3541 Ole John Aske 2011-08-17 [merge]
Merge 70 -> 70-spj
modified:
storage/ndb/src/ndbapi/NdbQueryOperation.cpp
storage/ndb/src/ndbapi/NdbQueryOperationImpl.hpp
3540 Ole John Aske 2011-08-17 [merge]
Merge
modified:
storage/ndb/src/ndbapi/NdbQueryOperation.cpp
=== modified file 'storage/ndb/src/ndbapi/NdbQueryOperation.cpp'
--- a/storage/ndb/src/ndbapi/NdbQueryOperation.cpp 2011-08-17 13:09:14 +0000
+++ b/storage/ndb/src/ndbapi/NdbQueryOperation.cpp 2011-08-17 13:27:04 +0000
@@ -187,9 +187,9 @@ public:
{ return m_fragNo; }
/**
- * Prepare for receiving another batch.
+ * Prepare for receiving another batch of results.
*/
- void reset();
+ void prepareNextReceiveSet();
/**
* Prepare for reading another batch of results.
@@ -403,7 +403,7 @@ public:
void prepare();
/** Prepare for receiving next batch of scan results. */
- void reset();
+ void prepareNextReceiveSet();
NdbReceiver& getReceiver()
{ return m_receiver; }
@@ -738,27 +738,6 @@ NdbResultStream::prepare()
m_resultSets[m_recv].m_buffer);
} //NdbResultStream::prepare
-void
-NdbResultStream::reset()
-{
- assert (isScanQuery());
-
- m_iterState = Iter_notStarted;
- m_currentRow = tupleNotFound;
- m_resultSets[m_recv].prepareReceive(m_receiver);
-
- /**
- * If this stream will get new rows in the next batch, then so will
- * all of its descendants.
- */
- for (Uint32 childNo = 0; childNo < m_operation.getNoOfChildOperations();
- childNo++)
- {
- NdbQueryOperationImpl& child = m_operation.getChildOperation(childNo);
- m_rootFrag.getResultStream(child).reset();
- }
-} //NdbResultStream::reset
-
/** Locate, and return 'tupleNo', of first tuple with specified parentId.
* parentId == tupleNotFound is use as a special value for iterating results
* from the root operation in the order which they was inserted by
@@ -851,7 +830,6 @@ NdbResultStream::firstResult()
return tupleNotFound;
} //NdbResultStream::firstResult()
-
Uint16
NdbResultStream::nextResult()
{
@@ -867,7 +845,6 @@ NdbResultStream::nextResult()
return tupleNotFound;
} //NdbResultStream::nextResult()
-
/**
* Callback when a TRANSID_AI signal (receive row) is processed.
*/
@@ -891,6 +868,32 @@ NdbResultStream::execTRANSID_AI(const Ui
} // NdbResultStream::execTRANSID_AI()
/**
+ * Make preparation for another batch of results to be received.
+ * This NdbResultStream, and all its sibling will receive a batch
+ * of results from the datanodes.
+ */
+void
+NdbResultStream::prepareNextReceiveSet()
+{
+ assert (isScanQuery());
+
+ m_iterState = Iter_notStarted;
+ m_currentRow = tupleNotFound;
+ m_resultSets[m_recv].prepareReceive(m_receiver);
+
+ /**
+ * If this stream will get new rows in the next batch, then so will
+ * all of its descendants.
+ */
+ for (Uint32 childNo = 0; childNo < m_operation.getNoOfChildOperations();
+ childNo++)
+ {
+ NdbQueryOperationImpl& child = m_operation.getChildOperation(childNo);
+ m_rootFrag.getResultStream(child).prepareNextReceiveSet();
+ }
+} //NdbResultStream::prepareNextReceiveSet
+
+/**
* Make preparations for another batch of result to be read:
* - Fill in parent/child result correlations in m_tupleSet[]
* - ... or reset m_tupleSet[] if we reuse the previous.
@@ -911,7 +914,8 @@ NdbResultStream::prepareResultSet(Uint32
/**
* Prepare NdbResultSet for reading - either the next received
- * from datanodes or reuse current.
+ * from datanodes or reuse the last as has been determined by
+ * ::prepareNextReceiveSet()
*/
if (m_tupleSet!=NULL)
{
@@ -1210,7 +1214,7 @@ bool NdbRootFragment::hasReceivedMore()
return (m_availResultSets > 0);
}
-void NdbRootFragment::reset()
+void NdbRootFragment::prepareNextReceiveSet()
{
assert(m_fragNo!=voidFragNo);
assert(m_outstandingResults == 0);
@@ -1218,13 +1222,14 @@ void NdbRootFragment::reset()
for (unsigned opNo=0; opNo<m_query->getNoOfOperations(); opNo++)
{
- if (!m_resultStreams[opNo].isSubScanComplete(m_remainingScans))
+ NdbResultStream& resultStream = getResultStream(opNo);
+ if (!resultStream.isSubScanComplete(m_remainingScans))
{
/**
- * Reset m_resultStreams[] and all its descendants, since all these
+ * Reset resultStream and all its descendants, since all these
* streams will get a new set of rows in the next batch.
*/
- m_resultStreams[opNo].reset();
+ resultStream.prepareNextReceiveSet();
}
}
m_confReceived = false;
@@ -3194,7 +3199,7 @@ NdbQueryImpl::sendFetchMore(NdbRootFragm
NdbRootFragment* rootFrag = rootFrags[i];
assert(rootFrag->isFragBatchComplete());
assert(!rootFrag->finalBatchReceived());
- rootFrag->reset();
+ rootFrag->prepareNextReceiveSet();
}
Ndb& ndb = *getNdbTransaction().getNdb();
@@ -3383,22 +3388,27 @@ int NdbQueryImpl::isPrunable(bool& pruna
NdbQueryImpl::OrderedFragSet::OrderedFragSet():
m_capacity(0),
m_activeFragCount(0),
- m_emptiedFragCount(0),
+ m_fetchMoreFragCount(0),
m_finalFragCount(0),
m_ordering(NdbQueryOptions::ScanOrdering_void),
m_keyRecord(NULL),
m_resultRecord(NULL),
m_activeFrags(NULL),
- m_emptiedFrags(NULL)
+ m_fetchMoreFrags(NULL)
{
}
NdbQueryImpl::OrderedFragSet::~OrderedFragSet()
{
m_activeFrags = NULL;
- m_emptiedFrags= NULL;
+ m_fetchMoreFrags = NULL;
}
+void NdbQueryImpl::OrderedFragSet::clear()
+{
+ m_activeFragCount = 0;
+ m_fetchMoreFragCount = 0;
+}
void
NdbQueryImpl::OrderedFragSet::prepare(NdbBulkAllocator& allocator,
@@ -3419,9 +3429,9 @@ NdbQueryImpl::OrderedFragSet::prepare(Nd
reinterpret_cast<NdbRootFragment**>(allocator.allocObjMem(capacity));
bzero(m_activeFrags, capacity * sizeof(NdbRootFragment*));
- m_emptiedFrags =
+ m_fetchMoreFrags =
reinterpret_cast<NdbRootFragment**>(allocator.allocObjMem(capacity));
- bzero(m_emptiedFrags, capacity * sizeof(NdbRootFragment*));
+ bzero(m_fetchMoreFrags, capacity * sizeof(NdbRootFragment*));
}
m_ordering = ordering;
m_keyRecord = keyRecord;
@@ -3486,10 +3496,10 @@ NdbQueryImpl::OrderedFragSet::reorganize
}
else
{
- m_emptiedFrags[m_emptiedFragCount++] = frag;
+ m_fetchMoreFrags[m_fetchMoreFragCount++] = frag;
}
m_activeFragCount--;
- assert(m_activeFragCount + m_emptiedFragCount + m_finalFragCount
+ assert(m_activeFragCount + m_fetchMoreFragCount + m_finalFragCount
<= m_capacity);
return; // Remaining m_activeFrags[] are sorted
@@ -3539,7 +3549,7 @@ NdbQueryImpl::OrderedFragSet::reorganize
}
assert(verifySortOrder());
}
- assert(m_activeFragCount + m_emptiedFragCount + m_finalFragCount
+ assert(m_activeFragCount + m_fetchMoreFragCount + m_finalFragCount
<= m_capacity);
} // OrderedFragSet::reorganize()
@@ -3565,23 +3575,16 @@ NdbQueryImpl::OrderedFragSet::prepareMor
}
} // for all 'rootFrags[]'
- assert(m_activeFragCount + m_emptiedFragCount + m_finalFragCount
+ assert(m_activeFragCount + m_fetchMoreFragCount + m_finalFragCount
<= m_capacity);
} // OrderedFragSet::prepareMoreResults()
-void NdbQueryImpl::OrderedFragSet::clear()
-{
- m_activeFragCount = 0;
- m_emptiedFragCount = 0;
- m_finalFragCount = 0;
-}
-
Uint32
NdbQueryImpl::OrderedFragSet::getFetchMore(NdbRootFragment** &frags)
{
- const Uint32 cnt = m_emptiedFragCount;
- frags = m_emptiedFrags;
- m_emptiedFragCount = 0;
+ const int cnt = m_fetchMoreFragCount;
+ frags = m_fetchMoreFrags;
+ m_fetchMoreFragCount = 0;
return cnt;
}
=== modified file 'storage/ndb/src/ndbapi/NdbQueryOperationImpl.hpp'
--- a/storage/ndb/src/ndbapi/NdbQueryOperationImpl.hpp 2011-08-17 12:06:04 +0000
+++ b/storage/ndb/src/ndbapi/NdbQueryOperationImpl.hpp 2011-08-17 13:16:22 +0000
@@ -333,14 +333,16 @@ private:
private:
- /** Max no of fragments.*/
+ /** No of fragments to read until '::finalBatchReceived()'.*/
int m_capacity;
/** Number of fragments in 'm_activeFrags'.*/
int m_activeFragCount;
- /** Number of fragments in 'm_emptiedFrags'. */
- int m_emptiedFragCount;
- /** Number of fragments where the final batch has been received
- * and consumed.*/
+ /** Number of fragments in 'm_fetchMoreFrags'. */
+ int m_fetchMoreFragCount;
+ /**
+ * Number of fragments where the final batch has been received
+ * and consumed.
+ */
int m_finalFragCount;
/** Ordering of index scan result.*/
NdbQueryOptions::ScanOrdering m_ordering;
@@ -348,15 +350,18 @@ private:
const NdbRecord* m_keyRecord;
/** Needed for comparing records when ordering results.*/
const NdbRecord* m_resultRecord;
- /** Fragments where some tuples in the current batch has not yet been
- * consumed.*/
+ /**
+ * Fragments where some tuples in the current ResultSet has not
+ * yet been consumed.
+ */
NdbRootFragment** m_activeFrags;
- /** Fragments where all tuples in the current batch have been consumed,
- * but where there are more batches to fetch.*/
- NdbRootFragment** m_emptiedFrags;
- // No copying.
- OrderedFragSet(const OrderedFragSet&);
- OrderedFragSet& operator=(const OrderedFragSet&);
+ /**
+ * Fragments from which we should request more ResultSets.
+ * Either due to the current batch has been consumed, or double buffering
+ * of result sets allows us to request another batch before the current
+ * has been consumed.
+ */
+ NdbRootFragment** m_fetchMoreFrags;
/** Add a complete fragment that has been received.*/
void add(NdbRootFragment& frag);
@@ -369,6 +374,10 @@ private:
/** For debugging purposes.*/
bool verifySortOrder() const;
+
+ // No copying.
+ OrderedFragSet(const OrderedFragSet&);
+ OrderedFragSet& operator=(const OrderedFragSet&);
}; // class OrderedFragSet
/** The interface that is visible to the application developer.*/
No bundle (reason: useless for push emails).
| Thread |
|---|
| • bzr push into mysql-5.1-telco-7.0-spj-scan-vs-scan branch(ole.john.aske:3540 to 3541) | Ole John Aske | 22 Aug |