3540 Ole John Aske 2011-08-17 [merge]
Merge
modified:
storage/ndb/src/ndbapi/NdbQueryOperation.cpp
3539 Ole John Aske 2011-08-17 [merge]
Merge
modified:
storage/ndb/include/ndbapi/NdbReceiver.hpp
storage/ndb/src/ndbapi/NdbQueryOperation.cpp
storage/ndb/src/ndbapi/NdbReceiver.cpp
=== modified file 'storage/ndb/src/ndbapi/NdbQueryOperation.cpp'
--- a/storage/ndb/src/ndbapi/NdbQueryOperation.cpp 2011-08-17 12:53:58 +0000
+++ b/storage/ndb/src/ndbapi/NdbQueryOperation.cpp 2011-08-17 13:09:14 +0000
@@ -325,6 +325,55 @@ private:
int m_idMapNext;
}; //NdbRootFragment
+/**
+ * 'class NdbResultSet' is a helper for 'class NdbResultStream'.
+ * It manages the buffers which rows are received into and
+ * read from.
+ */
+class NdbResultSet
+{
+ friend class NdbResultStream;
+
+public:
+ explicit NdbResultSet();
+
+ void init(NdbQueryImpl& query,
+ Uint32 maxRows, Uint32 rowSize);
+
+ void prepareReceive(NdbReceiver& receiver)
+ {
+ m_rowCount = 0;
+ receiver.prepareReceive(m_buffer);
+ }
+
+ void prepareRead(NdbReceiver& receiver)
+ {
+ receiver.prepareRead(m_buffer,m_rowCount);
+ }
+
+ Uint32 getRowCount() const
+ { return m_rowCount; }
+
+ char* getRow(Uint32 tupleNo) const
+ { return (m_buffer + (tupleNo*m_rowSize)); }
+
+private:
+ /** No copying.*/
+ NdbResultSet(const NdbResultSet&);
+ NdbResultSet& operator=(const NdbResultSet&);
+
+ /** The buffers which we receive the results into */
+ char* m_buffer;
+
+ /** Used for checking if buffer overrun occurred. */
+ Uint32* m_batchOverflowCheck;
+
+ Uint32 m_rowSize;
+
+ /** The current #rows in 'm_buffer'.*/
+ Uint32 m_rowCount;
+
+}; // class NdbResultSet
/**
* This class manages the subset of result data for one operation that is
@@ -362,11 +411,8 @@ public:
const NdbReceiver& getReceiver() const
{ return m_receiver; }
- Uint32 getRowCount() const
- { return m_rowCount; }
-
- char* getRow(Uint32 tupleNo) const
- { return (m_buffer + (tupleNo*m_rowSize)); }
+ const char* getCurrentRow() const
+ { return m_receiver.peek_row(); }
/**
* Process an incomming tuple for this stream. Extract parent and own tuple
@@ -386,9 +432,9 @@ public:
bool prepareResultSet(Uint32 remainingScans);
/**
- * Navigate within the current result batch to resp. first and next row.
+ * Navigate within the current ResultSet to resp. first and next row.
* For non-parent operations in the pushed query, navigation is with respect
- * to any preceding parents which results in this NdbResultStream depends on.
+ * to any preceding parents which results in this ResultSet depends on.
* Returns either the tupleNo within TupleSet[] which we navigated to, or
* tupleNotFound().
*/
@@ -504,25 +550,22 @@ private:
/** The receiver object that unpacks transid_AI messages.*/
NdbReceiver m_receiver;
- /** The buffers which we receive the results into */
- char* m_buffer;
-
- /** Used for checking if buffer overrun occurred. */
- Uint32* m_batchOverflowCheck;
-
- Uint32 m_rowSize;
-
- /** The number of transid_AI messages received.*/
- Uint32 m_rowCount;
+ /**
+ * ResultSets are received into and read from this stream,
+ * intended to be extended into double buffered ResultSet later.
+ */
+ NdbResultSet m_resultSets[1];
+ Uint32 m_read; // We read from m_resultSets[m_read]
+ Uint32 m_recv; // We receive into m_resultSets[m_recv]
/** This is the state of the iterator used by firstResult(), nextResult().*/
enum
{
/** The first row has not been fetched yet.*/
Iter_notStarted,
- /** Is iterating the resultset, (implies 'm_currentRow!=tupleNotFound').*/
+ /** Is iterating the ResultSet, (implies 'm_currentRow!=tupleNotFound').*/
Iter_started,
- /** Last row for current batch has been returned.*/
+ /** Last row for current ResultSet has been returned.*/
Iter_finished
} m_iterState;
@@ -600,6 +643,34 @@ void* NdbBulkAllocator::allocObjMem(Uint
return m_nextObjNo > m_maxObjs ? NULL : result;
}
+///////////////////////////////////////////
+///////// NdbResultSet methods ///////////
+///////////////////////////////////////////
+NdbResultSet::NdbResultSet() :
+ m_buffer(NULL),
+ m_batchOverflowCheck(NULL),
+ m_rowSize(0),
+ m_rowCount(0)
+{}
+
+void
+NdbResultSet::init(NdbQueryImpl& query,
+ Uint32 maxRows,
+ Uint32 rowSize)
+{
+ m_rowSize = rowSize;
+ {
+ const int bufferSize = rowSize * maxRows;
+ NdbBulkAllocator& bufferAlloc = query.getRowBufferAlloc();
+ m_buffer = reinterpret_cast<char*>(bufferAlloc.allocObjMem(bufferSize));
+
+ // So that we can test for buffer overrun.
+ m_batchOverflowCheck =
+ reinterpret_cast<Uint32*>(bufferAlloc.allocObjMem(sizeof(Uint32)));
+ *m_batchOverflowCheck = 0xacbd1234;
+ }
+}
+
//////////////////////////////////////////////
///////// NdbResultStream methods ///////////
//////////////////////////////////////////////
@@ -621,10 +692,7 @@ NdbResultStream::NdbResultStream(NdbQuer
| (operation.getQueryOperationDef().getMatchType() != NdbQueryOptions::MatchAll
? Is_Inner_Join : 0))),
m_receiver(operation.getQuery().getNdbTransaction().getNdb()),
- m_buffer(NULL),
- m_batchOverflowCheck(NULL),
- m_rowSize(0),
- m_rowCount(0),
+ m_resultSets(), m_read(0xffffffff), m_recv(0),
m_iterState(Iter_notStarted),
m_currentRow(tupleNotFound),
m_maxRows(0),
@@ -645,12 +713,8 @@ NdbResultStream::prepare()
const Uint32 rowSize = m_operation.getRowSize();
NdbQueryImpl &query = m_operation.getQuery();
- m_rowSize = rowSize;
-
/* Parent / child correlation is only relevant for scan type queries
- * Don't create m_parentTupleId[] and m_childTupleIdx[] for lookups!
- * Neither is these structures required for operations not having respective
- * child or parent operations.
+ * Don't create a m_tupleSet with these correlation id's for lookups!
*/
if (isScanQuery())
{
@@ -662,14 +726,7 @@ NdbResultStream::prepare()
else
m_maxRows = 1;
- const int bufferSize = rowSize * m_maxRows;
- NdbBulkAllocator& bufferAlloc = query.getRowBufferAlloc();
- m_buffer = reinterpret_cast<char*>(bufferAlloc.allocObjMem(bufferSize));
-
- // So that we can test for buffer overrun.
- m_batchOverflowCheck =
- reinterpret_cast<Uint32*>(bufferAlloc.allocObjMem(sizeof(Uint32)));
- *m_batchOverflowCheck = 0xacbd1234;
+ m_resultSets[0].init(query, m_maxRows, rowSize);
m_receiver.init(NdbReceiver::NDB_QUERY_OPERATION, false, &m_operation);
m_receiver.do_setup_ndbrecord(
@@ -678,7 +735,7 @@ NdbResultStream::prepare()
0 /*key_size*/,
0 /*read_range_no*/,
rowSize,
- m_buffer);
+ m_resultSets[m_recv].m_buffer);
} //NdbResultStream::prepare
void
@@ -686,12 +743,10 @@ NdbResultStream::reset()
{
assert (isScanQuery());
- // Root scan-operation need a ScanTabConf to complete
- m_rowCount = 0;
m_iterState = Iter_notStarted;
m_currentRow = tupleNotFound;
+ m_resultSets[m_recv].prepareReceive(m_receiver);
- m_receiver.prepareSend();
/**
* If this stream will get new rows in the next batch, then so will
* all of its descendants.
@@ -717,11 +772,11 @@ NdbResultStream::findTupleWithParentId(U
{
assert ((parentId==tupleNotFound) == (m_parent==NULL));
- if (likely(m_rowCount>0))
+ if (likely(m_resultSets[m_read].m_rowCount>0))
{
if (m_tupleSet==NULL)
{
- assert (m_rowCount <= 1);
+ assert (m_resultSets[m_read].m_rowCount <= 1);
return 0;
}
@@ -788,7 +843,7 @@ NdbResultStream::firstResult()
if ((m_currentRow=findTupleWithParentId(parentId)) != tupleNotFound)
{
m_iterState = Iter_started;
- m_receiver.setCurrentRow(m_buffer, m_currentRow);
+ m_receiver.setCurrentRow(m_resultSets[m_read].m_buffer, m_currentRow);
return m_currentRow;
}
@@ -805,7 +860,7 @@ NdbResultStream::nextResult()
(m_currentRow=findNextTuple(m_currentRow)) != tupleNotFound)
{
m_iterState = Iter_started;
- m_receiver.setCurrentRow(m_buffer, m_currentRow);
+ m_receiver.setCurrentRow(m_resultSets[m_read].m_buffer, m_currentRow);
return m_currentRow;
}
m_iterState = Iter_finished;
@@ -821,7 +876,7 @@ NdbResultStream::execTRANSID_AI(const Ui
TupleCorrelation correlation)
{
m_receiver.execTRANSID_AI(ptr, len);
- m_rowCount++;
+ m_resultSets[m_recv].m_rowCount++;
if (isScanQuery())
{
@@ -848,11 +903,14 @@ NdbResultStream::prepareResultSet(Uint32
bool isComplete = isSubScanComplete(remainingScans); //Childs with more rows
assert(isComplete || isScanResult()); //Lookups always 'complete'
- // Set correct #rows received in the NdbReceiver.
- getReceiver().m_result_rows = getRowCount();
+ m_read = m_recv;
+ NdbResultSet& readResult = m_resultSets[m_read];
+
+ // Set correct buffer and #rows received by this ResultSet.
+ readResult.prepareRead(m_receiver);
/**
- * Prepare NdbResultStream for reading - either the next received
+ * Prepare NdbResultSet for reading - either the next received
* from datanodes or reuse current.
*/
if (m_tupleSet!=NULL)
@@ -865,7 +923,7 @@ NdbResultStream::prepareResultSet(Uint32
else
{
// Makes all rows in 'TupleSet' available (clear 'm_skip' flag)
- for (Uint32 tupleNo=0; tupleNo<getRowCount(); tupleNo++)
+ for (Uint32 tupleNo=0; tupleNo<readResult.getRowCount(); tupleNo++)
{
m_tupleSet[tupleNo].m_skip = false;
}
@@ -892,7 +950,7 @@ NdbResultStream::prepareResultSet(Uint32
if (m_tupleSet!=NULL)
{
- for (Uint32 tupleNo=0; tupleNo<getRowCount(); tupleNo++)
+ for (Uint32 tupleNo=0; tupleNo<readResult.getRowCount(); tupleNo++)
{
if (!m_tupleSet[tupleNo].m_skip)
{
@@ -929,20 +987,23 @@ NdbResultStream::prepareResultSet(Uint32
/**
* Fill m_tupleSet[] with correlation data between parent
* and child tuples. The 'TupleCorrelation' is stored as
- * and extra Uint32 after each row received
+ * and extra Uint32 after each row in the NdbResultSet
* by execTRANSID_AI().
*
* NOTE: In order to reduce work done when holding the
* transporter mutex, the 'TupleCorrelation' is only stored
* in the buffer when it arrives. Later (here) we build the
* correlation hashMap immediately before we prepare to
- * read the NdbResultStream.
+ * read the NdbResultSet.
*/
void
NdbResultStream::buildResultCorrelations()
{
+ const NdbResultSet& readResult = m_resultSets[m_read];
+
// Buffer overrun check.
- assert(m_batchOverflowCheck==NULL || *m_batchOverflowCheck==0xacbd1234);
+ assert(readResult.m_batchOverflowCheck==NULL ||
+ *readResult.m_batchOverflowCheck==0xacbd1234);
//if (m_tupleSet!=NULL)
{
@@ -952,10 +1013,10 @@ NdbResultStream::buildResultCorrelations
m_tupleSet[i].m_hash_head = tupleNotFound;
}
- /* Rebuild correlation & hashmap from received buffers */
- for (Uint32 tupleNo=0; tupleNo<m_rowCount; tupleNo++)
+ /* Rebuild correlation & hashmap from 'readResult' */
+ for (Uint32 tupleNo=0; tupleNo<readResult.m_rowCount; tupleNo++)
{
- const Uint32* row = (Uint32*)getRow(tupleNo+1);
+ const Uint32* row = (Uint32*)readResult.getRow(tupleNo+1);
const TupleCorrelation correlation(row[-1]);
const Uint16 tupleId = correlation.getTupleId();
@@ -3941,7 +4002,7 @@ NdbQueryOperationImpl::nextResult(bool f
void
NdbQueryOperationImpl::fetchRow(NdbResultStream& resultStream)
{
- const char* buff = resultStream.getReceiver().peek_row();
+ const char* buff = resultStream.getCurrentRow();
assert(buff!=NULL || (m_firstRecAttr==NULL && m_ndbRecord==NULL));
m_isRowNull = false;
@@ -5090,7 +5151,7 @@ NdbOut& operator<<(NdbOut& out, const Nd
}
NdbOut& operator<<(NdbOut& out, const NdbResultStream& stream){
- out << " m_rowCount: " << stream.m_rowCount;
+ out << " received rows: " << stream.m_resultSets[stream.m_recv].getRowCount();
return out;
}
No bundle (reason: useless for push emails).
| Thread |
|---|
| • bzr push into mysql-5.1-telco-7.0-spj-scan-vs-scan branch(ole.john.aske:3539 to 3540) | Ole John Aske | 22 Aug |