3532 Ole John Aske 2011-08-16 [merge]
Merge
modified:
storage/ndb/src/ndbapi/NdbQueryOperation.cpp
storage/ndb/src/ndbapi/NdbQueryOperationImpl.hpp
3531 Ole John Aske 2011-08-15 [merge]
Merge
modified:
storage/ndb/src/ndbapi/NdbQueryOperation.cpp
=== modified file 'storage/ndb/src/ndbapi/NdbQueryOperation.cpp'
--- a/storage/ndb/src/ndbapi/NdbQueryOperation.cpp 2011-08-15 12:38:54 +0000
+++ b/storage/ndb/src/ndbapi/NdbQueryOperation.cpp 2011-08-16 08:02:39 +0000
@@ -148,6 +148,8 @@ public:
explicit NdbRootFragment();
+ ~NdbRootFragment();
+
/**
* Initialize object.
* @param query Enclosing query.
@@ -198,9 +200,11 @@ public:
* @param operationNo The id of the operation.
* @return The result stream for this root fragment.
*/
- NdbResultStream& getResultStream(Uint32 operationNo) const
- { return m_query->getQueryOperation(operationNo).getResultStream(m_fragNo); }
-
+ NdbResultStream& getResultStream(Uint32 operationNo) const;
+
+ NdbResultStream& getResultStream(const NdbQueryOperationImpl& op) const
+ { return getResultStream(op.getQueryOperationDef().getQueryOperationIx()); }
+
Uint32 getReceiverId() const;
Uint32 getReceiverTcPtrI() const;
@@ -215,6 +219,9 @@ public:
*/
bool isEmpty() const;
+ /** Release resources after last row has been returned */
+ void postFetchRelease();
+
private:
STATIC_CONST( voidFragNo = 0xffffffff);
@@ -224,6 +231,9 @@ private:
/** Number of the root operation fragment.*/
Uint32 m_fragNo;
+ /** For processing results originating from this root fragment (Array of).*/
+ NdbResultStream* m_resultStreams;
+
/**
* The number of outstanding TCKEYREF or TRANSID_AI
* messages for the fragment. This includes both messages related to the
@@ -275,26 +285,19 @@ public:
* @param operation The operation for which we will receive results.
* @param rootFragNo 0..n-1 when the root operation reads from n fragments.
*/
- explicit NdbResultStream(NdbQueryOperationImpl& operation, Uint32 rootFragNo);
+ explicit NdbResultStream(NdbQueryOperationImpl& operation,
+ NdbRootFragment& rootFrag);
~NdbResultStream();
/**
* Prepare for receiving first results.
- * @return possible error code.
*/
- int prepare();
+ void prepare();
/** Prepare for receiving next batch of scan results. */
void reset();
- /**
- * 0..n-1 if the root operation reads from n fragments. This stream holds data
- * derived from one of those fragments.
- */
- Uint32 getRootFragNo() const
- { return m_rootFragNo; }
-
NdbReceiver& getReceiver()
{ return m_receiver; }
@@ -418,22 +421,27 @@ public:
};
private:
- /** This stream handles results derived from the m_rootFragNo'th
- * fragment of the root operation.*/
- const Uint32 m_rootFragNo;
+ /**
+ * This stream handles results derived from specified
+ * m_rootFrag of the root operation.
+ */
+ const NdbRootFragment& m_rootFrag;
+
+ /** Operation to which this resultStream belong.*/
+ NdbQueryOperationImpl& m_operation;
/** The receiver object that unpacks transid_AI messages.*/
NdbReceiver m_receiver;
+ /** Used for checking if buffer overrun occurred. */
+ Uint32* m_batchOverflowCheck;
+
/** Max #rows which this stream may recieve in its buffer structures */
Uint32 m_maxRows;
/** The number of transid_AI messages received.*/
Uint32 m_rowCount;
- /** Operation to which this resultStream belong.*/
- NdbQueryOperationImpl& m_operation;
-
/** This is the state of the iterator used by firstResult(), nextResult().*/
enum
{
@@ -528,12 +536,15 @@ void* NdbBulkAllocator::allocObjMem(Uint
///////// NdbResultStream methods ///////////
//////////////////////////////////////////////
-NdbResultStream::NdbResultStream(NdbQueryOperationImpl& operation, Uint32 rootFragNo):
- m_rootFragNo(rootFragNo),
+NdbResultStream::NdbResultStream(NdbQueryOperationImpl& operation,
+ NdbRootFragment& rootFrag)
+:
+ m_rootFrag(rootFrag),
+ m_operation(operation),
m_receiver(operation.getQuery().getNdbTransaction().getNdb()),
+ m_batchOverflowCheck(NULL),
m_maxRows(0),
m_rowCount(0),
- m_operation(operation),
m_iterState(Iter_notStarted),
m_currentRow(tupleNotFound),
m_subScanComplete(true),
@@ -548,9 +559,12 @@ NdbResultStream::~NdbResultStream()
}
}
-int // Return 0 if ok, else errorcode
+void
NdbResultStream::prepare()
{
+ const Uint32 rowSize = m_operation.getRowSize();
+ NdbQueryImpl &query = m_operation.getQuery();
+
/* Parent / child correlation is only relevant for scan type queries
* Don't create m_parentTupleId[] and m_childTupleIdx[] for lookups!
* Neither is these structures required for operations not having respective
@@ -560,7 +574,7 @@ NdbResultStream::prepare()
{
m_maxRows = m_operation.getMaxBatchRows();
m_tupleSet =
- new (m_operation.getQuery().getTupleSetAlloc().allocObjMem(m_maxRows))
+ new (query.getTupleSetAlloc().allocObjMem(m_maxRows))
TupleSet[m_maxRows];
clearTupleSet();
@@ -568,9 +582,25 @@ NdbResultStream::prepare()
else
m_maxRows = 1;
- return 0;
-} //NdbResultStream::prepare
+ const int bufferSize = rowSize * m_maxRows;
+ NdbBulkAllocator& bufferAlloc = query.getRowBufferAlloc();
+ char* const rowBuf = reinterpret_cast<char*>(bufferAlloc
+ .allocObjMem(bufferSize));
+
+ // So that we can test for buffer overrun.
+ m_batchOverflowCheck =
+ reinterpret_cast<Uint32*>(bufferAlloc.allocObjMem(sizeof(Uint32)));
+ *m_batchOverflowCheck = 0xacbd1234;
+ m_receiver.init(NdbReceiver::NDB_QUERY_OPERATION, false, &m_operation);
+ m_receiver.do_setup_ndbrecord(
+ m_operation.getNdbRecord(),
+ m_maxRows,
+ 0 /*key_size*/,
+ 0 /*read_range_no*/,
+ rowSize,
+ rowBuf);
+} //NdbResultStream::prepare
void
NdbResultStream::reset()
@@ -592,7 +622,7 @@ NdbResultStream::reset()
childNo++)
{
NdbQueryOperationImpl& child = m_operation.getChildOperation(childNo);
- child.getResultStream(getRootFragNo()).reset();
+ m_rootFrag.getResultStream(child).reset();
}
} //NdbResultStream::reset
@@ -623,7 +653,7 @@ NdbResultStream::isAllSubScansComplete()
childNo++)
{
const NdbQueryOperationImpl& child = m_operation.getChildOperation(childNo);
- const NdbResultStream& childStream = child.getResultStream(getRootFragNo());
+ const NdbResultStream& childStream = m_rootFrag.getResultStream(child);
if (!childStream.isAllSubScansComplete())
return false;
}
@@ -744,7 +774,7 @@ NdbResultStream::firstResult()
Uint16 parentId = tupleNotFound;
if (parent!=NULL)
{
- const NdbResultStream& parentStream = parent->getResultStream(m_rootFragNo);
+ const NdbResultStream& parentStream = m_rootFrag.getResultStream(*parent);
parentId = parentStream.getCurrentTupleId();
if (parentId == tupleNotFound)
@@ -791,8 +821,8 @@ NdbResultStream::execTRANSID_AI(const Ui
{
const CorrelationData correlData(ptr, len);
- assert(m_operation.getRoot().getResultStream(m_rootFragNo)
- .m_receiver.getId() == correlData.getRootReceiverId());
+ assert(m_rootFrag.getResultStream(0).m_receiver.getId() ==
+ correlData.getRootReceiverId());
m_receiver.execTRANSID_AI(ptr, len - CorrelationData::wordCount);
@@ -829,6 +859,9 @@ NdbResultStream::execTRANSID_AI(const Ui
void
NdbResultStream::handleBatchComplete()
{
+ // Buffer overrun check.
+ assert(m_batchOverflowCheck==NULL || *m_batchOverflowCheck==0xacbd1234);
+
for (Uint32 tupleNo=0; tupleNo<getRowCount(); tupleNo++)
{
m_tupleSet[tupleNo].m_skip = false;
@@ -837,7 +870,7 @@ NdbResultStream::handleBatchComplete()
for (Uint32 childNo=0; childNo < m_operation.getNoOfChildOperations(); childNo++)
{
const NdbQueryOperationImpl& child = m_operation.getChildOperation(childNo);
- NdbResultStream& childStream = child.getResultStream(m_rootFragNo);
+ NdbResultStream& childStream = m_rootFrag.getResultStream(child);
childStream.handleBatchComplete();
const bool isInnerJoin = child.getQueryOperationDef().getMatchType() != NdbQueryOptions::MatchAll;
@@ -925,6 +958,7 @@ NdbRootFragment::receiverIdLookup(NdbRoo
NdbRootFragment::NdbRootFragment():
m_query(NULL),
m_fragNo(voidFragNo),
+ m_resultStreams(NULL),
m_outstandingResults(0),
m_confReceived(false),
m_idMapHead(-1),
@@ -932,11 +966,56 @@ NdbRootFragment::NdbRootFragment():
{
}
+NdbRootFragment::~NdbRootFragment()
+{
+ assert(m_resultStreams==NULL);
+}
+
void NdbRootFragment::init(NdbQueryImpl& query, Uint32 fragNo)
{
assert(m_fragNo==voidFragNo);
m_query = &query;
m_fragNo = fragNo;
+
+ m_resultStreams = reinterpret_cast<NdbResultStream*>
+ (query.getResultStreamAlloc().allocObjMem(query.getNoOfOperations()));
+ assert(m_resultStreams!=NULL);
+
+ for (unsigned opNo=0; opNo<query.getNoOfOperations(); opNo++)
+ {
+ NdbQueryOperationImpl& op = query.getQueryOperation(opNo);
+ new (&m_resultStreams[opNo]) NdbResultStream(op,*this);
+ m_resultStreams[opNo].prepare();
+ }
+}
+
+/**
+ * Release what we want need anymore after last available row has been
+ * returned from datanodes.
+ */
+void
+NdbRootFragment::postFetchRelease()
+{
+ if (m_resultStreams != NULL)
+ {
+ for (unsigned opNo=0; opNo<m_query->getNoOfOperations(); opNo++)
+ {
+ m_resultStreams[opNo].~NdbResultStream();
+ }
+ }
+ /**
+ * Don't 'delete' the object as it was in-place constructed from
+ * ResultStreamAlloc'ed memory. Memory is released by
+ * ResultStreamAlloc::reset().
+ */
+ m_resultStreams = NULL;
+}
+
+NdbResultStream&
+NdbRootFragment::getResultStream(Uint32 operationNo) const
+{
+ assert(m_resultStreams);
+ return m_resultStreams[operationNo];
}
void NdbRootFragment::reset()
@@ -1493,7 +1572,14 @@ NdbQueryImpl::~NdbQueryImpl()
void
NdbQueryImpl::postFetchRelease()
{
- if (m_operations != NULL) {
+ if (m_rootFrags != NULL)
+ {
+ for (unsigned i=0; i<m_rootFragCount; i++)
+ { m_rootFrags[i].postFetchRelease();
+ }
+ }
+ if (m_operations != NULL)
+ {
for (unsigned i=0; i<m_countOperations; i++)
{ m_operations[i].postFetchRelease();
}
@@ -2088,10 +2174,11 @@ NdbQueryImpl::awaitMoreResults(bool forc
returns: 'true' when application thread should be resumed.
*/
bool
-NdbQueryImpl::handleBatchComplete(Uint32 fragNo)
+NdbQueryImpl::handleBatchComplete(NdbRootFragment& rootFrag)
{
if (traceSignals) {
- ndbout << "NdbQueryImpl::handleBatchComplete, fragNo=" << fragNo
+ ndbout << "NdbQueryImpl::handleBatchComplete"
+ << ", fragNo=" << rootFrag.getFragNo()
<< ", pendingFrags=" << (m_pendingFrags-1)
<< ", finalBatchFrags=" << m_finalBatchFrags
<< endl;
@@ -2105,7 +2192,6 @@ NdbQueryImpl::handleBatchComplete(Uint32
if (likely(m_fullFrags.m_errorCode == 0))
{
NdbQueryOperationImpl& root = getRoot();
- NdbRootFragment& rootFrag = m_rootFrags[fragNo];
assert(rootFrag.isFragBatchComplete());
assert(m_pendingFrags > 0); // Check against underflow.
@@ -2121,7 +2207,7 @@ NdbQueryImpl::handleBatchComplete(Uint32
if (getQueryDef().isScanQuery())
{
// Only required for scans
- root.getResultStream(fragNo).handleBatchComplete();
+ rootFrag.getResultStream(root).handleBatchComplete();
// Only ordered scans has to wait until all pending completed
resume = (m_pendingFrags==0) ||
@@ -2129,7 +2215,7 @@ NdbQueryImpl::handleBatchComplete(Uint32
}
else
{
- assert(root.m_resultStreams[fragNo]->getReceiver().m_tcPtrI==RNIL);
+ assert(rootFrag.getResultStream(root).getReceiver().m_tcPtrI==RNIL);
assert(m_finalBatchFrags==1);
assert(m_pendingFrags==0); // Lookup query should be complete now.
resume = true;
@@ -2137,7 +2223,7 @@ NdbQueryImpl::handleBatchComplete(Uint32
/* Position at the first (sorted?) row available from this fragments.
*/
- root.m_resultStreams[fragNo]->firstResult();
+ rootFrag.getResultStream(root).firstResult();
/* When application thread ::awaitMoreResults() it will later be moved
* from m_fullFrags to m_applFrags under mutex protection.
@@ -2290,22 +2376,22 @@ NdbQueryImpl::execTCKEYCONF()
ndbout << "NdbQueryImpl::execTCKEYCONF()" << endl;
}
assert(!getQueryDef().isScanQuery());
+ NdbRootFragment& rootFrag = m_rootFrags[0];
// We will get 1 + #leaf-nodes TCKEYCONF for a lookup...
- m_rootFrags[0].setConfReceived();
- m_rootFrags[0].incrOutstandingResults(-1);
+ rootFrag.setConfReceived();
+ rootFrag.incrOutstandingResults(-1);
bool ret = false;
- if (m_rootFrags[0].isFragBatchComplete())
+ if (rootFrag.isFragBatchComplete())
{
- ret = handleBatchComplete(0);
+ ret = handleBatchComplete(rootFrag);
}
if (traceSignals) {
ndbout << "NdbQueryImpl::execTCKEYCONF(): returns:" << ret
<< ", m_pendingFrags=" << m_pendingFrags
- << ", *getRoot().m_resultStreams[0]="
- << *getRoot().m_resultStreams[0]
+ << ", rootStream= {" << rootFrag.getResultStream(0) << "}"
<< endl;
}
return ret;
@@ -2382,9 +2468,7 @@ NdbQueryImpl::prepareSend()
// Allocate space for ptrs to NdbResultStream and NdbRootFragment objects.
error = m_pointerAlloc.init(m_rootFragCount *
(SharedFragStack::pointersPerFragment +
- OrderedFragSet::pointersPerFragment +
- // Pointers to NdbResultStream objects.
- getNoOfOperations()));
+ OrderedFragSet::pointersPerFragment));
if (error != 0)
{
setErrorCode(error);
@@ -2395,21 +2479,22 @@ NdbQueryImpl::prepareSend()
getRoot().calculateBatchedRows(NULL);
getRoot().setBatchedRows(1);
- /** Calculate total amount of row buffer space for all operations and
- * fragments.*/
+ /**
+ * Calculate total amount of row buffer space for all operations and
+ * fragments.
+ */
Uint32 totalBuffSize = 0;
for (Uint32 opNo = 0; opNo < getNoOfOperations(); opNo++)
{
- NdbQueryOperationImpl& op = getQueryOperation(opNo);
-
- op.m_bufferSize = op.getRowSize() * op.getMaxBatchRows();
- totalBuffSize += op.m_bufferSize;
+ const NdbQueryOperationImpl& op = getQueryOperation(opNo);
+ totalBuffSize += (op.getRowSize() * op.getMaxBatchRows());
}
- /** Add one word per operation for buffer overrun check. We add a word
+ /**
+ * Add one word per ResultStream for buffer overrun check. We add a word
* rather than a byte to avoid possible alignment problems.
*/
- m_rowBufferAlloc.init(totalBuffSize * m_rootFragCount +
- sizeof(Uint32) * getNoOfOperations());
+ m_rowBufferAlloc.init(m_rootFragCount *
+ (totalBuffSize + (sizeof(Uint32) * getNoOfOperations())) );
if (getQueryDef().isScanQuery())
{
Uint32 totalRows = 0;
@@ -2424,15 +2509,28 @@ NdbQueryImpl::prepareSend()
return -1;
}
}
- // 1. Build receiver structures for each QueryOperation.
- // 2. Fill in parameters (into ATTRINFO) for QueryTree.
- // (Has to complete *after* ::prepareReceiver() as QueryTree params
- // refer receiver id's.)
- //
+
+ /**
+ * Allocate and initialize fragment state variables.
+ * Will also cause a ResultStream object containing a
+ * NdbReceiver to be constructed for each operation in QueryTree
+ */
+ m_rootFrags = new NdbRootFragment[m_rootFragCount];
+ if (m_rootFrags == NULL)
+ {
+ setErrorCode(Err_MemoryAlloc);
+ return -1;
+ }
+ for (Uint32 i = 0; i<m_rootFragCount; i++)
+ {
+ m_rootFrags[i].init(*this, i); // Set fragment number.
+ }
+
+ // Fill in parameters (into ATTRINFO) for QueryTree.
for (Uint32 i = 0; i < m_countOperations; i++) {
- int error;
- if (unlikely((error = m_operations[i].prepareReceiver()) != 0)
- || (error = m_operations[i].prepareAttrInfo(m_attrInfo)) != 0) {
+ const int error = m_operations[i].prepareAttrInfo(m_attrInfo);
+ if (unlikely(error))
+ {
setErrorCode(error);
return -1;
}
@@ -2468,23 +2566,6 @@ NdbQueryImpl::prepareSend()
return -1;
}
- /**
- * Allocate and initialize fragment state variables.
- */
- m_rootFrags = new NdbRootFragment[m_rootFragCount];
- if(m_rootFrags == NULL)
- {
- setErrorCode(Err_MemoryAlloc);
- return -1;
- }
- else
- {
- for(Uint32 i = 0; i<m_rootFragCount; i++)
- {
- m_rootFrags[i].init(*this, i); // Set fragment number.
- }
- }
-
if (getQueryDef().isScanQuery())
{
NdbRootFragment::buildReciverIdMap(m_rootFrags, m_rootFragCount);
@@ -2854,17 +2935,15 @@ Remark:
int
NdbQueryImpl::sendFetchMore(NdbRootFragment& emptyFrag, bool forceSend)
{
- assert(getRoot().m_resultStreams!=NULL);
assert(!emptyFrag.finalBatchReceived());
assert(getQueryDef().isScanQuery());
- const Uint32 fragNo = emptyFrag.getFragNo();
emptyFrag.reset();
for (unsigned opNo=0; opNo<m_countOperations; opNo++)
{
NdbResultStream& resultStream =
- getQueryOperation(opNo).getResultStream(fragNo);
+ emptyFrag.getResultStream(opNo);
if (!resultStream.isSubScanComplete())
{
@@ -3386,10 +3465,7 @@ NdbQueryOperationImpl::NdbQueryOperation
m_parent(NULL),
m_children(def.getNoOfChildOperations()),
m_maxBatchRows(0), // >0: User specified prefered value, ==0: Use default CFG values
- m_resultStreams(NULL),
m_params(),
- m_bufferSize(0),
- m_batchOverflowCheck(NULL),
m_resultBuffer(NULL),
m_resultRef(NULL),
m_isRowNull(true),
@@ -3438,10 +3514,10 @@ NdbQueryOperationImpl::NdbQueryOperation
NdbQueryOperationImpl::~NdbQueryOperationImpl()
{
- // We expect ::postFetchRelease to have deleted fetch related structures when fetch completed.
- // Either by fetching through last row, or calling ::close() which forcefully terminates fetch
- assert (m_batchOverflowCheck == NULL);
- assert (m_resultStreams == NULL);
+ /**
+ * We expect ::postFetchRelease to have deleted fetch related structures when fetch completed.
+ * Either by fetching through last row, or calling ::close() which forcefully terminates fetch
+ */
assert (m_firstRecAttr == NULL);
assert (m_interpretedCode == NULL);
} //NdbQueryOperationImpl::~NdbQueryOperationImpl()
@@ -3453,22 +3529,6 @@ NdbQueryOperationImpl::~NdbQueryOperatio
void
NdbQueryOperationImpl::postFetchRelease()
{
- // Buffer overrun check.
- assert(m_batchOverflowCheck==NULL || *m_batchOverflowCheck == 0xacbd1234);
- m_batchOverflowCheck = NULL;
-
- if (m_resultStreams != NULL)
- {
- for (int i = static_cast<int>(getQuery().getRootFragCount())-1; i >= 0; i--)
- {
- if (m_resultStreams[i] != NULL)
- {
- m_resultStreams[i]->~NdbResultStream();
- }
- }
- }
- m_resultStreams = NULL;
-
Ndb* const ndb = m_queryImpl.getNdbTransaction().getNdb();
NdbRecAttr* recAttr = m_firstRecAttr;
while (recAttr != NULL) {
@@ -3702,7 +3762,7 @@ NdbQueryOperationImpl::firstResult()
if (rootFrag != NULL)
{
- NdbResultStream& resultStream = *m_resultStreams[rootFrag->getFragNo()];
+ NdbResultStream& resultStream = rootFrag->getResultStream(*this);
if (resultStream.firstResult() != tupleNotFound)
{
fetchRow(resultStream);
@@ -3742,7 +3802,7 @@ NdbQueryOperationImpl::nextResult(bool f
const NdbRootFragment* rootFrag = m_queryImpl.m_applFrags.getCurrent();
if (rootFrag!=NULL)
{
- NdbResultStream& resultStream = *m_resultStreams[rootFrag->getFragNo()];
+ NdbResultStream& resultStream = rootFrag->getResultStream(*this);
if (resultStream.nextResult() != tupleNotFound)
{
fetchRow(resultStream);
@@ -4073,55 +4133,9 @@ NdbQueryOperationImpl::setBatchedRows(Ui
}
}
-
-int
-NdbQueryOperationImpl::prepareReceiver()
-{
- // Construct receiver streams and prepare them for receiving scan result
- assert(m_resultStreams==NULL);
- assert(m_queryImpl.getRootFragCount() > 0);
-
- m_resultStreams = reinterpret_cast<NdbResultStream**>
- (getQuery().getPointerAlloc().allocObjMem(m_queryImpl.getRootFragCount()));
-
- for(Uint32 i = 0; i<m_queryImpl.getRootFragCount(); i++) {
- m_resultStreams[i] = NULL; // Init to legal contents for d'tor
- }
- for(Uint32 i = 0; i<m_queryImpl.getRootFragCount(); i++) {
- m_resultStreams[i] = new (getQuery().m_resultStreamAlloc.allocObjMem(1))
- NdbResultStream(*this, i);
- const int error = m_resultStreams[i]->prepare();
- if (unlikely(error)) {
- return error;
- }
-
- m_resultStreams[i]->getReceiver().init(NdbReceiver::NDB_QUERY_OPERATION,
- false, this);
- char* const rowBuf = reinterpret_cast<char*>(getQuery().getRowBufferAlloc()
- .allocObjMem(m_bufferSize));
- m_resultStreams[i]->getReceiver()
- .do_setup_ndbrecord(m_ndbRecord,
- getMaxBatchRows(),
- 0 /*key_size*/,
- 0 /*read_range_no*/,
- getRowSize(),
- rowBuf);
- m_resultStreams[i]->getReceiver().prepareSend();
- }
- // So that we can test for for buffer overrun.
- m_batchOverflowCheck =
- reinterpret_cast<Uint32*>(getQuery().getRowBufferAlloc()
- .allocObjMem(sizeof(Uint32)));
- *m_batchOverflowCheck = 0xacbd1234;
- return 0;
-}//NdbQueryOperationImpl::prepareReceiver
-
int
NdbQueryOperationImpl::prepareAttrInfo(Uint32Buffer& attrInfo)
{
- // ::prepareReceiver() need to complete first:
- assert (m_resultStreams != NULL);
-
const NdbQueryOperationDefImpl& def = getQueryOperationDef();
/**
@@ -4604,14 +4618,14 @@ NdbQueryOperationImpl::execTRANSID_AI(co
}
// Process result values.
- m_resultStreams[rootFragNo]->execTRANSID_AI(ptr, len);
+ rootFrag->getResultStream(*this).execTRANSID_AI(ptr, len);
rootFrag->incrOutstandingResults(-1);
bool ret = false;
if (rootFrag->isFragBatchComplete())
{
- ret = m_queryImpl.handleBatchComplete(rootFragNo);
+ ret = m_queryImpl.handleBatchComplete(*rootFrag);
}
if (traceSignals) {
@@ -4653,7 +4667,6 @@ NdbQueryOperationImpl::execTCKEYREF(cons
}
}
- Uint32 rootFragNo = 0;
NdbRootFragment& rootFrag = getQuery().m_rootFrags[0];
if (ref->errorCode != DbspjErr::NodeFailure)
@@ -4678,13 +4691,12 @@ NdbQueryOperationImpl::execTCKEYREF(cons
bool ret = false;
if (rootFrag.isFragBatchComplete())
{
- ret = m_queryImpl.handleBatchComplete(rootFragNo);
+ ret = m_queryImpl.handleBatchComplete(rootFrag);
}
if (traceSignals) {
ndbout << "NdbQueryOperationImpl::execTCKEYREF(): returns:" << ret
- << ", *getRoot().m_resultStreams[0] {"
- << *getRoot().m_resultStreams[0] << "}"
+ << ", resultStream= {" << rootFrag.getResultStream(*this) << "}"
<< ", *this=" << *this << endl;
}
return ret;
@@ -4720,12 +4732,13 @@ NdbQueryOperationImpl::execSCAN_TABCONF(
rootFrag->incrOutstandingResults(rowCount);
// Handle for SCAN_NEXTREQ, RNIL -> EOF
- NdbResultStream& resultStream = *m_resultStreams[rootFrag->getFragNo()];
+ NdbResultStream& resultStream = rootFrag->getResultStream(*this);
resultStream.getReceiver().m_tcPtrI = tcPtrI;
if(traceSignals){
- ndbout << " resultStream(root) {" << resultStream << "} fragNo"
- << rootFrag->getFragNo() << endl;
+ ndbout << " resultStream {" << rootFrag->getResultStream(*this)
+ << "} fragNo" << rootFrag->getFragNo()
+ << endl;
}
const NdbQueryDefImpl& queryDef = m_queryImpl.getQueryDef();
@@ -4757,7 +4770,7 @@ NdbQueryOperationImpl::execSCAN_TABCONF(
if (rootFrag->isFragBatchComplete())
{
/* This fragment is now complete */
- ret = m_queryImpl.handleBatchComplete(rootFrag->getFragNo());
+ ret = m_queryImpl.handleBatchComplete(*rootFrag);
}
if (traceSignals) {
ndbout << "NdbQueryOperationImpl::execSCAN_TABCONF():, returns:" << ret
@@ -4893,13 +4906,6 @@ int NdbQueryOperationImpl::setBatchSize(
return 0;
}
-NdbResultStream&
-NdbQueryOperationImpl::getResultStream(Uint32 rootFragNo) const
-{
- assert(rootFragNo < getQuery().getRootFragCount());
- return *m_resultStreams[rootFragNo];
-}
-
bool
NdbQueryOperationImpl::hasInterpretedCode() const
{
@@ -4938,15 +4944,8 @@ NdbQueryOperationImpl::prepareInterprete
Uint32
NdbQueryOperationImpl::getIdOfReceiver() const {
- return m_resultStreams[0]->getReceiver().getId();
-}
-
-
-const NdbReceiver&
-NdbQueryOperationImpl::getReceiver(Uint32 recNo) const {
- assert(recNo<getQuery().getRootFragCount());
- assert(m_resultStreams!=NULL);
- return m_resultStreams[recNo]->getReceiver();
+ NdbRootFragment& rootFrag = m_queryImpl.m_rootFrags[0];
+ return rootFrag.getResultStream(*this).getReceiver().getId();
}
Uint32 NdbQueryOperationImpl::getRowSize() const
@@ -4975,7 +4974,8 @@ NdbOut& operator<<(NdbOut& out, const Nd
out << " m_queryImpl: " << &op.m_queryImpl;
out << " m_operationDef: " << &op.m_operationDef;
for(Uint32 i = 0; i<op.m_queryImpl.getRootFragCount(); i++){
- out << " m_resultStream[" << i << "]{" << *op.m_resultStreams[i] << "}";
+ NdbRootFragment& rootFrag = op.m_queryImpl.m_rootFrags[i];
+ out << " m_resultStream[" << i << "]{" << rootFrag.getResultStream(op) << "}";
}
out << " m_isRowNull " << op.m_isRowNull;
out << " ]";
=== modified file 'storage/ndb/src/ndbapi/NdbQueryOperationImpl.hpp'
--- a/storage/ndb/src/ndbapi/NdbQueryOperationImpl.hpp 2011-06-20 13:25:48 +0000
+++ b/storage/ndb/src/ndbapi/NdbQueryOperationImpl.hpp 2011-08-16 07:56:53 +0000
@@ -247,9 +247,15 @@ public:
Uint32 getRootFragCount() const
{ return m_rootFragCount; }
+ NdbBulkAllocator& getResultStreamAlloc()
+ { return m_resultStreamAlloc; }
+
NdbBulkAllocator& getTupleSetAlloc()
{ return m_tupleSetAlloc; }
+ NdbBulkAllocator& getRowBufferAlloc()
+ { return m_rowBufferAlloc; }
+
private:
/** Possible return values from NdbQueryImpl::awaitMoreResults.
* A subset of the integer values also matches those returned
@@ -577,13 +583,10 @@ private:
* the result.
* @return: 'true' if its time to resume appl. threads
*/
- bool handleBatchComplete(Uint32 rootFragNo);
+ bool handleBatchComplete(NdbRootFragment& rootFrag);
NdbBulkAllocator& getPointerAlloc()
{ return m_pointerAlloc; }
-
- NdbBulkAllocator& getRowBufferAlloc()
- { return m_rowBufferAlloc; }
}; // class NdbQueryImpl
@@ -705,10 +708,6 @@ public:
int setInterpretedCode(const NdbInterpretedCode& code);
bool hasInterpretedCode() const;
- NdbResultStream& getResultStream(Uint32 rootFragNo) const;
-
- const NdbReceiver& getReceiver(Uint32 rootFragNo) const;
-
/** Verify magic number.*/
bool checkMagicNumber() const
{ return m_magic == MAGIC; }
@@ -719,6 +718,12 @@ public:
Uint32 getMaxBatchRows() const
{ return m_maxBatchRows; }
+ /** Get size of row as required to buffer it. */
+ Uint32 getRowSize() const;
+
+ const NdbRecord* getNdbRecord() const
+ { return m_ndbRecord; }
+
private:
STATIC_CONST (MAGIC = 0xfade1234);
@@ -742,16 +747,9 @@ private:
/** Max rows (per resultStream) in a scan batch.*/
Uint32 m_maxBatchRows;
- /** For processing results from this operation (Array of).*/
- NdbResultStream** m_resultStreams;
/** Buffer for parameters in serialized format */
Uint32Buffer m_params;
- /** Buffer size allocated for *each* ResultStream/Receiver when
- * fetching results.*/
- Uint32 m_bufferSize;
- /** Used for checking if buffer overrun occurred. */
- Uint32* m_batchOverflowCheck;
/** User specified buffer for final storage of result.*/
char* m_resultBuffer;
/** User specified pointer to application pointer that should be
@@ -819,9 +817,6 @@ private:
Uint32 calculateBatchedRows(const NdbQueryOperationImpl* closestScan);
void setBatchedRows(Uint32 batchedRows);
- /** Construct and prepare receiver streams for result processing. */
- int prepareReceiver();
-
/** Prepare ATTRINFO for execution. (Add execution params++)
* @return possible error code.*/
int prepareAttrInfo(Uint32Buffer& attrInfo);
@@ -863,7 +858,6 @@ private:
bool diskInUserProjection() const
{ return m_diskInUserProjection; }
- Uint32 getRowSize() const;
}; // class NdbQueryOperationImpl
No bundle (reason: useless for push emails).
| Thread |
|---|
| • bzr push into mysql-5.1-telco-7.0-spj-scan-vs-scan branch(ole.john.aske:3531 to 3532) | Ole John Aske | 17 Aug |