4462 Ole John Aske 2011-08-17
Refactoring as part of preparing SPJ results to be double (multi...?) buffered.
'class SharedFragStack' which was intended as a container for fragments
which has received a complete batch of rows is removed.
It is replaced with a set of (mutex protected) methods in
'class NdbRootFragment' and OrderedFragSet::prepareMoreResults()
The rational for this refactoring is that the SharedFragStack container
wasn't a particular good fit to represent the state when *multiple*
batches of (double buffered) result set may be available.
modified:
storage/ndb/src/ndbapi/NdbQueryOperation.cpp
storage/ndb/src/ndbapi/NdbQueryOperationImpl.hpp
4461 Jonas Oreland 2011-08-17 [merge]
ndb - merge wl4124-new2
removed:
mysql-test/suite/ndb/r/ndb_statistics.result
mysql-test/suite/ndb/t/ndb_statistics.test
added:
mysql-test/suite/ndb/r/ndb_index_stat.result
mysql-test/suite/ndb/r/ndb_statistics0.result
mysql-test/suite/ndb/r/ndb_statistics1.result
mysql-test/suite/ndb/t/ndb_index_stat.test
mysql-test/suite/ndb/t/ndb_index_stat_enable.inc
mysql-test/suite/ndb/t/ndb_statistics.inc
mysql-test/suite/ndb/t/ndb_statistics0.test
mysql-test/suite/ndb/t/ndb_statistics1.test
modified:
mysql-test/suite/ndb/r/ndb_restore_misc.result
mysql-test/suite/ndb/t/ndb_restore_misc.test
sql/ha_ndb_index_stat.cc
sql/ha_ndb_index_stat.h
sql/ha_ndbcluster.cc
sql/ha_ndbcluster.h
storage/ndb/include/ndb_constants.h
storage/ndb/include/ndbapi/NdbIndexStat.hpp
storage/ndb/include/util/NdbPack.hpp
storage/ndb/src/common/util/NdbPack.cpp
storage/ndb/src/ndbapi/NdbIndexStat.cpp
storage/ndb/src/ndbapi/NdbIndexStatImpl.cpp
storage/ndb/src/ndbapi/NdbIndexStatImpl.hpp
storage/ndb/test/ndbapi/testIndexStat.cpp
storage/ndb/tools/ndb_index_stat.cpp
=== modified file 'storage/ndb/src/ndbapi/NdbQueryOperation.cpp'
--- a/storage/ndb/src/ndbapi/NdbQueryOperation.cpp 2011-08-17 08:10:27 +0000
+++ b/storage/ndb/src/ndbapi/NdbQueryOperation.cpp 2011-08-17 12:06:04 +0000
@@ -181,6 +181,8 @@ public:
*/
void init(NdbQueryImpl& query, Uint32 fragNo);
+ static void clear(NdbRootFragment* frags, Uint32 noOfFrags);
+
Uint32 getFragNo() const
{ return m_fragNo; }
@@ -192,7 +194,11 @@ public:
/**
* Prepare for reading another batch of results.
*/
- void prepareResultSet();
+ void grabNextResultSet(); // Need mutex lock
+
+ bool hasReceivedMore() const; // Need mutex lock
+
+ void setReceivedMore(); // Need mutex lock
void incrOutstandingResults(Int32 delta)
{
@@ -273,12 +279,19 @@ private:
NdbResultStream* m_resultStreams;
/**
- * The number of outstanding TCKEYREF or TRANSID_AI
- * messages for the fragment. This includes both messages related to the
+ * Number of available prefetched ResultSets which are completely
+ * received. Will be made available for reading by calling
+ * ::grabNextResultSet()
+ */
+ Uint32 m_availResultSets; // Need mutex
+
+ /**
+ * The number of outstanding TCKEYREF or TRANSID_AI messages to receive
+ * for the fragment. This includes both messages related to the
* root operation and any descendant operation that was instantiated as
* a consequence of tuples found by the root operation.
- * This number may temporarily be negative if e.g. TRANSID_AI arrives before
- * SCAN_TABCONF.
+ * This number may temporarily be negative if e.g. TRANSID_AI arrives
+ * before SCAN_TABCONF.
*/
Int32 m_outstandingResults;
@@ -287,7 +300,8 @@ private:
* operation accesses (i.e. one for a lookup, all for a table scan).
*
* Each element is true iff a SCAN_TABCONF (for that fragment) or
- * TCKEYCONF message has been received */
+ * TCKEYCONF message has been received
+ */
bool m_confReceived;
/**
@@ -1034,6 +1048,7 @@ NdbRootFragment::NdbRootFragment():
m_query(NULL),
m_fragNo(voidFragNo),
m_resultStreams(NULL),
+ m_availResultSets(0),
m_outstandingResults(0),
m_confReceived(false),
m_remainingScans(0),
@@ -1094,6 +1109,45 @@ NdbRootFragment::getResultStream(Uint32
return m_resultStreams[operationNo];
}
+/**
+ * Throw any pending ResultSets from specified rootFrags[]
+ */
+//static
+void NdbRootFragment::clear(NdbRootFragment* rootFrags, Uint32 noOfFrags)
+{
+ if (rootFrags != NULL)
+ {
+ for (Uint32 fragNo = 0; fragNo < noOfFrags; fragNo++)
+ {
+ rootFrags[fragNo].m_availResultSets = 0;
+ }
+ }
+}
+
+/**
+ * Signal that another complete ResultSet is available for
+ * this NdbRootFragment.
+ * Need mutex lock as 'm_availResultSets' is accesed both from
+ * receiver and application thread.
+ */
+void NdbRootFragment::setReceivedMore()
+{
+ assert(m_availResultSets==0);
+ m_availResultSets++;
+}
+
+/**
+ * Check if another ResultSets has been received and is available
+ * for reading. It will be given to the application thread when it
+ * call ::grabNextResultSet().
+ * Need mutex lock as 'm_availResultSets' is accesed both from
+ * receiver and application thread.
+ */
+bool NdbRootFragment::hasReceivedMore() const
+{
+ return (m_availResultSets > 0);
+}
+
void NdbRootFragment::reset()
{
assert(m_fragNo!=voidFragNo);
@@ -1114,8 +1168,17 @@ void NdbRootFragment::reset()
m_confReceived = false;
}
-void NdbRootFragment::prepareResultSet()
+/**
+ * Let the application thread takes ownership of an available
+ * ResultSet, prepare it for reading first row.
+ * Need mutex lock as 'm_availResultSets' is accesed both from
+ * receiver and application thread.
+ */
+void NdbRootFragment::grabNextResultSet()
{
+ assert(m_availResultSets>0);
+ m_availResultSets--;
+
NdbResultStream& rootStream = getResultStream(0);
rootStream.prepareResultSet(m_remainingScans);
@@ -1137,7 +1200,7 @@ void NdbRootFragment::setConfReceived(Ui
bool NdbRootFragment::finalBatchReceived() const
{
- return getReceiverTcPtrI()==RNIL;
+ return m_confReceived && getReceiverTcPtrI()==RNIL;
}
bool NdbRootFragment::isEmpty() const
@@ -1587,6 +1650,7 @@ NdbQueryImpl::NdbQueryImpl(NdbTransactio
m_next(NULL),
m_queryDef(&queryDef),
m_error(),
+ m_errorReceived(0),
m_transaction(trans),
m_scanTransaction(NULL),
m_operations(0),
@@ -1596,7 +1660,6 @@ NdbQueryImpl::NdbQueryImpl(NdbTransactio
m_rootFragCount(0),
m_rootFrags(NULL),
m_applFrags(),
- m_fullFrags(),
m_finalBatchFrags(0),
m_num_bounds(0),
m_shortestBound(0xffffffff),
@@ -2060,10 +2123,9 @@ NdbQueryImpl::nextResult(bool fetchAllow
NdbQuery::NextResultOutcome
NdbQueryImpl::nextRootResult(bool fetchAllowed, bool forceSend)
{
- /* To minimize lock contention, each query has two separate root fragment
- * conatiners (m_fullFrags and m_applFrags). m_applFrags is only
- * accessed by the application thread, so it is safe to use it without
- * locks.
+ /* To minimize lock contention, each query has the separate root fragment
+ * conatiner 'm_applFrags'. m_applFrags is only accessed by the application
+ * thread, so it is safe to use it without locks.
*/
while (m_state != EndOfData) // Or likely: return when 'gotRow' or error
{
@@ -2179,23 +2241,17 @@ NdbQueryImpl::awaitMoreResults(bool forc
*/
while (likely(!hasReceivedError()))
{
- /* m_fullFrags contains any fragments that are complete (for this batch)
- * but have not yet been moved (under mutex protection) to
- * m_applFrags.
+ /* Scan m_rootFrags (under mutex protection) for fragments
+ * which has received a complete batch. Add these to m_applFrags.
*/
- NdbRootFragment* frag;
- while ((frag=m_fullFrags.pop()) != NULL)
- {
- frag->prepareResultSet();
- m_applFrags.add(*frag);
- }
+ m_applFrags.prepareMoreResults(m_rootFrags,m_rootFragCount);
if (m_applFrags.getCurrent() != NULL)
{
return FetchResult_ok;
}
- /* There are noe more available frament results available without
- * first waiting for more to be received from datanodes
+ /* There are no more available fragment results available without
+ * first waiting for more to be received from the datanodes
*/
if (m_pendingFrags == 0)
{
@@ -2236,16 +2292,9 @@ NdbQueryImpl::awaitMoreResults(bool forc
/* The root operation is a lookup. Lookups are guaranteed to be complete
* before NdbTransaction::execute() returns. Therefore we do not set
* the lock, because we know that the signal receiver thread will not
- * be accessing m_fullFrags at this time.
+ * be accessing m_rootFrags at this time.
*/
- NdbRootFragment* frag;
- if ((frag=m_fullFrags.pop()) != NULL)
- {
- frag->prepareResultSet();
- m_applFrags.add(*frag);
- }
- assert(m_fullFrags.pop()==NULL); // Only one stream for lookups.
-
+ m_applFrags.prepareMoreResults(m_rootFrags,m_rootFragCount);
if (m_applFrags.getCurrent() != NULL)
{
return FetchResult_ok;
@@ -2266,8 +2315,8 @@ NdbQueryImpl::awaitMoreResults(bool forc
/*
::handleBatchComplete() is intended to be called when receiving signals only.
- The PollGuard mutex is then set and the shared 'm_pendingFrags',
- 'm_finalBatchFrags' and 'm_fullFrags' can safely be updated.
+ The PollGuard mutex is then set and the shared 'm_pendingFrags' and
+ 'm_finalBatchFrags' can safely be updated and ::setReceivedMore() signaled.
returns: 'true' when application thread should be resumed.
*/
@@ -2286,7 +2335,7 @@ NdbQueryImpl::handleBatchComplete(NdbRoo
* terminated the scan. We are about to close this query,
* and didn't expect any more data - ignore it!
*/
- if (likely(m_fullFrags.m_errorCode == 0))
+ if (likely(m_errorReceived == 0))
{
assert(rootFrag.isFragBatchComplete());
@@ -2300,10 +2349,10 @@ NdbQueryImpl::handleBatchComplete(NdbRoo
assert(m_finalBatchFrags <= m_rootFragCount);
}
- /* When application thread ::awaitMoreResults() it will later be moved
- * from m_fullFrags to m_applFrags under mutex protection.
+ /* When application thread ::awaitMoreResults() it will later be
+ * added to m_applFrags under mutex protection.
*/
- m_fullFrags.push(rootFrag);
+ rootFrag.setReceivedMore();
return true;
}
@@ -2327,7 +2376,7 @@ NdbQueryImpl::close(bool forceSend)
}
// Throw any pending results
- m_fullFrags.clear();
+ NdbRootFragment::clear(m_rootFrags,m_rootFragCount);
m_applFrags.clear();
Ndb* const ndb = m_transaction.getNdb();
@@ -2423,7 +2472,7 @@ NdbQueryImpl::setFetchTerminated(int err
}
if (errorCode!=0)
{
- m_fullFrags.m_errorCode = errorCode;
+ m_errorReceived = errorCode;
}
m_pendingFrags = 0;
} // NdbQueryImpl::setFetchTerminated()
@@ -2436,9 +2485,9 @@ NdbQueryImpl::setFetchTerminated(int err
bool
NdbQueryImpl::hasReceivedError()
{
- if (unlikely(m_fullFrags.m_errorCode))
+ if (unlikely(m_errorReceived))
{
- setErrorCode(m_fullFrags.m_errorCode);
+ setErrorCode(m_errorReceived);
return true;
}
return false;
@@ -2543,8 +2592,7 @@ NdbQueryImpl::prepareSend()
}
// Allocate space for ptrs to NdbResultStream and NdbRootFragment objects.
error = m_pointerAlloc.init(m_rootFragCount *
- (SharedFragStack::pointersPerFragment +
- OrderedFragSet::pointersPerFragment));
+ (OrderedFragSet::pointersPerFragment));
if (error != 0)
{
setErrorCode(error);
@@ -2631,21 +2679,17 @@ NdbQueryImpl::prepareSend()
keyRec = getRoot().getQueryOperationDef().getIndex()->getDefaultRecord();
assert(keyRec!=NULL);
}
- if (unlikely((error = m_applFrags.prepare(m_pointerAlloc,
- getRoot().getOrdering(),
- m_rootFragCount,
- keyRec,
- getRoot().m_ndbRecord)) != 0)
- || (error = m_fullFrags.prepare(m_pointerAlloc,
- m_rootFragCount)) != 0) {
- setErrorCode(error);
- return -1;
- }
+ m_applFrags.prepare(m_pointerAlloc,
+ getRoot().getOrdering(),
+ m_rootFragCount,
+ keyRec,
+ getRoot().m_ndbRecord);
if (getQueryDef().isScanQuery())
{
NdbRootFragment::buildReciverIdMap(m_rootFrags, m_rootFragCount);
}
+
#ifdef TRACE_SERIALIZATION
ndbout << "Serialized ATTRINFO : ";
for(Uint32 i = 0; i < m_attrInfo.getSize(); i++){
@@ -3183,8 +3227,8 @@ NdbQueryImpl::closeTcCursor(bool forceSe
} // while
assert(m_pendingFrags==0);
- m_fullFrags.clear(); // Throw any unhandled results
- m_fullFrags.m_errorCode = 0; // Clear errors caused by previous fetching
+ NdbRootFragment::clear(m_rootFrags,m_rootFragCount);
+ m_errorReceived = 0; // Clear errors caused by previous fetching
m_error.code = 0;
if (m_finalBatchFrags < getRootFragCount()) // TC has an open scan cursor.
@@ -3271,39 +3315,6 @@ int NdbQueryImpl::isPrunable(bool& pruna
/****************
- * NdbQueryImpl::SharedFragStack methods.
- ***************/
-
-NdbQueryImpl::SharedFragStack::SharedFragStack():
- m_errorCode(0),
- m_capacity(0),
- m_current(-1),
- m_array(NULL)
-{}
-
-int
-NdbQueryImpl::SharedFragStack::prepare(NdbBulkAllocator& allocator,
- int capacity)
-{
- assert(m_array==NULL);
- assert(m_capacity==0);
- if (capacity > 0)
- { m_capacity = capacity;
- m_array =
- reinterpret_cast<NdbRootFragment**>(allocator.allocObjMem(capacity));
- }
- return 0;
-}
-
-void
-NdbQueryImpl::SharedFragStack::push(NdbRootFragment& frag)
-{
- m_current++;
- assert(m_current<m_capacity);
- m_array[m_current] = &frag;
-}
-
-/****************
* NdbQueryImpl::OrderedFragSet methods.
***************/
@@ -3327,7 +3338,7 @@ NdbQueryImpl::OrderedFragSet::~OrderedFr
}
-int
+void
NdbQueryImpl::OrderedFragSet::prepare(NdbBulkAllocator& allocator,
NdbQueryOptions::ScanOrdering ordering,
int capacity,
@@ -3345,6 +3356,7 @@ NdbQueryImpl::OrderedFragSet::prepare(Nd
m_activeFrags =
reinterpret_cast<NdbRootFragment**>(allocator.allocObjMem(capacity));
bzero(m_activeFrags, capacity * sizeof(NdbRootFragment*));
+
m_emptiedFrags =
reinterpret_cast<NdbRootFragment**>(allocator.allocObjMem(capacity));
bzero(m_emptiedFrags, capacity * sizeof(NdbRootFragment*));
@@ -3352,7 +3364,6 @@ NdbQueryImpl::OrderedFragSet::prepare(Nd
m_ordering = ordering;
m_keyRecord = keyRecord;
m_resultRecord = resultRecord;
- return 0;
} // OrderedFragSet::prepare()
@@ -3479,6 +3490,23 @@ NdbQueryImpl::OrderedFragSet::add(NdbRoo
reorganize(); // Move into position
} // OrderedFragSet::add()
+void
+NdbQueryImpl::OrderedFragSet::prepareMoreResults(NdbRootFragment rootFrags[], Uint32 cnt)
+{
+ for (Uint32 fragNo = 0; fragNo < cnt; fragNo++)
+ {
+ NdbRootFragment& rootFrag = rootFrags[fragNo];
+ if (rootFrag.hasReceivedMore()) // Another ResultSet is available
+ {
+ rootFrag.grabNextResultSet(); // Get new ResultSet.
+ add(rootFrag); // Make avail. to appl. thread
+ }
+ } // for all 'rootFrags[]'
+
+ assert(m_activeFragCount + m_emptiedFragCount + m_finalFragCount
+ <= m_capacity);
+} // OrderedFragSet::prepareMoreResults()
+
void NdbQueryImpl::OrderedFragSet::clear()
{
m_activeFragCount = 0;
=== modified file 'storage/ndb/src/ndbapi/NdbQueryOperationImpl.hpp'
--- a/storage/ndb/src/ndbapi/NdbQueryOperationImpl.hpp 2011-08-17 08:10:27 +0000
+++ b/storage/ndb/src/ndbapi/NdbQueryOperationImpl.hpp 2011-08-17 12:06:04 +0000
@@ -271,58 +271,13 @@ private:
FetchResult_noMoreCache = 2
};
- /** A stack of NdbRootFragment pointers.
- * NdbRootFragments which are 'BatchComplete' are pushed on this stack by
- * the receiver thread, and later pop'ed into the application thread when
- * it need more results to process.
- * Due to this shared usage, the PollGuard mutex must be set before
- * accessing SharedFragStack.
+ /**
+ * Container of root fragments that the application is currently
+ * iterating over. 'Owned' by application thread and can be accesed
+ * without requiring a mutex lock.
+ * RootFragments are appended to a OrderedFragSet by ::prepareMoreResults()
+ *
*/
- class SharedFragStack{
- public:
- // For calculating need for dynamically allocated memory.
- static const Uint32 pointersPerFragment = 1;
-
- explicit SharedFragStack();
-
- /**
- * Prepare internal datastructures.
- * param[in] allocator For allocating arrays of pointers.
- * param[in] capacity Max no of root fragments.
- * @return 0 if ok, else errorcode
- */
- int prepare(NdbBulkAllocator& allocator, int capacity);
-
- NdbRootFragment* pop() {
- return m_current>=0 ? m_array[m_current--] : NULL;
- }
-
- void push(NdbRootFragment& frag);
-
- int size() const {
- return (m_current+1);
- }
-
- void clear() {
- m_current = -1;
- }
-
- /** Possible error received from TC / datanodes. */
- int m_errorCode;
-
- private:
- /** Capacity of stack.*/
- int m_capacity;
-
- /** Index of current top of stack.*/
- int m_current;
- NdbRootFragment** m_array;
-
- // No copying.
- SharedFragStack(const SharedFragStack&);
- SharedFragStack& operator=(const SharedFragStack&);
- }; // class SharedFragStack
-
class OrderedFragSet{
public:
// For calculating need for dynamically allocated memory.
@@ -339,11 +294,20 @@ private:
* param[in] capacity Max no of root fragments.
* @return 0 if ok, else errorcode
*/
- int prepare(NdbBulkAllocator& allocator,
- NdbQueryOptions::ScanOrdering ordering,
- int capacity,
- const NdbRecord* keyRecord,
- const NdbRecord* resultRecord);
+ void prepare(NdbBulkAllocator& allocator,
+ NdbQueryOptions::ScanOrdering ordering,
+ int capacity,
+ const NdbRecord* keyRecord,
+ const NdbRecord* resultRecord);
+
+ /**
+ * Add root fragments with completed ResultSets to this OrderedFragSet.
+ * The PollGuard mutex must locked, and under its protection
+ * completed root fragments are 'consumed' from rootFrags[] and
+ * added to OrderedFragSet where it become available for the
+ * application thread.
+ */
+ void prepareMoreResults(NdbRootFragment rootFrags[], Uint32 cnt); // Need mutex lock
/** Get the root fragment from which to read the next row.*/
NdbRootFragment* getCurrent() const;
@@ -355,9 +319,6 @@ private:
*/
void reorganize();
- /** Add a complete fragment that has been received.*/
- void add(NdbRootFragment& frag);
-
/** Reset object to an empty state.*/
void clear();
@@ -397,6 +358,9 @@ private:
OrderedFragSet(const OrderedFragSet&);
OrderedFragSet& operator=(const OrderedFragSet&);
+ /** Add a complete fragment that has been received.*/
+ void add(NdbRootFragment& frag);
+
/** For sorting fragment reads according to index value of first record.
* Also f1<f2 if f2 has reached end of data and f1 has not.
* @return 1 if f1>f2, 0 if f1==f2, -1 if f1<f2.*/
@@ -433,6 +397,14 @@ private:
/** Possible error status of this query.*/
NdbError m_error;
+
+ /**
+ * Possible error received from TC / datanodes.
+ * Only access w/ PollGuard mutex as it is set by receiver thread.
+ * Checked and moved into 'm_error' with ::hasReceivedError().
+ */
+ int m_errorReceived; // BEWARE: protect with PollGuard mutex
+
/** Transaction in which this query instance executes.*/
NdbTransaction& m_transaction;
@@ -452,7 +424,7 @@ private:
Uint32 m_globalCursor;
/** Number of root fragments not yet completed within the current batch.
- * Only access w/ PollGuard mutex as it is also updated by receiver threa
+ * Only access w/ PollGuard mutex as it is also updated by receiver thread
*/
Uint32 m_pendingFrags; // BEWARE: protect with PollGuard mutex
@@ -473,11 +445,6 @@ private:
*/
OrderedFragSet m_applFrags;
- /** Root frgaments that have received a complete batch. Shared between
- * application thread and receiving thread. Access should be mutex protected.
- */
- SharedFragStack m_fullFrags; // BEWARE: protect with PollGuard mutex
-
/** Number of root fragments for which confirmation for the final batch
* (with tcPtrI=RNIL) has been received. Observe that even if
* m_finalBatchFrags==m_rootFragCount, all tuples for the final batches may
@@ -557,7 +524,8 @@ private:
bool forceSend);
/** Wait for more scan results which already has been REQuested to arrive.
- * @return 0 if some rows did arrive, a negative value if there are errors (in m_error.code),
+ * @return 0 if some rows did arrive, a negative value if there are errors
+ * (in m_error.code),
* and 1 of there are no more rows to receive.
*/
FetchResult awaitMoreResults(bool forceSend);
No bundle (reason: useless for push emails).
| Thread |
|---|
| • bzr push into mysql-5.1-telco-7.0 branch (ole.john.aske:4461 to 4462) | Ole John Aske | 22 Aug |