3438 Jonas Oreland 2011-08-25 [merge]
ndb - merge 71 to 55
modified:
sql/ha_ndbcluster.cc
storage/ndb/include/ndbapi/NdbReceiver.hpp
storage/ndb/src/kernel/blocks/dbdih/DbdihMain.cpp
storage/ndb/src/kernel/blocks/dbspj/Dbspj.hpp
storage/ndb/src/kernel/blocks/dbspj/DbspjMain.cpp
storage/ndb/src/kernel/blocks/qmgr/QmgrMain.cpp
storage/ndb/src/ndbapi/NdbQueryOperation.cpp
storage/ndb/src/ndbapi/NdbQueryOperation.hpp
storage/ndb/src/ndbapi/NdbQueryOperationImpl.hpp
storage/ndb/src/ndbapi/NdbReceiver.cpp
3437 Jonas Oreland 2011-08-17
ndb - The last crusade of the trailing share - remove dependency on LOCK_open in ndbcluster_create_binlog_setup, allow it to be used from anywhere. Use share->mutex to serialize the actual binlog setup. Patch for 5.5, plan to backport after test results
modified:
sql/ha_ndbcluster.cc
sql/ha_ndbcluster_binlog.cc
sql/ha_ndbcluster_binlog.h
=== modified file 'sql/ha_ndbcluster.cc'
--- a/sql/ha_ndbcluster.cc 2011-08-17 12:22:49 +0000
+++ b/sql/ha_ndbcluster.cc 2011-08-25 06:35:39 +0000
@@ -160,6 +160,11 @@ static MYSQL_THDVAR_ULONG(
0 /* block */
);
+#if NDB_VERSION_D < NDB_MAKE_VERSION(7,2,0)
+#define DEFAULT_NDB_INDEX_STAT_ENABLE FALSE
+#else
+#define DEFAULT_NDB_INDEX_STAT_ENABLE TRUE
+#endif
static MYSQL_THDVAR_BOOL(
index_stat_enable, /* name */
@@ -167,7 +172,7 @@ static MYSQL_THDVAR_BOOL(
"Use ndb index statistics in query optimization.",
NULL, /* check func. */
NULL, /* update func. */
- FALSE /* default */
+ DEFAULT_NDB_INDEX_STAT_ENABLE /* default */
);
=== modified file 'storage/ndb/include/ndbapi/NdbReceiver.hpp'
--- a/storage/ndb/include/ndbapi/NdbReceiver.hpp 2011-07-05 12:46:07 +0000
+++ b/storage/ndb/include/ndbapi/NdbReceiver.hpp 2011-08-25 06:35:39 +0000
@@ -41,7 +41,6 @@ class NdbReceiver
friend class NdbIndexScanOperation;
friend class NdbTransaction;
friend class NdbRootFragment;
- friend class ReceiverIdIterator;
friend int compare_ndbrecord(const NdbReceiver *r1,
const NdbReceiver *r2,
const NdbRecord *key_record,
@@ -83,6 +82,12 @@ public:
void setErrorCode(int);
+ /* Prepare for receiving of rows into specified buffer */
+ void prepareReceive(char *buf);
+
+ /* Prepare for reading of rows from specified buffer */
+ void prepareRead(char *buf, Uint32 rows);
+
private:
Uint32 theMagicNumber;
Ndb* m_ndb;
@@ -141,8 +146,9 @@ private:
/* members used for NdbRecord operation. */
struct {
const NdbRecord *m_ndb_record;
- char *m_row;
- /* Block of memory used to receive all rows in a batch during scan. */
+ /* Destination to receive next row into. */
+ char *m_row_recv;
+ /* Block of memory used to read all rows in a batch during scan. */
char *m_row_buffer;
/*
Offsets between two rows in m_row_buffer.
@@ -227,7 +233,7 @@ private:
const char * & data_ptr) const;
int getScanAttrData(const char * & data, Uint32 & size, Uint32 & pos) const;
/** Used by NdbQueryOperationImpl, where random access to rows is needed.*/
- void setCurrentRow(Uint32 currentRow);
+ void setCurrentRow(char* buffer, Uint32 row);
/** Used by NdbQueryOperationImpl.*/
Uint32 getCurrentRow() const { return m_current_row; }
};
@@ -260,7 +266,7 @@ NdbReceiver::prepareSend(){
if (m_using_ndb_record)
{
if (m_type==NDB_SCANRECEIVER || m_type==NDB_QUERY_OPERATION)
- m_record.m_row= m_record.m_row_buffer;
+ m_record.m_row_recv= m_record.m_row_buffer;
}
theCurrentRecAttr = theFirstRecAttr;
}
@@ -288,9 +294,13 @@ NdbReceiver::execSCANOPCONF(Uint32 tcPtr
inline
void
-NdbReceiver::setCurrentRow(Uint32 currentRow)
+NdbReceiver::setCurrentRow(char* buffer, Uint32 row)
{
- m_current_row = currentRow;
+ m_record.m_row_buffer = buffer;
+ m_current_row = row;
+#ifdef assert
+ assert(m_current_row < m_result_rows);
+#endif
}
inline
=== modified file 'storage/ndb/src/kernel/blocks/dbdih/DbdihMain.cpp'
--- a/storage/ndb/src/kernel/blocks/dbdih/DbdihMain.cpp 2011-07-05 12:46:07 +0000
+++ b/storage/ndb/src/kernel/blocks/dbdih/DbdihMain.cpp 2011-08-25 06:35:39 +0000
@@ -3880,6 +3880,8 @@ done:
ndbassert(gci == restorableGCI);
replicaPtr.p->m_restorable_gci = gci;
Uint32 startGci = replicaPtr.p->maxGciCompleted[maxLcpIndex] + 1;
+ if (startGci > gci)
+ startGci = gci;
ndbout_c("node: %d tab: %d frag: %d restore lcp: %u(idx: %u) maxGciStarted: %u maxGciCompleted: %u (restorable: %u(%u) newestRestorableGCI: %u)",
takeOverPtr.p->toStartingNode,
takeOverPtr.p->toCurrentTabref,
@@ -17543,7 +17545,8 @@ Dbdih::execDUMP_STATE_ORD(Signal* signal
}
}
- if(arg == 7019 && signal->getLength() == 2)
+ if(arg == 7019 && signal->getLength() == 2 &&
+ signal->theData[1] < MAX_NDB_NODES)
{
char buf2[8+1];
NodeRecordPtr nodePtr;
=== modified file 'storage/ndb/src/kernel/blocks/dbspj/Dbspj.hpp'
--- a/storage/ndb/src/kernel/blocks/dbspj/Dbspj.hpp 2011-07-05 12:46:07 +0000
+++ b/storage/ndb/src/kernel/blocks/dbspj/Dbspj.hpp 2011-08-25 06:35:39 +0000
@@ -530,21 +530,87 @@ public:
typedef SLFifoListImpl<ScanFragHandle_pool, ScanFragHandle> ScanFragHandle_list;
typedef LocalSLFifoListImpl<ScanFragHandle_pool, ScanFragHandle> Local_ScanFragHandle_list;
+ /**
+ * This class computes mean and standard deviation incrementally for a series
+ * of samples.
+ */
+ class IncrementalStatistics
+ {
+ public:
+ /**
+ * We cannot have a (non-trivial) constructor, since this class is used in
+ * unions.
+ */
+ void init()
+ {
+ m_mean = m_sumSquare = 0.0;
+ m_noOfSamples = 0;
+ }
+
+ // Add another sample.
+ void update(double sample);
+
+ double getMean() const { return m_mean; }
+
+ double getStdDev() const {
+ return m_noOfSamples < 2 ? 0.0 : sqrt(m_sumSquare/(m_noOfSamples - 1));
+ }
+
+ private:
+ // Mean of all samples
+ double m_mean;
+ //Sum of square of differences from the current mean.
+ double m_sumSquare;
+ Uint32 m_noOfSamples;
+ }; // IncrementalStatistics
+
struct ScanIndexData
{
Uint16 m_frags_complete;
Uint16 m_frags_outstanding;
+ /**
+ * The number of fragment for which we have not yet sent SCAN_FRAGREQ but
+ * will eventually do so.
+ */
+ Uint16 m_frags_not_started;
Uint32 m_rows_received; // #execTRANSID_AI
Uint32 m_rows_expecting; // Sum(ScanFragConf)
Uint32 m_batch_chunks; // #SCAN_FRAGREQ + #SCAN_NEXTREQ to retrieve batch
Uint32 m_scanCookie;
Uint32 m_fragCount;
+ // The number of fragments that we scan in parallel.
+ Uint32 m_parallelism;
+ /**
+ * True if this is the first instantiation of this operation. A child
+ * operation will be instantiated once for each batch of its parent.
+ */
+ bool m_firstExecution;
+ /**
+ * Mean and standard deviation for the optimal parallelism for earlier
+ * executions of this operation.
+ */
+ IncrementalStatistics m_parallelismStat;
+ // Total number of rows for the current execution of this operation.
+ Uint32 m_totalRows;
+ // Total number of bytes for the current execution of this operation.
+ Uint32 m_totalBytes;
+
ScanFragHandle_list::HeadPOD m_fragments; // ScanFrag states
union
{
PatternStore::HeadPOD m_prunePattern;
Uint32 m_constPrunePtrI;
};
+ /**
+ * Max number of rows seen in a batch. Used for calculating the number of
+ * rows per fragment in the next next batch when using adaptive batch size.
+ */
+ Uint32 m_largestBatchRows;
+ /**
+ * Max number of bytes seen in a batch. Used for calculating the number of
+ * rows per fragment in the next next batch when using adaptive batch size.
+ */
+ Uint32 m_largestBatchBytes;
Uint32 m_scanFragReq[ScanFragReq::SignalLength + 2];
};
@@ -1164,6 +1230,13 @@ private:
void scanIndex_parent_row(Signal*,Ptr<Request>,Ptr<TreeNode>, const RowPtr&);
void scanIndex_fixupBound(Ptr<ScanFragHandle> fragPtr, Uint32 ptrI, Uint32);
void scanIndex_send(Signal*,Ptr<Request>,Ptr<TreeNode>);
+ void scanIndex_send(Signal* signal,
+ Ptr<Request> requestPtr,
+ Ptr<TreeNode> treeNodePtr,
+ Uint32 noOfFrags,
+ Uint32 bs_bytes,
+ Uint32 bs_rows,
+ Uint32& batchRange);
void scanIndex_batchComplete(Signal* signal);
Uint32 scanIndex_findFrag(Local_ScanFragHandle_list &, Ptr<ScanFragHandle>&,
Uint32 fragId);
=== modified file 'storage/ndb/src/kernel/blocks/dbspj/DbspjMain.cpp'
--- a/storage/ndb/src/kernel/blocks/dbspj/DbspjMain.cpp 2011-07-05 12:46:07 +0000
+++ b/storage/ndb/src/kernel/blocks/dbspj/DbspjMain.cpp 2011-08-25 06:35:39 +0000
@@ -4450,6 +4450,9 @@ Dbspj::parseScanIndex(Build_context& ctx
data.m_fragments.init();
data.m_frags_outstanding = 0;
data.m_frags_complete = 0;
+ data.m_frags_not_started = 0;
+ data.m_parallelismStat.init();
+ data.m_firstExecution = true;
data.m_batch_chunks = 0;
err = parseDA(ctx, requestPtr, treeNodePtr,
@@ -5002,6 +5005,7 @@ Dbspj::scanIndex_parent_batch_complete(S
}
}
}
+ data.m_frags_not_started = data.m_fragCount - data.m_frags_complete;
if (data.m_frags_complete == data.m_fragCount)
{
@@ -5015,7 +5019,120 @@ Dbspj::scanIndex_parent_batch_complete(S
/**
* When parent's batch is complete, we send our batch
*/
- scanIndex_send(signal, requestPtr, treeNodePtr);
+ const ScanFragReq * org = (const ScanFragReq*)data.m_scanFragReq;
+ ndbassert(org->batch_size_rows >= data.m_fragCount - data.m_frags_complete);
+
+ if (treeNodePtr.p->m_bits & TreeNode::T_SCAN_PARALLEL)
+ {
+ jam();
+ data.m_parallelism = data.m_fragCount - data.m_frags_complete;
+ }
+ else if (data.m_firstExecution)
+ {
+ /**
+ * Having a high parallelism would allow us to fetch data from many
+ * fragments in parallel and thus reduce the number of round trips.
+ * On the other hand, we should set parallelism so low that we can fetch
+ * all data from a fragment in one batch if possible.
+ * Since this is the first execution, we do not know how many rows or bytes
+ * this operation is likely to return. Therefore we set parallelism to 1,
+ * since this gives the lowest penalty if our guess is wrong.
+ */
+ jam();
+ data.m_parallelism = 1;
+ }
+ else
+ {
+ jam();
+ /**
+ * Use statistics from earlier runs of this operation to estimate the
+ * initial parallelism. We use the mean minus two times the standard
+ * deviation to have a low risk of setting parallelism to high (as erring
+ * in the other direction is more costly).
+ */
+ Int32 parallelism =
+ static_cast<Int32>(data.m_parallelismStat.getMean()
+ - 2 * data.m_parallelismStat.getStdDev());
+
+ if (parallelism < 1)
+ {
+ jam();
+ parallelism = 1;
+ }
+ else if ((data.m_fragCount - data.m_frags_complete) % parallelism != 0)
+ {
+ jam();
+ /**
+ * Set parallelism such that we can expect to have similar
+ * parallelism in each batch. For example if there are 8 remaining
+ * fragments, then we should fecth 2 times 4 fragments rather than
+ * 7+1.
+ */
+ const Int32 roundTrips =
+ 1 + (data.m_fragCount - data.m_frags_complete) / parallelism;
+ parallelism = (data.m_fragCount - data.m_frags_complete) / roundTrips;
+ }
+
+ data.m_parallelism = static_cast<Uint32>(parallelism);
+
+#ifdef DEBUG_SCAN_FRAGREQ
+ DEBUG("::scanIndex_send() starting index scan with parallelism="
+ << data.m_parallelism);
+#endif
+ }
+ ndbrequire(data.m_parallelism > 0);
+
+ const Uint32 bs_rows = org->batch_size_rows/ data.m_parallelism;
+ const Uint32 bs_bytes = org->batch_size_bytes / data.m_parallelism;
+ ndbassert(bs_rows > 0);
+ ndbassert(bs_bytes > 0);
+
+ data.m_largestBatchRows = 0;
+ data.m_largestBatchBytes = 0;
+ data.m_totalRows = 0;
+ data.m_totalBytes = 0;
+
+ {
+ Local_ScanFragHandle_list list(m_scanfraghandle_pool, data.m_fragments);
+ Ptr<ScanFragHandle> fragPtr;
+ list.first(fragPtr);
+
+ while(!fragPtr.isNull())
+ {
+ ndbassert(fragPtr.p->m_state == ScanFragHandle::SFH_NOT_STARTED ||
+ fragPtr.p->m_state == ScanFragHandle::SFH_COMPLETE);
+ fragPtr.p->m_state = ScanFragHandle::SFH_NOT_STARTED;
+ list.next(fragPtr);
+ }
+ }
+
+ Uint32 batchRange = 0;
+ scanIndex_send(signal,
+ requestPtr,
+ treeNodePtr,
+ data.m_parallelism,
+ bs_bytes,
+ bs_rows,
+ batchRange);
+
+ data.m_firstExecution = false;
+
+ if (treeNodePtr.p->m_bits & TreeNode::T_SCAN_PARALLEL)
+ {
+ ndbrequire((data.m_frags_outstanding + data.m_frags_complete) ==
+ data.m_fragCount);
+ }
+ else
+ {
+ ndbrequire(static_cast<Uint32>(data.m_frags_outstanding +
+ data.m_frags_complete) <=
+ data.m_fragCount);
+ }
+
+ data.m_batch_chunks = 1;
+ requestPtr.p->m_cnt_active++;
+ requestPtr.p->m_outstanding++;
+ treeNodePtr.p->m_state = TreeNode::TN_ACTIVE;
}
void
@@ -5045,77 +5162,91 @@ Dbspj::scanIndex_parent_batch_repeat(Sig
}
}
+/**
+ * Ask for the first batch for a number of fragments.
+ */
void
Dbspj::scanIndex_send(Signal* signal,
Ptr<Request> requestPtr,
- Ptr<TreeNode> treeNodePtr)
+ Ptr<TreeNode> treeNodePtr,
+ Uint32 noOfFrags,
+ Uint32 bs_bytes,
+ Uint32 bs_rows,
+ Uint32& batchRange)
{
- jam();
-
- ScanIndexData& data = treeNodePtr.p->m_scanindex_data;
- const ScanFragReq * org = (const ScanFragReq*)data.m_scanFragReq;
-
- Uint32 cnt = 1;
- Uint32 bs_rows = org->batch_size_rows;
- Uint32 bs_bytes = org->batch_size_bytes;
- if (treeNodePtr.p->m_bits & TreeNode::T_SCAN_PARALLEL)
- {
- jam();
- cnt = data.m_fragCount - data.m_frags_complete;
- ndbrequire(cnt > 0);
-
- bs_rows /= cnt;
- bs_bytes /= cnt;
- ndbassert(bs_rows > 0);
- }
-
/**
* if (m_bits & prunemask):
* - Range keys sliced out to each ScanFragHandle
* - Else, range keys kept on first (and only) ScanFragHandle
*/
- Uint32 prunemask = TreeNode::T_PRUNE_PATTERN | TreeNode::T_CONST_PRUNE;
+ const bool prune = treeNodePtr.p->m_bits &
+ (TreeNode::T_PRUNE_PATTERN | TreeNode::T_CONST_PRUNE);
/**
- * Don't release keyInfo if it may be sent multiple times, eiter:
- * - Not pruned -> same keyInfo goes to all datanodes.
- * - Result handling is REPEAT_SCAN_RESULT and same batch may be
- * repeated multiple times due to incomplete bushy X-scans.
- * (by ::scanIndex_parent_batch_repeat())
- *
- * When not released, ::scanIndex_parent_batch_cleanup() will
- * eventually release them when preparing arrival of a new parent batch.
+ * If scan is repeatable, we must make sure not to release range keys so
+ * that we canuse them again in the next repetition.
*/
- const bool release = ((treeNodePtr.p->m_bits & prunemask) != 0 &&
- (treeNodePtr.p->m_bits & TreeNode::T_SCAN_REPEATABLE) == 0);
+ const bool repeatable =
+ (treeNodePtr.p->m_bits & TreeNode::T_SCAN_REPEATABLE) != 0;
- ScanFragReq* req = reinterpret_cast<ScanFragReq*>(signal->getDataPtrSend());
+ ScanIndexData& data = treeNodePtr.p->m_scanindex_data;
+ ndbassert(noOfFrags > 0);
+ ndbassert(data.m_frags_not_started >= noOfFrags);
+ ScanFragReq* const req =
+ reinterpret_cast<ScanFragReq*>(signal->getDataPtrSend());
+ const ScanFragReq * const org
+ = reinterpret_cast<ScanFragReq*>(data.m_scanFragReq);
memcpy(req, org, sizeof(data.m_scanFragReq));
// req->variableData[0] // set below
req->variableData[1] = requestPtr.p->m_rootResultData;
req->batch_size_bytes = bs_bytes;
req->batch_size_rows = bs_rows;
- Ptr<ScanFragHandle> fragPtr;
+ Uint32 requestsSent = 0;
Local_ScanFragHandle_list list(m_scanfraghandle_pool, data.m_fragments);
-
- Uint32 keyInfoPtrI = RNIL;
+ Ptr<ScanFragHandle> fragPtr;
list.first(fragPtr);
- if ((treeNodePtr.p->m_bits & prunemask) == 0)
+ Uint32 keyInfoPtrI = fragPtr.p->m_rangePtrI;
+ ndbrequire(prune || keyInfoPtrI != RNIL);
+ /**
+ * Iterate over the list of fragments until we have sent as many
+ * SCAN_FRAGREQs as we should.
+ */
+ while (requestsSent < noOfFrags)
{
jam();
- keyInfoPtrI = fragPtr.p->m_rangePtrI;
- ndbrequire(keyInfoPtrI != RNIL);
- }
+ ndbassert(!fragPtr.isNull());
- Uint32 batchRange = 0;
- for (Uint32 i = 0; i < cnt && !fragPtr.isNull(); list.next(fragPtr))
- {
- jam();
+ if (fragPtr.p->m_state != ScanFragHandle::SFH_NOT_STARTED)
+ {
+ // Skip forward to the frags that we should send.
+ jam();
+ list.next(fragPtr);
+ continue;
+ }
+
+ const Uint32 ref = fragPtr.p->m_ref;
+
+ if (noOfFrags==1 && !prune &&
+ data.m_frags_not_started == data.m_fragCount &&
+ refToNode(ref) != getOwnNodeId() &&
+ list.hasNext(fragPtr))
+ {
+ /**
+ * If we are doing a scan with adaptive parallelism and start with
+ * parallelism=1 then it makes sense to fetch a batch from a fragment on
+ * the local data node. The reason for this is that if that fragment
+ * contains few rows, we may be able to read from several fragments in
+ * parallel. Then we minimize the total number of round trips (to remote
+ * data nodes) if we fetch the first fragment batch locally.
+ */
+ jam();
+ list.next(fragPtr);
+ continue;
+ }
SectionHandle handle(this);
- Uint32 ref = fragPtr.p->m_ref;
Uint32 attrInfoPtrI = treeNodePtr.p->m_send.m_attrInfoPtrI;
/**
@@ -5124,26 +5255,34 @@ Dbspj::scanIndex_send(Signal* signal,
req->senderData = fragPtr.i;
req->fragmentNoKeyLen = fragPtr.p->m_fragId;
- if ((treeNodePtr.p->m_bits & prunemask))
+ if (prune)
{
jam();
keyInfoPtrI = fragPtr.p->m_rangePtrI;
if (keyInfoPtrI == RNIL)
{
+ /**
+ * Since we use pruning, we can see that no parent rows would hash
+ * to this fragment.
+ */
jam();
fragPtr.p->m_state = ScanFragHandle::SFH_COMPLETE;
+ list.next(fragPtr);
continue;
}
- }
- if (release)
- {
- /**
- * If we'll use sendSignal() and we need to send the attrInfo several
- * times, we need to copy them
- */
- Uint32 tmp = RNIL;
- ndbrequire(dupSection(tmp, attrInfoPtrI)); // TODO handle error
- attrInfoPtrI = tmp;
+
+ if (!repeatable)
+ {
+ /**
+ * If we'll use sendSignal() and we need to send the attrInfo several
+ * times, we need to copy them. (For repeatable or unpruned scans
+ * we use sendSignalNoRelease(), so then we do not need to copy.)
+ */
+ jam();
+ Uint32 tmp = RNIL;
+ ndbrequire(dupSection(tmp, attrInfoPtrI)); // TODO handle error
+ attrInfoPtrI = tmp;
+ }
}
req->variableData[0] = batchRange;
@@ -5152,16 +5291,14 @@ Dbspj::scanIndex_send(Signal* signal,
handle.m_cnt = 2;
#if defined DEBUG_SCAN_FRAGREQ
- {
- ndbout_c("SCAN_FRAGREQ to %x", ref);
- printSCAN_FRAGREQ(stdout, signal->getDataPtrSend(),
- NDB_ARRAY_SIZE(treeNodePtr.p->m_scanfrag_data.m_scanFragReq),
- DBLQH);
- printf("ATTRINFO: ");
- print(handle.m_ptr[0], stdout);
- printf("KEYINFO: ");
- print(handle.m_ptr[1], stdout);
- }
+ ndbout_c("SCAN_FRAGREQ to %x", ref);
+ printSCAN_FRAGREQ(stdout, signal->getDataPtrSend(),
+ NDB_ARRAY_SIZE(treeNodePtr.p->m_scanfrag_data.m_scanFragReq),
+ DBLQH);
+ printf("ATTRINFO: ");
+ print(handle.m_ptr[0], stdout);
+ printf("KEYINFO: ");
+ print(handle.m_ptr[1], stdout);
#endif
if (refToNode(ref) == getOwnNodeId())
@@ -5173,8 +5310,13 @@ Dbspj::scanIndex_send(Signal* signal,
c_Counters.incr_counter(CI_REMOTE_RANGE_SCANS_SENT, 1);
}
- if (release)
+ if (prune && !repeatable)
{
+ /**
+ * For a non-repeatable pruned scan, key info is unique for each
+ * fragment and therefore cannot be reused, so we release key info
+ * right away.
+ */
jam();
sendSignal(ref, GSN_SCAN_FRAGREQ, signal,
NDB_ARRAY_SIZE(data.m_scanFragReq), JBB, &handle);
@@ -5183,32 +5325,24 @@ Dbspj::scanIndex_send(Signal* signal,
}
else
{
+ /**
+ * Reuse key info for multiple fragments and/or multiple repetitions
+ * of the scan.
+ */
jam();
sendSignalNoRelease(ref, GSN_SCAN_FRAGREQ, signal,
NDB_ARRAY_SIZE(data.m_scanFragReq), JBB, &handle);
}
handle.clear();
- i++;
fragPtr.p->m_state = ScanFragHandle::SFH_SCANNING; // running
data.m_frags_outstanding++;
batchRange += bs_rows;
- }
+ requestsSent++;
+ list.next(fragPtr);
+ } // while (requestsSent < noOfFrags)
- if (treeNodePtr.p->m_bits & TreeNode::T_SCAN_PARALLEL)
- {
- ndbrequire(data.m_frags_outstanding ==
- data.m_fragCount - data.m_frags_complete);
- }
- else
- {
- ndbrequire(data.m_frags_outstanding == 1);
- }
-
- data.m_batch_chunks = 1;
- requestPtr.p->m_cnt_active++;
- requestPtr.p->m_outstanding++;
- treeNodePtr.p->m_state = TreeNode::TN_ACTIVE;
+ data.m_frags_not_started -= requestsSent;
}
void
@@ -5282,6 +5416,10 @@ Dbspj::scanIndex_execSCAN_FRAGCONF(Signa
}
requestPtr.p->m_rows += rows;
+ data.m_totalRows += rows;
+ data.m_totalBytes += conf->total_len;
+ data.m_largestBatchRows = MAX(data.m_largestBatchRows, rows);
+ data.m_largestBatchBytes = MAX(data.m_largestBatchBytes, conf->total_len);
if (!treeNodePtr.p->isLeaf())
{
@@ -5302,7 +5440,9 @@ Dbspj::scanIndex_execSCAN_FRAGCONF(Signa
ndbrequire(data.m_frags_complete < data.m_fragCount);
data.m_frags_complete++;
- if (data.m_frags_complete == data.m_fragCount)
+ if (data.m_frags_complete == data.m_fragCount ||
+ ((requestPtr.p->m_state & Request::RS_ABORTING) != 0 &&
+ data.m_fragCount == (data.m_frags_complete + data.m_frags_not_started)))
{
jam();
ndbrequire(requestPtr.p->m_cnt_active);
@@ -5314,6 +5454,32 @@ Dbspj::scanIndex_execSCAN_FRAGCONF(Signa
if (data.m_frags_outstanding == 0)
{
+ const ScanFragReq * const org
+ = reinterpret_cast<const ScanFragReq*>(data.m_scanFragReq);
+
+ if (data.m_frags_complete == data.m_fragCount)
+ {
+ jam();
+ /**
+ * Calculate what would have been the optimal parallelism for the
+ * scan instance that we have just completed, and update
+ * 'parallelismStat' with this value. We then use this statistics to set
+ * the initial parallelism for the next instance of this operation.
+ */
+ double parallelism = data.m_fragCount;
+ if (data.m_totalRows > 0)
+ {
+ parallelism = MIN(parallelism,
+ double(org->batch_size_rows) / data.m_totalRows);
+ }
+ if (data.m_totalBytes > 0)
+ {
+ parallelism = MIN(parallelism,
+ double(org->batch_size_bytes) / data.m_totalBytes);
+ }
+ data.m_parallelismStat.update(parallelism);
+ }
+
/**
* Don't reportBatchComplete to children if we're aborting...
*/
@@ -5364,7 +5530,7 @@ Dbspj::scanIndex_execSCAN_FRAGREF(Signal
ndbrequire(data.m_frags_outstanding > 0);
data.m_frags_outstanding--;
- if (data.m_frags_complete == data.m_fragCount)
+ if (data.m_fragCount == (data.m_frags_complete + data.m_frags_not_started))
{
jam();
ndbrequire(requestPtr.p->m_cnt_active);
@@ -5390,21 +5556,78 @@ Dbspj::scanIndex_execSCAN_NEXTREQ(Signal
jam();
ScanIndexData& data = treeNodePtr.p->m_scanindex_data;
+ const ScanFragReq * org = (const ScanFragReq*)data.m_scanFragReq;
data.m_rows_received = 0;
data.m_rows_expecting = 0;
ndbassert(data.m_frags_outstanding == 0);
ndbrequire(data.m_frags_complete < data.m_fragCount);
- Uint32 cnt = data.m_fragCount - data.m_frags_complete;
if ((treeNodePtr.p->m_bits & TreeNode::T_SCAN_PARALLEL) == 0)
{
jam();
- cnt = 1;
+ /**
+ * Since fetching few but large batches is more efficient, we
+ * set parallelism to the lowest value where we can still expect each
+ * batch to be full.
+ */
+ if (data.m_largestBatchRows < org->batch_size_rows/data.m_parallelism &&
+ data.m_largestBatchBytes < org->batch_size_bytes/data.m_parallelism)
+ {
+ jam();
+ data.m_parallelism = data.m_fragCount - data.m_frags_complete;
+ if (data.m_largestBatchRows > 0)
+ {
+ jam();
+ data.m_parallelism =
+ MIN(org->batch_size_rows / data.m_largestBatchRows,
+ data.m_parallelism);
+ }
+ if (data.m_largestBatchBytes > 0)
+ {
+ jam();
+ data.m_parallelism =
+ MIN(data.m_parallelism,
+ org->batch_size_bytes/data.m_largestBatchBytes);
+ }
+ if (data.m_frags_complete == 0 &&
+ data.m_frags_not_started % data.m_parallelism != 0)
+ {
+ jam();
+ /**
+ * Set parallelism such that we can expect to have similar
+ * parallelism in each batch. For example if there are 8 remaining
+ * fragments, then we should fecth 2 times 4 fragments rather than
+ * 7+1.
+ */
+ const Uint32 roundTrips =
+ 1 + data.m_frags_not_started / data.m_parallelism;
+ data.m_parallelism = data.m_frags_not_started / roundTrips;
+ }
+ }
+ else
+ {
+ jam();
+ // We get full batches, so we should lower parallelism.
+ data.m_parallelism = MIN(data.m_fragCount - data.m_frags_complete,
+ MAX(1, data.m_parallelism/2));
+ }
+ ndbassert(data.m_parallelism > 0);
+#ifdef DEBUG_SCAN_FRAGREQ
+ DEBUG("::scanIndex_execSCAN_NEXTREQ() Asking for new batches from " <<
+ data.m_parallelism <<
+ " fragments with " << org->batch_size_rows/data.m_parallelism <<
+ " rows and " << org->batch_size_bytes/data.m_parallelism <<
+ " bytes.");
+#endif
+ }
+ else
+ {
+ jam();
+ data.m_parallelism = data.m_fragCount - data.m_frags_complete;
}
- const ScanFragReq * org = (const ScanFragReq*)data.m_scanFragReq;
- const Uint32 bs_rows = org->batch_size_rows/cnt;
+ const Uint32 bs_rows = org->batch_size_rows/data.m_parallelism;
ndbassert(bs_rows > 0);
ScanFragNextReq* req =
reinterpret_cast<ScanFragNextReq*>(signal->getDataPtrSend());
@@ -5413,20 +5636,27 @@ Dbspj::scanIndex_execSCAN_NEXTREQ(Signal
req->transId1 = requestPtr.p->m_transId[0];
req->transId2 = requestPtr.p->m_transId[1];
req->batch_size_rows = bs_rows;
- req->batch_size_bytes = org->batch_size_bytes/cnt;
+ req->batch_size_bytes = org->batch_size_bytes/data.m_parallelism;
Uint32 batchRange = 0;
Ptr<ScanFragHandle> fragPtr;
+ Uint32 sentFragCount = 0;
+ {
+ /**
+ * First, ask for more data from fragments that are already started.
+ */
Local_ScanFragHandle_list list(m_scanfraghandle_pool, data.m_fragments);
list.first(fragPtr);
- for (Uint32 i = 0; i < cnt && !fragPtr.isNull(); list.next(fragPtr))
+ while (sentFragCount < data.m_parallelism && !fragPtr.isNull())
{
jam();
+ ndbassert(fragPtr.p->m_state == ScanFragHandle::SFH_WAIT_NEXTREQ ||
+ fragPtr.p->m_state == ScanFragHandle::SFH_COMPLETE ||
+ fragPtr.p->m_state == ScanFragHandle::SFH_NOT_STARTED);
if (fragPtr.p->m_state == ScanFragHandle::SFH_WAIT_NEXTREQ)
{
jam();
- i++;
data.m_frags_outstanding++;
req->variableData[0] = batchRange;
fragPtr.p->m_state = ScanFragHandle::SFH_SCANNING;
@@ -5434,6 +5664,7 @@ Dbspj::scanIndex_execSCAN_NEXTREQ(Signal
DEBUG("scanIndex_execSCAN_NEXTREQ to: " << hex
<< treeNodePtr.p->m_send.m_ref
+ << ", m_node_no=" << treeNodePtr.p->m_node_no
<< ", senderData: " << req->senderData);
#ifdef DEBUG_SCAN_FRAGREQ
@@ -5445,9 +5676,27 @@ Dbspj::scanIndex_execSCAN_NEXTREQ(Signal
sendSignal(fragPtr.p->m_ref, GSN_SCAN_NEXTREQ, signal,
ScanFragNextReq::SignalLength + 1,
JBB);
+ sentFragCount++;
+ }
+ list.next(fragPtr);
}
}
+ if (sentFragCount < data.m_parallelism)
+ {
+ /**
+ * Then start new fragments until we reach data.m_parallelism.
+ */
+ jam();
+ ndbassert(data.m_frags_not_started != 0);
+ scanIndex_send(signal,
+ requestPtr,
+ treeNodePtr,
+ data.m_parallelism - sentFragCount,
+ org->batch_size_bytes/data.m_parallelism,
+ bs_rows,
+ batchRange);
+ }
/**
* cursor should not have been positioned here...
* unless we actually had something more to send.
@@ -5544,14 +5793,28 @@ Dbspj::scanIndex_abort(Signal* signal,
}
}
- ndbrequire(cnt_waiting + cnt_scanning > 0);
if (cnt_scanning == 0)
{
- /**
- * If all were waiting...this should increase m_outstanding
- */
- jam();
- requestPtr.p->m_outstanding++;
+ if (cnt_waiting > 0)
+ {
+ /**
+ * If all were waiting...this should increase m_outstanding
+ */
+ jam();
+ requestPtr.p->m_outstanding++;
+ }
+ else
+ {
+ /**
+ * All fragments are either complete or not yet started, so there is
+ * nothing to abort.
+ */
+ jam();
+ ndbassert(data.m_frags_not_started > 0);
+ ndbrequire(requestPtr.p->m_cnt_active);
+ requestPtr.p->m_cnt_active--;
+ treeNodePtr.p->m_state = TreeNode::TN_INACTIVE;
+ }
}
}
@@ -5603,6 +5866,8 @@ Dbspj::scanIndex_execNODE_FAILREP(Signal
jam();
ndbrequire(data.m_frags_complete < data.m_fragCount);
data.m_frags_complete++;
+ ndbrequire(data.m_frags_not_started > 0);
+ data.m_frags_not_started--;
// fall through
case ScanFragHandle::SFH_COMPLETE:
jam();
@@ -5637,8 +5902,8 @@ Dbspj::scanIndex_execNODE_FAILREP(Signal
requestPtr.p->m_outstanding--;
}
- if (save1 != data.m_fragCount
- && data.m_frags_complete == data.m_fragCount)
+ if (save1 != 0 &&
+ data.m_fragCount == (data.m_frags_complete + data.m_frags_not_started))
{
jam();
ndbrequire(requestPtr.p->m_cnt_active);
@@ -5654,7 +5919,8 @@ Dbspj::scanIndex_release_rangekeys(Ptr<R
Ptr<TreeNode> treeNodePtr)
{
jam();
- DEBUG("scanIndex_release_rangekeys(), m_node_no: " << treeNodePtr.p->m_node_no);
+ DEBUG("scanIndex_release_rangekeys(), tree node " << treeNodePtr.i
+ << " m_node_no: " << treeNodePtr.p->m_node_no);
ScanIndexData& data = treeNodePtr.p->m_scanindex_data;
Local_ScanFragHandle_list list(m_scanfraghandle_pool, data.m_fragments);
@@ -7115,3 +7381,15 @@ void Dbspj::execDBINFO_SCANREQ(Signal *s
ndbinfo_send_scan_conf(signal, req, rl);
} // Dbspj::execDBINFO_SCANREQ(Signal *signal)
+
+void Dbspj::IncrementalStatistics::update(double sample)
+{
+ // Prevent wrap-around
+ if(m_noOfSamples < 0xffffffff)
+ {
+ m_noOfSamples++;
+ const double delta = sample - m_mean;
+ m_mean += delta/m_noOfSamples;
+ m_sumSquare += delta * (sample - m_mean);
+ }
+}
=== modified file 'storage/ndb/src/kernel/blocks/qmgr/QmgrMain.cpp'
--- a/storage/ndb/src/kernel/blocks/qmgr/QmgrMain.cpp 2011-07-05 12:46:07 +0000
+++ b/storage/ndb/src/kernel/blocks/qmgr/QmgrMain.cpp 2011-08-25 06:35:39 +0000
@@ -2972,26 +2972,6 @@ void Qmgr::checkStartInterface(Signal* s
nodePtr.p->m_failconf_blocks[3],
nodePtr.p->m_failconf_blocks[4]);
warningEvent("%s", buf);
-
- for (Uint32 i = 0; i<NDB_ARRAY_SIZE(nodePtr.p->m_failconf_blocks);
- i++)
- {
- jam();
- if (nodePtr.p->m_failconf_blocks[i] != 0)
- {
- jam();
- signal->theData[0] = 7019;
- signal->theData[1] = nodePtr.i;
- sendSignal(numberToRef(nodePtr.p->m_failconf_blocks[i],
- getOwnNodeId()),
- GSN_DUMP_STATE_ORD, signal, 2, JBB);
- }
- else
- {
- jam();
- break;
- }
- }
}
}
}
=== modified file 'storage/ndb/src/ndbapi/NdbQueryOperation.cpp'
--- a/storage/ndb/src/ndbapi/NdbQueryOperation.cpp 2011-08-17 10:36:01 +0000
+++ b/storage/ndb/src/ndbapi/NdbQueryOperation.cpp 2011-08-25 06:35:39 +0000
@@ -57,6 +57,7 @@ static const int Err_FunctionNotImplemen
static const int Err_UnknownColumn = 4004;
static const int Err_ReceiveTimedOut = 4008;
static const int Err_NodeFailCausedAbort = 4028;
+static const int Err_ParameterError = 4118;
static const int Err_SimpleDirtyReadFailed = 4119;
static const int Err_WrongFieldLength = 4209;
static const int Err_ReadTooMuch = 4257;
@@ -72,6 +73,21 @@ static const Uint16 tupleNotFound = 0xff
/** Set to true to trace incomming signals.*/
const bool traceSignals = false;
+enum
+{
+ /**
+ * Set NdbQueryOperationImpl::m_parallelism to this value to indicate that
+ * scan parallelism should be adaptive.
+ */
+ Parallelism_adaptive = 0xffff0000,
+
+ /**
+ * Set NdbQueryOperationImpl::m_parallelism to this value to indicate that
+ * all fragments should be scanned in parallel.
+ */
+ Parallelism_max = 0xffff0001
+};
+
/**
* A class for accessing the correlation data at the end of a tuple (for
* scan queries). These data have the following layout:
@@ -181,18 +197,24 @@ public:
*/
void init(NdbQueryImpl& query, Uint32 fragNo);
+ static void clear(NdbRootFragment* frags, Uint32 noOfFrags);
+
Uint32 getFragNo() const
{ return m_fragNo; }
/**
- * Prepare for receiving another batch.
+ * Prepare for receiving another batch of results.
*/
- void reset();
+ void prepareNextReceiveSet();
/**
* Prepare for reading another batch of results.
*/
- void prepareResultSet();
+ void grabNextResultSet(); // Need mutex lock
+
+ bool hasReceivedMore() const; // Need mutex lock
+
+ void setReceivedMore(); // Need mutex lock
void incrOutstandingResults(Int32 delta)
{
@@ -261,6 +283,10 @@ public:
void postFetchRelease();
private:
+ /** No copying.*/
+ NdbRootFragment(const NdbRootFragment&);
+ NdbRootFragment& operator=(const NdbRootFragment&);
+
STATIC_CONST( voidFragNo = 0xffffffff);
/** Enclosing query.*/
@@ -273,12 +299,19 @@ private:
NdbResultStream* m_resultStreams;
/**
- * The number of outstanding TCKEYREF or TRANSID_AI
- * messages for the fragment. This includes both messages related to the
+ * Number of available prefetched ResultSets which are completely
+ * received. Will be made available for reading by calling
+ * ::grabNextResultSet()
+ */
+ Uint32 m_availResultSets; // Need mutex
+
+ /**
+ * The number of outstanding TCKEYREF or TRANSID_AI messages to receive
+ * for the fragment. This includes both messages related to the
* root operation and any descendant operation that was instantiated as
* a consequence of tuples found by the root operation.
- * This number may temporarily be negative if e.g. TRANSID_AI arrives before
- * SCAN_TABCONF.
+ * This number may temporarily be negative if e.g. TRANSID_AI arrives
+ * before SCAN_TABCONF.
*/
Int32 m_outstandingResults;
@@ -287,7 +320,8 @@ private:
* operation accesses (i.e. one for a lookup, all for a table scan).
*
* Each element is true iff a SCAN_TABCONF (for that fragment) or
- * TCKEYCONF message has been received */
+ * TCKEYCONF message has been received
+ */
bool m_confReceived;
/**
@@ -311,6 +345,55 @@ private:
int m_idMapNext;
}; //NdbRootFragment
+/**
+ * 'class NdbResultSet' is a helper for 'class NdbResultStream'.
+ * It manages the buffers which rows are received into and
+ * read from.
+ */
+class NdbResultSet
+{
+ friend class NdbResultStream;
+
+public:
+ explicit NdbResultSet();
+
+ void init(NdbQueryImpl& query,
+ Uint32 maxRows, Uint32 rowSize);
+
+ void prepareReceive(NdbReceiver& receiver)
+ {
+ m_rowCount = 0;
+ receiver.prepareReceive(m_buffer);
+ }
+
+ void prepareRead(NdbReceiver& receiver)
+ {
+ receiver.prepareRead(m_buffer,m_rowCount);
+ }
+
+ Uint32 getRowCount() const
+ { return m_rowCount; }
+
+private:
+ /** No copying.*/
+ NdbResultSet(const NdbResultSet&);
+ NdbResultSet& operator=(const NdbResultSet&);
+
+ /** The buffers which we receive the results into */
+ char* m_buffer;
+
+ /** Used for checking if buffer overrun occurred. */
+ Uint32* m_batchOverflowCheck;
+
+ /** Array of TupleCorrelations for all rows in m_buffer */
+ TupleCorrelation* m_correlations;
+
+ Uint32 m_rowSize;
+
+ /** The current #rows in 'm_buffer'.*/
+ Uint32 m_rowCount;
+
+}; // class NdbResultSet
/**
* This class manages the subset of result data for one operation that is
@@ -340,7 +423,7 @@ public:
void prepare();
/** Prepare for receiving next batch of scan results. */
- void reset();
+ void prepareNextReceiveSet();
NdbReceiver& getReceiver()
{ return m_receiver; }
@@ -348,11 +431,8 @@ public:
const NdbReceiver& getReceiver() const
{ return m_receiver; }
- Uint32 getRowCount() const
- { return m_rowCount; }
-
- char* getRow(Uint32 tupleNo) const
- { return (m_buffer + (tupleNo*m_rowSize)); }
+ const char* getCurrentRow()
+ { return m_receiver.get_row(); }
/**
* Process an incomming tuple for this stream. Extract parent and own tuple
@@ -372,9 +452,9 @@ public:
bool prepareResultSet(Uint32 remainingScans);
/**
- * Navigate within the current result batch to resp. first and next row.
+ * Navigate within the current ResultSet to resp. first and next row.
* For non-parent operations in the pushed query, navigation is with respect
- * to any preceding parents which results in this NdbResultStream depends on.
+ * to any preceding parents which results in this ResultSet depends on.
* Returns either the tupleNo within TupleSet[] which we navigated to, or
* tupleNotFound().
*/
@@ -490,25 +570,22 @@ private:
/** The receiver object that unpacks transid_AI messages.*/
NdbReceiver m_receiver;
- /** The buffers which we receive the results into */
- char* m_buffer;
-
- /** Used for checking if buffer overrun occurred. */
- Uint32* m_batchOverflowCheck;
-
- Uint32 m_rowSize;
-
- /** The number of transid_AI messages received.*/
- Uint32 m_rowCount;
+ /**
+ * ResultSets are received into and read from this stream,
+ * intended to be extended into double buffered ResultSet later.
+ */
+ NdbResultSet m_resultSets[1];
+ Uint32 m_read; // We read from m_resultSets[m_read]
+ Uint32 m_recv; // We receive into m_resultSets[m_recv]
/** This is the state of the iterator used by firstResult(), nextResult().*/
enum
{
/** The first row has not been fetched yet.*/
Iter_notStarted,
- /** Is iterating the resultset, (implies 'm_currentRow!=tupleNotFound').*/
+ /** Is iterating the ResultSet, (implies 'm_currentRow!=tupleNotFound').*/
Iter_started,
- /** Last row for current batch has been returned.*/
+ /** Last row for current ResultSet has been returned.*/
Iter_finished
} m_iterState;
@@ -586,6 +663,41 @@ void* NdbBulkAllocator::allocObjMem(Uint
return m_nextObjNo > m_maxObjs ? NULL : result;
}
+///////////////////////////////////////////
+///////// NdbResultSet methods ///////////
+///////////////////////////////////////////
+NdbResultSet::NdbResultSet() :
+ m_buffer(NULL),
+ m_batchOverflowCheck(NULL),
+ m_correlations(NULL),
+ m_rowSize(0),
+ m_rowCount(0)
+{}
+
+void
+NdbResultSet::init(NdbQueryImpl& query,
+ Uint32 maxRows,
+ Uint32 rowSize)
+{
+ m_rowSize = rowSize;
+ {
+ const int bufferSize = rowSize * maxRows;
+ NdbBulkAllocator& bufferAlloc = query.getRowBufferAlloc();
+ m_buffer = reinterpret_cast<char*>(bufferAlloc.allocObjMem(bufferSize));
+
+ // So that we can test for buffer overrun.
+ m_batchOverflowCheck =
+ reinterpret_cast<Uint32*>(bufferAlloc.allocObjMem(sizeof(Uint32)));
+ *m_batchOverflowCheck = 0xacbd1234;
+
+ if (query.getQueryDef().isScanQuery())
+ {
+ m_correlations = reinterpret_cast<TupleCorrelation*>
+ (bufferAlloc.allocObjMem(maxRows*sizeof(TupleCorrelation)));
+ }
+ }
+}
+
//////////////////////////////////////////////
///////// NdbResultStream methods ///////////
//////////////////////////////////////////////
@@ -607,10 +719,7 @@ NdbResultStream::NdbResultStream(NdbQuer
| (operation.getQueryOperationDef().getMatchType() != NdbQueryOptions::MatchAll
? Is_Inner_Join : 0))),
m_receiver(operation.getQuery().getNdbTransaction().getNdb()),
- m_buffer(NULL),
- m_batchOverflowCheck(NULL),
- m_rowSize(0),
- m_rowCount(0),
+ m_resultSets(), m_read(0xffffffff), m_recv(0),
m_iterState(Iter_notStarted),
m_currentRow(tupleNotFound),
m_maxRows(0),
@@ -631,12 +740,8 @@ NdbResultStream::prepare()
const Uint32 rowSize = m_operation.getRowSize();
NdbQueryImpl &query = m_operation.getQuery();
- m_rowSize = rowSize;
-
/* Parent / child correlation is only relevant for scan type queries
- * Don't create m_parentTupleId[] and m_childTupleIdx[] for lookups!
- * Neither is these structures required for operations not having respective
- * child or parent operations.
+ * Don't create a m_tupleSet with these correlation id's for lookups!
*/
if (isScanQuery())
{
@@ -648,14 +753,7 @@ NdbResultStream::prepare()
else
m_maxRows = 1;
- const int bufferSize = rowSize * m_maxRows;
- NdbBulkAllocator& bufferAlloc = query.getRowBufferAlloc();
- m_buffer = reinterpret_cast<char*>(bufferAlloc.allocObjMem(bufferSize));
-
- // So that we can test for buffer overrun.
- m_batchOverflowCheck =
- reinterpret_cast<Uint32*>(bufferAlloc.allocObjMem(sizeof(Uint32)));
- *m_batchOverflowCheck = 0xacbd1234;
+ m_resultSets[0].init(query, m_maxRows, rowSize);
m_receiver.init(NdbReceiver::NDB_QUERY_OPERATION, false, &m_operation);
m_receiver.do_setup_ndbrecord(
@@ -664,32 +762,9 @@ NdbResultStream::prepare()
0 /*key_size*/,
0 /*read_range_no*/,
rowSize,
- m_buffer);
+ m_resultSets[m_recv].m_buffer);
} //NdbResultStream::prepare
-void
-NdbResultStream::reset()
-{
- assert (isScanQuery());
-
- // Root scan-operation need a ScanTabConf to complete
- m_rowCount = 0;
- m_iterState = Iter_notStarted;
- m_currentRow = tupleNotFound;
-
- m_receiver.prepareSend();
- /**
- * If this stream will get new rows in the next batch, then so will
- * all of its descendants.
- */
- for (Uint32 childNo = 0; childNo < m_operation.getNoOfChildOperations();
- childNo++)
- {
- NdbQueryOperationImpl& child = m_operation.getChildOperation(childNo);
- m_rootFrag.getResultStream(child).reset();
- }
-} //NdbResultStream::reset
-
/** Locate, and return 'tupleNo', of first tuple with specified parentId.
* parentId == tupleNotFound is use as a special value for iterating results
* from the root operation in the order which they was inserted by
@@ -703,11 +778,11 @@ NdbResultStream::findTupleWithParentId(U
{
assert ((parentId==tupleNotFound) == (m_parent==NULL));
- if (likely(m_rowCount>0))
+ if (likely(m_resultSets[m_read].m_rowCount>0))
{
if (m_tupleSet==NULL)
{
- assert (m_rowCount <= 1);
+ assert (m_resultSets[m_read].m_rowCount <= 1);
return 0;
}
@@ -774,7 +849,7 @@ NdbResultStream::firstResult()
if ((m_currentRow=findTupleWithParentId(parentId)) != tupleNotFound)
{
m_iterState = Iter_started;
- m_receiver.setCurrentRow(m_currentRow);
+ m_receiver.setCurrentRow(m_resultSets[m_read].m_buffer, m_currentRow);
return m_currentRow;
}
@@ -782,7 +857,6 @@ NdbResultStream::firstResult()
return tupleNotFound;
} //NdbResultStream::firstResult()
-
Uint16
NdbResultStream::nextResult()
{
@@ -791,14 +865,13 @@ NdbResultStream::nextResult()
(m_currentRow=findNextTuple(m_currentRow)) != tupleNotFound)
{
m_iterState = Iter_started;
- m_receiver.setCurrentRow(m_currentRow);
+ m_receiver.setCurrentRow(m_resultSets[m_read].m_buffer, m_currentRow);
return m_currentRow;
}
m_iterState = Iter_finished;
return tupleNotFound;
} //NdbResultStream::nextResult()
-
/**
* Callback when a TRANSID_AI signal (receive row) is processed.
*/
@@ -806,21 +879,46 @@ void
NdbResultStream::execTRANSID_AI(const Uint32 *ptr, Uint32 len,
TupleCorrelation correlation)
{
- m_receiver.execTRANSID_AI(ptr, len);
- m_rowCount++;
-
+ NdbResultSet& receiveSet = m_resultSets[m_recv];
if (isScanQuery())
{
/**
- * Store TupleCorrelation as hidden value imm. after received row
- * (NdbQueryOperationImpl::getRowSize() has reserved space for it)
+ * Store TupleCorrelation.
*/
- Uint32* row_recv = reinterpret_cast<Uint32*>(m_receiver.m_record.m_row);
- row_recv[-1] = correlation.toUint32();
+ receiveSet.m_correlations[receiveSet.m_rowCount] = correlation;
}
+
+ m_receiver.execTRANSID_AI(ptr, len);
+ receiveSet.m_rowCount++;
} // NdbResultStream::execTRANSID_AI()
/**
+ * Make preparation for another batch of results to be received.
+ * This NdbResultStream, and all its sibling will receive a batch
+ * of results from the datanodes.
+ */
+void
+NdbResultStream::prepareNextReceiveSet()
+{
+ assert (isScanQuery());
+
+ m_iterState = Iter_notStarted;
+ m_currentRow = tupleNotFound;
+ m_resultSets[m_recv].prepareReceive(m_receiver);
+
+ /**
+ * If this stream will get new rows in the next batch, then so will
+ * all of its descendants.
+ */
+ for (Uint32 childNo = 0; childNo < m_operation.getNoOfChildOperations();
+ childNo++)
+ {
+ NdbQueryOperationImpl& child = m_operation.getChildOperation(childNo);
+ m_rootFrag.getResultStream(child).prepareNextReceiveSet();
+ }
+} //NdbResultStream::prepareNextReceiveSet
+
+/**
* Make preparations for another batch of result to be read:
* - Fill in parent/child result correlations in m_tupleSet[]
* - ... or reset m_tupleSet[] if we reuse the previous.
@@ -833,12 +931,16 @@ NdbResultStream::prepareResultSet(Uint32
bool isComplete = isSubScanComplete(remainingScans); //Childs with more rows
assert(isComplete || isScanResult()); //Lookups always 'complete'
- // Set correct #rows received in the NdbReceiver.
- getReceiver().m_result_rows = getRowCount();
+ m_read = m_recv;
+ NdbResultSet& readResult = m_resultSets[m_read];
+
+ // Set correct buffer and #rows received by this ResultSet.
+ readResult.prepareRead(m_receiver);
/**
- * Prepare NdbResultStream for reading - either the next received
- * from datanodes or reuse current.
+ * Prepare NdbResultSet for reading - either the next received
+ * from datanodes or reuse the last as has been determined by
+ * ::prepareNextReceiveSet()
*/
if (m_tupleSet!=NULL)
{
@@ -850,7 +952,7 @@ NdbResultStream::prepareResultSet(Uint32
else
{
// Makes all rows in 'TupleSet' available (clear 'm_skip' flag)
- for (Uint32 tupleNo=0; tupleNo<getRowCount(); tupleNo++)
+ for (Uint32 tupleNo=0; tupleNo<readResult.getRowCount(); tupleNo++)
{
m_tupleSet[tupleNo].m_skip = false;
}
@@ -877,7 +979,7 @@ NdbResultStream::prepareResultSet(Uint32
if (m_tupleSet!=NULL)
{
- for (Uint32 tupleNo=0; tupleNo<getRowCount(); tupleNo++)
+ for (Uint32 tupleNo=0; tupleNo<readResult.getRowCount(); tupleNo++)
{
if (!m_tupleSet[tupleNo].m_skip)
{
@@ -913,21 +1015,24 @@ NdbResultStream::prepareResultSet(Uint32
/**
* Fill m_tupleSet[] with correlation data between parent
- * and child tuples. The 'TupleCorrelation' is stored as
- * and extra Uint32 after each row received
+ * and child tuples. The 'TupleCorrelation' is stored
+ * in an array of TupleCorrelations in each ResultSet
* by execTRANSID_AI().
*
* NOTE: In order to reduce work done when holding the
* transporter mutex, the 'TupleCorrelation' is only stored
* in the buffer when it arrives. Later (here) we build the
* correlation hashMap immediately before we prepare to
- * read the NdbResultStream.
+ * read the NdbResultSet.
*/
void
NdbResultStream::buildResultCorrelations()
{
+ const NdbResultSet& readResult = m_resultSets[m_read];
+
// Buffer overrun check.
- assert(m_batchOverflowCheck==NULL || *m_batchOverflowCheck==0xacbd1234);
+ assert(readResult.m_batchOverflowCheck==NULL ||
+ *readResult.m_batchOverflowCheck==0xacbd1234);
//if (m_tupleSet!=NULL)
{
@@ -937,15 +1042,12 @@ NdbResultStream::buildResultCorrelations
m_tupleSet[i].m_hash_head = tupleNotFound;
}
- /* Rebuild correlation & hashmap from received buffers */
- for (Uint32 tupleNo=0; tupleNo<m_rowCount; tupleNo++)
+ /* Rebuild correlation & hashmap from 'readResult' */
+ for (Uint32 tupleNo=0; tupleNo<readResult.m_rowCount; tupleNo++)
{
- const Uint32* row = (Uint32*)getRow(tupleNo+1);
- const TupleCorrelation correlation(row[-1]);
-
- const Uint16 tupleId = correlation.getTupleId();
+ const Uint16 tupleId = readResult.m_correlations[tupleNo].getTupleId();
const Uint16 parentId = (m_parent!=NULL)
- ? correlation.getParentTupleId()
+ ? readResult.m_correlations[tupleNo].getParentTupleId()
: tupleNotFound;
m_tupleSet[tupleNo].m_skip = false;
@@ -1034,6 +1136,7 @@ NdbRootFragment::NdbRootFragment():
m_query(NULL),
m_fragNo(voidFragNo),
m_resultStreams(NULL),
+ m_availResultSets(0),
m_outstandingResults(0),
m_confReceived(false),
m_remainingScans(0),
@@ -1094,7 +1197,46 @@ NdbRootFragment::getResultStream(Uint32
return m_resultStreams[operationNo];
}
-void NdbRootFragment::reset()
+/**
+ * Throw any pending ResultSets from specified rootFrags[]
+ */
+//static
+void NdbRootFragment::clear(NdbRootFragment* rootFrags, Uint32 noOfFrags)
+{
+ if (rootFrags != NULL)
+ {
+ for (Uint32 fragNo = 0; fragNo < noOfFrags; fragNo++)
+ {
+ rootFrags[fragNo].m_availResultSets = 0;
+ }
+ }
+}
+
+/**
+ * Signal that another complete ResultSet is available for
+ * this NdbRootFragment.
+ * Need mutex lock as 'm_availResultSets' is accesed both from
+ * receiver and application thread.
+ */
+void NdbRootFragment::setReceivedMore()
+{
+ assert(m_availResultSets==0);
+ m_availResultSets++;
+}
+
+/**
+ * Check if another ResultSets has been received and is available
+ * for reading. It will be given to the application thread when it
+ * call ::grabNextResultSet().
+ * Need mutex lock as 'm_availResultSets' is accesed both from
+ * receiver and application thread.
+ */
+bool NdbRootFragment::hasReceivedMore() const
+{
+ return (m_availResultSets > 0);
+}
+
+void NdbRootFragment::prepareNextReceiveSet()
{
assert(m_fragNo!=voidFragNo);
assert(m_outstandingResults == 0);
@@ -1102,20 +1244,30 @@ void NdbRootFragment::reset()
for (unsigned opNo=0; opNo<m_query->getNoOfOperations(); opNo++)
{
- if (!m_resultStreams[opNo].isSubScanComplete(m_remainingScans))
+ NdbResultStream& resultStream = getResultStream(opNo);
+ if (!resultStream.isSubScanComplete(m_remainingScans))
{
/**
- * Reset m_resultStreams[] and all its descendants, since all these
+ * Reset resultStream and all its descendants, since all these
* streams will get a new set of rows in the next batch.
*/
- m_resultStreams[opNo].reset();
+ resultStream.prepareNextReceiveSet();
}
}
m_confReceived = false;
}
-void NdbRootFragment::prepareResultSet()
+/**
+ * Let the application thread takes ownership of an available
+ * ResultSet, prepare it for reading first row.
+ * Need mutex lock as 'm_availResultSets' is accesed both from
+ * receiver and application thread.
+ */
+void NdbRootFragment::grabNextResultSet()
{
+ assert(m_availResultSets>0);
+ m_availResultSets--;
+
NdbResultStream& rootStream = getResultStream(0);
rootStream.prepareResultSet(m_remainingScans);
@@ -1137,7 +1289,7 @@ void NdbRootFragment::setConfReceived(Ui
bool NdbRootFragment::finalBatchReceived() const
{
- return getReceiverTcPtrI()==RNIL;
+ return m_confReceived && getReceiverTcPtrI()==RNIL;
}
bool NdbRootFragment::isEmpty() const
@@ -1356,6 +1508,14 @@ int NdbQueryOperation::setParallelism(Ui
return m_impl.setParallelism(parallelism);
}
+int NdbQueryOperation::setMaxParallelism(){
+ return m_impl.setMaxParallelism();
+}
+
+int NdbQueryOperation::setAdaptiveParallelism(){
+ return m_impl.setAdaptiveParallelism();
+}
+
int NdbQueryOperation::setBatchSize(Uint32 batchSize){
return m_impl.setBatchSize(batchSize);
}
@@ -1587,6 +1747,7 @@ NdbQueryImpl::NdbQueryImpl(NdbTransactio
m_next(NULL),
m_queryDef(&queryDef),
m_error(),
+ m_errorReceived(0),
m_transaction(trans),
m_scanTransaction(NULL),
m_operations(0),
@@ -1596,7 +1757,6 @@ NdbQueryImpl::NdbQueryImpl(NdbTransactio
m_rootFragCount(0),
m_rootFrags(NULL),
m_applFrags(),
- m_fullFrags(),
m_finalBatchFrags(0),
m_num_bounds(0),
m_shortestBound(0xffffffff),
@@ -2060,10 +2220,9 @@ NdbQueryImpl::nextResult(bool fetchAllow
NdbQuery::NextResultOutcome
NdbQueryImpl::nextRootResult(bool fetchAllowed, bool forceSend)
{
- /* To minimize lock contention, each query has two separate root fragment
- * conatiners (m_fullFrags and m_applFrags). m_applFrags is only
- * accessed by the application thread, so it is safe to use it without
- * locks.
+ /* To minimize lock contention, each query has the separate root fragment
+ * conatiner 'm_applFrags'. m_applFrags is only accessed by the application
+ * thread, so it is safe to use it without locks.
*/
while (m_state != EndOfData) // Or likely: return when 'gotRow' or error
{
@@ -2179,23 +2338,17 @@ NdbQueryImpl::awaitMoreResults(bool forc
*/
while (likely(!hasReceivedError()))
{
- /* m_fullFrags contains any fragments that are complete (for this batch)
- * but have not yet been moved (under mutex protection) to
- * m_applFrags.
+ /* Scan m_rootFrags (under mutex protection) for fragments
+ * which has received a complete batch. Add these to m_applFrags.
*/
- NdbRootFragment* frag;
- while ((frag=m_fullFrags.pop()) != NULL)
- {
- frag->prepareResultSet();
- m_applFrags.add(*frag);
- }
+ m_applFrags.prepareMoreResults(m_rootFrags,m_rootFragCount);
if (m_applFrags.getCurrent() != NULL)
{
return FetchResult_ok;
}
- /* There are noe more available frament results available without
- * first waiting for more to be received from datanodes
+ /* There are no more available fragment results available without
+ * first waiting for more to be received from the datanodes
*/
if (m_pendingFrags == 0)
{
@@ -2236,16 +2389,9 @@ NdbQueryImpl::awaitMoreResults(bool forc
/* The root operation is a lookup. Lookups are guaranteed to be complete
* before NdbTransaction::execute() returns. Therefore we do not set
* the lock, because we know that the signal receiver thread will not
- * be accessing m_fullFrags at this time.
+ * be accessing m_rootFrags at this time.
*/
- NdbRootFragment* frag;
- if ((frag=m_fullFrags.pop()) != NULL)
- {
- frag->prepareResultSet();
- m_applFrags.add(*frag);
- }
- assert(m_fullFrags.pop()==NULL); // Only one stream for lookups.
-
+ m_applFrags.prepareMoreResults(m_rootFrags,m_rootFragCount);
if (m_applFrags.getCurrent() != NULL)
{
return FetchResult_ok;
@@ -2266,8 +2412,8 @@ NdbQueryImpl::awaitMoreResults(bool forc
/*
::handleBatchComplete() is intended to be called when receiving signals only.
- The PollGuard mutex is then set and the shared 'm_pendingFrags',
- 'm_finalBatchFrags' and 'm_fullFrags' can safely be updated.
+ The PollGuard mutex is then set and the shared 'm_pendingFrags' and
+ 'm_finalBatchFrags' can safely be updated and ::setReceivedMore() signaled.
returns: 'true' when application thread should be resumed.
*/
@@ -2286,7 +2432,7 @@ NdbQueryImpl::handleBatchComplete(NdbRoo
* terminated the scan. We are about to close this query,
* and didn't expect any more data - ignore it!
*/
- if (likely(m_fullFrags.m_errorCode == 0))
+ if (likely(m_errorReceived == 0))
{
assert(rootFrag.isFragBatchComplete());
@@ -2300,10 +2446,10 @@ NdbQueryImpl::handleBatchComplete(NdbRoo
assert(m_finalBatchFrags <= m_rootFragCount);
}
- /* When application thread ::awaitMoreResults() it will later be moved
- * from m_fullFrags to m_applFrags under mutex protection.
+ /* When application thread ::awaitMoreResults() it will later be
+ * added to m_applFrags under mutex protection.
*/
- m_fullFrags.push(rootFrag);
+ rootFrag.setReceivedMore();
return true;
}
@@ -2327,7 +2473,7 @@ NdbQueryImpl::close(bool forceSend)
}
// Throw any pending results
- m_fullFrags.clear();
+ NdbRootFragment::clear(m_rootFrags,m_rootFragCount);
m_applFrags.clear();
Ndb* const ndb = m_transaction.getNdb();
@@ -2423,7 +2569,7 @@ NdbQueryImpl::setFetchTerminated(int err
}
if (errorCode!=0)
{
- m_fullFrags.m_errorCode = errorCode;
+ m_errorReceived = errorCode;
}
m_pendingFrags = 0;
} // NdbQueryImpl::setFetchTerminated()
@@ -2436,9 +2582,9 @@ NdbQueryImpl::setFetchTerminated(int err
bool
NdbQueryImpl::hasReceivedError()
{
- if (unlikely(m_fullFrags.m_errorCode))
+ if (unlikely(m_errorReceived))
{
- setErrorCode(m_fullFrags.m_errorCode);
+ setErrorCode(m_errorReceived);
return true;
}
return false;
@@ -2503,16 +2649,17 @@ NdbQueryImpl::prepareSend()
{
/* For the first batch, we read from all fragments for both ordered
* and unordered scans.*/
- if (getQueryOperation(0U).m_parallelism > 0)
+ if (getQueryOperation(0U).m_parallelism == Parallelism_max)
{
m_rootFragCount
- = MIN(getRoot().getQueryOperationDef().getTable().getFragmentCount(),
- getQueryOperation(0U).m_parallelism);
+ = getRoot().getQueryOperationDef().getTable().getFragmentCount();
}
else
{
+ assert(getQueryOperation(0U).m_parallelism != Parallelism_adaptive);
m_rootFragCount
- = getRoot().getQueryOperationDef().getTable().getFragmentCount();
+ = MIN(getRoot().getQueryOperationDef().getTable().getFragmentCount(),
+ getQueryOperation(0U).m_parallelism);
}
Ndb* const ndb = m_transaction.getNdb();
@@ -2543,8 +2690,7 @@ NdbQueryImpl::prepareSend()
}
// Allocate space for ptrs to NdbResultStream and NdbRootFragment objects.
error = m_pointerAlloc.init(m_rootFragCount *
- (SharedFragStack::pointersPerFragment +
- OrderedFragSet::pointersPerFragment));
+ (OrderedFragSet::pointersPerFragment));
if (error != 0)
{
setErrorCode(error);
@@ -2563,14 +2709,13 @@ NdbQueryImpl::prepareSend()
for (Uint32 opNo = 0; opNo < getNoOfOperations(); opNo++)
{
const NdbQueryOperationImpl& op = getQueryOperation(opNo);
+ // Add space for m_correlations, m_buffer & m_batchOverflowCheck
+ totalBuffSize += (sizeof(TupleCorrelation) * op.getMaxBatchRows());
totalBuffSize += (op.getRowSize() * op.getMaxBatchRows());
+ totalBuffSize += sizeof(Uint32); // Overflow check
}
- /**
- * Add one word per ResultStream for buffer overrun check. We add a word
- * rather than a byte to avoid possible alignment problems.
- */
- m_rowBufferAlloc.init(m_rootFragCount *
- (totalBuffSize + (sizeof(Uint32) * getNoOfOperations())) );
+
+ m_rowBufferAlloc.init(m_rootFragCount * totalBuffSize);
if (getQueryDef().isScanQuery())
{
Uint32 totalRows = 0;
@@ -2631,21 +2776,17 @@ NdbQueryImpl::prepareSend()
keyRec = getRoot().getQueryOperationDef().getIndex()->getDefaultRecord();
assert(keyRec!=NULL);
}
- if (unlikely((error = m_applFrags.prepare(m_pointerAlloc,
- getRoot().getOrdering(),
- m_rootFragCount,
- keyRec,
- getRoot().m_ndbRecord)) != 0)
- || (error = m_fullFrags.prepare(m_pointerAlloc,
- m_rootFragCount)) != 0) {
- setErrorCode(error);
- return -1;
- }
+ m_applFrags.prepare(m_pointerAlloc,
+ getRoot().getOrdering(),
+ m_rootFragCount,
+ keyRec,
+ getRoot().m_ndbRecord);
if (getQueryDef().isScanQuery())
{
NdbRootFragment::buildReciverIdMap(m_rootFrags, m_rootFragCount);
}
+
#ifdef TRACE_SERIALIZATION
ndbout << "Serialized ATTRINFO : ";
for(Uint32 i = 0; i < m_attrInfo.getSize(); i++){
@@ -3088,7 +3229,7 @@ NdbQueryImpl::sendFetchMore(NdbRootFragm
NdbRootFragment* rootFrag = rootFrags[i];
assert(rootFrag->isFragBatchComplete());
assert(!rootFrag->finalBatchReceived());
- rootFrag->reset();
+ rootFrag->prepareNextReceiveSet();
}
Ndb& ndb = *getNdbTransaction().getNdb();
@@ -3183,8 +3324,8 @@ NdbQueryImpl::closeTcCursor(bool forceSe
} // while
assert(m_pendingFrags==0);
- m_fullFrags.clear(); // Throw any unhandled results
- m_fullFrags.m_errorCode = 0; // Clear errors caused by previous fetching
+ NdbRootFragment::clear(m_rootFrags,m_rootFragCount);
+ m_errorReceived = 0; // Clear errors caused by previous fetching
m_error.code = 0;
if (m_finalBatchFrags < getRootFragCount()) // TC has an open scan cursor.
@@ -3271,63 +3412,35 @@ int NdbQueryImpl::isPrunable(bool& pruna
/****************
- * NdbQueryImpl::SharedFragStack methods.
- ***************/
-
-NdbQueryImpl::SharedFragStack::SharedFragStack():
- m_errorCode(0),
- m_capacity(0),
- m_current(-1),
- m_array(NULL)
-{}
-
-int
-NdbQueryImpl::SharedFragStack::prepare(NdbBulkAllocator& allocator,
- int capacity)
-{
- assert(m_array==NULL);
- assert(m_capacity==0);
- if (capacity > 0)
- { m_capacity = capacity;
- m_array =
- reinterpret_cast<NdbRootFragment**>(allocator.allocObjMem(capacity));
- }
- return 0;
-}
-
-void
-NdbQueryImpl::SharedFragStack::push(NdbRootFragment& frag)
-{
- m_current++;
- assert(m_current<m_capacity);
- m_array[m_current] = &frag;
-}
-
-/****************
* NdbQueryImpl::OrderedFragSet methods.
***************/
NdbQueryImpl::OrderedFragSet::OrderedFragSet():
m_capacity(0),
m_activeFragCount(0),
- m_emptiedFragCount(0),
+ m_fetchMoreFragCount(0),
m_finalFragCount(0),
m_ordering(NdbQueryOptions::ScanOrdering_void),
m_keyRecord(NULL),
m_resultRecord(NULL),
m_activeFrags(NULL),
- m_emptiedFrags(NULL)
+ m_fetchMoreFrags(NULL)
{
}
NdbQueryImpl::OrderedFragSet::~OrderedFragSet()
{
m_activeFrags = NULL;
- m_emptiedFrags= NULL;
+ m_fetchMoreFrags = NULL;
}
+void NdbQueryImpl::OrderedFragSet::clear()
+{
+ m_activeFragCount = 0;
+ m_fetchMoreFragCount = 0;
+}
-int
+void
NdbQueryImpl::OrderedFragSet::prepare(NdbBulkAllocator& allocator,
NdbQueryOptions::ScanOrdering ordering,
int capacity,
@@ -3345,14 +3458,14 @@ NdbQueryImpl::OrderedFragSet::prepare(Nd
m_activeFrags =
reinterpret_cast<NdbRootFragment**>(allocator.allocObjMem(capacity));
bzero(m_activeFrags, capacity * sizeof(NdbRootFragment*));
- m_emptiedFrags =
+
+ m_fetchMoreFrags =
reinterpret_cast<NdbRootFragment**>(allocator.allocObjMem(capacity));
- bzero(m_emptiedFrags, capacity * sizeof(NdbRootFragment*));
+ bzero(m_fetchMoreFrags, capacity * sizeof(NdbRootFragment*));
}
m_ordering = ordering;
m_keyRecord = keyRecord;
m_resultRecord = resultRecord;
- return 0;
} // OrderedFragSet::prepare()
@@ -3368,8 +3481,6 @@ NdbQueryImpl::OrderedFragSet::getCurrent
{
if (m_ordering!=NdbQueryOptions::ScanOrdering_unordered)
{
- // Results should be ordered.
- assert(verifySortOrder());
/**
* Must have tuples for each (non-completed) fragment when doing ordered
* scan.
@@ -3413,10 +3524,10 @@ NdbQueryImpl::OrderedFragSet::reorganize
}
else
{
- m_emptiedFrags[m_emptiedFragCount++] = frag;
+ m_fetchMoreFrags[m_fetchMoreFragCount++] = frag;
}
m_activeFragCount--;
- assert(m_activeFragCount + m_emptiedFragCount + m_finalFragCount
+ assert(m_activeFragCount + m_fetchMoreFragCount + m_finalFragCount
<= m_capacity);
return; // Remaining m_activeFrags[] are sorted
@@ -3466,7 +3577,7 @@ NdbQueryImpl::OrderedFragSet::reorganize
}
assert(verifySortOrder());
}
- assert(m_activeFragCount + m_emptiedFragCount + m_finalFragCount
+ assert(m_activeFragCount + m_fetchMoreFragCount + m_finalFragCount
<= m_capacity);
} // OrderedFragSet::reorganize()
@@ -3479,19 +3590,29 @@ NdbQueryImpl::OrderedFragSet::add(NdbRoo
reorganize(); // Move into position
} // OrderedFragSet::add()
-void NdbQueryImpl::OrderedFragSet::clear()
-{
- m_activeFragCount = 0;
- m_emptiedFragCount = 0;
- m_finalFragCount = 0;
-}
+void
+NdbQueryImpl::OrderedFragSet::prepareMoreResults(NdbRootFragment rootFrags[], Uint32 cnt)
+{
+ for (Uint32 fragNo = 0; fragNo < cnt; fragNo++)
+ {
+ NdbRootFragment& rootFrag = rootFrags[fragNo];
+ if (rootFrag.hasReceivedMore()) // Another ResultSet is available
+ {
+ rootFrag.grabNextResultSet(); // Get new ResultSet.
+ add(rootFrag); // Make avail. to appl. thread
+ }
+ } // for all 'rootFrags[]'
+
+ assert(m_activeFragCount + m_fetchMoreFragCount + m_finalFragCount
+ <= m_capacity);
+} // OrderedFragSet::prepareMoreResults()
Uint32
NdbQueryImpl::OrderedFragSet::getFetchMore(NdbRootFragment** &frags)
{
- const Uint32 cnt = m_emptiedFragCount;
- frags = m_emptiedFrags;
- m_emptiedFragCount = 0;
+ const int cnt = m_fetchMoreFragCount;
+ frags = m_fetchMoreFrags;
+ m_fetchMoreFragCount = 0;
return cnt;
}
@@ -3570,7 +3691,8 @@ NdbQueryOperationImpl::NdbQueryOperation
m_ordering(NdbQueryOptions::ScanOrdering_unordered),
m_interpretedCode(NULL),
m_diskInUserProjection(false),
- m_parallelism(0),
+ m_parallelism(def.getQueryOperationIx() == 0
+ ? Parallelism_max : Parallelism_adaptive),
m_rowSize(0xffffffff)
{
if (errno == ENOMEM)
@@ -3912,7 +4034,7 @@ NdbQueryOperationImpl::nextResult(bool f
void
NdbQueryOperationImpl::fetchRow(NdbResultStream& resultStream)
{
- const char* buff = resultStream.getReceiver().peek_row();
+ const char* buff = resultStream.getCurrentRow();
assert(buff!=NULL || (m_firstRecAttr==NULL && m_ndbRecord==NULL));
m_isRowNull = false;
@@ -4169,9 +4291,10 @@ NdbQueryOperationImpl
m_ndbRecord,
m_firstRecAttr,
0, // Key size.
- getRoot().m_parallelism > 0 ?
- getRoot().m_parallelism :
- m_queryImpl.getRootFragCount(),
+ getRoot().m_parallelism
+ == Parallelism_max ?
+ m_queryImpl.getRootFragCount() :
+ getRoot().m_parallelism,
maxBatchRows,
batchByteSize,
firstBatchRows);
@@ -4357,7 +4480,12 @@ NdbQueryOperationImpl::prepareAttrInfo(U
firstBatchRows);
assert(batchRows==getMaxBatchRows());
assert(batchRows==firstBatchRows);
- requestInfo |= QN_ScanIndexParameters::SIP_PARALLEL; // FIXME: SPJ always assume. SIP_PARALLEL
+ assert(m_parallelism == Parallelism_max ||
+ m_parallelism == Parallelism_adaptive);
+ if (m_parallelism == Parallelism_max)
+ {
+ requestInfo |= QN_ScanIndexParameters::SIP_PARALLEL;
+ }
param->requestInfo = requestInfo;
param->batchSize = ((Uint16)batchByteSize << 16) | (Uint16)firstBatchRows;
param->resultData = getIdOfReceiver();
@@ -4689,7 +4817,7 @@ NdbQueryOperationImpl::execTRANSID_AI(co
if (getQueryDef().isScanQuery())
{
const CorrelationData correlData(ptr, len);
- const Uint32 receiverId = CorrelationData(ptr, len).getRootReceiverId();
+ const Uint32 receiverId = correlData.getRootReceiverId();
/** receiverId holds the Id of the receiver of the corresponding stream
* of the root operation. We can thus find the correct root fragment
@@ -4861,7 +4989,7 @@ NdbQueryOperationImpl::setOrdering(NdbQu
return -1;
}
- if (m_parallelism != 0)
+ if (m_parallelism != Parallelism_max)
{
getQuery().setErrorCode(QRY_SEQUENTIAL_SCAN_SORTED);
return -1;
@@ -4956,10 +5084,40 @@ int NdbQueryOperationImpl::setParallelis
getQuery().setErrorCode(Err_FunctionNotImplemented);
return -1;
}
+ else if (parallelism < 1 || parallelism > MAX_NDB_PARTITIONS)
+ {
+ getQuery().setErrorCode(Err_ParameterError);
+ return -1;
+ }
m_parallelism = parallelism;
return 0;
}
+int NdbQueryOperationImpl::setMaxParallelism(){
+ if (!getQueryOperationDef().isScanOperation())
+ {
+ getQuery().setErrorCode(QRY_WRONG_OPERATION_TYPE);
+ return -1;
+ }
+ m_parallelism = Parallelism_max;
+ return 0;
+}
+
+int NdbQueryOperationImpl::setAdaptiveParallelism(){
+ if (!getQueryOperationDef().isScanOperation())
+ {
+ getQuery().setErrorCode(QRY_WRONG_OPERATION_TYPE);
+ return -1;
+ }
+ else if (getQueryOperationDef().getQueryOperationIx() == 0)
+ {
+ getQuery().setErrorCode(Err_FunctionNotImplemented);
+ return -1;
+ }
+ m_parallelism = Parallelism_adaptive;
+ return 0;
+}
+
int NdbQueryOperationImpl::setBatchSize(Uint32 batchSize){
if (!getQueryOperationDef().isScanOperation())
{
@@ -5027,12 +5185,6 @@ Uint32 NdbQueryOperationImpl::getRowSize
{
m_rowSize =
NdbReceiver::ndbrecord_rowsize(m_ndbRecord, m_firstRecAttr, 0, false);
-
- const bool withCorrelation = getRoot().getQueryDef().isScanQuery();
- if (withCorrelation)
- {
- m_rowSize += TupleCorrelation::wordCount*sizeof(Uint32);
- }
}
return m_rowSize;
}
@@ -5061,7 +5213,7 @@ NdbOut& operator<<(NdbOut& out, const Nd
}
NdbOut& operator<<(NdbOut& out, const NdbResultStream& stream){
- out << " m_rowCount: " << stream.m_rowCount;
+ out << " received rows: " << stream.m_resultSets[stream.m_recv].getRowCount();
return out;
}
=== modified file 'storage/ndb/src/ndbapi/NdbQueryOperation.hpp'
--- a/storage/ndb/src/ndbapi/NdbQueryOperation.hpp 2011-06-20 13:25:48 +0000
+++ b/storage/ndb/src/ndbapi/NdbQueryOperation.hpp 2011-08-22 08:35:35 +0000
@@ -335,13 +335,29 @@ public:
NdbQueryOptions::ScanOrdering getOrdering() const;
/**
- * Set the number of fragments to be scanned in parallel for the root
- * operation of this query. This only applies to table scans and non-sorted
- * scans of ordered indexes.
+ * Set the number of fragments to be scanned in parallel. This only applies
+ * to table scans and non-sorted scans of ordered indexes. This method is
+ * only implemented for then root scan operation.
* @return 0 if ok, -1 in case of error (call getNdbError() for details.)
*/
int setParallelism(Uint32 parallelism);
+ /**
+ * Set the number of fragments to be scanned in parallel to the maximum
+ * possible value. This is the default for the root scan operation.
+ * @return 0 if ok, -1 in case of error (call getNdbError() for details.)
+ */
+ int setMaxParallelism();
+
+ /**
+ * Let the system dynamically choose the number of fragments to scan in
+ * parallel. The system will try to choose a value that gives optimal
+ * performance. This is the default for all scans but the root scan. This
+ * method only implemented for non-root scan operations.
+ * @return 0 if ok, -1 in case of error (call getNdbError() for details.)
+ */
+ int setAdaptiveParallelism();
+
/** Set the batch size (max rows per batch) for this operation. This
* only applies to scan operations, as lookup operations always will
* have the same batch size as its parent operation, or 1 if it is the
=== modified file 'storage/ndb/src/ndbapi/NdbQueryOperationImpl.hpp'
--- a/storage/ndb/src/ndbapi/NdbQueryOperationImpl.hpp 2011-08-17 08:10:27 +0000
+++ b/storage/ndb/src/ndbapi/NdbQueryOperationImpl.hpp 2011-08-22 08:35:35 +0000
@@ -271,58 +271,13 @@ private:
FetchResult_noMoreCache = 2
};
- /** A stack of NdbRootFragment pointers.
- * NdbRootFragments which are 'BatchComplete' are pushed on this stack by
- * the receiver thread, and later pop'ed into the application thread when
- * it need more results to process.
- * Due to this shared usage, the PollGuard mutex must be set before
- * accessing SharedFragStack.
+ /**
+ * Container of root fragments that the application is currently
+ * iterating over. 'Owned' by application thread and can be accesed
+ * without requiring a mutex lock.
+ * RootFragments are appended to a OrderedFragSet by ::prepareMoreResults()
+ *
*/
- class SharedFragStack{
- public:
- // For calculating need for dynamically allocated memory.
- static const Uint32 pointersPerFragment = 1;
-
- explicit SharedFragStack();
-
- /**
- * Prepare internal datastructures.
- * param[in] allocator For allocating arrays of pointers.
- * param[in] capacity Max no of root fragments.
- * @return 0 if ok, else errorcode
- */
- int prepare(NdbBulkAllocator& allocator, int capacity);
-
- NdbRootFragment* pop() {
- return m_current>=0 ? m_array[m_current--] : NULL;
- }
-
- void push(NdbRootFragment& frag);
-
- int size() const {
- return (m_current+1);
- }
-
- void clear() {
- m_current = -1;
- }
-
- /** Possible error received from TC / datanodes. */
- int m_errorCode;
-
- private:
- /** Capacity of stack.*/
- int m_capacity;
-
- /** Index of current top of stack.*/
- int m_current;
- NdbRootFragment** m_array;
-
- // No copying.
- SharedFragStack(const SharedFragStack&);
- SharedFragStack& operator=(const SharedFragStack&);
- }; // class SharedFragStack
-
class OrderedFragSet{
public:
// For calculating need for dynamically allocated memory.
@@ -339,11 +294,20 @@ private:
* param[in] capacity Max no of root fragments.
* @return 0 if ok, else errorcode
*/
- int prepare(NdbBulkAllocator& allocator,
- NdbQueryOptions::ScanOrdering ordering,
- int capacity,
- const NdbRecord* keyRecord,
- const NdbRecord* resultRecord);
+ void prepare(NdbBulkAllocator& allocator,
+ NdbQueryOptions::ScanOrdering ordering,
+ int capacity,
+ const NdbRecord* keyRecord,
+ const NdbRecord* resultRecord);
+
+ /**
+ * Add root fragments with completed ResultSets to this OrderedFragSet.
+ * The PollGuard mutex must locked, and under its protection
+ * completed root fragments are 'consumed' from rootFrags[] and
+ * added to OrderedFragSet where it become available for the
+ * application thread.
+ */
+ void prepareMoreResults(NdbRootFragment rootFrags[], Uint32 cnt); // Need mutex lock
/** Get the root fragment from which to read the next row.*/
NdbRootFragment* getCurrent() const;
@@ -355,9 +319,6 @@ private:
*/
void reorganize();
- /** Add a complete fragment that has been received.*/
- void add(NdbRootFragment& frag);
-
/** Reset object to an empty state.*/
void clear();
@@ -372,14 +333,16 @@ private:
private:
- /** Max no of fragments.*/
+ /** No of fragments to read until '::finalBatchReceived()'.*/
int m_capacity;
/** Number of fragments in 'm_activeFrags'.*/
int m_activeFragCount;
- /** Number of fragments in 'm_emptiedFrags'. */
- int m_emptiedFragCount;
- /** Number of fragments where the final batch has been received
- * and consumed.*/
+ /** Number of fragments in 'm_fetchMoreFrags'. */
+ int m_fetchMoreFragCount;
+ /**
+ * Number of fragments where the final batch has been received
+ * and consumed.
+ */
int m_finalFragCount;
/** Ordering of index scan result.*/
NdbQueryOptions::ScanOrdering m_ordering;
@@ -387,15 +350,21 @@ private:
const NdbRecord* m_keyRecord;
/** Needed for comparing records when ordering results.*/
const NdbRecord* m_resultRecord;
- /** Fragments where some tuples in the current batch has not yet been
- * consumed.*/
+ /**
+ * Fragments where some tuples in the current ResultSet has not
+ * yet been consumed.
+ */
NdbRootFragment** m_activeFrags;
- /** Fragments where all tuples in the current batch have been consumed,
- * but where there are more batches to fetch.*/
- NdbRootFragment** m_emptiedFrags;
- // No copying.
- OrderedFragSet(const OrderedFragSet&);
- OrderedFragSet& operator=(const OrderedFragSet&);
+ /**
+ * Fragments from which we should request more ResultSets.
+ * Either due to the current batch has been consumed, or double buffering
+ * of result sets allows us to request another batch before the current
+ * has been consumed.
+ */
+ NdbRootFragment** m_fetchMoreFrags;
+
+ /** Add a complete fragment that has been received.*/
+ void add(NdbRootFragment& frag);
/** For sorting fragment reads according to index value of first record.
* Also f1<f2 if f2 has reached end of data and f1 has not.
@@ -405,6 +374,10 @@ private:
/** For debugging purposes.*/
bool verifySortOrder() const;
+
+ // No copying.
+ OrderedFragSet(const OrderedFragSet&);
+ OrderedFragSet& operator=(const OrderedFragSet&);
}; // class OrderedFragSet
/** The interface that is visible to the application developer.*/
@@ -433,6 +406,14 @@ private:
/** Possible error status of this query.*/
NdbError m_error;
+
+ /**
+ * Possible error received from TC / datanodes.
+ * Only access w/ PollGuard mutex as it is set by receiver thread.
+ * Checked and moved into 'm_error' with ::hasReceivedError().
+ */
+ int m_errorReceived; // BEWARE: protect with PollGuard mutex
+
/** Transaction in which this query instance executes.*/
NdbTransaction& m_transaction;
@@ -452,7 +433,7 @@ private:
Uint32 m_globalCursor;
/** Number of root fragments not yet completed within the current batch.
- * Only access w/ PollGuard mutex as it is also updated by receiver threa
+ * Only access w/ PollGuard mutex as it is also updated by receiver thread
*/
Uint32 m_pendingFrags; // BEWARE: protect with PollGuard mutex
@@ -473,11 +454,6 @@ private:
*/
OrderedFragSet m_applFrags;
- /** Root frgaments that have received a complete batch. Shared between
- * application thread and receiving thread. Access should be mutex protected.
- */
- SharedFragStack m_fullFrags; // BEWARE: protect with PollGuard mutex
-
/** Number of root fragments for which confirmation for the final batch
* (with tcPtrI=RNIL) has been received. Observe that even if
* m_finalBatchFrags==m_rootFragCount, all tuples for the final batches may
@@ -557,7 +533,8 @@ private:
bool forceSend);
/** Wait for more scan results which already has been REQuested to arrive.
- * @return 0 if some rows did arrive, a negative value if there are errors (in m_error.code),
+ * @return 0 if some rows did arrive, a negative value if there are errors
+ * (in m_error.code),
* and 1 of there are no more rows to receive.
*/
FetchResult awaitMoreResults(bool forceSend);
@@ -684,14 +661,30 @@ public:
NdbQueryOptions::ScanOrdering getOrdering() const
{ return m_ordering; }
- /**
- * Set the number of fragments to be scanned in parallel for the root
- * operation of this query. This only applies to table scans and non-sorted
- * scans of ordered indexes.
+ /**
+ * Set the number of fragments to be scanned in parallel. This only applies
+ * to table scans and non-sorted scans of ordered indexes. This method is
+ * only implemented for then root scan operation.
* @return 0 if ok, -1 in case of error (call getNdbError() for details.)
*/
int setParallelism(Uint32 parallelism);
+ /**
+ * Set the number of fragments to be scanned in parallel to the maximum
+ * possible value. This is the default for the root scan operation.
+ * @return 0 if ok, -1 in case of error (call getNdbError() for details.)
+ */
+ int setMaxParallelism();
+
+ /**
+ * Let the system dynamically choose the number of fragments to scan in
+ * parallel. The system will try to choose a value that gives optimal
+ * performance. This is the default for all scans but the root scan. This
+ * method only implemented for non-root scan operations.
+ * @return 0 if ok, -1 in case of error (call getNdbError() for details.)
+ */
+ int setAdaptiveParallelism();
+
/** Set the batch size (max rows per batch) for this operation. This
* only applies to scan operations, as lookup operations always will
* have the same batch size as its parent operation, or 1 if it is the
=== modified file 'storage/ndb/src/ndbapi/NdbReceiver.cpp'
--- a/storage/ndb/src/ndbapi/NdbReceiver.cpp 2011-07-05 12:46:07 +0000
+++ b/storage/ndb/src/ndbapi/NdbReceiver.cpp 2011-08-25 06:35:39 +0000
@@ -56,7 +56,7 @@ NdbReceiver::init(ReceiverType type, boo
if (useRec)
{
m_record.m_ndb_record= NULL;
- m_record.m_row= NULL;
+ m_record.m_row_recv= NULL;
m_record.m_row_buffer= NULL;
m_record.m_row_offset= 0;
m_record.m_read_range_no= false;
@@ -118,11 +118,38 @@ NdbReceiver::getValues(const NdbRecord*
assert(m_using_ndb_record);
m_record.m_ndb_record= rec;
- m_record.m_row= row_ptr;
+ m_record.m_row_recv= row_ptr;
m_record.m_row_offset= rec->m_row_size;
}
-#define KEY_ATTR_ID (~(Uint32)0)
+void
+NdbReceiver::prepareReceive(char *buf)
+{
+ /* Set pointers etc. to prepare for receiving the first row of the batch. */
+ assert(theMagicNumber == 0x11223344);
+ m_received_result_length = 0;
+ m_expected_result_length = 0;
+ if (m_using_ndb_record)
+ {
+ m_record.m_row_recv= buf;
+ }
+ theCurrentRecAttr = theFirstRecAttr;
+}
+
+void
+NdbReceiver::prepareRead(char *buf, Uint32 rows)
+{
+ /* Set pointers etc. to prepare for reading the first row of the batch. */
+ assert(theMagicNumber == 0x11223344);
+ m_current_row = 0;
+ m_result_rows = rows;
+ if (m_using_ndb_record)
+ {
+ m_record.m_row_buffer = buf;
+ }
+}
+
+ #define KEY_ATTR_ID (~(Uint32)0)
/*
Compute the batch size (rows between each NEXT_TABREQ / SCAN_TABCONF) to
@@ -219,7 +246,7 @@ NdbReceiver::do_setup_ndbrecord(const Nd
{
m_using_ndb_record= true;
m_record.m_ndb_record= ndb_record;
- m_record.m_row= row_buffer;
+ m_record.m_row_recv= row_buffer;
m_record.m_row_buffer= row_buffer;
m_record.m_row_offset= rowsize;
m_record.m_read_range_no= read_range_no;
@@ -524,7 +551,7 @@ NdbReceiver::receive_packed_ndbrecord(Ui
{
if (BitmaskImpl::get(bmlen, aDataPtr, ++i))
{
- setRecToNULL(col, m_record.m_row);
+ setRecToNULL(col, m_record.m_row_recv);
// Next column...
continue;
@@ -668,7 +695,8 @@ NdbReceiver::execTRANSID_AI(const Uint32
assert(m_record.m_read_range_no);
assert(attrSize==4);
assert (m_record.m_row_offset >= m_record.m_ndb_record->m_row_size+attrSize);
- memcpy(m_record.m_row+m_record.m_ndb_record->m_row_size, aDataPtr++, 4);
+ memcpy(m_record.m_row_recv+m_record.m_ndb_record->m_row_size,
+ aDataPtr++, 4);
aLength--;
continue; // Next
}
@@ -682,7 +710,7 @@ NdbReceiver::execTRANSID_AI(const Uint32
assert (m_record.m_row_offset >= m_record.m_ndb_record->m_row_size);
Uint32 len= receive_packed_ndbrecord(attrSize >> 2, // Bitmap length
aDataPtr,
- m_record.m_row);
+ m_record.m_row_recv);
aDataPtr+= len;
aLength-= len;
continue; // Next
@@ -709,13 +737,13 @@ NdbReceiver::execTRANSID_AI(const Uint32
/* Save this extra getValue */
save_pos+= sizeof(Uint32);
- memcpy(m_record.m_row + m_record.m_row_offset - save_pos,
+ memcpy(m_record.m_row_recv + m_record.m_row_offset - save_pos,
&attrSize, sizeof(Uint32));
if (attrSize > 0)
{
save_pos+= attrSize;
assert (save_pos<=m_record.m_row_offset);
- memcpy(m_record.m_row + m_record.m_row_offset - save_pos,
+ memcpy(m_record.m_row_recv + m_record.m_row_offset - save_pos,
aDataPtr, attrSize);
}
@@ -803,7 +831,7 @@ NdbReceiver::execTRANSID_AI(const Uint32
if (m_using_ndb_record) {
/* Move onto next row in scan buffer */
- m_record.m_row+= m_record.m_row_offset;
+ m_record.m_row_recv+= m_record.m_row_offset;
}
return (tmp == exp || (exp > TcKeyConf::DirtyReadBit) ? 1 : 0);
}
No bundle (reason: useless for push emails).
| Thread |
|---|
| • bzr push into mysql-5.5-cluster branch (jonas.oreland:3437 to 3438) | Jonas Oreland | 25 Aug |