From: Jan Wedvik Date: August 22 2011 8:51am Subject: bzr push into mysql-5.1-telco-7.0-spj-scan-vs-scan branch (jan.wedvik:3541 to 3542) List-Archive: http://lists.mysql.com/commits/140744 Message-Id: <20110822085102.C383D218@fimafeng09.norway.sun.com> MIME-Version: 1.0 Content-Type: text/plain; charset="us-ascii" Content-Transfer-Encoding: 7bit 3542 Jan Wedvik 2011-08-22 [merge] Merged from mysql-5.1-telco-7.0 modified: mysql-test/suite/ndb/r/ndb_join_pushdown.result storage/ndb/src/kernel/blocks/dbdih/DbdihMain.cpp storage/ndb/src/kernel/blocks/dbspj/Dbspj.hpp storage/ndb/src/kernel/blocks/dbspj/DbspjMain.cpp storage/ndb/src/kernel/blocks/qmgr/QmgrMain.cpp storage/ndb/src/ndbapi/NdbQueryOperation.cpp storage/ndb/src/ndbapi/NdbQueryOperation.hpp storage/ndb/src/ndbapi/NdbQueryOperationImpl.hpp 3541 Ole John Aske 2011-08-17 [merge] Merge 70 -> 70-spj modified: storage/ndb/src/ndbapi/NdbQueryOperation.cpp storage/ndb/src/ndbapi/NdbQueryOperationImpl.hpp === modified file 'mysql-test/suite/ndb/r/ndb_join_pushdown.result' --- a/mysql-test/suite/ndb/r/ndb_join_pushdown.result 2011-06-30 09:17:30 +0000 +++ b/mysql-test/suite/ndb/r/ndb_join_pushdown.result 2011-08-22 08:50:01 +0000 @@ -5211,7 +5211,7 @@ drop table spj_save_counts; drop table spj_counts_at_startup; drop table spj_counts_at_end; scan_count -2511 +2520 pruned_scan_count 8 sorted_scan_count === modified file 'storage/ndb/src/kernel/blocks/dbdih/DbdihMain.cpp' --- a/storage/ndb/src/kernel/blocks/dbdih/DbdihMain.cpp 2011-07-09 11:16:31 +0000 +++ b/storage/ndb/src/kernel/blocks/dbdih/DbdihMain.cpp 2011-08-22 08:50:01 +0000 @@ -3880,6 +3880,8 @@ done: ndbassert(gci == restorableGCI); replicaPtr.p->m_restorable_gci = gci; Uint32 startGci = replicaPtr.p->maxGciCompleted[maxLcpIndex] + 1; + if (startGci > gci) + startGci = gci; ndbout_c("node: %d tab: %d frag: %d restore lcp: %u(idx: %u) maxGciStarted: %u maxGciCompleted: %u (restorable: %u(%u) newestRestorableGCI: %u)", takeOverPtr.p->toStartingNode, takeOverPtr.p->toCurrentTabref, @@ -17543,7 +17545,8 @@ Dbdih::execDUMP_STATE_ORD(Signal* signal } } - if(arg == 7019 && signal->getLength() == 2) + if(arg == 7019 && signal->getLength() == 2 && + signal->theData[1] < MAX_NDB_NODES) { char buf2[8+1]; NodeRecordPtr nodePtr; === modified file 'storage/ndb/src/kernel/blocks/dbspj/Dbspj.hpp' --- a/storage/ndb/src/kernel/blocks/dbspj/Dbspj.hpp 2011-07-09 11:16:31 +0000 +++ b/storage/ndb/src/kernel/blocks/dbspj/Dbspj.hpp 2011-08-22 08:50:01 +0000 @@ -530,21 +530,87 @@ public: typedef SLFifoListImpl ScanFragHandle_list; typedef LocalSLFifoListImpl Local_ScanFragHandle_list; + /** + * This class computes mean and standard deviation incrementally for a series + * of samples. + */ + class IncrementalStatistics + { + public: + /** + * We cannot have a (non-trivial) constructor, since this class is used in + * unions. + */ + void init() + { + m_mean = m_sumSquare = 0.0; + m_noOfSamples = 0; + } + + // Add another sample. + void update(double sample); + + double getMean() const { return m_mean; } + + double getStdDev() const { + return m_noOfSamples < 2 ? 0.0 : sqrt(m_sumSquare/(m_noOfSamples - 1)); + } + + private: + // Mean of all samples + double m_mean; + //Sum of square of differences from the current mean. + double m_sumSquare; + Uint32 m_noOfSamples; + }; // IncrementalStatistics + struct ScanIndexData { Uint16 m_frags_complete; Uint16 m_frags_outstanding; + /** + * The number of fragment for which we have not yet sent SCAN_FRAGREQ but + * will eventually do so. + */ + Uint16 m_frags_not_started; Uint32 m_rows_received; // #execTRANSID_AI Uint32 m_rows_expecting; // Sum(ScanFragConf) Uint32 m_batch_chunks; // #SCAN_FRAGREQ + #SCAN_NEXTREQ to retrieve batch Uint32 m_scanCookie; Uint32 m_fragCount; + // The number of fragments that we scan in parallel. + Uint32 m_parallelism; + /** + * True if this is the first instantiation of this operation. A child + * operation will be instantiated once for each batch of its parent. + */ + bool m_firstExecution; + /** + * Mean and standard deviation for the optimal parallelism for earlier + * executions of this operation. + */ + IncrementalStatistics m_parallelismStat; + // Total number of rows for the current execution of this operation. + Uint32 m_totalRows; + // Total number of bytes for the current execution of this operation. + Uint32 m_totalBytes; + ScanFragHandle_list::HeadPOD m_fragments; // ScanFrag states union { PatternStore::HeadPOD m_prunePattern; Uint32 m_constPrunePtrI; }; + /** + * Max number of rows seen in a batch. Used for calculating the number of + * rows per fragment in the next next batch when using adaptive batch size. + */ + Uint32 m_largestBatchRows; + /** + * Max number of bytes seen in a batch. Used for calculating the number of + * rows per fragment in the next next batch when using adaptive batch size. + */ + Uint32 m_largestBatchBytes; Uint32 m_scanFragReq[ScanFragReq::SignalLength + 2]; }; @@ -1164,6 +1230,13 @@ private: void scanIndex_parent_row(Signal*,Ptr,Ptr, const RowPtr&); void scanIndex_fixupBound(Ptr fragPtr, Uint32 ptrI, Uint32); void scanIndex_send(Signal*,Ptr,Ptr); + void scanIndex_send(Signal* signal, + Ptr requestPtr, + Ptr treeNodePtr, + Uint32 noOfFrags, + Uint32 bs_bytes, + Uint32 bs_rows, + Uint32& batchRange); void scanIndex_batchComplete(Signal* signal); Uint32 scanIndex_findFrag(Local_ScanFragHandle_list &, Ptr&, Uint32 fragId); === modified file 'storage/ndb/src/kernel/blocks/dbspj/DbspjMain.cpp' --- a/storage/ndb/src/kernel/blocks/dbspj/DbspjMain.cpp 2011-07-09 11:16:31 +0000 +++ b/storage/ndb/src/kernel/blocks/dbspj/DbspjMain.cpp 2011-08-22 08:50:01 +0000 @@ -4450,6 +4450,9 @@ Dbspj::parseScanIndex(Build_context& ctx data.m_fragments.init(); data.m_frags_outstanding = 0; data.m_frags_complete = 0; + data.m_frags_not_started = 0; + data.m_parallelismStat.init(); + data.m_firstExecution = true; data.m_batch_chunks = 0; err = parseDA(ctx, requestPtr, treeNodePtr, @@ -5002,6 +5005,7 @@ Dbspj::scanIndex_parent_batch_complete(S } } } + data.m_frags_not_started = data.m_fragCount - data.m_frags_complete; if (data.m_frags_complete == data.m_fragCount) { @@ -5015,7 +5019,118 @@ Dbspj::scanIndex_parent_batch_complete(S /** * When parent's batch is complete, we send our batch */ - scanIndex_send(signal, requestPtr, treeNodePtr); + const ScanFragReq * org = (const ScanFragReq*)data.m_scanFragReq; + ndbassert(org->batch_size_rows >= data.m_fragCount - data.m_frags_complete); + + if (treeNodePtr.p->m_bits & TreeNode::T_SCAN_PARALLEL) + { + jam(); + data.m_parallelism = data.m_fragCount - data.m_frags_complete; + } + else if (data.m_firstExecution) + { + /** + * Having a high parallelism would allow us to fetch data from many + * fragments in parallel and thus reduce the number of round trips. + * On the other hand, we should set parallelism so low that we can fetch + * all data from a fragment in one batch if possible. + * Since this is the first execution, we do not know how many rows or bytes + * this operation is likely to return. Therefore we set parallelism to 1, + * since this gives the lowest penalty if our guess is wrong. + */ + jam(); + data.m_parallelism = 1; + } + else + { + jam(); + /** + * Use statistics from earlier runs of this operation to estimate the + * initial parallelism. We use the mean minus two times the standard + * deviation to have a low risk of setting parallelism to high (as erring + * in the other direction is more costly). + */ + Int32 parallelism = data.m_parallelismStat.getMean() + - 2 * data.m_parallelismStat.getStdDev(); + + if (parallelism < 1) + { + jam(); + parallelism = 1; + } + else if ((data.m_fragCount - data.m_frags_complete) % parallelism != 0) + { + jam(); + /** + * Set parallelism such that we can expect to have similar + * parallelism in each batch. For example if there are 8 remaining + * fragments, then we should fecth 2 times 4 fragments rather than + * 7+1. + */ + const Int32 roundTrips = + 1 + (data.m_fragCount - data.m_frags_complete) / parallelism; + parallelism = (data.m_fragCount - data.m_frags_complete) / roundTrips; + } + + data.m_parallelism = static_cast(parallelism); + +#ifdef DEBUG_SCAN_FRAGREQ + DEBUG("::scanIndex_send() starting index scan with parallelism=" + << data.m_parallelism); +#endif + } + ndbrequire(data.m_parallelism > 0); + + const Uint32 bs_rows = org->batch_size_rows/ data.m_parallelism; + const Uint32 bs_bytes = org->batch_size_bytes / data.m_parallelism; + ndbassert(bs_rows > 0); + ndbassert(bs_bytes > 0); + + data.m_largestBatchRows = 0; + data.m_largestBatchBytes = 0; + data.m_totalRows = 0; + data.m_totalBytes = 0; + + { + Local_ScanFragHandle_list list(m_scanfraghandle_pool, data.m_fragments); + Ptr fragPtr; + list.first(fragPtr); + + while(!fragPtr.isNull()) + { + ndbassert(fragPtr.p->m_state == ScanFragHandle::SFH_NOT_STARTED || + fragPtr.p->m_state == ScanFragHandle::SFH_COMPLETE); + fragPtr.p->m_state = ScanFragHandle::SFH_NOT_STARTED; + list.next(fragPtr); + } + } + + Uint32 batchRange = 0; + scanIndex_send(signal, + requestPtr, + treeNodePtr, + data.m_parallelism, + bs_bytes, + bs_rows, + batchRange); + + data.m_firstExecution = false; + + if (treeNodePtr.p->m_bits & TreeNode::T_SCAN_PARALLEL) + { + ndbrequire((data.m_frags_outstanding + data.m_frags_complete) == + data.m_fragCount); + } + else + { + ndbrequire((data.m_frags_outstanding + data.m_frags_complete) <= + data.m_fragCount); + } + + data.m_batch_chunks = 1; + requestPtr.p->m_cnt_active++; + requestPtr.p->m_outstanding++; + treeNodePtr.p->m_state = TreeNode::TN_ACTIVE; } void @@ -5045,77 +5160,91 @@ Dbspj::scanIndex_parent_batch_repeat(Sig } } +/** + * Ask for the first batch for a number of fragments. + */ void Dbspj::scanIndex_send(Signal* signal, Ptr requestPtr, - Ptr treeNodePtr) + Ptr treeNodePtr, + Uint32 noOfFrags, + Uint32 bs_bytes, + Uint32 bs_rows, + Uint32& batchRange) { - jam(); - - ScanIndexData& data = treeNodePtr.p->m_scanindex_data; - const ScanFragReq * org = (const ScanFragReq*)data.m_scanFragReq; - - Uint32 cnt = 1; - Uint32 bs_rows = org->batch_size_rows; - Uint32 bs_bytes = org->batch_size_bytes; - if (treeNodePtr.p->m_bits & TreeNode::T_SCAN_PARALLEL) - { - jam(); - cnt = data.m_fragCount - data.m_frags_complete; - ndbrequire(cnt > 0); - - bs_rows /= cnt; - bs_bytes /= cnt; - ndbassert(bs_rows > 0); - } - /** * if (m_bits & prunemask): * - Range keys sliced out to each ScanFragHandle * - Else, range keys kept on first (and only) ScanFragHandle */ - Uint32 prunemask = TreeNode::T_PRUNE_PATTERN | TreeNode::T_CONST_PRUNE; + const bool prune = treeNodePtr.p->m_bits & + (TreeNode::T_PRUNE_PATTERN | TreeNode::T_CONST_PRUNE); /** - * Don't release keyInfo if it may be sent multiple times, eiter: - * - Not pruned -> same keyInfo goes to all datanodes. - * - Result handling is REPEAT_SCAN_RESULT and same batch may be - * repeated multiple times due to incomplete bushy X-scans. - * (by ::scanIndex_parent_batch_repeat()) - * - * When not released, ::scanIndex_parent_batch_cleanup() will - * eventually release them when preparing arrival of a new parent batch. + * If scan is repeatable, we must make sure not to release range keys so + * that we canuse them again in the next repetition. */ - const bool release = ((treeNodePtr.p->m_bits & prunemask) != 0 && - (treeNodePtr.p->m_bits & TreeNode::T_SCAN_REPEATABLE) == 0); + const bool repeatable = + (treeNodePtr.p->m_bits & TreeNode::T_SCAN_REPEATABLE) != 0; - ScanFragReq* req = reinterpret_cast(signal->getDataPtrSend()); + ScanIndexData& data = treeNodePtr.p->m_scanindex_data; + ndbassert(noOfFrags > 0); + ndbassert(data.m_frags_not_started >= noOfFrags); + ScanFragReq* const req = + reinterpret_cast(signal->getDataPtrSend()); + const ScanFragReq * const org + = reinterpret_cast(data.m_scanFragReq); memcpy(req, org, sizeof(data.m_scanFragReq)); // req->variableData[0] // set below req->variableData[1] = requestPtr.p->m_rootResultData; req->batch_size_bytes = bs_bytes; req->batch_size_rows = bs_rows; - Ptr fragPtr; + Uint32 requestsSent = 0; Local_ScanFragHandle_list list(m_scanfraghandle_pool, data.m_fragments); - - Uint32 keyInfoPtrI = RNIL; + Ptr fragPtr; list.first(fragPtr); - if ((treeNodePtr.p->m_bits & prunemask) == 0) + Uint32 keyInfoPtrI = fragPtr.p->m_rangePtrI; + ndbrequire(prune || keyInfoPtrI != RNIL); + /** + * Iterate over the list of fragments until we have sent as many + * SCAN_FRAGREQs as we should. + */ + while (requestsSent < noOfFrags) { jam(); - keyInfoPtrI = fragPtr.p->m_rangePtrI; - ndbrequire(keyInfoPtrI != RNIL); - } + ndbassert(!fragPtr.isNull()); - Uint32 batchRange = 0; - for (Uint32 i = 0; i < cnt && !fragPtr.isNull(); list.next(fragPtr)) - { - jam(); + if (fragPtr.p->m_state != ScanFragHandle::SFH_NOT_STARTED) + { + // Skip forward to the frags that we should send. + jam(); + list.next(fragPtr); + continue; + } + + const Uint32 ref = fragPtr.p->m_ref; + + if (noOfFrags==1 && !prune && + data.m_frags_not_started == data.m_fragCount && + refToNode(ref) != getOwnNodeId() && + list.hasNext(fragPtr)) + { + /** + * If we are doing a scan with adaptive parallelism and start with + * parallelism=1 then it makes sense to fetch a batch from a fragment on + * the local data node. The reason for this is that if that fragment + * contains few rows, we may be able to read from several fragments in + * parallel. Then we minimize the total number of round trips (to remote + * data nodes) if we fetch the first fragment batch locally. + */ + jam(); + list.next(fragPtr); + continue; + } SectionHandle handle(this); - Uint32 ref = fragPtr.p->m_ref; Uint32 attrInfoPtrI = treeNodePtr.p->m_send.m_attrInfoPtrI; /** @@ -5124,26 +5253,34 @@ Dbspj::scanIndex_send(Signal* signal, req->senderData = fragPtr.i; req->fragmentNoKeyLen = fragPtr.p->m_fragId; - if ((treeNodePtr.p->m_bits & prunemask)) + if (prune) { jam(); keyInfoPtrI = fragPtr.p->m_rangePtrI; if (keyInfoPtrI == RNIL) { + /** + * Since we use pruning, we can see that no parent rows would hash + * to this fragment. + */ jam(); fragPtr.p->m_state = ScanFragHandle::SFH_COMPLETE; + list.next(fragPtr); continue; } - } - if (release) - { - /** - * If we'll use sendSignal() and we need to send the attrInfo several - * times, we need to copy them - */ - Uint32 tmp = RNIL; - ndbrequire(dupSection(tmp, attrInfoPtrI)); // TODO handle error - attrInfoPtrI = tmp; + + if (!repeatable) + { + /** + * If we'll use sendSignal() and we need to send the attrInfo several + * times, we need to copy them. (For repeatable or unpruned scans + * we use sendSignalNoRelease(), so then we do not need to copy.) + */ + jam(); + Uint32 tmp = RNIL; + ndbrequire(dupSection(tmp, attrInfoPtrI)); // TODO handle error + attrInfoPtrI = tmp; + } } req->variableData[0] = batchRange; @@ -5152,16 +5289,14 @@ Dbspj::scanIndex_send(Signal* signal, handle.m_cnt = 2; #if defined DEBUG_SCAN_FRAGREQ - { - ndbout_c("SCAN_FRAGREQ to %x", ref); - printSCAN_FRAGREQ(stdout, signal->getDataPtrSend(), - NDB_ARRAY_SIZE(treeNodePtr.p->m_scanfrag_data.m_scanFragReq), - DBLQH); - printf("ATTRINFO: "); - print(handle.m_ptr[0], stdout); - printf("KEYINFO: "); - print(handle.m_ptr[1], stdout); - } + ndbout_c("SCAN_FRAGREQ to %x", ref); + printSCAN_FRAGREQ(stdout, signal->getDataPtrSend(), + NDB_ARRAY_SIZE(treeNodePtr.p->m_scanfrag_data.m_scanFragReq), + DBLQH); + printf("ATTRINFO: "); + print(handle.m_ptr[0], stdout); + printf("KEYINFO: "); + print(handle.m_ptr[1], stdout); #endif if (refToNode(ref) == getOwnNodeId()) @@ -5173,8 +5308,13 @@ Dbspj::scanIndex_send(Signal* signal, c_Counters.incr_counter(CI_REMOTE_RANGE_SCANS_SENT, 1); } - if (release) + if (prune && !repeatable) { + /** + * For a non-repeatable pruned scan, key info is unique for each + * fragment and therefore cannot be reused, so we release key info + * right away. + */ jam(); sendSignal(ref, GSN_SCAN_FRAGREQ, signal, NDB_ARRAY_SIZE(data.m_scanFragReq), JBB, &handle); @@ -5183,32 +5323,24 @@ Dbspj::scanIndex_send(Signal* signal, } else { + /** + * Reuse key info for multiple fragments and/or multiple repetitions + * of the scan. + */ jam(); sendSignalNoRelease(ref, GSN_SCAN_FRAGREQ, signal, NDB_ARRAY_SIZE(data.m_scanFragReq), JBB, &handle); } handle.clear(); - i++; fragPtr.p->m_state = ScanFragHandle::SFH_SCANNING; // running data.m_frags_outstanding++; batchRange += bs_rows; - } + requestsSent++; + list.next(fragPtr); + } // while (requestsSent < noOfFrags) - if (treeNodePtr.p->m_bits & TreeNode::T_SCAN_PARALLEL) - { - ndbrequire(data.m_frags_outstanding == - data.m_fragCount - data.m_frags_complete); - } - else - { - ndbrequire(data.m_frags_outstanding == 1); - } - - data.m_batch_chunks = 1; - requestPtr.p->m_cnt_active++; - requestPtr.p->m_outstanding++; - treeNodePtr.p->m_state = TreeNode::TN_ACTIVE; + data.m_frags_not_started -= requestsSent; } void @@ -5282,6 +5414,10 @@ Dbspj::scanIndex_execSCAN_FRAGCONF(Signa } requestPtr.p->m_rows += rows; + data.m_totalRows += rows; + data.m_totalBytes += conf->total_len; + data.m_largestBatchRows = MAX(data.m_largestBatchRows, rows); + data.m_largestBatchBytes = MAX(data.m_largestBatchBytes, conf->total_len); if (!treeNodePtr.p->isLeaf()) { @@ -5302,7 +5438,9 @@ Dbspj::scanIndex_execSCAN_FRAGCONF(Signa ndbrequire(data.m_frags_complete < data.m_fragCount); data.m_frags_complete++; - if (data.m_frags_complete == data.m_fragCount) + if (data.m_frags_complete == data.m_fragCount || + ((requestPtr.p->m_state & Request::RS_ABORTING) != 0 && + data.m_fragCount == (data.m_frags_complete + data.m_frags_not_started))) { jam(); ndbrequire(requestPtr.p->m_cnt_active); @@ -5314,6 +5452,32 @@ Dbspj::scanIndex_execSCAN_FRAGCONF(Signa if (data.m_frags_outstanding == 0) { + const ScanFragReq * const org + = reinterpret_cast(data.m_scanFragReq); + + if (data.m_frags_complete == data.m_fragCount) + { + jam(); + /** + * Calculate what would have been the optimal parallelism for the + * scan instance that we have just completed, and update + * 'parallelismStat' with this value. We then use this statistics to set + * the initial parallelism for the next instance of this operation. + */ + double parallelism = data.m_fragCount; + if (data.m_totalRows > 0) + { + parallelism = MIN(parallelism, + double(org->batch_size_rows) / data.m_totalRows); + } + if (data.m_totalBytes > 0) + { + parallelism = MIN(parallelism, + double(org->batch_size_bytes) / data.m_totalBytes); + } + data.m_parallelismStat.update(parallelism); + } + /** * Don't reportBatchComplete to children if we're aborting... */ @@ -5364,7 +5528,7 @@ Dbspj::scanIndex_execSCAN_FRAGREF(Signal ndbrequire(data.m_frags_outstanding > 0); data.m_frags_outstanding--; - if (data.m_frags_complete == data.m_fragCount) + if (data.m_fragCount == (data.m_frags_complete + data.m_frags_not_started)) { jam(); ndbrequire(requestPtr.p->m_cnt_active); @@ -5390,21 +5554,78 @@ Dbspj::scanIndex_execSCAN_NEXTREQ(Signal jam(); ScanIndexData& data = treeNodePtr.p->m_scanindex_data; + const ScanFragReq * org = (const ScanFragReq*)data.m_scanFragReq; data.m_rows_received = 0; data.m_rows_expecting = 0; ndbassert(data.m_frags_outstanding == 0); ndbrequire(data.m_frags_complete < data.m_fragCount); - Uint32 cnt = data.m_fragCount - data.m_frags_complete; if ((treeNodePtr.p->m_bits & TreeNode::T_SCAN_PARALLEL) == 0) { jam(); - cnt = 1; + /** + * Since fetching few but large batches is more efficient, we + * set parallelism to the lowest value where we can still expect each + * batch to be full. + */ + if (data.m_largestBatchRows < org->batch_size_rows/data.m_parallelism && + data.m_largestBatchBytes < org->batch_size_bytes/data.m_parallelism) + { + jam(); + data.m_parallelism = data.m_fragCount - data.m_frags_complete; + if (data.m_largestBatchRows > 0) + { + jam(); + data.m_parallelism = + MIN(org->batch_size_rows / data.m_largestBatchRows, + data.m_parallelism); + } + if (data.m_largestBatchBytes > 0) + { + jam(); + data.m_parallelism = + MIN(data.m_parallelism, + org->batch_size_bytes/data.m_largestBatchBytes); + } + if (data.m_frags_complete == 0 && + data.m_frags_not_started % data.m_parallelism != 0) + { + jam(); + /** + * Set parallelism such that we can expect to have similar + * parallelism in each batch. For example if there are 8 remaining + * fragments, then we should fecth 2 times 4 fragments rather than + * 7+1. + */ + const Uint32 roundTrips = + 1 + data.m_frags_not_started / data.m_parallelism; + data.m_parallelism = data.m_frags_not_started / roundTrips; + } + } + else + { + jam(); + // We get full batches, so we should lower parallelism. + data.m_parallelism = MIN(data.m_fragCount - data.m_frags_complete, + MAX(1, data.m_parallelism/2)); + } + ndbassert(data.m_parallelism > 0); +#ifdef DEBUG_SCAN_FRAGREQ + DEBUG("::scanIndex_execSCAN_NEXTREQ() Asking for new batches from " << + data.m_parallelism << + " fragments with " << org->batch_size_rows/data.m_parallelism << + " rows and " << org->batch_size_bytes/data.m_parallelism << + " bytes."); +#endif + } + else + { + jam(); + data.m_parallelism = data.m_fragCount - data.m_frags_complete; } - const ScanFragReq * org = (const ScanFragReq*)data.m_scanFragReq; - const Uint32 bs_rows = org->batch_size_rows/cnt; + const Uint32 bs_rows = org->batch_size_rows/data.m_parallelism; ndbassert(bs_rows > 0); ScanFragNextReq* req = reinterpret_cast(signal->getDataPtrSend()); @@ -5413,20 +5634,27 @@ Dbspj::scanIndex_execSCAN_NEXTREQ(Signal req->transId1 = requestPtr.p->m_transId[0]; req->transId2 = requestPtr.p->m_transId[1]; req->batch_size_rows = bs_rows; - req->batch_size_bytes = org->batch_size_bytes/cnt; + req->batch_size_bytes = org->batch_size_bytes/data.m_parallelism; Uint32 batchRange = 0; Ptr fragPtr; + Uint32 sentFragCount = 0; + { + /** + * First, ask for more data from fragments that are already started. + */ Local_ScanFragHandle_list list(m_scanfraghandle_pool, data.m_fragments); list.first(fragPtr); - for (Uint32 i = 0; i < cnt && !fragPtr.isNull(); list.next(fragPtr)) + while (sentFragCount < data.m_parallelism && !fragPtr.isNull()) { jam(); + ndbassert(fragPtr.p->m_state == ScanFragHandle::SFH_WAIT_NEXTREQ || + fragPtr.p->m_state == ScanFragHandle::SFH_COMPLETE || + fragPtr.p->m_state == ScanFragHandle::SFH_NOT_STARTED); if (fragPtr.p->m_state == ScanFragHandle::SFH_WAIT_NEXTREQ) { jam(); - i++; data.m_frags_outstanding++; req->variableData[0] = batchRange; fragPtr.p->m_state = ScanFragHandle::SFH_SCANNING; @@ -5434,6 +5662,7 @@ Dbspj::scanIndex_execSCAN_NEXTREQ(Signal DEBUG("scanIndex_execSCAN_NEXTREQ to: " << hex << treeNodePtr.p->m_send.m_ref + << ", m_node_no=" << treeNodePtr.p->m_node_no << ", senderData: " << req->senderData); #ifdef DEBUG_SCAN_FRAGREQ @@ -5445,9 +5674,27 @@ Dbspj::scanIndex_execSCAN_NEXTREQ(Signal sendSignal(fragPtr.p->m_ref, GSN_SCAN_NEXTREQ, signal, ScanFragNextReq::SignalLength + 1, JBB); + sentFragCount++; + } + list.next(fragPtr); } } + if (sentFragCount < data.m_parallelism) + { + /** + * Then start new fragments until we reach data.m_parallelism. + */ + jam(); + ndbassert(data.m_frags_not_started != 0); + scanIndex_send(signal, + requestPtr, + treeNodePtr, + data.m_parallelism - sentFragCount, + org->batch_size_bytes/data.m_parallelism, + bs_rows, + batchRange); + } /** * cursor should not have been positioned here... * unless we actually had something more to send. @@ -5544,14 +5791,28 @@ Dbspj::scanIndex_abort(Signal* signal, } } - ndbrequire(cnt_waiting + cnt_scanning > 0); if (cnt_scanning == 0) { - /** - * If all were waiting...this should increase m_outstanding - */ - jam(); - requestPtr.p->m_outstanding++; + if (cnt_waiting > 0) + { + /** + * If all were waiting...this should increase m_outstanding + */ + jam(); + requestPtr.p->m_outstanding++; + } + else + { + /** + * All fragments are either complete or not yet started, so there is + * nothing to abort. + */ + jam(); + ndbassert(data.m_frags_not_started > 0); + ndbrequire(requestPtr.p->m_cnt_active); + requestPtr.p->m_cnt_active--; + treeNodePtr.p->m_state = TreeNode::TN_INACTIVE; + } } } @@ -5603,6 +5864,8 @@ Dbspj::scanIndex_execNODE_FAILREP(Signal jam(); ndbrequire(data.m_frags_complete < data.m_fragCount); data.m_frags_complete++; + ndbrequire(data.m_frags_not_started > 0); + data.m_frags_not_started--; // fall through case ScanFragHandle::SFH_COMPLETE: jam(); @@ -5637,8 +5900,8 @@ Dbspj::scanIndex_execNODE_FAILREP(Signal requestPtr.p->m_outstanding--; } - if (save1 != data.m_fragCount - && data.m_frags_complete == data.m_fragCount) + if (save1 != 0 && + data.m_fragCount == (data.m_frags_complete + data.m_frags_not_started)) { jam(); ndbrequire(requestPtr.p->m_cnt_active); @@ -5654,7 +5917,8 @@ Dbspj::scanIndex_release_rangekeys(Ptr treeNodePtr) { jam(); - DEBUG("scanIndex_release_rangekeys(), m_node_no: " << treeNodePtr.p->m_node_no); + DEBUG("scanIndex_release_rangekeys(), tree node " << treeNodePtr.i + << " m_node_no: " << treeNodePtr.p->m_node_no); ScanIndexData& data = treeNodePtr.p->m_scanindex_data; Local_ScanFragHandle_list list(m_scanfraghandle_pool, data.m_fragments); @@ -7115,3 +7379,15 @@ void Dbspj::execDBINFO_SCANREQ(Signal *s ndbinfo_send_scan_conf(signal, req, rl); } // Dbspj::execDBINFO_SCANREQ(Signal *signal) + +void Dbspj::IncrementalStatistics::update(double sample) +{ + // Prevent wrap-around + if(m_noOfSamples < 0xffffffff) + { + m_noOfSamples++; + const double delta = sample - m_mean; + m_mean += delta/m_noOfSamples; + m_sumSquare += delta * (sample - m_mean); + } +} === modified file 'storage/ndb/src/kernel/blocks/qmgr/QmgrMain.cpp' --- a/storage/ndb/src/kernel/blocks/qmgr/QmgrMain.cpp 2011-07-09 11:16:31 +0000 +++ b/storage/ndb/src/kernel/blocks/qmgr/QmgrMain.cpp 2011-08-22 08:50:01 +0000 @@ -2972,26 +2972,6 @@ void Qmgr::checkStartInterface(Signal* s nodePtr.p->m_failconf_blocks[3], nodePtr.p->m_failconf_blocks[4]); warningEvent("%s", buf); - - for (Uint32 i = 0; im_failconf_blocks); - i++) - { - jam(); - if (nodePtr.p->m_failconf_blocks[i] != 0) - { - jam(); - signal->theData[0] = 7019; - signal->theData[1] = nodePtr.i; - sendSignal(numberToRef(nodePtr.p->m_failconf_blocks[i], - getOwnNodeId()), - GSN_DUMP_STATE_ORD, signal, 2, JBB); - } - else - { - jam(); - break; - } - } } } } === modified file 'storage/ndb/src/ndbapi/NdbQueryOperation.cpp' --- a/storage/ndb/src/ndbapi/NdbQueryOperation.cpp 2011-08-17 13:27:04 +0000 +++ b/storage/ndb/src/ndbapi/NdbQueryOperation.cpp 2011-08-22 08:50:01 +0000 @@ -57,6 +57,7 @@ static const int Err_FunctionNotImplemen static const int Err_UnknownColumn = 4004; static const int Err_ReceiveTimedOut = 4008; static const int Err_NodeFailCausedAbort = 4028; +static const int Err_ParameterError = 4118; static const int Err_SimpleDirtyReadFailed = 4119; static const int Err_WrongFieldLength = 4209; static const int Err_ReadTooMuch = 4257; @@ -72,6 +73,21 @@ static const Uint16 tupleNotFound = 0xff /** Set to true to trace incomming signals.*/ const bool traceSignals = false; +enum +{ + /** + * Set NdbQueryOperationImpl::m_parallelism to this value to indicate that + * scan parallelism should be adaptive. + */ + Parallelism_adaptive = 0xffff0000, + + /** + * Set NdbQueryOperationImpl::m_parallelism to this value to indicate that + * all fragments should be scanned in parallel. + */ + Parallelism_max = 0xffff0001 +}; + /** * A class for accessing the correlation data at the end of a tuple (for * scan queries). These data have the following layout: @@ -267,6 +283,10 @@ public: void postFetchRelease(); private: + /** No copying.*/ + NdbRootFragment(const NdbRootFragment&); + NdbRootFragment& operator=(const NdbRootFragment&); + STATIC_CONST( voidFragNo = 0xffffffff); /** Enclosing query.*/ @@ -1486,6 +1506,14 @@ int NdbQueryOperation::setParallelism(Ui return m_impl.setParallelism(parallelism); } +int NdbQueryOperation::setMaxParallelism(){ + return m_impl.setMaxParallelism(); +} + +int NdbQueryOperation::setAdaptiveParallelism(){ + return m_impl.setAdaptiveParallelism(); +} + int NdbQueryOperation::setBatchSize(Uint32 batchSize){ return m_impl.setBatchSize(batchSize); } @@ -2619,16 +2647,17 @@ NdbQueryImpl::prepareSend() { /* For the first batch, we read from all fragments for both ordered * and unordered scans.*/ - if (getQueryOperation(0U).m_parallelism > 0) + if (getQueryOperation(0U).m_parallelism == Parallelism_max) { m_rootFragCount - = MIN(getRoot().getQueryOperationDef().getTable().getFragmentCount(), - getQueryOperation(0U).m_parallelism); + = getRoot().getQueryOperationDef().getTable().getFragmentCount(); } else { + assert(getQueryOperation(0U).m_parallelism != Parallelism_adaptive); m_rootFragCount - = getRoot().getQueryOperationDef().getTable().getFragmentCount(); + = MIN(getRoot().getQueryOperationDef().getTable().getFragmentCount(), + getQueryOperation(0U).m_parallelism); } Ndb* const ndb = m_transaction.getNdb(); @@ -3663,7 +3692,8 @@ NdbQueryOperationImpl::NdbQueryOperation m_ordering(NdbQueryOptions::ScanOrdering_unordered), m_interpretedCode(NULL), m_diskInUserProjection(false), - m_parallelism(0), + m_parallelism(def.getQueryOperationIx() == 0 + ? Parallelism_max : Parallelism_adaptive), m_rowSize(0xffffffff) { if (errno == ENOMEM) @@ -4262,9 +4292,10 @@ NdbQueryOperationImpl m_ndbRecord, m_firstRecAttr, 0, // Key size. - getRoot().m_parallelism > 0 ? - getRoot().m_parallelism : - m_queryImpl.getRootFragCount(), + getRoot().m_parallelism + == Parallelism_max ? + m_queryImpl.getRootFragCount() : + getRoot().m_parallelism, maxBatchRows, batchByteSize, firstBatchRows); @@ -4450,7 +4481,12 @@ NdbQueryOperationImpl::prepareAttrInfo(U firstBatchRows); assert(batchRows==getMaxBatchRows()); assert(batchRows==firstBatchRows); - requestInfo |= QN_ScanIndexParameters::SIP_PARALLEL; // FIXME: SPJ always assume. SIP_PARALLEL + assert(m_parallelism == Parallelism_max || + m_parallelism == Parallelism_adaptive); + if (m_parallelism == Parallelism_max) + { + requestInfo |= QN_ScanIndexParameters::SIP_PARALLEL; + } param->requestInfo = requestInfo; param->batchSize = ((Uint16)batchByteSize << 16) | (Uint16)firstBatchRows; param->resultData = getIdOfReceiver(); @@ -4954,7 +4990,7 @@ NdbQueryOperationImpl::setOrdering(NdbQu return -1; } - if (m_parallelism != 0) + if (m_parallelism != Parallelism_max) { getQuery().setErrorCode(QRY_SEQUENTIAL_SCAN_SORTED); return -1; @@ -5049,10 +5085,40 @@ int NdbQueryOperationImpl::setParallelis getQuery().setErrorCode(Err_FunctionNotImplemented); return -1; } + else if (parallelism < 1 || parallelism > MAX_NDB_PARTITIONS) + { + getQuery().setErrorCode(Err_ParameterError); + return -1; + } m_parallelism = parallelism; return 0; } +int NdbQueryOperationImpl::setMaxParallelism(){ + if (!getQueryOperationDef().isScanOperation()) + { + getQuery().setErrorCode(QRY_WRONG_OPERATION_TYPE); + return -1; + } + m_parallelism = Parallelism_max; + return 0; +} + +int NdbQueryOperationImpl::setAdaptiveParallelism(){ + if (!getQueryOperationDef().isScanOperation()) + { + getQuery().setErrorCode(QRY_WRONG_OPERATION_TYPE); + return -1; + } + else if (getQueryOperationDef().getQueryOperationIx() == 0) + { + getQuery().setErrorCode(Err_FunctionNotImplemented); + return -1; + } + m_parallelism = Parallelism_adaptive; + return 0; +} + int NdbQueryOperationImpl::setBatchSize(Uint32 batchSize){ if (!getQueryOperationDef().isScanOperation()) { === modified file 'storage/ndb/src/ndbapi/NdbQueryOperation.hpp' --- a/storage/ndb/src/ndbapi/NdbQueryOperation.hpp 2011-06-20 13:25:48 +0000 +++ b/storage/ndb/src/ndbapi/NdbQueryOperation.hpp 2011-08-22 08:35:35 +0000 @@ -335,13 +335,29 @@ public: NdbQueryOptions::ScanOrdering getOrdering() const; /** - * Set the number of fragments to be scanned in parallel for the root - * operation of this query. This only applies to table scans and non-sorted - * scans of ordered indexes. + * Set the number of fragments to be scanned in parallel. This only applies + * to table scans and non-sorted scans of ordered indexes. This method is + * only implemented for then root scan operation. * @return 0 if ok, -1 in case of error (call getNdbError() for details.) */ int setParallelism(Uint32 parallelism); + /** + * Set the number of fragments to be scanned in parallel to the maximum + * possible value. This is the default for the root scan operation. + * @return 0 if ok, -1 in case of error (call getNdbError() for details.) + */ + int setMaxParallelism(); + + /** + * Let the system dynamically choose the number of fragments to scan in + * parallel. The system will try to choose a value that gives optimal + * performance. This is the default for all scans but the root scan. This + * method only implemented for non-root scan operations. + * @return 0 if ok, -1 in case of error (call getNdbError() for details.) + */ + int setAdaptiveParallelism(); + /** Set the batch size (max rows per batch) for this operation. This * only applies to scan operations, as lookup operations always will * have the same batch size as its parent operation, or 1 if it is the === modified file 'storage/ndb/src/ndbapi/NdbQueryOperationImpl.hpp' --- a/storage/ndb/src/ndbapi/NdbQueryOperationImpl.hpp 2011-08-17 13:16:22 +0000 +++ b/storage/ndb/src/ndbapi/NdbQueryOperationImpl.hpp 2011-08-22 08:35:35 +0000 @@ -661,14 +661,30 @@ public: NdbQueryOptions::ScanOrdering getOrdering() const { return m_ordering; } - /** - * Set the number of fragments to be scanned in parallel for the root - * operation of this query. This only applies to table scans and non-sorted - * scans of ordered indexes. + /** + * Set the number of fragments to be scanned in parallel. This only applies + * to table scans and non-sorted scans of ordered indexes. This method is + * only implemented for then root scan operation. * @return 0 if ok, -1 in case of error (call getNdbError() for details.) */ int setParallelism(Uint32 parallelism); + /** + * Set the number of fragments to be scanned in parallel to the maximum + * possible value. This is the default for the root scan operation. + * @return 0 if ok, -1 in case of error (call getNdbError() for details.) + */ + int setMaxParallelism(); + + /** + * Let the system dynamically choose the number of fragments to scan in + * parallel. The system will try to choose a value that gives optimal + * performance. This is the default for all scans but the root scan. This + * method only implemented for non-root scan operations. + * @return 0 if ok, -1 in case of error (call getNdbError() for details.) + */ + int setAdaptiveParallelism(); + /** Set the batch size (max rows per batch) for this operation. This * only applies to scan operations, as lookup operations always will * have the same batch size as its parent operation, or 1 if it is the No bundle (reason: useless for push emails).