3542 Jan Wedvik 2011-08-22 [merge]
Merged from mysql-5.1-telco-7.0
modified:
mysql-test/suite/ndb/r/ndb_join_pushdown.result
storage/ndb/src/kernel/blocks/dbdih/DbdihMain.cpp
storage/ndb/src/kernel/blocks/dbspj/Dbspj.hpp
storage/ndb/src/kernel/blocks/dbspj/DbspjMain.cpp
storage/ndb/src/kernel/blocks/qmgr/QmgrMain.cpp
storage/ndb/src/ndbapi/NdbQueryOperation.cpp
storage/ndb/src/ndbapi/NdbQueryOperation.hpp
storage/ndb/src/ndbapi/NdbQueryOperationImpl.hpp
3541 Ole John Aske 2011-08-17 [merge]
Merge 70 -> 70-spj
modified:
storage/ndb/src/ndbapi/NdbQueryOperation.cpp
storage/ndb/src/ndbapi/NdbQueryOperationImpl.hpp
=== modified file 'mysql-test/suite/ndb/r/ndb_join_pushdown.result'
--- a/mysql-test/suite/ndb/r/ndb_join_pushdown.result 2011-06-30 09:17:30 +0000
+++ b/mysql-test/suite/ndb/r/ndb_join_pushdown.result 2011-08-22 08:50:01 +0000
@@ -5211,7 +5211,7 @@ drop table spj_save_counts;
drop table spj_counts_at_startup;
drop table spj_counts_at_end;
scan_count
-2511
+2520
pruned_scan_count
8
sorted_scan_count
=== modified file 'storage/ndb/src/kernel/blocks/dbdih/DbdihMain.cpp'
--- a/storage/ndb/src/kernel/blocks/dbdih/DbdihMain.cpp 2011-07-09 11:16:31 +0000
+++ b/storage/ndb/src/kernel/blocks/dbdih/DbdihMain.cpp 2011-08-22 08:50:01 +0000
@@ -3880,6 +3880,8 @@ done:
ndbassert(gci == restorableGCI);
replicaPtr.p->m_restorable_gci = gci;
Uint32 startGci = replicaPtr.p->maxGciCompleted[maxLcpIndex] + 1;
+ if (startGci > gci)
+ startGci = gci;
ndbout_c("node: %d tab: %d frag: %d restore lcp: %u(idx: %u) maxGciStarted: %u maxGciCompleted: %u (restorable: %u(%u) newestRestorableGCI: %u)",
takeOverPtr.p->toStartingNode,
takeOverPtr.p->toCurrentTabref,
@@ -17543,7 +17545,8 @@ Dbdih::execDUMP_STATE_ORD(Signal* signal
}
}
- if(arg == 7019 && signal->getLength() == 2)
+ if(arg == 7019 && signal->getLength() == 2 &&
+ signal->theData[1] < MAX_NDB_NODES)
{
char buf2[8+1];
NodeRecordPtr nodePtr;
=== modified file 'storage/ndb/src/kernel/blocks/dbspj/Dbspj.hpp'
--- a/storage/ndb/src/kernel/blocks/dbspj/Dbspj.hpp 2011-07-09 11:16:31 +0000
+++ b/storage/ndb/src/kernel/blocks/dbspj/Dbspj.hpp 2011-08-22 08:50:01 +0000
@@ -530,21 +530,87 @@ public:
typedef SLFifoListImpl<ScanFragHandle_pool, ScanFragHandle> ScanFragHandle_list;
typedef LocalSLFifoListImpl<ScanFragHandle_pool, ScanFragHandle> Local_ScanFragHandle_list;
+ /**
+ * This class computes mean and standard deviation incrementally for a series
+ * of samples.
+ */
+ class IncrementalStatistics
+ {
+ public:
+ /**
+ * We cannot have a (non-trivial) constructor, since this class is used in
+ * unions.
+ */
+ void init()
+ {
+ m_mean = m_sumSquare = 0.0;
+ m_noOfSamples = 0;
+ }
+
+ // Add another sample.
+ void update(double sample);
+
+ double getMean() const { return m_mean; }
+
+ double getStdDev() const {
+ return m_noOfSamples < 2 ? 0.0 : sqrt(m_sumSquare/(m_noOfSamples - 1));
+ }
+
+ private:
+ // Mean of all samples
+ double m_mean;
+ //Sum of square of differences from the current mean.
+ double m_sumSquare;
+ Uint32 m_noOfSamples;
+ }; // IncrementalStatistics
+
struct ScanIndexData
{
Uint16 m_frags_complete;
Uint16 m_frags_outstanding;
+ /**
+ * The number of fragment for which we have not yet sent SCAN_FRAGREQ but
+ * will eventually do so.
+ */
+ Uint16 m_frags_not_started;
Uint32 m_rows_received; // #execTRANSID_AI
Uint32 m_rows_expecting; // Sum(ScanFragConf)
Uint32 m_batch_chunks; // #SCAN_FRAGREQ + #SCAN_NEXTREQ to retrieve batch
Uint32 m_scanCookie;
Uint32 m_fragCount;
+ // The number of fragments that we scan in parallel.
+ Uint32 m_parallelism;
+ /**
+ * True if this is the first instantiation of this operation. A child
+ * operation will be instantiated once for each batch of its parent.
+ */
+ bool m_firstExecution;
+ /**
+ * Mean and standard deviation for the optimal parallelism for earlier
+ * executions of this operation.
+ */
+ IncrementalStatistics m_parallelismStat;
+ // Total number of rows for the current execution of this operation.
+ Uint32 m_totalRows;
+ // Total number of bytes for the current execution of this operation.
+ Uint32 m_totalBytes;
+
ScanFragHandle_list::HeadPOD m_fragments; // ScanFrag states
union
{
PatternStore::HeadPOD m_prunePattern;
Uint32 m_constPrunePtrI;
};
+ /**
+ * Max number of rows seen in a batch. Used for calculating the number of
+ * rows per fragment in the next next batch when using adaptive batch size.
+ */
+ Uint32 m_largestBatchRows;
+ /**
+ * Max number of bytes seen in a batch. Used for calculating the number of
+ * rows per fragment in the next next batch when using adaptive batch size.
+ */
+ Uint32 m_largestBatchBytes;
Uint32 m_scanFragReq[ScanFragReq::SignalLength + 2];
};
@@ -1164,6 +1230,13 @@ private:
void scanIndex_parent_row(Signal*,Ptr<Request>,Ptr<TreeNode>, const RowPtr&);
void scanIndex_fixupBound(Ptr<ScanFragHandle> fragPtr, Uint32 ptrI, Uint32);
void scanIndex_send(Signal*,Ptr<Request>,Ptr<TreeNode>);
+ void scanIndex_send(Signal* signal,
+ Ptr<Request> requestPtr,
+ Ptr<TreeNode> treeNodePtr,
+ Uint32 noOfFrags,
+ Uint32 bs_bytes,
+ Uint32 bs_rows,
+ Uint32& batchRange);
void scanIndex_batchComplete(Signal* signal);
Uint32 scanIndex_findFrag(Local_ScanFragHandle_list &, Ptr<ScanFragHandle>&,
Uint32 fragId);
=== modified file 'storage/ndb/src/kernel/blocks/dbspj/DbspjMain.cpp'
--- a/storage/ndb/src/kernel/blocks/dbspj/DbspjMain.cpp 2011-07-09 11:16:31 +0000
+++ b/storage/ndb/src/kernel/blocks/dbspj/DbspjMain.cpp 2011-08-22 08:50:01 +0000
@@ -4450,6 +4450,9 @@ Dbspj::parseScanIndex(Build_context& ctx
data.m_fragments.init();
data.m_frags_outstanding = 0;
data.m_frags_complete = 0;
+ data.m_frags_not_started = 0;
+ data.m_parallelismStat.init();
+ data.m_firstExecution = true;
data.m_batch_chunks = 0;
err = parseDA(ctx, requestPtr, treeNodePtr,
@@ -5002,6 +5005,7 @@ Dbspj::scanIndex_parent_batch_complete(S
}
}
}
+ data.m_frags_not_started = data.m_fragCount - data.m_frags_complete;
if (data.m_frags_complete == data.m_fragCount)
{
@@ -5015,7 +5019,118 @@ Dbspj::scanIndex_parent_batch_complete(S
/**
* When parent's batch is complete, we send our batch
*/
- scanIndex_send(signal, requestPtr, treeNodePtr);
+ const ScanFragReq * org = (const ScanFragReq*)data.m_scanFragReq;
+ ndbassert(org->batch_size_rows >= data.m_fragCount - data.m_frags_complete);
+
+ if (treeNodePtr.p->m_bits & TreeNode::T_SCAN_PARALLEL)
+ {
+ jam();
+ data.m_parallelism = data.m_fragCount - data.m_frags_complete;
+ }
+ else if (data.m_firstExecution)
+ {
+ /**
+ * Having a high parallelism would allow us to fetch data from many
+ * fragments in parallel and thus reduce the number of round trips.
+ * On the other hand, we should set parallelism so low that we can fetch
+ * all data from a fragment in one batch if possible.
+ * Since this is the first execution, we do not know how many rows or bytes
+ * this operation is likely to return. Therefore we set parallelism to 1,
+ * since this gives the lowest penalty if our guess is wrong.
+ */
+ jam();
+ data.m_parallelism = 1;
+ }
+ else
+ {
+ jam();
+ /**
+ * Use statistics from earlier runs of this operation to estimate the
+ * initial parallelism. We use the mean minus two times the standard
+ * deviation to have a low risk of setting parallelism to high (as erring
+ * in the other direction is more costly).
+ */
+ Int32 parallelism = data.m_parallelismStat.getMean()
+ - 2 * data.m_parallelismStat.getStdDev();
+
+ if (parallelism < 1)
+ {
+ jam();
+ parallelism = 1;
+ }
+ else if ((data.m_fragCount - data.m_frags_complete) % parallelism != 0)
+ {
+ jam();
+ /**
+ * Set parallelism such that we can expect to have similar
+ * parallelism in each batch. For example if there are 8 remaining
+ * fragments, then we should fecth 2 times 4 fragments rather than
+ * 7+1.
+ */
+ const Int32 roundTrips =
+ 1 + (data.m_fragCount - data.m_frags_complete) / parallelism;
+ parallelism = (data.m_fragCount - data.m_frags_complete) / roundTrips;
+ }
+
+ data.m_parallelism = static_cast<Uint32>(parallelism);
+
+#ifdef DEBUG_SCAN_FRAGREQ
+ DEBUG("::scanIndex_send() starting index scan with parallelism="
+ << data.m_parallelism);
+#endif
+ }
+ ndbrequire(data.m_parallelism > 0);
+
+ const Uint32 bs_rows = org->batch_size_rows/ data.m_parallelism;
+ const Uint32 bs_bytes = org->batch_size_bytes / data.m_parallelism;
+ ndbassert(bs_rows > 0);
+ ndbassert(bs_bytes > 0);
+
+ data.m_largestBatchRows = 0;
+ data.m_largestBatchBytes = 0;
+ data.m_totalRows = 0;
+ data.m_totalBytes = 0;
+
+ {
+ Local_ScanFragHandle_list list(m_scanfraghandle_pool, data.m_fragments);
+ Ptr<ScanFragHandle> fragPtr;
+ list.first(fragPtr);
+
+ while(!fragPtr.isNull())
+ {
+ ndbassert(fragPtr.p->m_state == ScanFragHandle::SFH_NOT_STARTED ||
+ fragPtr.p->m_state == ScanFragHandle::SFH_COMPLETE);
+ fragPtr.p->m_state = ScanFragHandle::SFH_NOT_STARTED;
+ list.next(fragPtr);
+ }
+ }
+
+ Uint32 batchRange = 0;
+ scanIndex_send(signal,
+ requestPtr,
+ treeNodePtr,
+ data.m_parallelism,
+ bs_bytes,
+ bs_rows,
+ batchRange);
+
+ data.m_firstExecution = false;
+
+ if (treeNodePtr.p->m_bits & TreeNode::T_SCAN_PARALLEL)
+ {
+ ndbrequire((data.m_frags_outstanding + data.m_frags_complete) ==
+ data.m_fragCount);
+ }
+ else
+ {
+ ndbrequire((data.m_frags_outstanding + data.m_frags_complete) <=
+ data.m_fragCount);
+ }
+
+ data.m_batch_chunks = 1;
+ requestPtr.p->m_cnt_active++;
+ requestPtr.p->m_outstanding++;
+ treeNodePtr.p->m_state = TreeNode::TN_ACTIVE;
}
void
@@ -5045,77 +5160,91 @@ Dbspj::scanIndex_parent_batch_repeat(Sig
}
}
+/**
+ * Ask for the first batch for a number of fragments.
+ */
void
Dbspj::scanIndex_send(Signal* signal,
Ptr<Request> requestPtr,
- Ptr<TreeNode> treeNodePtr)
+ Ptr<TreeNode> treeNodePtr,
+ Uint32 noOfFrags,
+ Uint32 bs_bytes,
+ Uint32 bs_rows,
+ Uint32& batchRange)
{
- jam();
-
- ScanIndexData& data = treeNodePtr.p->m_scanindex_data;
- const ScanFragReq * org = (const ScanFragReq*)data.m_scanFragReq;
-
- Uint32 cnt = 1;
- Uint32 bs_rows = org->batch_size_rows;
- Uint32 bs_bytes = org->batch_size_bytes;
- if (treeNodePtr.p->m_bits & TreeNode::T_SCAN_PARALLEL)
- {
- jam();
- cnt = data.m_fragCount - data.m_frags_complete;
- ndbrequire(cnt > 0);
-
- bs_rows /= cnt;
- bs_bytes /= cnt;
- ndbassert(bs_rows > 0);
- }
-
/**
* if (m_bits & prunemask):
* - Range keys sliced out to each ScanFragHandle
* - Else, range keys kept on first (and only) ScanFragHandle
*/
- Uint32 prunemask = TreeNode::T_PRUNE_PATTERN | TreeNode::T_CONST_PRUNE;
+ const bool prune = treeNodePtr.p->m_bits &
+ (TreeNode::T_PRUNE_PATTERN | TreeNode::T_CONST_PRUNE);
/**
- * Don't release keyInfo if it may be sent multiple times, eiter:
- * - Not pruned -> same keyInfo goes to all datanodes.
- * - Result handling is REPEAT_SCAN_RESULT and same batch may be
- * repeated multiple times due to incomplete bushy X-scans.
- * (by ::scanIndex_parent_batch_repeat())
- *
- * When not released, ::scanIndex_parent_batch_cleanup() will
- * eventually release them when preparing arrival of a new parent batch.
+ * If scan is repeatable, we must make sure not to release range keys so
+ * that we canuse them again in the next repetition.
*/
- const bool release = ((treeNodePtr.p->m_bits & prunemask) != 0 &&
- (treeNodePtr.p->m_bits & TreeNode::T_SCAN_REPEATABLE) == 0);
+ const bool repeatable =
+ (treeNodePtr.p->m_bits & TreeNode::T_SCAN_REPEATABLE) != 0;
- ScanFragReq* req = reinterpret_cast<ScanFragReq*>(signal->getDataPtrSend());
+ ScanIndexData& data = treeNodePtr.p->m_scanindex_data;
+ ndbassert(noOfFrags > 0);
+ ndbassert(data.m_frags_not_started >= noOfFrags);
+ ScanFragReq* const req =
+ reinterpret_cast<ScanFragReq*>(signal->getDataPtrSend());
+ const ScanFragReq * const org
+ = reinterpret_cast<ScanFragReq*>(data.m_scanFragReq);
memcpy(req, org, sizeof(data.m_scanFragReq));
// req->variableData[0] // set below
req->variableData[1] = requestPtr.p->m_rootResultData;
req->batch_size_bytes = bs_bytes;
req->batch_size_rows = bs_rows;
- Ptr<ScanFragHandle> fragPtr;
+ Uint32 requestsSent = 0;
Local_ScanFragHandle_list list(m_scanfraghandle_pool, data.m_fragments);
-
- Uint32 keyInfoPtrI = RNIL;
+ Ptr<ScanFragHandle> fragPtr;
list.first(fragPtr);
- if ((treeNodePtr.p->m_bits & prunemask) == 0)
+ Uint32 keyInfoPtrI = fragPtr.p->m_rangePtrI;
+ ndbrequire(prune || keyInfoPtrI != RNIL);
+ /**
+ * Iterate over the list of fragments until we have sent as many
+ * SCAN_FRAGREQs as we should.
+ */
+ while (requestsSent < noOfFrags)
{
jam();
- keyInfoPtrI = fragPtr.p->m_rangePtrI;
- ndbrequire(keyInfoPtrI != RNIL);
- }
+ ndbassert(!fragPtr.isNull());
- Uint32 batchRange = 0;
- for (Uint32 i = 0; i < cnt && !fragPtr.isNull(); list.next(fragPtr))
- {
- jam();
+ if (fragPtr.p->m_state != ScanFragHandle::SFH_NOT_STARTED)
+ {
+ // Skip forward to the frags that we should send.
+ jam();
+ list.next(fragPtr);
+ continue;
+ }
+
+ const Uint32 ref = fragPtr.p->m_ref;
+
+ if (noOfFrags==1 && !prune &&
+ data.m_frags_not_started == data.m_fragCount &&
+ refToNode(ref) != getOwnNodeId() &&
+ list.hasNext(fragPtr))
+ {
+ /**
+ * If we are doing a scan with adaptive parallelism and start with
+ * parallelism=1 then it makes sense to fetch a batch from a fragment on
+ * the local data node. The reason for this is that if that fragment
+ * contains few rows, we may be able to read from several fragments in
+ * parallel. Then we minimize the total number of round trips (to remote
+ * data nodes) if we fetch the first fragment batch locally.
+ */
+ jam();
+ list.next(fragPtr);
+ continue;
+ }
SectionHandle handle(this);
- Uint32 ref = fragPtr.p->m_ref;
Uint32 attrInfoPtrI = treeNodePtr.p->m_send.m_attrInfoPtrI;
/**
@@ -5124,26 +5253,34 @@ Dbspj::scanIndex_send(Signal* signal,
req->senderData = fragPtr.i;
req->fragmentNoKeyLen = fragPtr.p->m_fragId;
- if ((treeNodePtr.p->m_bits & prunemask))
+ if (prune)
{
jam();
keyInfoPtrI = fragPtr.p->m_rangePtrI;
if (keyInfoPtrI == RNIL)
{
+ /**
+ * Since we use pruning, we can see that no parent rows would hash
+ * to this fragment.
+ */
jam();
fragPtr.p->m_state = ScanFragHandle::SFH_COMPLETE;
+ list.next(fragPtr);
continue;
}
- }
- if (release)
- {
- /**
- * If we'll use sendSignal() and we need to send the attrInfo several
- * times, we need to copy them
- */
- Uint32 tmp = RNIL;
- ndbrequire(dupSection(tmp, attrInfoPtrI)); // TODO handle error
- attrInfoPtrI = tmp;
+
+ if (!repeatable)
+ {
+ /**
+ * If we'll use sendSignal() and we need to send the attrInfo several
+ * times, we need to copy them. (For repeatable or unpruned scans
+ * we use sendSignalNoRelease(), so then we do not need to copy.)
+ */
+ jam();
+ Uint32 tmp = RNIL;
+ ndbrequire(dupSection(tmp, attrInfoPtrI)); // TODO handle error
+ attrInfoPtrI = tmp;
+ }
}
req->variableData[0] = batchRange;
@@ -5152,16 +5289,14 @@ Dbspj::scanIndex_send(Signal* signal,
handle.m_cnt = 2;
#if defined DEBUG_SCAN_FRAGREQ
- {
- ndbout_c("SCAN_FRAGREQ to %x", ref);
- printSCAN_FRAGREQ(stdout, signal->getDataPtrSend(),
- NDB_ARRAY_SIZE(treeNodePtr.p->m_scanfrag_data.m_scanFragReq),
- DBLQH);
- printf("ATTRINFO: ");
- print(handle.m_ptr[0], stdout);
- printf("KEYINFO: ");
- print(handle.m_ptr[1], stdout);
- }
+ ndbout_c("SCAN_FRAGREQ to %x", ref);
+ printSCAN_FRAGREQ(stdout, signal->getDataPtrSend(),
+ NDB_ARRAY_SIZE(treeNodePtr.p->m_scanfrag_data.m_scanFragReq),
+ DBLQH);
+ printf("ATTRINFO: ");
+ print(handle.m_ptr[0], stdout);
+ printf("KEYINFO: ");
+ print(handle.m_ptr[1], stdout);
#endif
if (refToNode(ref) == getOwnNodeId())
@@ -5173,8 +5308,13 @@ Dbspj::scanIndex_send(Signal* signal,
c_Counters.incr_counter(CI_REMOTE_RANGE_SCANS_SENT, 1);
}
- if (release)
+ if (prune && !repeatable)
{
+ /**
+ * For a non-repeatable pruned scan, key info is unique for each
+ * fragment and therefore cannot be reused, so we release key info
+ * right away.
+ */
jam();
sendSignal(ref, GSN_SCAN_FRAGREQ, signal,
NDB_ARRAY_SIZE(data.m_scanFragReq), JBB, &handle);
@@ -5183,32 +5323,24 @@ Dbspj::scanIndex_send(Signal* signal,
}
else
{
+ /**
+ * Reuse key info for multiple fragments and/or multiple repetitions
+ * of the scan.
+ */
jam();
sendSignalNoRelease(ref, GSN_SCAN_FRAGREQ, signal,
NDB_ARRAY_SIZE(data.m_scanFragReq), JBB, &handle);
}
handle.clear();
- i++;
fragPtr.p->m_state = ScanFragHandle::SFH_SCANNING; // running
data.m_frags_outstanding++;
batchRange += bs_rows;
- }
+ requestsSent++;
+ list.next(fragPtr);
+ } // while (requestsSent < noOfFrags)
- if (treeNodePtr.p->m_bits & TreeNode::T_SCAN_PARALLEL)
- {
- ndbrequire(data.m_frags_outstanding ==
- data.m_fragCount - data.m_frags_complete);
- }
- else
- {
- ndbrequire(data.m_frags_outstanding == 1);
- }
-
- data.m_batch_chunks = 1;
- requestPtr.p->m_cnt_active++;
- requestPtr.p->m_outstanding++;
- treeNodePtr.p->m_state = TreeNode::TN_ACTIVE;
+ data.m_frags_not_started -= requestsSent;
}
void
@@ -5282,6 +5414,10 @@ Dbspj::scanIndex_execSCAN_FRAGCONF(Signa
}
requestPtr.p->m_rows += rows;
+ data.m_totalRows += rows;
+ data.m_totalBytes += conf->total_len;
+ data.m_largestBatchRows = MAX(data.m_largestBatchRows, rows);
+ data.m_largestBatchBytes = MAX(data.m_largestBatchBytes, conf->total_len);
if (!treeNodePtr.p->isLeaf())
{
@@ -5302,7 +5438,9 @@ Dbspj::scanIndex_execSCAN_FRAGCONF(Signa
ndbrequire(data.m_frags_complete < data.m_fragCount);
data.m_frags_complete++;
- if (data.m_frags_complete == data.m_fragCount)
+ if (data.m_frags_complete == data.m_fragCount ||
+ ((requestPtr.p->m_state & Request::RS_ABORTING) != 0 &&
+ data.m_fragCount == (data.m_frags_complete + data.m_frags_not_started)))
{
jam();
ndbrequire(requestPtr.p->m_cnt_active);
@@ -5314,6 +5452,32 @@ Dbspj::scanIndex_execSCAN_FRAGCONF(Signa
if (data.m_frags_outstanding == 0)
{
+ const ScanFragReq * const org
+ = reinterpret_cast<const ScanFragReq*>(data.m_scanFragReq);
+
+ if (data.m_frags_complete == data.m_fragCount)
+ {
+ jam();
+ /**
+ * Calculate what would have been the optimal parallelism for the
+ * scan instance that we have just completed, and update
+ * 'parallelismStat' with this value. We then use this statistics to set
+ * the initial parallelism for the next instance of this operation.
+ */
+ double parallelism = data.m_fragCount;
+ if (data.m_totalRows > 0)
+ {
+ parallelism = MIN(parallelism,
+ double(org->batch_size_rows) / data.m_totalRows);
+ }
+ if (data.m_totalBytes > 0)
+ {
+ parallelism = MIN(parallelism,
+ double(org->batch_size_bytes) / data.m_totalBytes);
+ }
+ data.m_parallelismStat.update(parallelism);
+ }
+
/**
* Don't reportBatchComplete to children if we're aborting...
*/
@@ -5364,7 +5528,7 @@ Dbspj::scanIndex_execSCAN_FRAGREF(Signal
ndbrequire(data.m_frags_outstanding > 0);
data.m_frags_outstanding--;
- if (data.m_frags_complete == data.m_fragCount)
+ if (data.m_fragCount == (data.m_frags_complete + data.m_frags_not_started))
{
jam();
ndbrequire(requestPtr.p->m_cnt_active);
@@ -5390,21 +5554,78 @@ Dbspj::scanIndex_execSCAN_NEXTREQ(Signal
jam();
ScanIndexData& data = treeNodePtr.p->m_scanindex_data;
+ const ScanFragReq * org = (const ScanFragReq*)data.m_scanFragReq;
data.m_rows_received = 0;
data.m_rows_expecting = 0;
ndbassert(data.m_frags_outstanding == 0);
ndbrequire(data.m_frags_complete < data.m_fragCount);
- Uint32 cnt = data.m_fragCount - data.m_frags_complete;
if ((treeNodePtr.p->m_bits & TreeNode::T_SCAN_PARALLEL) == 0)
{
jam();
- cnt = 1;
+ /**
+ * Since fetching few but large batches is more efficient, we
+ * set parallelism to the lowest value where we can still expect each
+ * batch to be full.
+ */
+ if (data.m_largestBatchRows < org->batch_size_rows/data.m_parallelism &&
+ data.m_largestBatchBytes < org->batch_size_bytes/data.m_parallelism)
+ {
+ jam();
+ data.m_parallelism = data.m_fragCount - data.m_frags_complete;
+ if (data.m_largestBatchRows > 0)
+ {
+ jam();
+ data.m_parallelism =
+ MIN(org->batch_size_rows / data.m_largestBatchRows,
+ data.m_parallelism);
+ }
+ if (data.m_largestBatchBytes > 0)
+ {
+ jam();
+ data.m_parallelism =
+ MIN(data.m_parallelism,
+ org->batch_size_bytes/data.m_largestBatchBytes);
+ }
+ if (data.m_frags_complete == 0 &&
+ data.m_frags_not_started % data.m_parallelism != 0)
+ {
+ jam();
+ /**
+ * Set parallelism such that we can expect to have similar
+ * parallelism in each batch. For example if there are 8 remaining
+ * fragments, then we should fecth 2 times 4 fragments rather than
+ * 7+1.
+ */
+ const Uint32 roundTrips =
+ 1 + data.m_frags_not_started / data.m_parallelism;
+ data.m_parallelism = data.m_frags_not_started / roundTrips;
+ }
+ }
+ else
+ {
+ jam();
+ // We get full batches, so we should lower parallelism.
+ data.m_parallelism = MIN(data.m_fragCount - data.m_frags_complete,
+ MAX(1, data.m_parallelism/2));
+ }
+ ndbassert(data.m_parallelism > 0);
+#ifdef DEBUG_SCAN_FRAGREQ
+ DEBUG("::scanIndex_execSCAN_NEXTREQ() Asking for new batches from " <<
+ data.m_parallelism <<
+ " fragments with " << org->batch_size_rows/data.m_parallelism <<
+ " rows and " << org->batch_size_bytes/data.m_parallelism <<
+ " bytes.");
+#endif
+ }
+ else
+ {
+ jam();
+ data.m_parallelism = data.m_fragCount - data.m_frags_complete;
}
- const ScanFragReq * org = (const ScanFragReq*)data.m_scanFragReq;
- const Uint32 bs_rows = org->batch_size_rows/cnt;
+ const Uint32 bs_rows = org->batch_size_rows/data.m_parallelism;
ndbassert(bs_rows > 0);
ScanFragNextReq* req =
reinterpret_cast<ScanFragNextReq*>(signal->getDataPtrSend());
@@ -5413,20 +5634,27 @@ Dbspj::scanIndex_execSCAN_NEXTREQ(Signal
req->transId1 = requestPtr.p->m_transId[0];
req->transId2 = requestPtr.p->m_transId[1];
req->batch_size_rows = bs_rows;
- req->batch_size_bytes = org->batch_size_bytes/cnt;
+ req->batch_size_bytes = org->batch_size_bytes/data.m_parallelism;
Uint32 batchRange = 0;
Ptr<ScanFragHandle> fragPtr;
+ Uint32 sentFragCount = 0;
+ {
+ /**
+ * First, ask for more data from fragments that are already started.
+ */
Local_ScanFragHandle_list list(m_scanfraghandle_pool, data.m_fragments);
list.first(fragPtr);
- for (Uint32 i = 0; i < cnt && !fragPtr.isNull(); list.next(fragPtr))
+ while (sentFragCount < data.m_parallelism && !fragPtr.isNull())
{
jam();
+ ndbassert(fragPtr.p->m_state == ScanFragHandle::SFH_WAIT_NEXTREQ ||
+ fragPtr.p->m_state == ScanFragHandle::SFH_COMPLETE ||
+ fragPtr.p->m_state == ScanFragHandle::SFH_NOT_STARTED);
if (fragPtr.p->m_state == ScanFragHandle::SFH_WAIT_NEXTREQ)
{
jam();
- i++;
data.m_frags_outstanding++;
req->variableData[0] = batchRange;
fragPtr.p->m_state = ScanFragHandle::SFH_SCANNING;
@@ -5434,6 +5662,7 @@ Dbspj::scanIndex_execSCAN_NEXTREQ(Signal
DEBUG("scanIndex_execSCAN_NEXTREQ to: " << hex
<< treeNodePtr.p->m_send.m_ref
+ << ", m_node_no=" << treeNodePtr.p->m_node_no
<< ", senderData: " << req->senderData);
#ifdef DEBUG_SCAN_FRAGREQ
@@ -5445,9 +5674,27 @@ Dbspj::scanIndex_execSCAN_NEXTREQ(Signal
sendSignal(fragPtr.p->m_ref, GSN_SCAN_NEXTREQ, signal,
ScanFragNextReq::SignalLength + 1,
JBB);
+ sentFragCount++;
+ }
+ list.next(fragPtr);
}
}
+ if (sentFragCount < data.m_parallelism)
+ {
+ /**
+ * Then start new fragments until we reach data.m_parallelism.
+ */
+ jam();
+ ndbassert(data.m_frags_not_started != 0);
+ scanIndex_send(signal,
+ requestPtr,
+ treeNodePtr,
+ data.m_parallelism - sentFragCount,
+ org->batch_size_bytes/data.m_parallelism,
+ bs_rows,
+ batchRange);
+ }
/**
* cursor should not have been positioned here...
* unless we actually had something more to send.
@@ -5544,14 +5791,28 @@ Dbspj::scanIndex_abort(Signal* signal,
}
}
- ndbrequire(cnt_waiting + cnt_scanning > 0);
if (cnt_scanning == 0)
{
- /**
- * If all were waiting...this should increase m_outstanding
- */
- jam();
- requestPtr.p->m_outstanding++;
+ if (cnt_waiting > 0)
+ {
+ /**
+ * If all were waiting...this should increase m_outstanding
+ */
+ jam();
+ requestPtr.p->m_outstanding++;
+ }
+ else
+ {
+ /**
+ * All fragments are either complete or not yet started, so there is
+ * nothing to abort.
+ */
+ jam();
+ ndbassert(data.m_frags_not_started > 0);
+ ndbrequire(requestPtr.p->m_cnt_active);
+ requestPtr.p->m_cnt_active--;
+ treeNodePtr.p->m_state = TreeNode::TN_INACTIVE;
+ }
}
}
@@ -5603,6 +5864,8 @@ Dbspj::scanIndex_execNODE_FAILREP(Signal
jam();
ndbrequire(data.m_frags_complete < data.m_fragCount);
data.m_frags_complete++;
+ ndbrequire(data.m_frags_not_started > 0);
+ data.m_frags_not_started--;
// fall through
case ScanFragHandle::SFH_COMPLETE:
jam();
@@ -5637,8 +5900,8 @@ Dbspj::scanIndex_execNODE_FAILREP(Signal
requestPtr.p->m_outstanding--;
}
- if (save1 != data.m_fragCount
- && data.m_frags_complete == data.m_fragCount)
+ if (save1 != 0 &&
+ data.m_fragCount == (data.m_frags_complete + data.m_frags_not_started))
{
jam();
ndbrequire(requestPtr.p->m_cnt_active);
@@ -5654,7 +5917,8 @@ Dbspj::scanIndex_release_rangekeys(Ptr<R
Ptr<TreeNode> treeNodePtr)
{
jam();
- DEBUG("scanIndex_release_rangekeys(), m_node_no: " << treeNodePtr.p->m_node_no);
+ DEBUG("scanIndex_release_rangekeys(), tree node " << treeNodePtr.i
+ << " m_node_no: " << treeNodePtr.p->m_node_no);
ScanIndexData& data = treeNodePtr.p->m_scanindex_data;
Local_ScanFragHandle_list list(m_scanfraghandle_pool, data.m_fragments);
@@ -7115,3 +7379,15 @@ void Dbspj::execDBINFO_SCANREQ(Signal *s
ndbinfo_send_scan_conf(signal, req, rl);
} // Dbspj::execDBINFO_SCANREQ(Signal *signal)
+
+void Dbspj::IncrementalStatistics::update(double sample)
+{
+ // Prevent wrap-around
+ if(m_noOfSamples < 0xffffffff)
+ {
+ m_noOfSamples++;
+ const double delta = sample - m_mean;
+ m_mean += delta/m_noOfSamples;
+ m_sumSquare += delta * (sample - m_mean);
+ }
+}
=== modified file 'storage/ndb/src/kernel/blocks/qmgr/QmgrMain.cpp'
--- a/storage/ndb/src/kernel/blocks/qmgr/QmgrMain.cpp 2011-07-09 11:16:31 +0000
+++ b/storage/ndb/src/kernel/blocks/qmgr/QmgrMain.cpp 2011-08-22 08:50:01 +0000
@@ -2972,26 +2972,6 @@ void Qmgr::checkStartInterface(Signal* s
nodePtr.p->m_failconf_blocks[3],
nodePtr.p->m_failconf_blocks[4]);
warningEvent("%s", buf);
-
- for (Uint32 i = 0; i<NDB_ARRAY_SIZE(nodePtr.p->m_failconf_blocks);
- i++)
- {
- jam();
- if (nodePtr.p->m_failconf_blocks[i] != 0)
- {
- jam();
- signal->theData[0] = 7019;
- signal->theData[1] = nodePtr.i;
- sendSignal(numberToRef(nodePtr.p->m_failconf_blocks[i],
- getOwnNodeId()),
- GSN_DUMP_STATE_ORD, signal, 2, JBB);
- }
- else
- {
- jam();
- break;
- }
- }
}
}
}
=== modified file 'storage/ndb/src/ndbapi/NdbQueryOperation.cpp'
--- a/storage/ndb/src/ndbapi/NdbQueryOperation.cpp 2011-08-17 13:27:04 +0000
+++ b/storage/ndb/src/ndbapi/NdbQueryOperation.cpp 2011-08-22 08:50:01 +0000
@@ -57,6 +57,7 @@ static const int Err_FunctionNotImplemen
static const int Err_UnknownColumn = 4004;
static const int Err_ReceiveTimedOut = 4008;
static const int Err_NodeFailCausedAbort = 4028;
+static const int Err_ParameterError = 4118;
static const int Err_SimpleDirtyReadFailed = 4119;
static const int Err_WrongFieldLength = 4209;
static const int Err_ReadTooMuch = 4257;
@@ -72,6 +73,21 @@ static const Uint16 tupleNotFound = 0xff
/** Set to true to trace incomming signals.*/
const bool traceSignals = false;
+enum
+{
+ /**
+ * Set NdbQueryOperationImpl::m_parallelism to this value to indicate that
+ * scan parallelism should be adaptive.
+ */
+ Parallelism_adaptive = 0xffff0000,
+
+ /**
+ * Set NdbQueryOperationImpl::m_parallelism to this value to indicate that
+ * all fragments should be scanned in parallel.
+ */
+ Parallelism_max = 0xffff0001
+};
+
/**
* A class for accessing the correlation data at the end of a tuple (for
* scan queries). These data have the following layout:
@@ -267,6 +283,10 @@ public:
void postFetchRelease();
private:
+ /** No copying.*/
+ NdbRootFragment(const NdbRootFragment&);
+ NdbRootFragment& operator=(const NdbRootFragment&);
+
STATIC_CONST( voidFragNo = 0xffffffff);
/** Enclosing query.*/
@@ -1486,6 +1506,14 @@ int NdbQueryOperation::setParallelism(Ui
return m_impl.setParallelism(parallelism);
}
+int NdbQueryOperation::setMaxParallelism(){
+ return m_impl.setMaxParallelism();
+}
+
+int NdbQueryOperation::setAdaptiveParallelism(){
+ return m_impl.setAdaptiveParallelism();
+}
+
int NdbQueryOperation::setBatchSize(Uint32 batchSize){
return m_impl.setBatchSize(batchSize);
}
@@ -2619,16 +2647,17 @@ NdbQueryImpl::prepareSend()
{
/* For the first batch, we read from all fragments for both ordered
* and unordered scans.*/
- if (getQueryOperation(0U).m_parallelism > 0)
+ if (getQueryOperation(0U).m_parallelism == Parallelism_max)
{
m_rootFragCount
- = MIN(getRoot().getQueryOperationDef().getTable().getFragmentCount(),
- getQueryOperation(0U).m_parallelism);
+ = getRoot().getQueryOperationDef().getTable().getFragmentCount();
}
else
{
+ assert(getQueryOperation(0U).m_parallelism != Parallelism_adaptive);
m_rootFragCount
- = getRoot().getQueryOperationDef().getTable().getFragmentCount();
+ = MIN(getRoot().getQueryOperationDef().getTable().getFragmentCount(),
+ getQueryOperation(0U).m_parallelism);
}
Ndb* const ndb = m_transaction.getNdb();
@@ -3663,7 +3692,8 @@ NdbQueryOperationImpl::NdbQueryOperation
m_ordering(NdbQueryOptions::ScanOrdering_unordered),
m_interpretedCode(NULL),
m_diskInUserProjection(false),
- m_parallelism(0),
+ m_parallelism(def.getQueryOperationIx() == 0
+ ? Parallelism_max : Parallelism_adaptive),
m_rowSize(0xffffffff)
{
if (errno == ENOMEM)
@@ -4262,9 +4292,10 @@ NdbQueryOperationImpl
m_ndbRecord,
m_firstRecAttr,
0, // Key size.
- getRoot().m_parallelism > 0 ?
- getRoot().m_parallelism :
- m_queryImpl.getRootFragCount(),
+ getRoot().m_parallelism
+ == Parallelism_max ?
+ m_queryImpl.getRootFragCount() :
+ getRoot().m_parallelism,
maxBatchRows,
batchByteSize,
firstBatchRows);
@@ -4450,7 +4481,12 @@ NdbQueryOperationImpl::prepareAttrInfo(U
firstBatchRows);
assert(batchRows==getMaxBatchRows());
assert(batchRows==firstBatchRows);
- requestInfo |= QN_ScanIndexParameters::SIP_PARALLEL; // FIXME: SPJ always assume. SIP_PARALLEL
+ assert(m_parallelism == Parallelism_max ||
+ m_parallelism == Parallelism_adaptive);
+ if (m_parallelism == Parallelism_max)
+ {
+ requestInfo |= QN_ScanIndexParameters::SIP_PARALLEL;
+ }
param->requestInfo = requestInfo;
param->batchSize = ((Uint16)batchByteSize << 16) | (Uint16)firstBatchRows;
param->resultData = getIdOfReceiver();
@@ -4954,7 +4990,7 @@ NdbQueryOperationImpl::setOrdering(NdbQu
return -1;
}
- if (m_parallelism != 0)
+ if (m_parallelism != Parallelism_max)
{
getQuery().setErrorCode(QRY_SEQUENTIAL_SCAN_SORTED);
return -1;
@@ -5049,10 +5085,40 @@ int NdbQueryOperationImpl::setParallelis
getQuery().setErrorCode(Err_FunctionNotImplemented);
return -1;
}
+ else if (parallelism < 1 || parallelism > MAX_NDB_PARTITIONS)
+ {
+ getQuery().setErrorCode(Err_ParameterError);
+ return -1;
+ }
m_parallelism = parallelism;
return 0;
}
+int NdbQueryOperationImpl::setMaxParallelism(){
+ if (!getQueryOperationDef().isScanOperation())
+ {
+ getQuery().setErrorCode(QRY_WRONG_OPERATION_TYPE);
+ return -1;
+ }
+ m_parallelism = Parallelism_max;
+ return 0;
+}
+
+int NdbQueryOperationImpl::setAdaptiveParallelism(){
+ if (!getQueryOperationDef().isScanOperation())
+ {
+ getQuery().setErrorCode(QRY_WRONG_OPERATION_TYPE);
+ return -1;
+ }
+ else if (getQueryOperationDef().getQueryOperationIx() == 0)
+ {
+ getQuery().setErrorCode(Err_FunctionNotImplemented);
+ return -1;
+ }
+ m_parallelism = Parallelism_adaptive;
+ return 0;
+}
+
int NdbQueryOperationImpl::setBatchSize(Uint32 batchSize){
if (!getQueryOperationDef().isScanOperation())
{
=== modified file 'storage/ndb/src/ndbapi/NdbQueryOperation.hpp'
--- a/storage/ndb/src/ndbapi/NdbQueryOperation.hpp 2011-06-20 13:25:48 +0000
+++ b/storage/ndb/src/ndbapi/NdbQueryOperation.hpp 2011-08-22 08:35:35 +0000
@@ -335,13 +335,29 @@ public:
NdbQueryOptions::ScanOrdering getOrdering() const;
/**
- * Set the number of fragments to be scanned in parallel for the root
- * operation of this query. This only applies to table scans and non-sorted
- * scans of ordered indexes.
+ * Set the number of fragments to be scanned in parallel. This only applies
+ * to table scans and non-sorted scans of ordered indexes. This method is
+ * only implemented for then root scan operation.
* @return 0 if ok, -1 in case of error (call getNdbError() for details.)
*/
int setParallelism(Uint32 parallelism);
+ /**
+ * Set the number of fragments to be scanned in parallel to the maximum
+ * possible value. This is the default for the root scan operation.
+ * @return 0 if ok, -1 in case of error (call getNdbError() for details.)
+ */
+ int setMaxParallelism();
+
+ /**
+ * Let the system dynamically choose the number of fragments to scan in
+ * parallel. The system will try to choose a value that gives optimal
+ * performance. This is the default for all scans but the root scan. This
+ * method only implemented for non-root scan operations.
+ * @return 0 if ok, -1 in case of error (call getNdbError() for details.)
+ */
+ int setAdaptiveParallelism();
+
/** Set the batch size (max rows per batch) for this operation. This
* only applies to scan operations, as lookup operations always will
* have the same batch size as its parent operation, or 1 if it is the
=== modified file 'storage/ndb/src/ndbapi/NdbQueryOperationImpl.hpp'
--- a/storage/ndb/src/ndbapi/NdbQueryOperationImpl.hpp 2011-08-17 13:16:22 +0000
+++ b/storage/ndb/src/ndbapi/NdbQueryOperationImpl.hpp 2011-08-22 08:35:35 +0000
@@ -661,14 +661,30 @@ public:
NdbQueryOptions::ScanOrdering getOrdering() const
{ return m_ordering; }
- /**
- * Set the number of fragments to be scanned in parallel for the root
- * operation of this query. This only applies to table scans and non-sorted
- * scans of ordered indexes.
+ /**
+ * Set the number of fragments to be scanned in parallel. This only applies
+ * to table scans and non-sorted scans of ordered indexes. This method is
+ * only implemented for then root scan operation.
* @return 0 if ok, -1 in case of error (call getNdbError() for details.)
*/
int setParallelism(Uint32 parallelism);
+ /**
+ * Set the number of fragments to be scanned in parallel to the maximum
+ * possible value. This is the default for the root scan operation.
+ * @return 0 if ok, -1 in case of error (call getNdbError() for details.)
+ */
+ int setMaxParallelism();
+
+ /**
+ * Let the system dynamically choose the number of fragments to scan in
+ * parallel. The system will try to choose a value that gives optimal
+ * performance. This is the default for all scans but the root scan. This
+ * method only implemented for non-root scan operations.
+ * @return 0 if ok, -1 in case of error (call getNdbError() for details.)
+ */
+ int setAdaptiveParallelism();
+
/** Set the batch size (max rows per batch) for this operation. This
* only applies to scan operations, as lookup operations always will
* have the same batch size as its parent operation, or 1 if it is the
No bundle (reason: useless for push emails).
| Thread |
|---|
| • bzr push into mysql-5.1-telco-7.0-spj-scan-vs-scan branch (jan.wedvik:3541to 3542) | Jan Wedvik | 22 Aug |