#At file:///net/fimafeng09/export/home/tmp/oleja/mysql/mysql-5.1-telco-7.0-spj-scan-scan/ based on revid:jonas@stripped
3340 Ole John Aske 2010-11-05
sps-svs: Fixed improper handling of SCANREF's, timeout and nodefailure when
wait_scan() wait for more results to arrive.
Fix introduce the member field 'm_errorCode' in class SharedFragStack. m_errorCode is used
to temporary store errors received from TC / datanodes. As these error are received
asyncronous by the receiver thread, they should not be stored directly into m_error.code.
Instead we check m_errorCode for errors whenever we have locked the PollGuard mutex. At
this point any received errors may be made moved into 'm_error.code' where it is made
visible, and returned to the aplication.
This fix also contains several fixed for ::closeTcCursor() related to when we should
wait for pending results, and when to send a 'close' to TC.
Furthermore, execTRANSID_ID has been enhanced to ignore counting of 'pending' fragments
if we are in an error situation. These are 'leftover' TRANSID_AI's arriving after a SCANREF, and
before TC and LQH has been requested to close its cursors.
modified:
storage/ndb/src/ndbapi/NdbQueryOperation.cpp
storage/ndb/src/ndbapi/NdbQueryOperationImpl.hpp
storage/ndb/src/ndbapi/NdbTransactionScan.cpp
=== modified file 'storage/ndb/src/ndbapi/NdbQueryOperation.cpp'
--- a/storage/ndb/src/ndbapi/NdbQueryOperation.cpp 2010-11-02 11:36:28 +0000
+++ b/storage/ndb/src/ndbapi/NdbQueryOperation.cpp 2010-11-05 14:45:27 +0000
@@ -47,7 +47,7 @@ STATIC_CONST(Err_MemoryAlloc = 4000);
STATIC_CONST(Err_SendFailed = 4002);
STATIC_CONST(Err_FunctionNotImplemented = 4003);
STATIC_CONST(Err_UnknownColumn = 4004);
-STATIC_CONST(Err_ReceiveFromNdbFailed = 4008);
+STATIC_CONST(Err_ReceiveTimedOut = 4008);
STATIC_CONST(Err_NodeFailCausedAbort = 4028);
STATIC_CONST(Err_WrongFieldLength = 4209);
STATIC_CONST(Err_MixRecAttrAndRecord = 4284);
@@ -1688,76 +1688,77 @@ NdbQueryImpl::nextRootResult(bool fetchA
* accessed by the application thread, so it is safe to use it without
* locks.
*/
- while (m_state != EndOfData) // Or likely: return when 'gotRow'
+ while (m_state != EndOfData) // Or likely: return when 'gotRow' or error
{
const NdbRootFragment* rootFrag = m_applFrags.getCurrent();
if (unlikely(rootFrag==NULL))
{
/* m_applFrags is empty, so we cannot get more results without
* possibly blocking.
+ *
+ * ::awaitMoreResults() will either copy fragments that are already
+ * complete (under mutex protection), or block until data
+ * previously requested arrives.
*/
- if (fetchAllowed)
- {
- /* fetchMoreResults() will either copy fragments that are already
- * complete (under mutex protection), or block until more data arrives.
- */
- const FetchResult fetchResult = awaitMoreResults(forceSend);
- switch (fetchResult) {
-
- case FetchResult_ok:
- break;
-
- case FetchResult_otherError:
- assert (m_error.code != 0);
- setErrorCode(m_error.code);
- return NdbQuery::NextResult_error;
-
- case FetchResult_sendFail:
- setErrorCode(Err_NodeFailCausedAbort);
- return NdbQuery::NextResult_error;
-
- case FetchResult_nodeFail:
- setErrorCode(Err_NodeFailCausedAbort);
- return NdbQuery::NextResult_error;
+ const FetchResult fetchResult = awaitMoreResults(forceSend);
+ switch (fetchResult) {
- case FetchResult_timeOut:
- setErrorCode(Err_ReceiveFromNdbFailed);
- return NdbQuery::NextResult_error;
+ case FetchResult_ok: // OK - got data wo/ error
+ assert (m_error.code == 0);
+ rootFrag = m_applFrags.getCurrent();
+ assert (rootFrag!=NULL);
+ break;
- case FetchResult_scanComplete:
- getRoot().nullifyResult();
+ case FetchResult_noMoreData: // No data, no error
+ assert (m_error.code == 0);
+ assert (m_applFrags.getCurrent()==NULL);
+ getRoot().nullifyResult();
+ if (!fetchAllowed)
+ {
+ return NdbQuery::NextResult_bufferEmpty;
+ }
+ else if (m_finalBatchFrags == getRootFragCount())
+ {
+ m_state = EndOfData;
+ postFetchRelease();
return NdbQuery::NextResult_scanComplete;
-
- default:
- assert(false);
}
+ break; // ::sendFetchMore() will request more results
+
+ case FetchResult_gotError: // Error in 'm_error.code'
+ assert (m_error.code != 0);
+ return NdbQuery::NextResult_error;
+
+ default:
+ assert(false);
}
- else
- {
- // There are no more cached records in NdbApi
- getRoot().nullifyResult();
- return NdbQuery::NextResult_bufferEmpty;
- }
- rootFrag = m_applFrags.getCurrent();
}
else
{
assert(rootFrag->isFragBatchComplete());
rootFrag->getResultStream(0).nextResult(); // Consume current
- m_applFrags.reorganize();
+ m_applFrags.reorganize(); // Calculate new current
// Reorg. may update 'current' RootFragment
rootFrag = m_applFrags.getCurrent();
+ }
+ /**
+ * If allowed to request more rows from datanodes, we do this asynch
+ * and request more rows as soon as we have consumed all rows from a
+ * fragment. ::awaitMoreResults() may eventually block and wait for these
+ * when required.
+ */
+ if (fetchAllowed)
+ {
// Ask for a new batch if we emptied one.
- NdbRootFragment* const emptyFrag = m_applFrags.getEmpty();
- if (emptyFrag != NULL)
+ NdbRootFragment* emptyFrag = m_applFrags.getEmpty();
+ while (emptyFrag != NULL)
{
if (sendFetchMore(*emptyFrag, forceSend) != 0)
{
- setErrorCode(Err_NodeFailCausedAbort); // Node fail
return NdbQuery::NextResult_error;
}
- assert(m_applFrags.getEmpty() == NULL);
+ emptyFrag = m_applFrags.getEmpty();
}
}
@@ -1767,13 +1768,18 @@ NdbQueryImpl::nextRootResult(bool fetchA
getRoot().fetchRow(rootFrag->getResultStream(0));
return NdbQuery::NextResult_gotRow;
}
- }
+ } // m_state != EndOfData
assert (m_state == EndOfData);
return NdbQuery::NextResult_scanComplete;
} //NdbQueryImpl::nextRootResult()
+/**
+ * Wait for more scan results which already has been REQuested to arrive.
+ * @return 0 if some rows did arrive, a negative value if there are errors (in m_error.code),
+ * and 1 of there are no more rows to receive.
+ */
NdbQueryImpl::FetchResult
NdbQueryImpl::awaitMoreResults(bool forceSend)
{
@@ -1783,83 +1789,77 @@ NdbQueryImpl::awaitMoreResults(bool forc
if (m_queryDef.isScanQuery())
{
assert (m_scanTransaction);
+ assert (m_state==Executing);
Ndb* const ndb = m_transaction.getNdb();
- Uint32 timeout = ndb->theImpl->get_waitfor_timeout();
-
- while (likely(m_error.code==0))
{
- {
- /* This part needs to be done under mutex due to synchronization with
- * receiver thread.
- */
- PollGuard poll_guard(*ndb->theImpl);
- assert (m_state==Executing);
+ /* This part needs to be done under mutex due to synchronization with
+ * receiver thread.
+ */
+ PollGuard poll_guard(*ndb->theImpl);
+ /* There may be pending (asynchronous received, mutex protected) errors
+ * from TC / datanodes. Propogate these into m_error.code in 'API space'.
+ */
+ while (likely(!hasReceivedError()))
+ {
/* m_fullFrags contains any fragments that are complete (for this batch)
* but have not yet been moved (under mutex protection) to
* m_applFrags.
*/
- if (m_fullFrags.size()==0) {
- if (m_pendingFrags == 0)
- {
- assert(m_finalBatchFrags == getRootFragCount());
- m_state = EndOfData;
- postFetchRelease();
- return FetchResult_scanComplete;
- }
- else
- {
- /* More results are on the way, so we wait for them.*/
- const FetchResult waitResult = static_cast<FetchResult>
- (poll_guard.wait_scan(3*timeout,
- m_transaction.getConnectedNodeId(),
- forceSend));
- if (waitResult != FetchResult_ok)
- {
- return waitResult;
- }
- }
- } // if (m_fullFrags.size()==0)
-
NdbRootFragment* frag;
while ((frag=m_fullFrags.pop()) != NULL)
{
m_applFrags.add(*frag);
}
- }
+ if (m_applFrags.getCurrent() != NULL)
+ {
+ return FetchResult_ok;
+ }
- /**
- * We may get batches that are empty but still not the final batch
- * for that fragment. If so, ask for a new batch.
- */
- NdbRootFragment* emptyFrag = m_applFrags.getEmpty();
- while (emptyFrag != NULL)
- {
- if (unlikely(sendFetchMore(*emptyFrag, forceSend) != 0))
+ /* There are noe more available frament results available without
+ * first waiting for more to be received from datanodes
+ */
+ if (m_pendingFrags == 0)
{
- return FetchResult_sendFail;
- }
- emptyFrag = m_applFrags.getEmpty();
- }
+ // 'No more *pending* results', ::sendFetchMore() may make more available
+ return FetchResult_noMoreData;
+ }
- if (m_applFrags.getCurrent() != NULL)
- {
- return FetchResult_ok;
- }
+ /* More results are on the way, so we wait for them.*/
+ const FetchResult waitResult = static_cast<FetchResult>
+ (poll_guard.wait_scan(3*ndb->theImpl->get_waitfor_timeout(),
+ m_transaction.getConnectedNodeId(),
+ forceSend));
- /* Getting here is not an error. PollGuard::wait_scan() will return
- * when a complete batch (for a fragment) is available for *any* active
- * scan in this transaction. So we must wait again for the next arriving
- * batch.
- */
- } // while(likely(m_error.code==0))
+ switch (waitResult) {
+ case FetchResult_ok: // SCAN_TABREF, may have setFetchTerminated() w/ errors
+ break;
+ case FetchResult_timeOut:
+ setFetchTerminated(Err_ReceiveTimedOut,false);
+ break;
+ case FetchResult_nodeFail:
+ setFetchTerminated(Err_NodeFailCausedAbort,false);
+ break;
+ default:
+ assert(false);
+ }
+ assert (!m_error.code);
- // 'while' terminated by m_error.code
- assert (m_error.code);
- return FetchResult_otherError;
+ /* Getting here is not an error. PollGuard::wait_scan() will return
+ * when a complete batch (for a fragment) is available for *any* active
+ * scan in this transaction. So we must wait again for the next arriving
+ * batch.
+ */
+ } // while(!hasReceivedError())
+ } // Terminates scope of 'PollGuard'
- } else { // is a Lookup query
+ // Fall through only if ::hasReceivedError()
+ assert (m_error.code);
+ return FetchResult_gotError;
+ }
+ else // is a Lookup query
+ {
/* The root operation is a lookup. Lookups are guaranteed to be complete
* before NdbTransaction::execute() returns. Therefore we do not set
* the lock, because we know that the signal receiver thread will not
@@ -1873,9 +1873,7 @@ NdbQueryImpl::awaitMoreResults(bool forc
* - There was no matching row for an inner join.
* - or, the application called nextResult() twice for a lookup query.
*/
- m_state = EndOfData;
- postFetchRelease();
- return FetchResult_scanComplete;
+ return FetchResult_noMoreData;
}
else
{
@@ -1887,7 +1885,7 @@ NdbQueryImpl::awaitMoreResults(bool forc
return FetchResult_ok;
}
} // if(m_queryDef.isScanQuery())
-} //NdbQueryImpl::fetchMoreResults
+} //NdbQueryImpl::awaitMoreResults
void
NdbQueryImpl::handleBatchComplete(Uint32 fragNo)
@@ -1918,6 +1916,7 @@ NdbQueryImpl::closeSingletonScans()
if (getRoot().m_resultStreams[0]->firstResult() != tupleNotFound) {
m_fullFrags.push(m_rootFrags[0]);
}
+ m_finalBatchFrags++;
} //NdbQueryImpl::closeSingletonScans
int
@@ -1991,6 +1990,42 @@ NdbQueryImpl::setErrorCodeAbort(int aErr
}
+/*
+ * ::setFetchTerminated() Should only be called with mutex locked.
+ * Register result fetching as completed (possibly prematurely, w/ errorCode).
+ */
+void
+NdbQueryImpl::setFetchTerminated(int errorCode, bool needClose)
+{
+ assert(m_finalBatchFrags < getRootFragCount());
+ if (!needClose)
+ {
+ m_finalBatchFrags = getRootFragCount();
+ }
+ if (errorCode!=0)
+ {
+ m_fullFrags.m_errorCode = errorCode;
+ }
+ m_pendingFrags = 0;
+} // NdbQueryImpl::setFetchTerminated()
+
+
+/* There may be pending (asynchronous received, mutex protected) errors
+ * from TC / datanodes. Propogate these into 'API space'.
+ * ::hasReceivedError() Should only be called with mutex locked
+ */
+bool
+NdbQueryImpl::hasReceivedError()
+{
+ if (unlikely(m_fullFrags.m_errorCode))
+ {
+ setErrorCode(m_fullFrags.m_errorCode);
+ return true;
+ }
+ return false;
+} // NdbQueryImpl::hasReceivedError
+
+
bool
NdbQueryImpl::execTCKEYCONF()
{
@@ -2023,18 +2058,13 @@ NdbQueryImpl::execTCKEYCONF()
}
void
-NdbQueryImpl::execCLOSE_SCAN_REP(bool needClose)
+NdbQueryImpl::execCLOSE_SCAN_REP(int errorCode, bool needClose)
{
if (traceSignals)
{
ndbout << "NdbQueryImpl::execCLOSE_SCAN_REP()" << endl;
}
- assert(m_finalBatchFrags < getRootFragCount());
- m_pendingFrags = 0;
- if(!needClose)
- {
- m_finalBatchFrags = getRootFragCount();
- }
+ setFetchTerminated(errorCode,needClose);
}
/*
@@ -2044,6 +2074,15 @@ NdbQueryImpl::execCLOSE_SCAN_REP(bool ne
bool
NdbQueryImpl::incrementPendingFrags(int increment)
{
+ if (unlikely(m_fullFrags.m_errorCode != 0))
+ {
+ /* Received fragment data after a SCANREF() (timeout?) has terminated the scan.
+ * We are about to close this query, and didn't expect there to be anything 'pending'.
+ */
+ assert (m_pendingFrags==0);
+ return (m_pendingFrags==0);
+ }
+
m_pendingFrags += increment;
assert(m_pendingFrags < 1<<15); // Check against underflow.
assert(m_pendingFrags <= m_rootFragCount); // .... and overflow
@@ -2530,7 +2569,6 @@ NdbQueryImpl::doSend(int nodeId, bool la
int sendFetchMore() - Fetch another scan batch, optionaly closing the scan
Request another batch of rows to be retrieved from the scan.
- Transporter mutex is locked before this method is called.
Return Value: 0 if send succeeded, -1 otherwise.
Parameters: emptyFrag: Root frgament for which to ask for another batch.
@@ -2600,6 +2638,12 @@ NdbQueryImpl::sendFetchMore(NdbRootFragm
* receiver thread.
*/
PollGuard poll_guard(*ndb.theImpl);
+
+ if (unlikely(hasReceivedError()))
+ {
+ // Errors arrived inbetween ::await released mutex, and fetchMore grabbed it
+ return -1;
+ }
const int res =
facade->sendSignal(&tSignal,
getNdbTransaction().getConnectedNodeId(),
@@ -2607,7 +2651,7 @@ NdbQueryImpl::sendFetchMore(NdbRootFragm
1);
if (unlikely(res == -1))
{
- // Error: 'Send to NDB failed'
+ setErrorCode(Err_NodeFailCausedAbort);
return -1;
}
m_pendingFrags++;
@@ -2639,40 +2683,49 @@ NdbQueryImpl::closeTcCursor(bool forceSe
*/
PollGuard poll_guard(*ndb->theImpl);
+//Uint32 seq = m_transaction->theNodeSequence;
+//if (seq != tp->getNodeSequence(m_transaction.getConnectedNodeId())) // TODO
+//{}
+
/* Wait for outstanding scan results from current batch fetch */
- while (m_error.code==0 && !isBatchComplete())
+ while (m_pendingFrags > 0)
{
const FetchResult waitResult = static_cast<FetchResult>
(poll_guard.wait_scan(3*timeout,
m_transaction.getConnectedNodeId(),
forceSend));
switch (waitResult) {
- case FetchResult_ok:
+ case FetchResult_ok: // SCAN_TABREF, may have setFetchTerminated() w/ errors
break;
- case FetchResult_nodeFail:
- setErrorCode(Err_NodeFailCausedAbort); // Node fail
- return -1;
case FetchResult_timeOut:
- setErrorCode(Err_ReceiveFromNdbFailed); // Timeout
- return -1;
+ setFetchTerminated(Err_ReceiveTimedOut,false);
+ break;
+ case FetchResult_nodeFail:
+ setFetchTerminated(Err_NodeFailCausedAbort,false);
+ break;
default:
assert(false);
}
+ if (hasReceivedError())
+ {
+ break;
+ }
} // while
- assert(m_pendingFrags==0 || m_error.code != 0);
- m_error.code = 0; // Ignore possible errorcode caused by previous fetching
-
- /* Discard pending result in order to not confuse later counting of m_finalBatchFrags */
- m_fullFrags.clear();
+ assert(m_pendingFrags==0);
+ m_fullFrags.clear(); // Throw any unhandled results
+ m_fullFrags.m_errorCode = 0; // Clear errors caused by previous fetching
+ m_error.code = 0;
- if (m_finalBatchFrags<getRootFragCount()) // TC has an open scan cursor.
+ if (m_finalBatchFrags < getRootFragCount()) // TC has an open scan cursor.
{
- /* Send SCANREQ(close) */
+ /* Send SCAN_NEXTREQ(close) */
const int error = sendClose(m_transaction.getConnectedNodeId());
if (unlikely(error))
return error;
+ assert(m_finalBatchFrags+m_pendingFrags==getRootFragCount());
+
/* Wait for close to be confirmed: */
while (m_pendingFrags > 0)
{
@@ -2682,49 +2735,35 @@ NdbQueryImpl::closeTcCursor(bool forceSe
forceSend));
switch (waitResult) {
case FetchResult_ok:
- if (unlikely(m_error.code)) // Close request itself failed, keep error
- {
- setErrorCode(m_error.code);
- return -1;
- }
- NdbRootFragment* frag;
- while ((frag=m_fullFrags.pop()) != NULL)
- {
- if (frag->finalBatchReceived())
- {
- // This was the final batch for that root fragment.
- m_finalBatchFrags++;
- }
- }
break;
- case FetchResult_nodeFail:
- setErrorCode(Err_NodeFailCausedAbort); // Node fail
- return -1;
case FetchResult_timeOut:
- setErrorCode(Err_ReceiveFromNdbFailed); // Timeout
- return -1;
+ setFetchTerminated(Err_ReceiveTimedOut,false);
+ break;
+ case FetchResult_nodeFail:
+ setFetchTerminated(Err_NodeFailCausedAbort,false);
+ break;
default:
assert(false);
}
+ if (hasReceivedError())
+ {
+ break;
+ }
} // while
} // if
- assert(m_finalBatchFrags == getRootFragCount());
return 0;
} //NdbQueryImpl::closeTcCursor
/*
- This method is called with the PollGuard mutex held on the transporter.
-*/
+ * This method is called with the PollGuard mutex held on the transporter.
+ */
int
NdbQueryImpl::sendClose(int nodeId)
{
assert(m_finalBatchFrags < getRootFragCount());
-
m_pendingFrags = getRootFragCount() - m_finalBatchFrags;
- assert(m_pendingFrags > 0);
- assert(m_pendingFrags < 1<<15); // Check against underflow.
Ndb& ndb = *m_transaction.getNdb();
NdbApiSignal tSignal(&ndb);
@@ -2746,28 +2785,6 @@ NdbQueryImpl::sendClose(int nodeId)
} // NdbQueryImpl::sendClose()
-/*
- This method is always called with PollGuard mutex held on the transporter.
- (As m_pendingFrags is accessed both from API sender and receiver)
-*/
-bool
-NdbQueryImpl::isBatchComplete() const {
-#ifndef NDEBUG
- if (!m_error.code)
- {
- Uint32 count = 0;
- for(Uint32 i = 0; i < getRootFragCount(); i++){
- if(!m_rootFrags[i].isFragBatchComplete()){
- count++;
- }
- }
- assert(count == m_pendingFrags);
- }
-#endif
- return m_pendingFrags == 0;
-}
-
-
int NdbQueryImpl::isPrunable(bool& prunable)
{
if (m_prunability == Prune_Unknown)
@@ -2788,17 +2805,18 @@ int NdbQueryImpl::isPrunable(bool& pruna
/****************
- * NdbQueryImpl::FragStack methods.
+ * NdbQueryImpl::SharedFragStack methods.
***************/
-NdbQueryImpl::FragStack::FragStack():
+NdbQueryImpl::SharedFragStack::SharedFragStack():
+ m_errorCode(0),
m_capacity(0),
m_current(-1),
- m_array(NULL){
-}
+ m_array(NULL)
+{}
int
-NdbQueryImpl::FragStack::prepare(int capacity)
+NdbQueryImpl::SharedFragStack::prepare(int capacity)
{
assert(m_array==NULL);
assert(m_capacity==0);
@@ -2812,7 +2830,8 @@ NdbQueryImpl::FragStack::prepare(int cap
}
void
-NdbQueryImpl::FragStack::push(NdbRootFragment& frag){
+NdbQueryImpl::SharedFragStack::push(NdbRootFragment& frag)
+{
m_current++;
assert(m_current<m_capacity);
m_array[m_current] = &frag;
@@ -4323,7 +4342,8 @@ NdbQueryOperationImpl::execTRANSID_AI(co
bool ret = false;
NdbRootFragment* rootFrag = NULL;
- if(getQueryDef().isScanQuery()){
+ if(getQueryDef().isScanQuery())
+ {
const Uint32 receiverId = CorrelationData(ptr, len).getRootReceiverId();
/** receiverId holds the Id of the receiver of the corresponding stream
@@ -4360,7 +4380,9 @@ NdbQueryOperationImpl::execTRANSID_AI(co
// Wake up appl thread when we have data, or entire query batch completed.
ret = true;
}
- } else { // Lookup query
+ }
+ else
+ { // Lookup query
// The root operation is a lookup.
m_resultStreams[0]->execTRANSID_AI(ptr, len);
=== modified file 'storage/ndb/src/ndbapi/NdbQueryOperationImpl.hpp'
--- a/storage/ndb/src/ndbapi/NdbQueryOperationImpl.hpp 2010-10-29 21:21:45 +0000
+++ b/storage/ndb/src/ndbapi/NdbQueryOperationImpl.hpp 2010-11-05 14:45:27 +0000
@@ -152,7 +152,7 @@ public:
bool execTCKEYCONF();
/** Process SCAN_TABCONF w/ EndOfData which is a 'Close Scan Reply'. */
- void execCLOSE_SCAN_REP(bool isClosed);
+ void execCLOSE_SCAN_REP(int errorCode, bool needClose);
/** Determines if query has completed and may be garbage collected
* A query is not considder complete until the client has
@@ -199,24 +199,30 @@ public:
{ return m_rootFragCount; }
private:
- /** Possible return values from NdbQuery::awaitMoreResults. Integer values
- * matches those returned from PoolGuard::waitScan().
+ /** Possible return values from NdbQueryImpl::awaitMoreResults. Integer values
+ * matches those returned from PoolGuard::wait_scan().
*/
enum FetchResult{
- FetchResult_otherError = -4,
+ FetchResult_gotError = -4, // There is an error avail in 'm_error.code'
FetchResult_sendFail = -3,
FetchResult_nodeFail = -2,
FetchResult_timeOut = -1,
FetchResult_ok = 0,
- FetchResult_scanComplete = 1
+ FetchResult_noMoreData = 1
};
- /** A stack of NdbRootFragment pointers. */
- class FragStack{
+ /** A stack of NdbRootFragment pointers.
+ * NdbRootFragments which are 'BatchComplete' are pushed on this stack by
+ * the receiver thread, and later pop'ed into the application thread when
+ * it need more results to process.
+ * Due to this shared usage, the PollGuard mutex must be set before
+ * accessing SharedFragStack.
+ */
+ class SharedFragStack{
public:
- explicit FragStack();
+ explicit SharedFragStack();
- ~FragStack() {
+ ~SharedFragStack() {
delete[] m_array;
}
@@ -238,16 +244,21 @@ private:
m_current = -1;
}
+ /** Possible error received from TC / datanodes. */
+ int m_errorCode;
+
private:
/** Capacity of stack.*/
int m_capacity;
+
/** Index of current top of stack.*/
int m_current;
NdbRootFragment** m_array;
+
// No copying.
- FragStack(const FragStack&);
- FragStack& operator=(const FragStack&);
- }; // class FragStack
+ SharedFragStack(const SharedFragStack&);
+ SharedFragStack& operator=(const SharedFragStack&);
+ }; // class SharedFragStack
class OrderedFragSet{
public:
@@ -389,7 +400,7 @@ private:
/** Root frgaments that have received a complete batch. Shared between
* application thread and receiving thread. Access should be mutex protected.
*/
- FragStack m_fullFrags;
+ SharedFragStack m_fullFrags; // BEWARE: protect with PollGuard mutex
/** Number of root fragments for which confirmation for the final batch
* (with tcPtrI=RNIL) has been received. Observe that even if
@@ -448,14 +459,24 @@ private:
/** Navigate to the next result from the root operation. */
NdbQuery::NextResultOutcome nextRootResult(bool fetchAllowed, bool forceSend);
- /** Get more scan results, ask datanodes for the next batch if necessary.*/
- FetchResult awaitMoreResults(bool forceSend);
-
/** Send SCAN_NEXTREQ signal to fetch another batch from a scan query
* @return 0 if send succeeded, -1 otherwise.
*/
int sendFetchMore(NdbRootFragment& emptyFrag, bool forceSend);
+ /** Wait for more scan results which already has been REQuested to arrive.
+ * @return 0 if some rows did arrive, a negative value if there are errors (in m_error.code),
+ * and 1 of there are no more rows to receive.
+ */
+ FetchResult awaitMoreResults(bool forceSend);
+
+ /** Check if we have received an error from TC, or datanodes.
+ * @return 'true' if an error is pending, 'false' otherwise.
+ */
+ bool hasReceivedError(); // Need mutex lock
+
+ void setFetchTerminated(int aErrorCode, bool needClose); // Need mutex lock
+
/** Close cursor on TC */
int closeTcCursor(bool forceSend);
@@ -482,9 +503,6 @@ private:
*/
bool incrementPendingFrags(int increment);
- /** Check if batch is complete (no outstanding messages).*/
- bool isBatchComplete() const;
-
/** A complete batch has been received for a given root fragment
* Update whatever required before the appl. is allowed to navigate the result.
*/
=== modified file 'storage/ndb/src/ndbapi/NdbTransactionScan.cpp'
--- a/storage/ndb/src/ndbapi/NdbTransactionScan.cpp 2010-10-11 13:18:51 +0000
+++ b/storage/ndb/src/ndbapi/NdbTransactionScan.cpp 2010-11-05 14:45:27 +0000
@@ -55,8 +55,7 @@ NdbTransaction::receiveSCAN_TABREF(const
} else {
assert (m_scanningQuery);
- m_scanningQuery->execCLOSE_SCAN_REP(ref->closeNeeded);
- m_scanningQuery->setErrorCode(ref->errorCode);
+ m_scanningQuery->execCLOSE_SCAN_REP(ref->errorCode, ref->closeNeeded);
if(!ref->closeNeeded){
return 0;
}
@@ -100,7 +99,7 @@ NdbTransaction::receiveSCAN_TABCONF(cons
theScanningOp->execCLOSE_SCAN_REP();
} else {
assert (m_scanningQuery);
- m_scanningQuery->execCLOSE_SCAN_REP(false);
+ m_scanningQuery->execCLOSE_SCAN_REP(0, false);
}
return 1; // -> Finished
}
Attachment: [text/bzr-bundle] bzr/ole.john.aske@oracle.com-20101105144527-rsl314qx5w7slhyr.bundle
| Thread |
|---|
| • bzr commit into mysql-5.1-telco-7.0-spj-scan-vs-scan branch(ole.john.aske:3340) | Ole John Aske | 5 Nov |