#At file:///home/jonas/src/70-spj/ based on revid:jonas@stripped
2969 Jonas Oreland 2009-11-04 [merge]
ndb - spj - merge main
modified:
storage/ndb/CMakeLists.txt
storage/ndb/src/kernel/blocks/dbspj/Dbspj.hpp
storage/ndb/src/kernel/blocks/dbspj/DbspjMain.cpp
storage/ndb/src/ndbapi/NdbQueryOperation.cpp
storage/ndb/src/ndbapi/NdbQueryOperationImpl.hpp
storage/ndb/src/ndbapi/NdbTransactionScan.cpp
=== modified file 'storage/ndb/CMakeLists.txt'
--- a/storage/ndb/CMakeLists.txt 2009-10-19 09:44:26 +0000
+++ b/storage/ndb/CMakeLists.txt 2009-11-03 10:11:56 +0000
@@ -38,7 +38,11 @@ IF(WITHOUT_PARTITION_STORAGE_ENGINE)
ENDIF(WITHOUT_PARTITION_STORAGE_ENGINE)
INCLUDE("${PROJECT_SOURCE_DIR}/storage/mysql_storage_engine.cmake")
-INCLUDE_DIRECTORIES(${CMAKE_SOURCE_DIR}/storage/ndb/include)
+INCLUDE_DIRECTORIES(${CMAKE_SOURCE_DIR}/storage/ndb/include
+ ${CMAKE_SOURCE_DIR}/storage/ndb/src/ndbapi
+ ${CMAKE_SOURCE_DIR}/storage/ndb/include/util
+)
+
SET(NDBCLUSTER_SOURCES
../../sql/ha_ndbcluster.cc
../../sql/ha_ndbcluster_cond.cc
=== modified file 'storage/ndb/src/kernel/blocks/dbspj/Dbspj.hpp'
--- a/storage/ndb/src/kernel/blocks/dbspj/Dbspj.hpp 2009-11-02 18:03:43 +0000
+++ b/storage/ndb/src/kernel/blocks/dbspj/Dbspj.hpp 2009-11-04 10:58:11 +0000
@@ -247,11 +247,17 @@ public:
/**
* SCAN_FRAGCONF is received
*/
- SF_STARTED = 2
+ SF_STARTED = 2,
+
+ /**
+ * SCAN_NEXTREQ(close) has been sent to datanodes
+ */
+ SF_CLOSING = 3
};
- Uint32 m_scan_state; // Only valid is TreeNodeState >= TN_ACTIVE
+ Uint32 m_scan_state; // Only valid if TreeNodeState >= TN_ACTIVE
Uint32 m_scan_status; // fragmentCompleted
+ bool m_pending_close; // SCAN_NEXTREQ(close) pending while SF_RUNNING
/** True if signal has been received since sending
* last SCAN_FRAGREQ/SCAN_NEXTREQ*/
bool m_scan_fragconf_received;
@@ -357,7 +363,7 @@ public:
/**
* Is attrinfo "constructed"
- * (implies key info will be disowned (by send-signal)
+ * (implies attr info will be disowned (by send-signal)
*/
T_ATTRINFO_CONSTRUCTED = 0x8,
=== modified file 'storage/ndb/src/kernel/blocks/dbspj/DbspjMain.cpp'
--- a/storage/ndb/src/kernel/blocks/dbspj/DbspjMain.cpp 2009-11-04 11:10:35 +0000
+++ b/storage/ndb/src/kernel/blocks/dbspj/DbspjMain.cpp 2009-11-04 11:12:17 +0000
@@ -534,6 +534,7 @@ Dbspj::do_init(Request* requestP, const
requestP->m_transId[1] = req->transId2;
requestP->m_node_mask.clear();
requestP->m_rootResultData = req->resultData;
+ requestP->m_currentNodePtrI = RNIL;
}
void
@@ -950,32 +951,51 @@ Dbspj::execSCAN_NEXTREQ(Signal* signal)
Ptr<TreeNode> treeNodePtr;
m_treenode_pool.getPtr(treeNodePtr, requestPtr.p->m_currentNodePtrI);
- if (req->closeFlag == ZTRUE && // Requested close scan
- treeNodePtr.p->m_scanfrag_data.m_scan_status == 2) // Is closed on LQH
+ if (treeNodePtr.p->m_scanfrag_data.m_scan_state == ScanFragData::SF_CLOSING)
{
jam();
-
/**
- * TODO this needs more elaborate *abort* handling
+ * Duplicate of a close request already sent to datanodes.
+ * Ignore this and wait for reply on pending request.
*/
- ScanFragConf* conf = reinterpret_cast<ScanFragConf*>(signal->getDataPtrSend());
-
- conf->senderData = requestPtr.p->m_senderData;
- conf->transId1 = requestPtr.p->m_transId[0];
- conf->transId2 = requestPtr.p->m_transId[1];
- conf->completedOps = 0;
- conf->fragmentCompleted = 2; // =ZSCAN_FRAG_CLOSED -> Finished...
- conf->total_len = 0; // Not supported...
+ DEBUG("execSCAN_NEXTREQ, is SF_CLOSING -> ignore request");
+ return;
+ }
- DEBUG("execSCAN_NEXTREQ(close), fragmentCompleted:" << conf->fragmentCompleted);
+ if (req->closeFlag == ZTRUE) // Requested close scan
+ {
+ if (treeNodePtr.p->m_scanfrag_data.m_scan_status == 2) // Is closed on LQH
+ {
+ jam();
+ ndbassert (treeNodePtr.p->m_scanfrag_data.m_scan_state != ScanFragData::SF_RUNNING)
- sendSignal(requestPtr.p->m_senderRef, GSN_SCAN_FRAGCONF, signal,
- ScanFragConf::SignalLength, JBB);
+ ScanFragConf* conf = reinterpret_cast<ScanFragConf*>(signal->getDataPtrSend());
+ conf->senderData = requestPtr.p->m_senderData;
+ conf->transId1 = requestPtr.p->m_transId[0];
+ conf->transId2 = requestPtr.p->m_transId[1];
+ conf->completedOps = 0;
+ conf->fragmentCompleted = 2; // =ZSCAN_FRAG_CLOSED -> Finished...
+ conf->total_len = 0; // Not supported...
+
+ DEBUG("execSCAN_NEXTREQ(close), LQH has conf'ed 'w/ ZSCAN_FRAG_CLOSED");
+ sendSignal(requestPtr.p->m_senderRef, GSN_SCAN_FRAGCONF, signal,
+ ScanFragConf::SignalLength, JBB);
- cleanup(requestPtr);
- return;
+ cleanup(requestPtr);
+ return;
+ }
+ else if (treeNodePtr.p->m_scanfrag_data.m_scan_state == ScanFragData::SF_RUNNING)
+ {
+ jam();
+ DEBUG("execSCAN_NEXTREQ, make PENDING CLOSE");
+ treeNodePtr.p->m_scanfrag_data.m_pending_close = true;
+ return;
+ }
+ // else; fallthrough & send to datanodes:
}
+ ndbassert (!treeNodePtr.p->m_scanfrag_data.m_pending_close);
+ ndbassert (treeNodePtr.p->m_scanfrag_data.m_scan_status != 2)
ndbrequire(treeNodePtr.p->m_info != 0 &&
treeNodePtr.p->m_info->m_execSCAN_NEXTREQ != 0);
(this->*(treeNodePtr.p->m_info->m_execSCAN_NEXTREQ))(signal,
@@ -1765,6 +1785,10 @@ Dbspj::scanFrag_build(Build_context& ctx
treeNodePtr.p->m_info = &g_ScanFragOpInfo;
treeNodePtr.p->m_bits |= TreeNode::T_ATTR_INTERPRETED;
+ treeNodePtr.p->m_scanfrag_data.m_scan_state = ScanFragData::SF_IDLE;
+ treeNodePtr.p->m_scanfrag_data.m_scan_status = 0;
+ treeNodePtr.p->m_scanfrag_data.m_pending_close = false;
+
ScanFragReq*dst=(ScanFragReq*)treeNodePtr.p->m_scanfrag_data.m_scanFragReq;
dst->senderData = treeNodePtr.i;
dst->resultRef = reference();
@@ -1807,6 +1831,7 @@ Dbspj::scanFrag_build(Build_context& ctx
DEBUG("param len: " << param->len);
if (unlikely(param->len < QN_ScanFragParameters::NodeSize))
{
+ jam();
DEBUG_CRASH();
break;
}
@@ -1826,6 +1851,7 @@ Dbspj::scanFrag_build(Build_context& ctx
nodeDA, treeBits, paramDA, paramBits);
if (unlikely(err != 0))
{
+ jam();
DEBUG_CRASH();
break;
}
@@ -2012,6 +2038,7 @@ Dbspj::scanFrag_send(Signal* signal,
NDB_ARRAY_SIZE(treeNodePtr.p->m_scanfrag_data.m_scanFragReq),
JBB, &handle);
+ ndbassert (!treeNodePtr.p->m_scanfrag_data.m_pending_close);
treeNodePtr.p->m_scanfrag_data.m_scan_state = ScanFragData::SF_RUNNING;
treeNodePtr.p->m_scanfrag_data.m_scan_status = 0;
treeNodePtr.p->m_scanfrag_data.m_scan_fragconf_received = false;
@@ -2022,6 +2049,12 @@ Dbspj::scanFrag_send(Signal* signal,
treeNodePtr.p->m_scanfrag_data.m_descendant_keyrefs_received = 0;
treeNodePtr.p->m_scanfrag_data.m_descendant_keyreqs_sent = 0;
treeNodePtr.p->m_scanfrag_data.m_missing_descendant_rows = 0;
+
+ /**
+ * Save position where next-scan-req should continue or close
+ */
+ treeNodePtr.p->m_scanfrag_data.m_scan_state = ScanFragData::SF_RUNNING;
+ requestPtr.p->m_currentNodePtrI = treeNodePtr.i;
}
/** Return true if scan batch is complete. This happens when all scan
@@ -2076,9 +2109,6 @@ Dbspj::scanFrag_execSCAN_FRAGREF(Signal*
Ptr<Request> requestPtr,
Ptr<TreeNode> treeNodePtr)
{
- /**
- * TODO
- */
const ScanFragRef* const rep = reinterpret_cast<const ScanFragRef*>(signal->getDataPtr());
Uint32 errCode = rep->errorCode;
@@ -2102,9 +2132,29 @@ Dbspj::scanFrag_execSCAN_FRAGREF(Signal*
sendSignal(requestPtr.p->m_senderRef, GSN_SCAN_FRAGREF, signal,
ScanFragRef::SignalLength, JBB);
- // TODO: Cleanup operation on SPJ block
+ treeNodePtr.p->m_scanfrag_data.m_scan_fragconf_received = true;
+//treeNodePtr.p->m_scanfrag_data.m_scan_status = 2; // (2=ZSCAN_FRAG_CLOSED)
+ ndbassert (isScanComplete(treeNodePtr.p->m_scanfrag_data));
-//ndbrequire(false);
+ /**
+ * SCAN_FRAGREF implies that datanodes closed the cursor.
+ * -> Pending close is effectively a NOOP, reset it
+ */
+ if (treeNodePtr.p->m_scanfrag_data.m_pending_close)
+ {
+ jam();
+ treeNodePtr.p->m_scanfrag_data.m_pending_close = false;
+ DEBUG(" SCAN_FRAGREF, had pending close which can be ignored (is closed)");
+ }
+
+ /**
+ * Cleanup operation on SPJ block, remove all allocated resources.
+ */
+ {
+ jam();
+ treeNodePtr.p->m_scanfrag_data.m_scan_state = ScanFragData::SF_IDLE;
+ nodeFinished(signal, requestPtr, treeNodePtr);
+ }
}
@@ -2143,8 +2193,34 @@ Dbspj::scanFrag_batch_complete(Signal* s
Ptr<TreeNode> treeNodePtr)
{
DEBUG("scanFrag_batch_complete()");
- ndbrequire(treeNodePtr.p->m_scanfrag_data.m_scan_state ==
- ScanFragData::SF_RUNNING);
+
+ if (treeNodePtr.p->m_scanfrag_data.m_pending_close)
+ {
+ jam();
+ ndbrequire(treeNodePtr.p->m_scanfrag_data.m_scan_state == ScanFragData::SF_RUNNING);
+ treeNodePtr.p->m_scanfrag_data.m_scan_state = ScanFragData::SF_STARTED;
+
+ DEBUG("scanFrag_batch_complete() - has pending close, ignore this reply, request close");
+
+ ScanFragNextReq* req = reinterpret_cast<ScanFragNextReq*>(signal->getDataPtrSend());
+
+ /**
+ * SCAN_NEXTREQ(close) was requested while we where waiting for
+ * datanodes to complete this request.
+ * - Send close request to LQH now.
+ * - Suppress reply to TC/API, will reply later when close is conf'ed
+ */
+ req->closeFlag = ZTRUE;
+ req->senderData = treeNodePtr.i;
+ req->transId1 = requestPtr.p->m_transId[0];
+ req->transId2 = requestPtr.p->m_transId[1];
+ req->batch_size_rows = 0;
+ req->batch_size_bytes = 0;
+
+ treeNodePtr.p->m_scanfrag_data.m_pending_close = false;
+ scanFrag_execSCAN_NEXTREQ(signal, requestPtr, treeNodePtr);
+ return;
+ }
/**
* one batch complete...
@@ -2166,6 +2242,8 @@ Dbspj::scanFrag_batch_complete(Signal* s
if (treeNodePtr.p->m_scanfrag_data.m_scan_status == 2)
{
jam();
+ ndbrequire(treeNodePtr.p->m_scanfrag_data.m_scan_state == ScanFragData::SF_RUNNING ||
+ treeNodePtr.p->m_scanfrag_data.m_scan_state == ScanFragData::SF_CLOSING);
/**
* EOF for scan
*/
@@ -2175,11 +2253,12 @@ Dbspj::scanFrag_batch_complete(Signal* s
else
{
jam();
+ ndbrequire(treeNodePtr.p->m_scanfrag_data.m_scan_state == ScanFragData::SF_RUNNING);
/**
- * Save position where next-scan-req should continue
+ * Check position where next-scan-req should continue
*/
treeNodePtr.p->m_scanfrag_data.m_scan_state = ScanFragData::SF_STARTED;
- requestPtr.p->m_currentNodePtrI = treeNodePtr.i;
+ assert(requestPtr.p->m_currentNodePtrI == treeNodePtr.i);
}
}
@@ -2199,6 +2278,7 @@ Dbspj::scanFrag_execSCAN_NEXTREQ(Signal*
Ptr<TreeNode> treeNodePtr)
{
jamEntry();
+ ndbassert (treeNodePtr.p->m_scanfrag_data.m_scan_state == ScanFragData::SF_STARTED);
ScanFragNextReq* nextReq = reinterpret_cast<ScanFragNextReq*>(signal->getDataPtrSend());
nextReq->senderData = treeNodePtr.i;
@@ -2214,7 +2294,10 @@ Dbspj::scanFrag_execSCAN_NEXTREQ(Signal*
ScanFragNextReq::SignalLength,
JBB);
- treeNodePtr.p->m_scanfrag_data.m_scan_state = ScanFragData::SF_RUNNING;
+ treeNodePtr.p->m_scanfrag_data.m_scan_state = (nextReq->closeFlag == ZTRUE)
+ ? ScanFragData::SF_CLOSING
+ : ScanFragData::SF_RUNNING;
+
treeNodePtr.p->m_scanfrag_data.m_scan_status = 0;
treeNodePtr.p->m_scanfrag_data.m_scan_fragconf_received = false;
treeNodePtr.p->m_scanfrag_data.m_rows_received = 0;
=== modified file 'storage/ndb/src/ndbapi/NdbQueryOperation.cpp'
--- a/storage/ndb/src/ndbapi/NdbQueryOperation.cpp 2009-10-29 15:04:38 +0000
+++ b/storage/ndb/src/ndbapi/NdbQueryOperation.cpp 2009-11-03 12:21:47 +0000
@@ -469,6 +469,9 @@ NdbQueryImpl::NdbQueryImpl(NdbTransactio
const NdbQueryDefImpl& queryDef):
m_interface(*this),
m_state(Initial),
+ m_tcState(Inactive),
+ m_next(NULL),
+ m_queryDef(queryDef),
m_error(),
m_transaction(trans),
m_scanTransaction(NULL),
@@ -476,8 +479,6 @@ NdbQueryImpl::NdbQueryImpl(NdbTransactio
m_countOperations(0),
m_tcKeyConfReceived(false),
m_pendingStreams(0),
- m_next(NULL),
- m_queryDef(queryDef),
m_parallelism(0),
m_maxBatchRows(0),
m_applStreams(),
@@ -652,18 +653,21 @@ NdbQueryImpl::nextResult(bool fetchAllow
m_applStreams.pop();
}
- if (unlikely(m_applStreams.top()==NULL)) {
+ if (unlikely(m_applStreams.top()==NULL))
+ {
/* m_applStreams is empty, so we cannot get more results without
- * possibly blocking.*/
-
- if (fetchAllowed) {
+ * possibly blocking.
+ */
+ if (fetchAllowed)
+ {
/* fetchMoreResults() will either copy streams that are already
- * complete (under mutex protection), or block until more data arrives.*/
+ * complete (under mutex protection), or block until more data arrives.
+ */
const FetchResult fetchResult = fetchMoreResults(forceSend);
switch (fetchResult) {
case FetchResult_otherError:
- // FIXME: copy semantics from NdbScanOperation.
- setErrorCode(Err_NodeFailCausedAbort); // Node fail
+ assert (m_error.code != 0);
+ setErrorCode(m_error.code);
return NdbQuery::NextResult_error;
case FetchResult_sendFail:
// FIXME: copy semantics from NdbScanOperation.
@@ -758,7 +762,6 @@ NdbQueryImpl::nextResult(bool fetchAllow
}
}
- m_state = Fetching;
return NdbQuery::NextResult_gotRow;
} //NdbQueryImpl::nextResult
@@ -771,7 +774,7 @@ NdbQueryImpl::fetchMoreResults(bool forc
/* Check if there are any more completed streams available.*/
if(m_queryDef.isScanQuery()){
- assert (m_state==Executing || m_state==Fetching);
+ assert (m_state==Executing);
assert (m_scanTransaction);
Ndb* const ndb = m_transaction.getNdb();
@@ -782,14 +785,16 @@ NdbQueryImpl::fetchMoreResults(bool forc
PollGuard poll_guard(facade,
&ndb->theImpl->theWaiter,
ndb->theNdbBlockNumber);
- while(true){
+
+ while (likely(m_error.code==0))
+ {
/* m_fullStreams contains any streams that are complete (for this batch)
* but have not yet been moved (under mutex protection) to
* m_applStreams.*/
if(m_fullStreams.top()==NULL){
if(getRoot().isBatchComplete()){
// Request another scan batch, may already be at EOF
- const int sent = sendFetchMore(m_transaction.getConnectedNodeId(),false);
+ const int sent = sendFetchMore(m_transaction.getConnectedNodeId());
if (sent==0) { // EOF reached?
m_state = EndOfData;
postFetchRelease();
@@ -797,7 +802,8 @@ NdbQueryImpl::fetchMoreResults(bool forc
} else if (unlikely(sent<0)) {
return FetchResult_sendFail;
}
- }
+ } //if (isBatchComplete...
+
/* More results are on the way, so we wait for them.*/
const FetchResult waitResult = static_cast<FetchResult>
(poll_guard.wait_scan(3*facade->m_waitfor_timeout,
@@ -809,7 +815,7 @@ NdbQueryImpl::fetchMoreResults(bool forc
} // if (m_fullStreams.top()==NULL)
// Assert: No sporious wakeups w/ neither resultdata, nor EOF:
- assert (m_fullStreams.top()!=NULL || getRoot().isBatchComplete());
+ assert (m_fullStreams.top()!=NULL || getRoot().isBatchComplete() || m_error.code);
/* Move full streams from receiver thread's container to application
* thread's container.*/
@@ -823,9 +829,13 @@ NdbQueryImpl::fetchMoreResults(bool forc
}
// Only expect to end up here if another ::sendFetchMore() is required
- assert (getRoot().isBatchComplete());
+ assert (getRoot().isBatchComplete() || m_error.code);
} // while(true)
+ // 'while' terminated by m_error.code
+ assert (m_error.code);
+ return FetchResult_otherError;
+
} else { // is a Lookup query
/* The root operation is a lookup. Lookups are guaranteed to be complete
* before NdbTransaction::execute() returns. Therefore we do not set
@@ -885,90 +895,22 @@ NdbQueryImpl::closeSingletonScans()
int
NdbQueryImpl::close(bool forceSend)
{
+ int res = 0;
+
assert (m_state >= Initial && m_state < Destructed);
Ndb* const ndb = m_transaction.getNdb();
- // TODO?: Unsure if we may also need to send a close to datanodes if we
- // have an errorcondition on this query
-
- if (m_state >= Executing && m_state < EndOfData) {
- if (m_queryDef.isScanQuery()) {
- assert (m_scanTransaction != NULL);
-
- TransporterFacade* const facade = ndb->theImpl->m_transporter_facade;
-
- /* This part needs to be done under mutex due to synchronization with
- * receiver thread.
- */
- PollGuard poll_guard(facade,
- &ndb->theImpl->theWaiter,
- ndb->theNdbBlockNumber);
-
- /* Wait for outstanding scan results from current batch fetch */
- while (!getRoot().isBatchComplete())
- {
- const FetchResult waitResult = static_cast<FetchResult>
- (poll_guard.wait_scan(3*facade->m_waitfor_timeout,
- m_transaction.getConnectedNodeId(),
- forceSend));
- switch (waitResult) {
- case FetchResult_ok:
- break;
- case FetchResult_nodeFail:
- setErrorCode(Err_NodeFailCausedAbort); // Node fail
- return -1;
- case FetchResult_timeOut:
- setErrorCode(Err_ReceiveFromNdbFailed); // Timeout
- return -1;
- default:
- assert(false);
- }
- } // while
-
- m_fullStreams.clear();
- m_applStreams.clear();
-
- /* Send SCANREQ(close) */
- const int sent = sendFetchMore(m_transaction.getConnectedNodeId(), true);
- if (unlikely(sent<0))
- return -1;
-
- /* Wait for close to be confirm: */
- while (sent > 0 && !getRoot().isBatchComplete())
- {
- const FetchResult waitResult = static_cast<FetchResult>
- (poll_guard.wait_scan(3*facade->m_waitfor_timeout,
- m_transaction.getConnectedNodeId(),
- forceSend));
- switch (waitResult) {
- case FetchResult_ok:
- break;
- case FetchResult_nodeFail:
- setErrorCode(Err_NodeFailCausedAbort); // Node fail
- return -1;
- case FetchResult_timeOut:
- setErrorCode(Err_ReceiveFromNdbFailed); // Timeout
- return -1;
- default:
- assert(false);
- }
- } // while
-
- } else { // Lookup query
- // Not likely to require any action to close lookups on TC
- }
- // Throw any pending results
- m_fullStreams.clear();
- m_applStreams.clear();
- m_state = EndOfData;
-
- } else { // Not executed query, nor fetched any rows
- // Should not have any pending result rows in this state:
- assert(m_fullStreams.top() == NULL);
- assert(m_applStreams.top() == NULL);
+ if (m_tcState > Inactive && m_tcState != Completed)
+ {
+ res = closeTcCursor(forceSend);
}
- if (m_scanTransaction != NULL) {
+ // Throw any pending results
+ m_fullStreams.clear();
+ m_applStreams.clear();
+
+ if (m_scanTransaction != NULL)
+ {
assert (m_state != Closed);
assert (m_scanTransaction->m_scanningQuery == this);
m_scanTransaction->m_scanningQuery = NULL;
@@ -978,10 +920,11 @@ NdbQueryImpl::close(bool forceSend)
}
postFetchRelease();
- m_state = Closed;
- return 0;
+ m_state = Closed; // Even if it was previously 'Failed' it is closed now!
+ return res;
} //NdbQueryImpl::close
+
void
NdbQueryImpl::release()
{
@@ -1025,15 +968,17 @@ NdbQueryImpl::execTCKEYCONF()
}
void
-NdbQueryImpl::execCLOSE_SCAN_REP()
+NdbQueryImpl::execCLOSE_SCAN_REP(bool needClose)
{
- if(traceSignals){
+ if(traceSignals)
+ {
ndbout << "NdbQueryImpl::execCLOSE_SCAN_REP()" << endl;
}
- if (m_state < EndOfData)
- m_state = EndOfData;
+ assert (m_tcState < Completed);
+ m_tcState = (needClose) ? FetchMore : Completed;
}
+
bool
NdbQueryImpl::countPendingStreams(int increment)
{
@@ -1217,7 +1162,7 @@ NdbQueryImpl::doSend(int nodeId, bool la
Uint32 parallel = m_parallelism;
bool tupScan = (scan_flags & NdbScanOperation::SF_TupScan);
- bool rangeScan= false;
+ bool rangeScan = false;
bool isPruned;
Uint32 hashValue;
@@ -1331,6 +1276,7 @@ NdbQueryImpl::doSend(int nodeId, bool la
setErrorCodeAbort(Err_SendFailed); // Error: 'Send to NDB failed'
return FetchResult_sendFail;
}
+ m_tcState = Fetching;
} else { // Lookup query
@@ -1409,6 +1355,7 @@ NdbQueryImpl::doSend(int nodeId, bool la
return FetchResult_sendFail;
}
m_transaction.OpSent();
+ m_tcState = Completed;
} // if
// Shrink memory footprint by removing structures not required after ::execute()
@@ -1436,18 +1383,18 @@ Return Value: Return >0 : send was suc
Return =0 : No more rows is available -> EOF
Return -1: In all other case.
Parameters: nodeId: Receiving processor node
- closeFlag: Close the scan
Remark:
******************************************************************************/
int
-NdbQueryImpl::sendFetchMore(int nodeId, bool closeFlag)
+NdbQueryImpl::sendFetchMore(int nodeId)
{
-
- Uint32 receivers[64]; // TODO: 64 is a temp hack
Uint32 sent = 0;
NdbQueryOperationImpl& root = getRoot();
- for(unsigned i = 0; i<m_parallelism; i++) {
+ Uint32 receivers[64]; // TODO: 64 is a temp hack
+ assert (root.m_resultStreams!=NULL);
+ for(unsigned i = 0; i<m_parallelism; i++)
+ {
const Uint32 tcPtrI = root.getReceiver(i).m_tcPtrI;
if (tcPtrI != RNIL) {
receivers[sent++] = tcPtrI;
@@ -1459,10 +1406,16 @@ NdbQueryImpl::sendFetchMore(int nodeId,
//printf("::sendFetchMore, to nodeId:%d, sent:%d\n", nodeId, sent);
if (sent==0)
+ {
+ assert (m_tcState != FetchMore);
+ m_tcState = Completed;
return 0;
+ }
m_pendingStreams = 0;
m_tcKeyConfReceived = false;
+ assert (m_tcState == FetchMore);
+ m_tcState = Fetching;
Ndb& ndb = *m_transaction.getNdb();
NdbApiSignal tSignal(&ndb);
@@ -1473,7 +1426,7 @@ NdbQueryImpl::sendFetchMore(int nodeId,
const Uint64 transId = m_scanTransaction->getTransactionId();
scanNextReq->apiConnectPtr = m_scanTransaction->theTCConPtr;
- scanNextReq->stopScan = closeFlag;
+ scanNextReq->stopScan = 0;
scanNextReq->transId1 = (Uint32) transId;
scanNextReq->transId2 = (Uint32) (transId >> 32);
tSignal.setLength(ScanNextReq::SignalLength);
@@ -1492,6 +1445,109 @@ NdbQueryImpl::sendFetchMore(int nodeId,
return sent;
} // NdbQueryImpl::sendFetchMore()
+int
+NdbQueryImpl::closeTcCursor(bool forceSend)
+{
+ assert (m_queryDef.isScanQuery());
+
+ Ndb* const ndb = m_transaction.getNdb();
+ TransporterFacade* const facade = ndb->theImpl->m_transporter_facade;
+
+ /* This part needs to be done under mutex due to synchronization with
+ * receiver thread.
+ */
+ PollGuard poll_guard(facade,
+ &ndb->theImpl->theWaiter,
+ ndb->theNdbBlockNumber);
+
+ /* Wait for outstanding scan results from current batch fetch */
+ while (!getRoot().isBatchComplete() && m_error.code==0)
+ {
+ const FetchResult waitResult = static_cast<FetchResult>
+ (poll_guard.wait_scan(3*facade->m_waitfor_timeout,
+ m_transaction.getConnectedNodeId(),
+ forceSend));
+ switch (waitResult) {
+ case FetchResult_ok:
+ break;
+ case FetchResult_nodeFail:
+ setErrorCode(Err_NodeFailCausedAbort); // Node fail
+ return -1;
+ case FetchResult_timeOut:
+ setErrorCode(Err_ReceiveFromNdbFailed); // Timeout
+ return -1;
+ default:
+ assert(false);
+ }
+ } // while
+
+ m_error.code = 0; // Ignore possible errorcode caused by previous fetching
+
+ if (m_tcState == FetchMore) // TC has an open scan cursor.
+ {
+ /* Send SCANREQ(close) */
+ const int error = sendClose(m_transaction.getConnectedNodeId());
+ if (unlikely(error))
+ return error;
+
+ /* Wait for close to be confirmed: */
+ while (m_tcState != Completed)
+ {
+ const FetchResult waitResult = static_cast<FetchResult>
+ (poll_guard.wait_scan(3*facade->m_waitfor_timeout,
+ m_transaction.getConnectedNodeId(),
+ forceSend));
+ switch (waitResult) {
+ case FetchResult_ok:
+ if (unlikely(m_error.code)) // Close request itself failed, keep error
+ {
+ setErrorCode(m_error.code);
+ return -1;
+ }
+ break;
+ case FetchResult_nodeFail:
+ setErrorCode(Err_NodeFailCausedAbort); // Node fail
+ return -1;
+ case FetchResult_timeOut:
+ setErrorCode(Err_ReceiveFromNdbFailed); // Timeout
+ return -1;
+ default:
+ assert(false);
+ }
+ } // while
+ } // if
+
+ m_tcState = Completed;
+ return 0;
+} //NdbQueryImpl::closeTcCursor
+
+int
+NdbQueryImpl::sendClose(int nodeId)
+{
+ m_pendingStreams = 0;
+ m_tcKeyConfReceived = false;
+ assert (m_tcState == FetchMore);
+ m_tcState = Fetching;
+
+ Ndb& ndb = *m_transaction.getNdb();
+ NdbApiSignal tSignal(&ndb);
+ tSignal.setSignal(GSN_SCAN_NEXTREQ);
+ ScanNextReq * const scanNextReq = CAST_PTR(ScanNextReq, tSignal.getDataPtrSend());
+
+ assert (m_scanTransaction);
+ const Uint64 transId = m_scanTransaction->getTransactionId();
+
+ scanNextReq->apiConnectPtr = m_scanTransaction->theTCConPtr;
+ scanNextReq->stopScan = true;
+ scanNextReq->transId1 = (Uint32) transId;
+ scanNextReq->transId2 = (Uint32) (transId >> 32);
+ tSignal.setLength(ScanNextReq::SignalLength);
+
+ TransporterFacade* tp = ndb.theImpl->m_transporter_facade;
+ return tp->sendSignal(&tSignal, nodeId);
+
+} // NdbQueryImpl::sendClose()
+
NdbQueryImpl::StreamStack::StreamStack():
m_size(0),
@@ -1748,7 +1804,8 @@ NdbQueryOperationImpl::setResultRowRef (
}
void
-NdbQueryOperationImpl::fetchRecAttrResults(Uint32 streamNo){
+NdbQueryOperationImpl::fetchRecAttrResults(Uint32 streamNo)
+{
NdbRecAttr* recAttr = m_firstRecAttr;
Uint32 posInRow = 0;
while(recAttr != NULL){
@@ -1766,7 +1823,8 @@ NdbQueryOperationImpl::fetchRecAttrResul
}
void
-NdbQueryOperationImpl::updateChildResult(Uint32 streamNo, Uint32 rowNo){
+NdbQueryOperationImpl::updateChildResult(Uint32 streamNo, Uint32 rowNo)
+{
if (rowNo==tupleNotFound) {
/* This operation gave no result for the current parent tuple.*/
m_isRowNull = true;
@@ -2231,7 +2289,7 @@ NdbQueryOperationImpl::execTRANSID_AI(co
m_queryImpl.countPendingStreams(1);
}
return false;
- }
+ } // end lookup
} //NdbQueryOperationImpl::execTRANSID_AI
@@ -2285,6 +2343,9 @@ NdbQueryOperationImpl::execSCAN_TABCONF(
resultStream.m_pendingResults += rowCount;
resultStream.m_receiver.m_tcPtrI = tcPtrI; // Handle for SCAN_NEXTREQ, RNIL -> EOF
+ if (tcPtrI != RNIL) {
+ m_queryImpl.m_tcState = NdbQueryImpl::FetchMore;
+ }
if(traceSignals){
ndbout << " resultStream(root) {" << resultStream << "}" << endl;
@@ -2363,10 +2424,7 @@ NdbQueryOperationImpl::getIdOfReceiver()
bool
NdbQueryOperationImpl::isBatchComplete() const {
- if (m_queryImpl.m_state >= NdbQueryImpl::EndOfData) {
-// printf("Query state:%d (>= EndOfData) -> BatchCompleted\n", m_queryImpl.m_state);
- return true;
- }
+ assert(m_resultStreams!=NULL);
for(Uint32 i = 0; i < m_queryImpl.getParallelism(); i++){
if(!m_resultStreams[i]->isBatchComplete()){
return false;
@@ -2379,6 +2437,7 @@ NdbQueryOperationImpl::isBatchComplete()
const NdbReceiver&
NdbQueryOperationImpl::getReceiver(Uint32 recNo) const {
assert(recNo<getQuery().getParallelism());
+ assert(m_resultStreams!=NULL);
return m_resultStreams[recNo]->m_receiver;
}
=== modified file 'storage/ndb/src/ndbapi/NdbQueryOperationImpl.hpp'
--- a/storage/ndb/src/ndbapi/NdbQueryOperationImpl.hpp 2009-10-29 15:04:38 +0000
+++ b/storage/ndb/src/ndbapi/NdbQueryOperationImpl.hpp 2009-11-03 12:21:47 +0000
@@ -127,11 +127,6 @@ public:
*/
int doSend(int aNodeId, bool lastFlag);
- /** Send SCAN_NEXTREQ signal to fetch another batch from a scan query
- * @return #signals sent, -1 if error.
- */
- int sendFetchMore(int nodeId, bool lastFlag);
-
NdbQuery& getInterface()
{ return m_interface; }
@@ -154,7 +149,7 @@ public:
bool execTCKEYCONF();
/** Process SCAN_TABCONF w/ EndOfData which is a 'Close Scan Reply'. */
- void execCLOSE_SCAN_REP();
+ void execCLOSE_SCAN_REP(bool isClosed);
/** Determines if query has completed and may be garbage collected
* A query is considder complete when it either:
@@ -224,18 +219,29 @@ private:
/** The interface that is visible to the application developer.*/
NdbQuery m_interface;
- enum {
- Initial,
- Defined,
- Prepared,
- Executing,
- Fetching,
- EndOfData,
- Closed,
- Failed,
+ enum { // State of NdbQuery in API
+ Initial, // Constructed object, assiciated with a defined query
+ Defined, // Parameter values has been assigned
+ Prepared, // KeyInfo & AttrInfo prepared for execution
+ Executing, // Signal with exec. req. sent to TC
+ EndOfData, // All results rows consumed
+ Closed, // Query has been ::close()'ed
+ Failed,
Destructed
} m_state;
+ enum { // Assumed state of query cursor in TC block
+ Inactive, // Execution not started at TC
+ Fetching,
+ FetchMore,
+ Completed
+ } m_tcState;
+
+ /** Next query in same transaction.*/
+ NdbQueryImpl* m_next;
+ /** Definition of this query.*/
+ const NdbQueryDefImpl& m_queryDef;
+
/** Possible error status of this query.*/
NdbError m_error;
/** Transaction in which this query instance executes.*/
@@ -253,12 +259,9 @@ private:
/** True if a TCKEYCONF message has been received for this query.*/
bool m_tcKeyConfReceived;
+
/** Number of streams not yet completed within the current batch.*/
Uint32 m_pendingStreams;
- /** Next query in same transaction.*/
- NdbQueryImpl* m_next;
- /** Definition of this query.*/
- const NdbQueryDefImpl& m_queryDef;
/** Number of fragments to be scanned in parallel. (1 if root operation is
* a lookup)*/
@@ -296,6 +299,19 @@ private:
/** Get more scan results, ask for the next batch if necessary.*/
FetchResult fetchMoreResults(bool forceSend);
+ /** Send SCAN_NEXTREQ signal to fetch another batch from a scan query
+ * @return #signals sent, -1 if error.
+ */
+ int sendFetchMore(int nodeId);
+
+ /** Close cursor on TC */
+ int closeTcCursor(bool forceSend);
+
+ /** Send SCAN_NEXTREQ(close) signal to close cursor on TC and datanodes.
+ * @return #signals sent, -1 if error.
+ */
+ int sendClose(int nodeId);
+
/** Close scan receivers used for lookups. (Since scans and lookups should
* have the same semantics for nextResult(), lookups use scan-type
* NdbReceiver objects.)
=== modified file 'storage/ndb/src/ndbapi/NdbTransactionScan.cpp'
--- a/storage/ndb/src/ndbapi/NdbTransactionScan.cpp 2009-10-21 11:45:41 +0000
+++ b/storage/ndb/src/ndbapi/NdbTransactionScan.cpp 2009-11-03 12:21:47 +0000
@@ -47,8 +47,8 @@ NdbTransaction::receiveSCAN_TABREF(NdbAp
if (checkState_TransId(&ref->transId1)) {
if (theScanningOp) {
- theScanningOp->setErrorCode(ref->errorCode);
theScanningOp->execCLOSE_SCAN_REP();
+ theScanningOp->setErrorCode(ref->errorCode);
if(!ref->closeNeeded){
return 0;
}
@@ -63,8 +63,8 @@ NdbTransaction::receiveSCAN_TABREF(NdbAp
} else {
assert (m_scanningQuery);
+ m_scanningQuery->execCLOSE_SCAN_REP(ref->closeNeeded);
m_scanningQuery->setErrorCode(ref->errorCode);
- m_scanningQuery->execCLOSE_SCAN_REP();
if(!ref->closeNeeded){
return 0;
}
@@ -108,7 +108,7 @@ NdbTransaction::receiveSCAN_TABCONF(NdbA
theScanningOp->execCLOSE_SCAN_REP();
} else {
assert (m_scanningQuery);
- m_scanningQuery->execCLOSE_SCAN_REP();
+ m_scanningQuery->execCLOSE_SCAN_REP(false);
}
return 1; // -> Finished
}
Attachment: [text/bzr-bundle] bzr/jonas@mysql.com-20091104111217-5gs4rcrc7ggu3gie.bundle
| Thread |
|---|
| • bzr commit into mysql-5.1-telco-7.0-spj branch (jonas:2969) | Jonas Oreland | 4 Nov |