3343 Ole John Aske 2010-11-08
spj-svs: Introduce usage of 'Node sequence' as part of nodefailure detection.
Implemented based on current implementation in NdbScanOperation.cpp
modified:
storage/ndb/src/ndbapi/NdbQueryOperation.cpp
3342 Ole John Aske 2010-11-08
spj-svs: Corrected a potential mutex issue accessing 'm_finalBatchFrags' within ::nextRootResult)
m_finalBatchFrags requires the PullGuard mutex to be locked before accesing it. However, in
::nextRootResult() it was accessed wo/ mutex.
Moved the mutex violating logic inside :awaitMoreResults() where the mutex is locked,
and extended the return values from ::awaitMoreResults() to include
'FetchResult_noMoreCache'.
modified:
storage/ndb/src/ndbapi/NdbQueryOperation.cpp
storage/ndb/src/ndbapi/NdbQueryOperationImpl.hpp
=== modified file 'storage/ndb/src/ndbapi/NdbQueryOperation.cpp'
--- a/storage/ndb/src/ndbapi/NdbQueryOperation.cpp 2010-11-08 09:38:33 +0000
+++ b/storage/ndb/src/ndbapi/NdbQueryOperation.cpp 2010-11-08 12:12:08 +0000
@@ -1793,12 +1793,12 @@ NdbQueryImpl::awaitMoreResults(bool forc
assert (m_scanTransaction);
assert (m_state==Executing);
- Ndb* const ndb = m_transaction.getNdb();
+ NdbImpl* const ndb = m_transaction.getNdb()->theImpl;
{
/* This part needs to be done under mutex due to synchronization with
* receiver thread.
*/
- PollGuard poll_guard(*ndb->theImpl);
+ PollGuard poll_guard(*ndb);
/* There may be pending (asynchronous received, mutex protected) errors
* from TC / datanodes. Propogate these into m_error.code in 'API space'.
@@ -1829,31 +1829,27 @@ NdbQueryImpl::awaitMoreResults(bool forc
: FetchResult_noMoreData;
}
+ TransporterFacade* tp = ndb->m_transporter_facade;
+ const Uint32 timeout = ndb->get_waitfor_timeout();
+ const Uint32 nodeId = m_transaction.getConnectedNodeId();
+ const Uint32 seq = m_transaction.theNodeSequence;
+
/* More results are on the way, so we wait for them.*/
const FetchResult waitResult = static_cast<FetchResult>
- (poll_guard.wait_scan(3*ndb->theImpl->get_waitfor_timeout(),
- m_transaction.getConnectedNodeId(),
+ (poll_guard.wait_scan(3*timeout,
+ nodeId,
forceSend));
- switch (waitResult) {
- case FetchResult_ok: // SCAN_TABREF, may have setFetchTerminated() w/ errors
- break;
- case FetchResult_timeOut:
+ if (tp->getNodeSequence(nodeId) != seq)
+ setFetchTerminated(Err_NodeFailCausedAbort,false);
+ else if (likely(waitResult == FetchResult_ok))
+ continue;
+ else if (waitResult == FetchResult_timeOut)
setFetchTerminated(Err_ReceiveTimedOut,false);
- break;
- case FetchResult_nodeFail:
+ else
setFetchTerminated(Err_NodeFailCausedAbort,false);
- break;
- default:
- assert(false);
- }
- assert (!m_error.code);
- /* Getting here is not an error. PollGuard::wait_scan() will return
- * when a complete batch (for a fragment) is available for *any* active
- * scan in this transaction. So we must wait again for the next arriving
- * batch.
- */
+ assert (!m_error.code);
} // while(!hasReceivedError())
} // Terminates scope of 'PollGuard'
@@ -2636,7 +2632,9 @@ NdbQueryImpl::sendFetchMore(NdbRootFragm
secs[ScanNextReq::ReceiverIdsSectionNum].sectionIter = &receiverIdIter;
secs[ScanNextReq::ReceiverIdsSectionNum].sz = 1;
- TransporterFacade* const facade = ndb.theImpl->m_transporter_facade;
+ TransporterFacade* const tp = ndb.theImpl->m_transporter_facade;
+ Uint32 nodeId = m_transaction.getConnectedNodeId();
+ Uint32 seq = m_transaction.theNodeSequence;
/* This part needs to be done under mutex due to synchronization with
* receiver thread.
@@ -2648,29 +2646,24 @@ NdbQueryImpl::sendFetchMore(NdbRootFragm
// Errors arrived inbetween ::await released mutex, and fetchMore grabbed it
return -1;
}
- const int res =
- facade->sendSignal(&tSignal,
- getNdbTransaction().getConnectedNodeId(),
- secs,
- 1);
- if (unlikely(res == -1))
+ if (tp->getNodeSequence(nodeId) != seq ||
+ tp->sendSignal(&tSignal, nodeId, secs, 1) != 0)
{
setErrorCode(Err_NodeFailCausedAbort);
return -1;
}
- m_pendingFrags++;
- assert(m_pendingFrags <= getRootFragCount());
-
if (forceSend)
{
// Flush signals to TC.
- facade->forceSend(ndb.theNdbBlockNumber);
+ tp->forceSend(ndb.theNdbBlockNumber);
}
else
{
- facade->checkForceSend(ndb.theNdbBlockNumber);
+ tp->checkForceSend(ndb.theNdbBlockNumber);
}
+ m_pendingFrags++;
+ assert(m_pendingFrags <= getRootFragCount());
return 0;
} // NdbQueryImpl::sendFetchMore()
@@ -2679,36 +2672,38 @@ NdbQueryImpl::closeTcCursor(bool forceSe
{
assert (m_queryDef.isScanQuery());
- Ndb* const ndb = m_transaction.getNdb();
- Uint32 timeout = ndb->theImpl->get_waitfor_timeout();
+ NdbImpl* const ndb = m_transaction.getNdb()->theImpl;
+
+ TransporterFacade* tp = ndb->m_transporter_facade;
+ const Uint32 timeout = ndb->get_waitfor_timeout();
+ const Uint32 nodeId = m_transaction.getConnectedNodeId();
+ const Uint32 seq = m_transaction.theNodeSequence;
/* This part needs to be done under mutex due to synchronization with
* receiver thread.
*/
- PollGuard poll_guard(*ndb->theImpl);
+ PollGuard poll_guard(*ndb);
-//Uint32 seq = m_transaction->theNodeSequence;
-//if (seq != tp->getNodeSequence(m_transaction.getConnectedNodeId())) // TODO
-//{}
+ if (unlikely(tp->getNodeSequence(nodeId) != seq))
+ {
+ setErrorCode(Err_NodeFailCausedAbort);
+ return -1; // Transporter disconnected and reconnected, no need to close
+ }
/* Wait for outstanding scan results from current batch fetch */
while (m_pendingFrags > 0)
{
- const FetchResult waitResult = static_cast<FetchResult>
- (poll_guard.wait_scan(3*timeout,
- m_transaction.getConnectedNodeId(),
- forceSend));
- switch (waitResult) {
- case FetchResult_ok: // SCAN_TABREF, may have setFetchTerminated() w/ errors
- break;
- case FetchResult_timeOut:
- setFetchTerminated(Err_ReceiveTimedOut,false);
- break;
- case FetchResult_nodeFail:
+ const FetchResult result = static_cast<FetchResult>
+ (poll_guard.wait_scan(3*timeout, nodeId, forceSend));
+
+ if (unlikely(tp->getNodeSequence(nodeId) != seq))
setFetchTerminated(Err_NodeFailCausedAbort,false);
- break;
- default:
- assert(false);
+ else if (unlikely(result != FetchResult_ok))
+ {
+ if (result == FetchResult_timeOut)
+ setFetchTerminated(Err_ReceiveTimedOut,false);
+ else
+ setFetchTerminated(Err_NodeFailCausedAbort,false);
}
if (hasReceivedError())
{
@@ -2733,21 +2728,17 @@ NdbQueryImpl::closeTcCursor(bool forceSe
/* Wait for close to be confirmed: */
while (m_pendingFrags > 0)
{
- const FetchResult waitResult = static_cast<FetchResult>
- (poll_guard.wait_scan(3*timeout,
- m_transaction.getConnectedNodeId(),
- forceSend));
- switch (waitResult) {
- case FetchResult_ok:
- break;
- case FetchResult_timeOut:
- setFetchTerminated(Err_ReceiveTimedOut,false);
- break;
- case FetchResult_nodeFail:
+ const FetchResult result = static_cast<FetchResult>
+ (poll_guard.wait_scan(3*timeout, nodeId, forceSend));
+
+ if (unlikely(tp->getNodeSequence(nodeId) != seq))
setFetchTerminated(Err_NodeFailCausedAbort,false);
- break;
- default:
- assert(false);
+ if (unlikely(result != FetchResult_ok))
+ {
+ if (result == FetchResult_timeOut)
+ setFetchTerminated(Err_ReceiveTimedOut,false);
+ else
+ setFetchTerminated(Err_NodeFailCausedAbort,false);
}
if (hasReceivedError())
{
Attachment: [text/bzr-bundle] bzr/ole.john.aske@oracle.com-20101108121208-itigmpnjo5cvk395.bundle
| Thread |
|---|
| • bzr push into mysql-5.1-telco-7.0-spj-scan-vs-scan branch(ole.john.aske:3342 to 3343) | Ole John Aske | 8 Nov |