3355 Ole John Aske 2010-11-09
sps-svs: refactoring, no functional changes. - recommit after merge conflicts
Cleaned up and removed duplicated code in receiving result related to ::handleBatchComplete().
- More logic related to counting of 'm_pendingFrags' and 'm_finalBatchFrags' which was
duplicated in several signal ::execFOO methods has been moved into ::handleBatchComplete()
- ::incrementPendingFrags() has been removed, and its logic integrated into ::handleBatchComplete
- ::closeSingletonScans() (Which was not a 'close' at all) has been removed
by making receive of lookup queries more similar to scans.
- Introduced ::findResultStream(Uint32 receiverId) which contains functionality
previously duplicated in ::execTRANSID_AI() and ::execSCAN_TABCONF().
-
modified:
storage/ndb/src/ndbapi/NdbQueryOperation.cpp
storage/ndb/src/ndbapi/NdbQueryOperationImpl.hpp
3354 Jonas Oreland 2010-11-09
ndb spj-svs - Add support for scan-scan queries to Hugo
modified:
storage/ndb/test/include/HugoQueries.hpp
storage/ndb/test/include/HugoQueryBuilder.hpp
storage/ndb/test/src/HugoQueries.cpp
storage/ndb/test/src/HugoQueryBuilder.cpp
storage/ndb/test/tools/hugoJoin.cpp
=== modified file 'storage/ndb/src/ndbapi/NdbQueryOperation.cpp'
--- a/storage/ndb/src/ndbapi/NdbQueryOperation.cpp 2010-11-08 15:18:22 +0000
+++ b/storage/ndb/src/ndbapi/NdbQueryOperation.cpp 2010-11-09 14:55:01 +0000
@@ -1867,60 +1867,94 @@ NdbQueryImpl::awaitMoreResults(bool forc
* the lock, because we know that the signal receiver thread will not
* be accessing m_fullFrags at this time.
*/
- NdbRootFragment* frag = m_fullFrags.pop();
- if (frag==NULL)
+ NdbRootFragment* frag;
+ if ((frag=m_fullFrags.pop()) != NULL)
{
- /* Getting here means that either:
- * - No results was returned (TCKEYREF)
- * - There was no matching row for an inner join.
- * - or, the application called nextResult() twice for a lookup query.
- */
- assert(m_finalBatchFrags == getRootFragCount());
- return FetchResult_noMoreData;
+ m_applFrags.add(*frag);
}
- else
+ assert(m_fullFrags.pop()==NULL); // Only one stream for lookups.
+
+ if (m_applFrags.getCurrent() != NULL)
{
- /* Move fragment from receiver thread's container to application
- * thread's container.*/
- assert(!frag->isEmpty());
- m_applFrags.add(*frag);
- assert(m_fullFrags.pop()==NULL); // Only one stream for lookups.
return FetchResult_ok;
}
+
+ /* Getting here means that either:
+ * - No results was returned (TCKEYREF)
+ * - There was no matching row for an inner join.
+ * - or, the application called nextResult() twice for a lookup query.
+ */
+ assert(m_pendingFrags == 0);
+ assert(m_finalBatchFrags == getRootFragCount());
+ return FetchResult_noMoreData;
} // if(m_queryDef.isScanQuery())
+
} //NdbQueryImpl::awaitMoreResults
-void
+
+/*
+ ::handleBatchComplete() is intended to be called when receiving signals only.
+ The PollGuard mutex is then set and the shared 'm_pendingFrags',
+ 'm_finalBatchFrags' and 'm_fullFrags' can safely be updated.
+
+ returns: 'true' when application thread should be resumed.
+*/
+bool
NdbQueryImpl::handleBatchComplete(Uint32 fragNo)
{
- assert(m_rootFrags[fragNo].isFragBatchComplete());
- getRoot().handleBatchComplete(fragNo);
+ if (traceSignals) {
+ ndbout << "NdbQueryImpl::handleBatchComplete, fragNo=" << fragNo
+ << ", pendingFrags=" << (m_pendingFrags-1)
+ << ", finalBatchFrags=" << m_finalBatchFrags
+ << endl;
+ }
+ bool resume = false;
+
+ /* May received fragment data after a SCANREF() (timeout?)
+ * terminated the scan. We are about to close this query,
+ * and didn't expect any more data - ignore it!
+ */
+ if (likely(m_fullFrags.m_errorCode == 0))
+ {
+ NdbQueryOperationImpl& root = getRoot();
+ NdbRootFragment& rootFrag = m_rootFrags[fragNo];
+ assert(rootFrag.isFragBatchComplete());
- // Position at the first (sorted?) row available from this fragments.
- getRoot().m_resultStreams[fragNo]->firstResult();
-}
+ assert(m_pendingFrags > 0); // Check against underflow.
+ assert(m_pendingFrags <= m_rootFragCount); // .... and overflow
+ m_pendingFrags--;
-void
-NdbQueryImpl::closeSingletonScans()
-{
- assert(!getQueryDef().isScanQuery());
- for(Uint32 i = 0; i<getNoOfOperations(); i++){
- NdbQueryOperationImpl& operation = getQueryOperation(i);
- NdbResultStream& resultStream = *operation.m_resultStreams[0];
- /** Now we have received all tuples for all operations. We can thus call
- * execSCANOPCONF() with the right row count.
+ if (rootFrag.finalBatchReceived())
+ {
+ m_finalBatchFrags++;
+ assert(m_finalBatchFrags <= m_rootFragCount);
+ }
+
+ if (getQueryDef().isScanQuery())
+ {
+ root.handleBatchComplete(fragNo); // Only required for scans
+ resume = (m_pendingFrags==0) || true;
+ }
+ else
+ {
+ assert(root.m_resultStreams[fragNo]->getReceiver().m_tcPtrI==RNIL);
+ assert(m_finalBatchFrags==1);
+ assert(m_pendingFrags==0); // Lookup query should be complete now.
+ resume = true;
+ }
+
+ /* Position at the first (sorted?) row available from this fragments.
*/
- resultStream.getReceiver()
- .execSCANOPCONF(RNIL, 0, resultStream.getRowCount());
- }
- /* nextResult() will later move it from m_fullFrags to m_applFrags
- * under mutex protection.
- */
- if (getRoot().m_resultStreams[0]->firstResult() != tupleNotFound) {
- m_fullFrags.push(m_rootFrags[0]);
+ root.m_resultStreams[fragNo]->firstResult();
+
+ /* When application thread ::awaitMoreResults() it will later be moved
+ * from m_fullFrags to m_applFrags under mutex protection.
+ */
+ m_fullFrags.push(rootFrag);
}
- m_finalBatchFrags++;
-} //NdbQueryImpl::closeSingletonScans
+
+ return resume;
+} // NdbQueryImpl::handleBatchComplete
int
NdbQueryImpl::close(bool forceSend)
@@ -2042,12 +2076,9 @@ NdbQueryImpl::execTCKEYCONF()
m_rootFrags[0].incrOutstandingResults(-1);
bool ret = false;
- if (m_rootFrags[0].isFragBatchComplete()) {
- /* If this root fragment is complete, verify that the query is also
- * complete for this batch.
- */
- ret = incrementPendingFrags(-1);
- assert(ret);
+ if (m_rootFrags[0].isFragBatchComplete())
+ {
+ ret = handleBatchComplete(0);
}
if (traceSignals) {
@@ -2058,7 +2089,7 @@ NdbQueryImpl::execTCKEYCONF()
<< endl;
}
return ret;
-}
+} // NdbQueryImpl::execTCKEYCONF
void
NdbQueryImpl::execCLOSE_SCAN_REP(int errorCode, bool needClose)
@@ -2070,41 +2101,6 @@ NdbQueryImpl::execCLOSE_SCAN_REP(int err
setFetchTerminated(errorCode,needClose);
}
-/*
- ::incrementPendingFrags() is intended to be called when receiving signals only.
- The PollGuard mutex is then set and the shared 'm_pendingFrags' can safely be updated.
-*/
-bool
-NdbQueryImpl::incrementPendingFrags(int increment)
-{
- if (unlikely(m_fullFrags.m_errorCode != 0))
- {
- /* Received fragment data after a SCANREF() (timeout?) has terminated the scan.
- * We are about to close this query, and didn't expect there to be anything 'pending'.
- */
- assert (m_pendingFrags==0);
- return (m_pendingFrags==0);
- }
-
- m_pendingFrags += increment;
- assert(m_pendingFrags < 1<<15); // Check against underflow.
- assert(m_pendingFrags <= m_rootFragCount); // .... and overflow
-
- if (traceSignals) {
- ndbout << "NdbQueryImpl::incrementPendingFrags(" << increment << "): "
- << ", pendingFrags=" << m_pendingFrags << endl;
- }
-
- if (m_pendingFrags==0) {
- if (!getQueryDef().isScanQuery()) {
- closeSingletonScans();
- }
- return true;
- } else {
- return false;
- }
-}
-
int
NdbQueryImpl::prepareSend()
{
@@ -3506,9 +3502,9 @@ NdbQueryOperationImpl::handleBatchComple
NdbQueryOperationImpl& child = getChildOperation(i);
child.handleBatchComplete(fragNo);
}
-
m_resultStreams[fragNo]->handleBatchComplete();
-}
+
+} // NdbQueryOperationImpl::handleBatchComplete
void
@@ -4321,7 +4317,6 @@ NdbQueryOperationImpl::prepareLookupKeyI
default:
assert(false);
}
-
}
if (unlikely(keyInfo.isMemoryExhausted())) {
@@ -4332,17 +4327,29 @@ NdbQueryOperationImpl::prepareLookupKeyI
} // NdbQueryOperationImpl::prepareLookupKeyInfo
-bool
-NdbQueryOperationImpl::execTRANSID_AI(const Uint32* ptr, Uint32 len){
- if (traceSignals) {
- ndbout << "NdbQueryOperationImpl::execTRANSID_AI()"
- << " operation no: "
- << getQueryOperationDef().getQueryOperationIx() << endl;
+Uint32
+NdbQueryOperationImpl::findResultStream(Uint32 receiverId) const
+{
+ Uint32 rootFragNo;
+ Uint32 rootFragCount = getQuery().getRootFragCount();
+ assert(&getRoot() == this);
+
+ for (rootFragNo = 0; rootFragNo<rootFragCount; rootFragNo++)
+ {
+ if (m_resultStreams[rootFragNo]->getReceiver().getId() == receiverId)
+ return rootFragNo;
}
- bool ret = false;
- NdbRootFragment* rootFrag = NULL;
- if(getQueryDef().isScanQuery())
+ assert(false);
+ return rootFragCount;
+} // NdbQueryOperationImpl::findResultStream
+
+
+bool
+NdbQueryOperationImpl::execTRANSID_AI(const Uint32* ptr, Uint32 len)
+{
+ Uint32 rootFragNo = 0;
+ if (getQueryDef().isScanQuery())
{
const Uint32 receiverId = CorrelationData(ptr, len).getRootReceiverId();
@@ -4350,50 +4357,26 @@ NdbQueryOperationImpl::execTRANSID_AI(co
* of the root operation. We can thus find the correct root fragment
* number.
*/
- Uint32 rootFragNo;
- for(rootFragNo = 0;
- rootFragNo<getQuery().getRootFragCount() &&
- getRoot().m_resultStreams[rootFragNo]->getReceiver().getId()
- != receiverId;
- rootFragNo++);
- assert(rootFragNo<getQuery().getRootFragCount());
-
- if (traceSignals) {
- ndbout << "NdbQueryOperationImpl::execTRANSID_AI()"
- << " fragment no: " << rootFragNo
- << endl;
- }
-
- // Process result values.
- m_resultStreams[rootFragNo]->execTRANSID_AI(ptr, len);
-
- rootFrag = &getQuery().m_rootFrags[rootFragNo];
- rootFrag->incrOutstandingResults(-1);
-
- if (rootFrag->isFragBatchComplete()) {
- m_queryImpl.incrementPendingFrags(-1);
- m_queryImpl.handleBatchComplete(rootFragNo);
-
- /* nextResult() will later move it from m_fullFrags to m_applFrags
- * under mutex protection.*/
- m_queryImpl.m_fullFrags.push(*rootFrag);
- // Wake up appl thread when we have data, or entire query batch completed.
- ret = true;
- }
+ rootFragNo = getRoot().findResultStream(receiverId);
+ }
+ if (traceSignals) {
+ ndbout << "NdbQueryOperationImpl::execTRANSID_AI()"
+ << ", operation no: " << getQueryOperationDef().getQueryOperationIx()
+ << ", fragment no: " << rootFragNo
+ << endl;
+ }
+
+ // Process result values.
+ m_resultStreams[rootFragNo]->execTRANSID_AI(ptr, len);
+
+ NdbRootFragment& rootFrag = m_queryImpl.m_rootFrags[rootFragNo];
+ rootFrag.incrOutstandingResults(-1);
+
+ bool ret = false;
+ if (rootFrag.isFragBatchComplete())
+ {
+ ret = m_queryImpl.handleBatchComplete(rootFragNo);
}
- else
- { // Lookup query
- // The root operation is a lookup.
- m_resultStreams[0]->execTRANSID_AI(ptr, len);
-
- rootFrag = &getQuery().m_rootFrags[0];
- rootFrag->incrOutstandingResults(-1);
-
- if (rootFrag->isFragBatchComplete()) {
- ret = m_queryImpl.incrementPendingFrags(-1);
- assert(ret); // The query should be complete now.
- }
- } // end lookup
if (traceSignals) {
ndbout << "NdbQueryOperationImpl::execTRANSID_AI(): returns:" << ret
@@ -4404,7 +4387,8 @@ NdbQueryOperationImpl::execTRANSID_AI(co
bool
-NdbQueryOperationImpl::execTCKEYREF(const NdbApiSignal* aSignal){
+NdbQueryOperationImpl::execTCKEYREF(const NdbApiSignal* aSignal)
+{
if (traceSignals) {
ndbout << "NdbQueryOperationImpl::execTCKEYREF()" << endl;
}
@@ -4432,6 +4416,9 @@ NdbQueryOperationImpl::execTCKEYREF(cons
}
}
+ Uint32 rootFragNo = 0;
+ NdbRootFragment& rootFrag = getQuery().m_rootFrags[0];
+
if (ref->errorCode != DbspjErr::NodeFailure)
{
// Compensate for children results not produced.
@@ -4443,18 +4430,18 @@ NdbQueryOperationImpl::execTCKEYREF(cons
{
cnt += getNoOfLeafOperations();
}
- getQuery().m_rootFrags[0].incrOutstandingResults(- Int32(cnt));
+ rootFrag.incrOutstandingResults(- Int32(cnt));
}
else
{
// consider frag-batch complete
- getQuery().m_rootFrags[0].clearOutstandingResults();
+ rootFrag.clearOutstandingResults();
}
bool ret = false;
- if (getQuery().m_rootFrags[0].isFragBatchComplete()) {
- ret = m_queryImpl.incrementPendingFrags(-1);
- assert(ret); // The query should be complete now.
+ if (rootFrag.isFragBatchComplete())
+ {
+ ret = m_queryImpl.handleBatchComplete(rootFragNo);
}
if (traceSignals) {
@@ -4482,14 +4469,9 @@ NdbQueryOperationImpl::execSCAN_TABCONF(
// For now, only the root operation may be a scan.
assert(&getRoot() == this);
assert(m_operationDef.isScanOperation());
- Uint32 fragNo;
+
// Find root fragment number.
- for(fragNo = 0;
- fragNo<getQuery().getRootFragCount() &&
- &getRoot().m_resultStreams[fragNo]
- ->getReceiver() != receiver;
- fragNo++);
- assert(fragNo<getQuery().getRootFragCount());
+ Uint32 fragNo = findResultStream(receiver->getId());
NdbRootFragment& rootFrag = getQuery().m_rootFrags[fragNo];
rootFrag.setConfReceived();
@@ -4499,10 +4481,6 @@ NdbQueryOperationImpl::execSCAN_TABCONF(
NdbResultStream& resultStream = *m_resultStreams[fragNo];
resultStream.getReceiver().m_tcPtrI = tcPtrI;
- if (rootFrag.finalBatchReceived())
- {
- m_queryImpl.m_finalBatchFrags++;
- }
if(traceSignals){
ndbout << " resultStream(root) {" << resultStream << "} fragNo="
<< fragNo << endl;
@@ -4536,17 +4514,12 @@ NdbQueryOperationImpl::execSCAN_TABCONF(
// Check that nodeMask does not have more bits than we have operations.
UNUSED(finalOpDef);
assert(nodeMask >> (1+finalOpDef.getQueryOperationId()) == 0);
+
bool ret = false;
- if (rootFrag.isFragBatchComplete()) {
- /* This fragment is now complete*/
- m_queryImpl.incrementPendingFrags(-1);
- m_queryImpl.handleBatchComplete(fragNo);
-
- /* nextResult() will later move it from m_fullFrags to m_applFrags
- * under mutex protection.*/
- m_queryImpl.m_fullFrags.push(rootFrag);
- // Wake up now.
- ret = true;
+ if (rootFrag.isFragBatchComplete())
+ {
+ /* This fragment is now complete */
+ ret = m_queryImpl.handleBatchComplete(fragNo);
}
if (traceSignals) {
ndbout << "NdbQueryOperationImpl::execSCAN_TABCONF():, returns:" << ret
=== modified file 'storage/ndb/src/ndbapi/NdbQueryOperationImpl.hpp'
--- a/storage/ndb/src/ndbapi/NdbQueryOperationImpl.hpp 2010-11-08 15:18:22 +0000
+++ b/storage/ndb/src/ndbapi/NdbQueryOperationImpl.hpp 2010-11-09 14:55:01 +0000
@@ -487,28 +487,18 @@ private:
*/
int sendClose(int nodeId);
- /** Close scan receivers used for lookups. (Since scans and lookups should
- * have the same semantics for nextResult(), lookups use scan-type
- * NdbReceiver objects.)
- */
- void closeSingletonScans();
-
const NdbQuery& getInterface() const
{ return m_interface; }
NdbQueryOperationImpl& getRoot() const
{ return getQueryOperation(0U); }
- /** Count number of completed root fragments within this batch.
- * @param increment Change in count of completed root frgaments.
- * @return True if batch is complete.
- */
- bool incrementPendingFrags(int increment);
-
/** A complete batch has been received for a given root fragment
- * Update whatever required before the appl. is allowed to navigate the result.
+ * Update whatever required before the appl. is allowed to navigate
+ * the result.
+ * @return: 'true' if its time to resume appl. threads
*/
- void handleBatchComplete(Uint32 rootFragNo);
+ bool handleBatchComplete(Uint32 rootFragNo);
}; // class NdbQueryImpl
@@ -779,6 +769,8 @@ private:
* NdbReceiver::m_query_operation_impl here.*/
Uint32 getIdOfReceiver() const;
+ Uint32 findResultStream(Uint32 receiverId) const;
+
/**
* If the operation has a scan filter, append the corresponding
* interpreter code to a buffer.
No bundle (reason: useless for push emails).
| Thread |
|---|
| • bzr push into mysql-5.1-telco-7.0-spj-scan-vs-scan branch(ole.john.aske:3354 to 3355) | Ole John Aske | 9 Nov |