3331 Ole John Aske 2010-10-29
Recommit after resolving merge conflicts:
spj-svs: Fixed mutex / mt concurrency problem when receiving fragment data.
Ensure that the class NdbQueryImpl member fields m_pendingFrags & m_finalBatchFrags
are only accessed when the PollGuard mutex is locked.
modified:
storage/ndb/src/ndbapi/NdbQueryOperation.cpp
storage/ndb/src/ndbapi/NdbQueryOperationImpl.hpp
3330 jonas oreland 2010-10-29 [merge]
ndb - spj svs - merge
modified:
mysql-test/suite/ndb/r/ndb_gis.result
sql/ha_ndbcluster.cc
=== modified file 'storage/ndb/src/ndbapi/NdbQueryOperation.cpp'
--- a/storage/ndb/src/ndbapi/NdbQueryOperation.cpp 2010-10-26 12:41:00 +0000
+++ b/storage/ndb/src/ndbapi/NdbQueryOperation.cpp 2010-10-29 21:21:45 +0000
@@ -1743,7 +1743,6 @@ NdbQueryImpl::awaitMoreResults(bool forc
/* Check if there are any more completed fragments available.*/
if (m_queryDef.isScanQuery())
{
- assert (m_state==Executing);
assert (m_scanTransaction);
Ndb* const ndb = m_transaction.getNdb();
@@ -1756,6 +1755,7 @@ NdbQueryImpl::awaitMoreResults(bool forc
* receiver thread.
*/
PollGuard poll_guard(*ndb->theImpl);
+ assert (m_state==Executing);
/* m_fullFrags contains any fragments that are complete (for this batch)
* but have not yet been moved (under mutex protection) to
@@ -1998,12 +1998,17 @@ NdbQueryImpl::execCLOSE_SCAN_REP(bool ne
}
}
-
+/*
+ ::incrementPendingFrags() is intended to be called when receiving signals only.
+ The PollGuard mutex is then set and the shared 'm_pendingFrags' can safely be updated.
+*/
bool
NdbQueryImpl::incrementPendingFrags(int increment)
{
m_pendingFrags += increment;
- assert(m_pendingFrags < 1<<15); // Check against underflow.
+ assert(m_pendingFrags < 1<<15); // Check against underflow.
+ assert(m_pendingFrags <= m_rootFragCount); // .... and overflow
+
if (traceSignals) {
ndbout << "NdbQueryImpl::incrementPendingFrags(" << increment << "): "
<< ", pendingFrags=" << m_pendingFrags << endl;
@@ -2032,8 +2037,6 @@ NdbQueryImpl::prepareSend()
return -1;
}
- assert (m_pendingFrags==0);
-
// Determine execution parameters 'batch size'.
// May be user specified (TODO), and/or, limited/specified by config values
//
@@ -2043,13 +2046,13 @@ NdbQueryImpl::prepareSend()
* and unordered scans.*/
if (getQueryOperation(0U).m_parallelism > 0)
{
- m_pendingFrags = m_rootFragCount
+ m_rootFragCount
= MIN(getRoot().getQueryOperationDef().getTable().getFragmentCount(),
getQueryOperation(0U).m_parallelism);
}
else
{
- m_pendingFrags = m_rootFragCount
+ m_rootFragCount
= getRoot().getQueryOperationDef().getTable().getFragmentCount();
}
Ndb* const ndb = m_transaction.getNdb();
@@ -2070,7 +2073,7 @@ NdbQueryImpl::prepareSend()
}
else // Lookup query
{
- m_pendingFrags = m_rootFragCount = 1;
+ m_rootFragCount = 1;
}
// Some preparation for later batchsize calculations pr. (sub) scan
@@ -2112,10 +2115,10 @@ NdbQueryImpl::prepareSend()
}
int error;
if (unlikely((error = m_applFrags.prepare(getRoot().getOrdering(),
- m_pendingFrags,
+ m_rootFragCount,
keyRec,
getRoot().m_ndbRecord)) != 0)
- || (error = m_fullFrags.prepare(m_pendingFrags)) != 0) {
+ || (error = m_fullFrags.prepare(m_rootFragCount)) != 0) {
setErrorCodeAbort(error);
return -1;
}
@@ -2147,6 +2150,7 @@ NdbQueryImpl::prepareSend()
ndbout << endl;
#endif
+ assert (m_pendingFrags==0);
m_state = Prepared;
return 0;
} // NdbQueryImpl::prepareSend
@@ -2220,7 +2224,10 @@ const Uint32* InitialReceiverIdIterator:
/******************************************************************************
-int doSend()
+int doSend() Send serialized queryTree and parameters encapsulated in
+ either a SCAN_TABREQ or TCKEYREQ to TC.
+
+NOTE: The TransporterFacade mutex is already set by callee.
Return Value: Return >0 : send was succesful, returns number of signals sent
Return -1: In all other case.
@@ -2461,6 +2468,9 @@ NdbQueryImpl::doSend(int nodeId, bool la
getNoOfLeafOperations());
} // if
+ assert (m_pendingFrags==0);
+ m_pendingFrags = m_rootFragCount;
+
// Shrink memory footprint by removing structures not required after ::execute()
m_keyInfo.releaseExtend();
m_attrInfo.releaseExtend();
@@ -2494,9 +2504,6 @@ NdbQueryImpl::sendFetchMore(NdbRootFragm
assert(!emptyFrag.finalBatchReceived());
assert(m_queryDef.isScanQuery());
- m_pendingFrags++;
- assert(m_pendingFrags <= getRootFragCount());
-
emptyFrag.reset();
for (unsigned opNo=0; opNo<m_countOperations; opNo++)
@@ -2562,6 +2569,8 @@ NdbQueryImpl::sendFetchMore(NdbRootFragm
// Error: 'Send to NDB failed'
return -1;
}
+ m_pendingFrags++;
+ assert(m_pendingFrags <= getRootFragCount());
if (forceSend)
{
@@ -2663,6 +2672,10 @@ NdbQueryImpl::closeTcCursor(bool forceSe
return 0;
} //NdbQueryImpl::closeTcCursor
+
+/*
+ This method is called with the PollGuard mutex held on the transporter.
+*/
int
NdbQueryImpl::sendClose(int nodeId)
{
@@ -2692,6 +2705,10 @@ NdbQueryImpl::sendClose(int nodeId)
} // NdbQueryImpl::sendClose()
+/*
+ This method is always called with PollGuard mutex held on the transporter.
+ (As m_pendingFrags is accessed both from API sender and receiver)
+*/
bool
NdbQueryImpl::isBatchComplete() const {
#ifndef NDEBUG
=== modified file 'storage/ndb/src/ndbapi/NdbQueryOperationImpl.hpp'
--- a/storage/ndb/src/ndbapi/NdbQueryOperationImpl.hpp 2010-10-12 13:00:44 +0000
+++ b/storage/ndb/src/ndbapi/NdbQueryOperationImpl.hpp 2010-10-29 21:21:45 +0000
@@ -364,8 +364,10 @@ private:
*/
Uint32 m_globalCursor;
- /** Number of root fragments not yet completed within the current batch.*/
- Uint32 m_pendingFrags;
+ /** Number of root fragments not yet completed within the current batch.
+ * Only access w/ PollGuard mutex as it is also updated by receiver threa
+ */
+ Uint32 m_pendingFrags; // BEWARE: protect with PollGuard mutex
/** Number of fragments to be read by the root operation. (1 if root
* operation is a lookup)*/
@@ -394,7 +396,7 @@ private:
* m_finalBatchFrags==m_rootFragCount, all tuples for the final batches may
* still not have been received (i.e. m_pendingFrags>0).
*/
- Uint32 m_finalBatchFrags;
+ Uint32 m_finalBatchFrags; // BEWARE: protect with PollGuard mutex
/** Number of IndexBounds set by API (index scans only) */
Uint32 m_num_bounds;
Attachment: [text/bzr-bundle] bzr/ole.john.aske@oracle.com-20101029212145-ip0tz1guh8iwghol.bundle
| Thread |
|---|
| • bzr push into mysql-5.1-telco-7.0-spj-scan-vs-scan branch(ole.john.aske:3330 to 3331) | Ole John Aske | 29 Oct |