4473 Pekka Nousiainen 2011-08-29 [merge]
merge telco-7.0 to wl4124-new2
added:
storage/ndb/src/kernel/vm/mt_thr_config.cpp
storage/ndb/src/kernel/vm/mt_thr_config.hpp
modified:
.bzr-mysql/default.conf
mysql-test/suite/ndb/r/ndb_index_stat.result
mysql-test/suite/ndb/r/ndb_statistics1.result
mysql-test/suite/ndb/t/ndb_index_stat.test
sql/ha_ndbcluster.cc
sql/ha_ndbcluster_binlog.cc
sql/ha_ndbinfo.cc
storage/ndb/include/util/SparseBitmask.hpp
storage/ndb/src/kernel/SimBlockList.cpp
storage/ndb/src/kernel/blocks/dbspj/Dbspj.hpp
storage/ndb/src/kernel/blocks/dbspj/DbspjMain.cpp
storage/ndb/src/kernel/vm/Configuration.cpp
storage/ndb/src/kernel/vm/Configuration.hpp
storage/ndb/src/kernel/vm/Makefile.am
storage/ndb/src/kernel/vm/SimulatedBlock.cpp
storage/ndb/src/kernel/vm/SimulatedBlock.hpp
storage/ndb/src/kernel/vm/dummy_nonmt.cpp
storage/ndb/src/kernel/vm/mt.cpp
storage/ndb/src/kernel/vm/mt.hpp
storage/ndb/src/ndbapi/NdbQueryOperation.cpp
storage/ndb/src/ndbapi/NdbQueryOperation.hpp
storage/ndb/src/ndbapi/NdbQueryOperationImpl.hpp
4472 Pekka Nousiainen 2011-08-21
wl#4124 g01_enable.diff
MTR: index-stat-enable=1 (but AutoCreate OFF)
modified:
mysql-test/suite/ndb/my.cnf
mysql-test/suite/ndb/r/ndb_index.result
mysql-test/suite/ndb/r/ndb_index_stat.result
mysql-test/suite/ndb/r/ndb_statistics0.result
mysql-test/suite/ndb/r/ndb_statistics1.result
mysql-test/suite/ndb/t/ndb_index_stat_enable.inc
mysql-test/suite/ndb/t/ndb_share.cnf
=== modified file '.bzr-mysql/default.conf'
--- a/.bzr-mysql/default.conf 2011-08-17 09:09:22 +0000
+++ b/.bzr-mysql/default.conf 2011-08-29 14:35:32 +0000
@@ -1,4 +1,4 @@
[MYSQL]
post_commit_to = commits@stripped
post_push_to = commits@stripped
-tree_name = mysql-5.1-telco-7.0
+tree_name = mysql-5.1-telco-7.0-wl4124-new2
=== modified file 'mysql-test/suite/ndb/r/ndb_index_stat.result'
--- a/mysql-test/suite/ndb/r/ndb_index_stat.result 2011-08-21 08:48:41 +0000
+++ b/mysql-test/suite/ndb/r/ndb_index_stat.result 2011-08-29 14:35:32 +0000
@@ -473,6 +473,18 @@ select count(*) from t1 where f > '222';
count(*)
1
drop table t1;
+create table t1 (a1 int, b1 int, primary key(b1), key(a1)) engine=ndbcluster partition by key() partitions 1;
+create table t2 (b2 int, c2 int, primary key(b2,c2)) engine=ndbcluster partition by key() partitions 1;
+# table t1 is only for forcing record by key count for table t2 that should be near 50 (not 1)
+analyze table t1, t2;
+Table Op Msg_type Msg_text
+test.t1 analyze status OK
+test.t2 analyze status OK
+explain select * from t1, t2 where b2 = b1 and a1 = 1;
+id select_type table type possible_keys key key_len ref rows Extra
+1 SIMPLE t1 ref PRIMARY,a1 a1 5 const 2 #
+1 SIMPLE t2 ref PRIMARY PRIMARY 4 test.t1.b1 50 #
+drop table t1, t2;
set @is_enable = @is_enable_default;
set @is_enable = NULL;
# is_enable_on=0 is_enable_off=0
=== modified file 'mysql-test/suite/ndb/r/ndb_statistics1.result'
--- a/mysql-test/suite/ndb/r/ndb_statistics1.result 2011-08-21 08:48:41 +0000
+++ b/mysql-test/suite/ndb/r/ndb_statistics1.result 2011-08-29 14:35:32 +0000
@@ -75,7 +75,7 @@ SELECT * FROM t10000 AS X JOIN t10000 AS
ON Y.I=X.I AND Y.J = X.I;
id select_type table type possible_keys key key_len ref rows Extra
1 SIMPLE X ALL I NULL NULL NULL 10000
-1 SIMPLE Y ref J,I I 10 test.X.I,test.X.I 1 Using where
+1 SIMPLE Y ref J,I J 5 test.X.I 1 Using where
EXPLAIN
SELECT * FROM t100 WHERE k < 42;
id select_type table type possible_keys key key_len ref rows Extra
@@ -143,11 +143,11 @@ id select_type table type possible_keys
EXPLAIN
SELECT * FROM t10000 WHERE J = 0 AND K < 1;
id select_type table type possible_keys key key_len ref rows Extra
-1 SIMPLE t10000 range PRIMARY,J PRIMARY 4 NULL 2 Using where with pushed condition
+1 SIMPLE t10000 ref PRIMARY,J J 5 const 2 Using where with pushed condition
EXPLAIN
SELECT * FROM t10000 WHERE J = 0 AND K BETWEEN 1 AND 10;
id select_type table type possible_keys key key_len ref rows Extra
-1 SIMPLE t10000 range PRIMARY,J PRIMARY 4 NULL 2 Using where with pushed condition
+1 SIMPLE t10000 ref PRIMARY,J J 5 const 2 Using where with pushed condition
EXPLAIN
SELECT * FROM t10000 WHERE J = 0 AND K = 1;
id select_type table type possible_keys key key_len ref rows Extra
=== modified file 'mysql-test/suite/ndb/t/ndb_index_stat.test'
--- a/mysql-test/suite/ndb/t/ndb_index_stat.test 2011-07-23 14:35:37 +0000
+++ b/mysql-test/suite/ndb/t/ndb_index_stat.test 2011-08-28 13:29:22 +0000
@@ -309,5 +309,30 @@ while ($i)
}
drop table t1;
+#
+# Check estimates of records per key for partial keys using unique/primary ordered index
+#
+
+create table t1 (a1 int, b1 int, primary key(b1), key(a1)) engine=ndbcluster partition by key() partitions 1;
+create table t2 (b2 int, c2 int, primary key(b2,c2)) engine=ndbcluster partition by key() partitions 1;
+
+--disable_query_log
+let $i = 100;
+while ($i)
+{
+ eval insert into t1 (a1,b1) values ($i,$i);
+ eval insert into t2 (b2,c2) values ($i mod 2, $i div 2);
+ dec $i;
+}
+--enable_query_log
+
+--echo # table t1 is only for forcing record by key count for table t2 that should be near 50 (not 1)
+analyze table t1, t2;
+# Hide Extra column
+--replace_column 10 #
+explain select * from t1, t2 where b2 = b1 and a1 = 1;
+
+drop table t1, t2;
+
set @is_enable = @is_enable_default;
source ndb_index_stat_enable.inc;
=== modified file 'sql/ha_ndbcluster.cc'
--- a/sql/ha_ndbcluster.cc 2011-07-23 14:38:08 +0000
+++ b/sql/ha_ndbcluster.cc 2011-08-28 13:29:22 +0000
@@ -164,6 +164,11 @@ static MYSQL_THDVAR_ULONG(
0 /* block */
);
+#if NDB_VERSION_D < NDB_MAKE_VERSION(7,2,0)
+#define DEFAULT_NDB_INDEX_STAT_ENABLE FALSE
+#else
+#define DEFAULT_NDB_INDEX_STAT_ENABLE TRUE
+#endif
static MYSQL_THDVAR_BOOL(
index_stat_enable, /* name */
@@ -171,7 +176,7 @@ static MYSQL_THDVAR_BOOL(
"Use ndb index statistics in query optimization.",
NULL, /* check func. */
NULL, /* update func. */
- FALSE /* default */
+ DEFAULT_NDB_INDEX_STAT_ENABLE /* default */
);
@@ -1353,19 +1358,22 @@ void ha_ndbcluster::set_rec_per_key()
*/
for (uint i=0 ; i < table_share->keys ; i++)
{
+ bool is_unique_index= false;
KEY* key_info= table->key_info + i;
switch (get_index_type(i))
{
- case UNIQUE_ORDERED_INDEX:
- case PRIMARY_KEY_ORDERED_INDEX:
case UNIQUE_INDEX:
case PRIMARY_KEY_INDEX:
{
// Index is unique when all 'key_parts' are specified,
// else distribution is unknown and not specified here.
- key_info->rec_per_key[key_info->key_parts-1]= 1;
+ is_unique_index= true;
break;
}
+ case UNIQUE_ORDERED_INDEX:
+ case PRIMARY_KEY_ORDERED_INDEX:
+ is_unique_index= true;
+ // intentional fall thru to logic for ordered index
case ORDERED_INDEX:
// 'Records pr. key' are unknown for non-unique indexes.
// (May change when we get better index statistics.)
@@ -1395,6 +1403,11 @@ void ha_ndbcluster::set_rec_per_key()
default:
DBUG_ASSERT(false);
}
+ // set rows per key to 1 for complete key given for unique/primary index
+ if (is_unique_index)
+ {
+ key_info->rec_per_key[key_info->key_parts-1]= 1;
+ }
}
DBUG_VOID_RETURN;
}
=== modified file 'sql/ha_ndbcluster_binlog.cc'
--- a/sql/ha_ndbcluster_binlog.cc 2011-07-08 12:28:37 +0000
+++ b/sql/ha_ndbcluster_binlog.cc 2011-08-27 12:12:27 +0000
@@ -6299,8 +6299,8 @@ ndb_binlog_thread_handle_data_event(Ndb
MY_BITMAP b;
/* Potential buffer for the bitmap */
uint32 bitbuf[128 / (sizeof(uint32) * 8)];
- bitmap_init(&b, n_fields <= sizeof(bitbuf) * 8 ? bitbuf : NULL,
- n_fields, FALSE);
+ const bool own_buffer = n_fields <= sizeof(bitbuf) * 8;
+ bitmap_init(&b, own_buffer ? bitbuf : NULL, n_fields, FALSE);
bitmap_set_all(&b);
/*
@@ -6463,6 +6463,11 @@ ndb_binlog_thread_handle_data_event(Ndb
my_free(blobs_buffer[1], MYF(MY_ALLOW_ZERO_PTR));
}
+ if (!own_buffer)
+ {
+ bitmap_free(&b);
+ }
+
return 0;
}
=== modified file 'sql/ha_ndbinfo.cc'
--- a/sql/ha_ndbinfo.cc 2011-06-30 15:59:25 +0000
+++ b/sql/ha_ndbinfo.cc 2011-08-27 09:54:26 +0000
@@ -367,6 +367,7 @@ int ha_ndbinfo::open(const char *name, i
int err = g_ndbinfo->openTable(name, &m_impl.m_table);
if (err)
{
+ assert(m_impl.m_table == 0);
if (err == NdbInfo::ERR_NoSuchTable)
DBUG_RETURN(HA_ERR_NO_SUCH_TABLE);
DBUG_RETURN(err2mysql(err));
@@ -390,6 +391,7 @@ int ha_ndbinfo::open(const char *name, i
warn_incompatible(ndb_tab, true,
"column '%s' is NOT NULL",
field->field_name);
+ delete m_impl.m_table; m_impl.m_table= 0;
DBUG_RETURN(ERR_INCOMPAT_TABLE_DEF);
}
@@ -427,6 +429,7 @@ int ha_ndbinfo::open(const char *name, i
warn_incompatible(ndb_tab, true,
"column '%s' is not compatible",
field->field_name);
+ delete m_impl.m_table; m_impl.m_table= 0;
DBUG_RETURN(ERR_INCOMPAT_TABLE_DEF);
}
}
=== modified file 'storage/ndb/include/util/SparseBitmask.hpp'
--- a/storage/ndb/include/util/SparseBitmask.hpp 2010-08-28 09:37:09 +0000
+++ b/storage/ndb/include/util/SparseBitmask.hpp 2011-08-26 09:20:08 +0000
@@ -20,6 +20,7 @@
#include <ndb_global.h>
#include <util/Vector.hpp>
+#include <util/BaseString.hpp>
class SparseBitmask {
unsigned m_max_size;
@@ -102,6 +103,11 @@ public:
bool isclear() const { return count() == 0; }
+ unsigned getBitNo(unsigned n) const {
+ assert(n < m_vec.size());
+ return m_vec[n];
+ }
+
void print(void) const {
for (unsigned i = 0; i < m_vec.size(); i++)
{
@@ -110,6 +116,38 @@ public:
}
}
+ bool equal(const SparseBitmask& obj) const {
+ if (obj.count() != count())
+ return false;
+
+ for (unsigned i = 0; i<count(); i++)
+ if (!obj.get(m_vec[i]))
+ return false;
+
+ return true;
+ }
+
+ bool overlaps(const SparseBitmask& obj) const {
+ for (unsigned i = 0; i<count(); i++)
+ if (!obj.get(m_vec[i]))
+ return true;
+
+ for (unsigned i = 0; i<obj.count(); i++)
+ if (!get(obj.getBitNo(i)))
+ return true;
+ return false;
+ }
+
+ BaseString str() const {
+ BaseString tmp;
+ const char* sep="";
+ for (unsigned i = 0; i<m_vec.size(); i++)
+ {
+ tmp.appfmt("%s%u", sep, m_vec[i]);
+ sep=",";
+ }
+ return tmp;
+ }
};
#endif
=== modified file 'storage/ndb/src/kernel/SimBlockList.cpp'
--- a/storage/ndb/src/kernel/SimBlockList.cpp 2011-06-30 15:59:25 +0000
+++ b/storage/ndb/src/kernel/SimBlockList.cpp 2011-08-27 06:06:02 +0000
@@ -160,6 +160,7 @@ SimBlockList::load(EmulatorData& data){
for (int i = 0; i < noOfBlocks; i++)
theList[i]->loadWorkers();
}
+ finalize_thr_map();
}
// Check that all blocks could be created
=== modified file 'storage/ndb/src/kernel/blocks/dbspj/Dbspj.hpp'
--- a/storage/ndb/src/kernel/blocks/dbspj/Dbspj.hpp 2011-07-04 13:37:56 +0000
+++ b/storage/ndb/src/kernel/blocks/dbspj/Dbspj.hpp 2011-08-22 08:35:35 +0000
@@ -530,21 +530,87 @@ public:
typedef SLFifoListImpl<ScanFragHandle_pool, ScanFragHandle> ScanFragHandle_list;
typedef LocalSLFifoListImpl<ScanFragHandle_pool, ScanFragHandle> Local_ScanFragHandle_list;
+ /**
+ * This class computes mean and standard deviation incrementally for a series
+ * of samples.
+ */
+ class IncrementalStatistics
+ {
+ public:
+ /**
+ * We cannot have a (non-trivial) constructor, since this class is used in
+ * unions.
+ */
+ void init()
+ {
+ m_mean = m_sumSquare = 0.0;
+ m_noOfSamples = 0;
+ }
+
+ // Add another sample.
+ void update(double sample);
+
+ double getMean() const { return m_mean; }
+
+ double getStdDev() const {
+ return m_noOfSamples < 2 ? 0.0 : sqrt(m_sumSquare/(m_noOfSamples - 1));
+ }
+
+ private:
+ // Mean of all samples
+ double m_mean;
+ //Sum of square of differences from the current mean.
+ double m_sumSquare;
+ Uint32 m_noOfSamples;
+ }; // IncrementalStatistics
+
struct ScanIndexData
{
Uint16 m_frags_complete;
Uint16 m_frags_outstanding;
+ /**
+ * The number of fragment for which we have not yet sent SCAN_FRAGREQ but
+ * will eventually do so.
+ */
+ Uint16 m_frags_not_started;
Uint32 m_rows_received; // #execTRANSID_AI
Uint32 m_rows_expecting; // Sum(ScanFragConf)
Uint32 m_batch_chunks; // #SCAN_FRAGREQ + #SCAN_NEXTREQ to retrieve batch
Uint32 m_scanCookie;
Uint32 m_fragCount;
+ // The number of fragments that we scan in parallel.
+ Uint32 m_parallelism;
+ /**
+ * True if this is the first instantiation of this operation. A child
+ * operation will be instantiated once for each batch of its parent.
+ */
+ bool m_firstExecution;
+ /**
+ * Mean and standard deviation for the optimal parallelism for earlier
+ * executions of this operation.
+ */
+ IncrementalStatistics m_parallelismStat;
+ // Total number of rows for the current execution of this operation.
+ Uint32 m_totalRows;
+ // Total number of bytes for the current execution of this operation.
+ Uint32 m_totalBytes;
+
ScanFragHandle_list::HeadPOD m_fragments; // ScanFrag states
union
{
PatternStore::HeadPOD m_prunePattern;
Uint32 m_constPrunePtrI;
};
+ /**
+ * Max number of rows seen in a batch. Used for calculating the number of
+ * rows per fragment in the next next batch when using adaptive batch size.
+ */
+ Uint32 m_largestBatchRows;
+ /**
+ * Max number of bytes seen in a batch. Used for calculating the number of
+ * rows per fragment in the next next batch when using adaptive batch size.
+ */
+ Uint32 m_largestBatchBytes;
Uint32 m_scanFragReq[ScanFragReq::SignalLength + 2];
};
@@ -1164,6 +1230,13 @@ private:
void scanIndex_parent_row(Signal*,Ptr<Request>,Ptr<TreeNode>, const RowPtr&);
void scanIndex_fixupBound(Ptr<ScanFragHandle> fragPtr, Uint32 ptrI, Uint32);
void scanIndex_send(Signal*,Ptr<Request>,Ptr<TreeNode>);
+ void scanIndex_send(Signal* signal,
+ Ptr<Request> requestPtr,
+ Ptr<TreeNode> treeNodePtr,
+ Uint32 noOfFrags,
+ Uint32 bs_bytes,
+ Uint32 bs_rows,
+ Uint32& batchRange);
void scanIndex_batchComplete(Signal* signal);
Uint32 scanIndex_findFrag(Local_ScanFragHandle_list &, Ptr<ScanFragHandle>&,
Uint32 fragId);
=== modified file 'storage/ndb/src/kernel/blocks/dbspj/DbspjMain.cpp'
--- a/storage/ndb/src/kernel/blocks/dbspj/DbspjMain.cpp 2011-07-04 13:37:56 +0000
+++ b/storage/ndb/src/kernel/blocks/dbspj/DbspjMain.cpp 2011-08-22 11:50:41 +0000
@@ -4450,6 +4450,9 @@ Dbspj::parseScanIndex(Build_context& ctx
data.m_fragments.init();
data.m_frags_outstanding = 0;
data.m_frags_complete = 0;
+ data.m_frags_not_started = 0;
+ data.m_parallelismStat.init();
+ data.m_firstExecution = true;
data.m_batch_chunks = 0;
err = parseDA(ctx, requestPtr, treeNodePtr,
@@ -5002,6 +5005,7 @@ Dbspj::scanIndex_parent_batch_complete(S
}
}
}
+ data.m_frags_not_started = data.m_fragCount - data.m_frags_complete;
if (data.m_frags_complete == data.m_fragCount)
{
@@ -5015,7 +5019,120 @@ Dbspj::scanIndex_parent_batch_complete(S
/**
* When parent's batch is complete, we send our batch
*/
- scanIndex_send(signal, requestPtr, treeNodePtr);
+ const ScanFragReq * org = (const ScanFragReq*)data.m_scanFragReq;
+ ndbassert(org->batch_size_rows >= data.m_fragCount - data.m_frags_complete);
+
+ if (treeNodePtr.p->m_bits & TreeNode::T_SCAN_PARALLEL)
+ {
+ jam();
+ data.m_parallelism = data.m_fragCount - data.m_frags_complete;
+ }
+ else if (data.m_firstExecution)
+ {
+ /**
+ * Having a high parallelism would allow us to fetch data from many
+ * fragments in parallel and thus reduce the number of round trips.
+ * On the other hand, we should set parallelism so low that we can fetch
+ * all data from a fragment in one batch if possible.
+ * Since this is the first execution, we do not know how many rows or bytes
+ * this operation is likely to return. Therefore we set parallelism to 1,
+ * since this gives the lowest penalty if our guess is wrong.
+ */
+ jam();
+ data.m_parallelism = 1;
+ }
+ else
+ {
+ jam();
+ /**
+ * Use statistics from earlier runs of this operation to estimate the
+ * initial parallelism. We use the mean minus two times the standard
+ * deviation to have a low risk of setting parallelism to high (as erring
+ * in the other direction is more costly).
+ */
+ Int32 parallelism =
+ static_cast<Int32>(data.m_parallelismStat.getMean()
+ - 2 * data.m_parallelismStat.getStdDev());
+
+ if (parallelism < 1)
+ {
+ jam();
+ parallelism = 1;
+ }
+ else if ((data.m_fragCount - data.m_frags_complete) % parallelism != 0)
+ {
+ jam();
+ /**
+ * Set parallelism such that we can expect to have similar
+ * parallelism in each batch. For example if there are 8 remaining
+ * fragments, then we should fecth 2 times 4 fragments rather than
+ * 7+1.
+ */
+ const Int32 roundTrips =
+ 1 + (data.m_fragCount - data.m_frags_complete) / parallelism;
+ parallelism = (data.m_fragCount - data.m_frags_complete) / roundTrips;
+ }
+
+ data.m_parallelism = static_cast<Uint32>(parallelism);
+
+#ifdef DEBUG_SCAN_FRAGREQ
+ DEBUG("::scanIndex_send() starting index scan with parallelism="
+ << data.m_parallelism);
+#endif
+ }
+ ndbrequire(data.m_parallelism > 0);
+
+ const Uint32 bs_rows = org->batch_size_rows/ data.m_parallelism;
+ const Uint32 bs_bytes = org->batch_size_bytes / data.m_parallelism;
+ ndbassert(bs_rows > 0);
+ ndbassert(bs_bytes > 0);
+
+ data.m_largestBatchRows = 0;
+ data.m_largestBatchBytes = 0;
+ data.m_totalRows = 0;
+ data.m_totalBytes = 0;
+
+ {
+ Local_ScanFragHandle_list list(m_scanfraghandle_pool, data.m_fragments);
+ Ptr<ScanFragHandle> fragPtr;
+ list.first(fragPtr);
+
+ while(!fragPtr.isNull())
+ {
+ ndbassert(fragPtr.p->m_state == ScanFragHandle::SFH_NOT_STARTED ||
+ fragPtr.p->m_state == ScanFragHandle::SFH_COMPLETE);
+ fragPtr.p->m_state = ScanFragHandle::SFH_NOT_STARTED;
+ list.next(fragPtr);
+ }
+ }
+
+ Uint32 batchRange = 0;
+ scanIndex_send(signal,
+ requestPtr,
+ treeNodePtr,
+ data.m_parallelism,
+ bs_bytes,
+ bs_rows,
+ batchRange);
+
+ data.m_firstExecution = false;
+
+ if (treeNodePtr.p->m_bits & TreeNode::T_SCAN_PARALLEL)
+ {
+ ndbrequire((data.m_frags_outstanding + data.m_frags_complete) ==
+ data.m_fragCount);
+ }
+ else
+ {
+ ndbrequire(static_cast<Uint32>(data.m_frags_outstanding +
+ data.m_frags_complete) <=
+ data.m_fragCount);
+ }
+
+ data.m_batch_chunks = 1;
+ requestPtr.p->m_cnt_active++;
+ requestPtr.p->m_outstanding++;
+ treeNodePtr.p->m_state = TreeNode::TN_ACTIVE;
}
void
@@ -5045,77 +5162,91 @@ Dbspj::scanIndex_parent_batch_repeat(Sig
}
}
+/**
+ * Ask for the first batch for a number of fragments.
+ */
void
Dbspj::scanIndex_send(Signal* signal,
Ptr<Request> requestPtr,
- Ptr<TreeNode> treeNodePtr)
+ Ptr<TreeNode> treeNodePtr,
+ Uint32 noOfFrags,
+ Uint32 bs_bytes,
+ Uint32 bs_rows,
+ Uint32& batchRange)
{
- jam();
-
- ScanIndexData& data = treeNodePtr.p->m_scanindex_data;
- const ScanFragReq * org = (const ScanFragReq*)data.m_scanFragReq;
-
- Uint32 cnt = 1;
- Uint32 bs_rows = org->batch_size_rows;
- Uint32 bs_bytes = org->batch_size_bytes;
- if (treeNodePtr.p->m_bits & TreeNode::T_SCAN_PARALLEL)
- {
- jam();
- cnt = data.m_fragCount - data.m_frags_complete;
- ndbrequire(cnt > 0);
-
- bs_rows /= cnt;
- bs_bytes /= cnt;
- ndbassert(bs_rows > 0);
- }
-
/**
* if (m_bits & prunemask):
* - Range keys sliced out to each ScanFragHandle
* - Else, range keys kept on first (and only) ScanFragHandle
*/
- Uint32 prunemask = TreeNode::T_PRUNE_PATTERN | TreeNode::T_CONST_PRUNE;
+ const bool prune = treeNodePtr.p->m_bits &
+ (TreeNode::T_PRUNE_PATTERN | TreeNode::T_CONST_PRUNE);
/**
- * Don't release keyInfo if it may be sent multiple times, eiter:
- * - Not pruned -> same keyInfo goes to all datanodes.
- * - Result handling is REPEAT_SCAN_RESULT and same batch may be
- * repeated multiple times due to incomplete bushy X-scans.
- * (by ::scanIndex_parent_batch_repeat())
- *
- * When not released, ::scanIndex_parent_batch_cleanup() will
- * eventually release them when preparing arrival of a new parent batch.
+ * If scan is repeatable, we must make sure not to release range keys so
+ * that we canuse them again in the next repetition.
*/
- const bool release = ((treeNodePtr.p->m_bits & prunemask) != 0 &&
- (treeNodePtr.p->m_bits & TreeNode::T_SCAN_REPEATABLE) == 0);
+ const bool repeatable =
+ (treeNodePtr.p->m_bits & TreeNode::T_SCAN_REPEATABLE) != 0;
- ScanFragReq* req = reinterpret_cast<ScanFragReq*>(signal->getDataPtrSend());
+ ScanIndexData& data = treeNodePtr.p->m_scanindex_data;
+ ndbassert(noOfFrags > 0);
+ ndbassert(data.m_frags_not_started >= noOfFrags);
+ ScanFragReq* const req =
+ reinterpret_cast<ScanFragReq*>(signal->getDataPtrSend());
+ const ScanFragReq * const org
+ = reinterpret_cast<ScanFragReq*>(data.m_scanFragReq);
memcpy(req, org, sizeof(data.m_scanFragReq));
// req->variableData[0] // set below
req->variableData[1] = requestPtr.p->m_rootResultData;
req->batch_size_bytes = bs_bytes;
req->batch_size_rows = bs_rows;
- Ptr<ScanFragHandle> fragPtr;
+ Uint32 requestsSent = 0;
Local_ScanFragHandle_list list(m_scanfraghandle_pool, data.m_fragments);
-
- Uint32 keyInfoPtrI = RNIL;
+ Ptr<ScanFragHandle> fragPtr;
list.first(fragPtr);
- if ((treeNodePtr.p->m_bits & prunemask) == 0)
+ Uint32 keyInfoPtrI = fragPtr.p->m_rangePtrI;
+ ndbrequire(prune || keyInfoPtrI != RNIL);
+ /**
+ * Iterate over the list of fragments until we have sent as many
+ * SCAN_FRAGREQs as we should.
+ */
+ while (requestsSent < noOfFrags)
{
jam();
- keyInfoPtrI = fragPtr.p->m_rangePtrI;
- ndbrequire(keyInfoPtrI != RNIL);
- }
+ ndbassert(!fragPtr.isNull());
- Uint32 batchRange = 0;
- for (Uint32 i = 0; i < cnt && !fragPtr.isNull(); list.next(fragPtr))
- {
- jam();
+ if (fragPtr.p->m_state != ScanFragHandle::SFH_NOT_STARTED)
+ {
+ // Skip forward to the frags that we should send.
+ jam();
+ list.next(fragPtr);
+ continue;
+ }
+
+ const Uint32 ref = fragPtr.p->m_ref;
+
+ if (noOfFrags==1 && !prune &&
+ data.m_frags_not_started == data.m_fragCount &&
+ refToNode(ref) != getOwnNodeId() &&
+ list.hasNext(fragPtr))
+ {
+ /**
+ * If we are doing a scan with adaptive parallelism and start with
+ * parallelism=1 then it makes sense to fetch a batch from a fragment on
+ * the local data node. The reason for this is that if that fragment
+ * contains few rows, we may be able to read from several fragments in
+ * parallel. Then we minimize the total number of round trips (to remote
+ * data nodes) if we fetch the first fragment batch locally.
+ */
+ jam();
+ list.next(fragPtr);
+ continue;
+ }
SectionHandle handle(this);
- Uint32 ref = fragPtr.p->m_ref;
Uint32 attrInfoPtrI = treeNodePtr.p->m_send.m_attrInfoPtrI;
/**
@@ -5124,26 +5255,34 @@ Dbspj::scanIndex_send(Signal* signal,
req->senderData = fragPtr.i;
req->fragmentNoKeyLen = fragPtr.p->m_fragId;
- if ((treeNodePtr.p->m_bits & prunemask))
+ if (prune)
{
jam();
keyInfoPtrI = fragPtr.p->m_rangePtrI;
if (keyInfoPtrI == RNIL)
{
+ /**
+ * Since we use pruning, we can see that no parent rows would hash
+ * to this fragment.
+ */
jam();
fragPtr.p->m_state = ScanFragHandle::SFH_COMPLETE;
+ list.next(fragPtr);
continue;
}
- }
- if (release)
- {
- /**
- * If we'll use sendSignal() and we need to send the attrInfo several
- * times, we need to copy them
- */
- Uint32 tmp = RNIL;
- ndbrequire(dupSection(tmp, attrInfoPtrI)); // TODO handle error
- attrInfoPtrI = tmp;
+
+ if (!repeatable)
+ {
+ /**
+ * If we'll use sendSignal() and we need to send the attrInfo several
+ * times, we need to copy them. (For repeatable or unpruned scans
+ * we use sendSignalNoRelease(), so then we do not need to copy.)
+ */
+ jam();
+ Uint32 tmp = RNIL;
+ ndbrequire(dupSection(tmp, attrInfoPtrI)); // TODO handle error
+ attrInfoPtrI = tmp;
+ }
}
req->variableData[0] = batchRange;
@@ -5152,16 +5291,14 @@ Dbspj::scanIndex_send(Signal* signal,
handle.m_cnt = 2;
#if defined DEBUG_SCAN_FRAGREQ
- {
- ndbout_c("SCAN_FRAGREQ to %x", ref);
- printSCAN_FRAGREQ(stdout, signal->getDataPtrSend(),
- NDB_ARRAY_SIZE(treeNodePtr.p->m_scanfrag_data.m_scanFragReq),
- DBLQH);
- printf("ATTRINFO: ");
- print(handle.m_ptr[0], stdout);
- printf("KEYINFO: ");
- print(handle.m_ptr[1], stdout);
- }
+ ndbout_c("SCAN_FRAGREQ to %x", ref);
+ printSCAN_FRAGREQ(stdout, signal->getDataPtrSend(),
+ NDB_ARRAY_SIZE(treeNodePtr.p->m_scanfrag_data.m_scanFragReq),
+ DBLQH);
+ printf("ATTRINFO: ");
+ print(handle.m_ptr[0], stdout);
+ printf("KEYINFO: ");
+ print(handle.m_ptr[1], stdout);
#endif
if (refToNode(ref) == getOwnNodeId())
@@ -5173,8 +5310,13 @@ Dbspj::scanIndex_send(Signal* signal,
c_Counters.incr_counter(CI_REMOTE_RANGE_SCANS_SENT, 1);
}
- if (release)
+ if (prune && !repeatable)
{
+ /**
+ * For a non-repeatable pruned scan, key info is unique for each
+ * fragment and therefore cannot be reused, so we release key info
+ * right away.
+ */
jam();
sendSignal(ref, GSN_SCAN_FRAGREQ, signal,
NDB_ARRAY_SIZE(data.m_scanFragReq), JBB, &handle);
@@ -5183,32 +5325,24 @@ Dbspj::scanIndex_send(Signal* signal,
}
else
{
+ /**
+ * Reuse key info for multiple fragments and/or multiple repetitions
+ * of the scan.
+ */
jam();
sendSignalNoRelease(ref, GSN_SCAN_FRAGREQ, signal,
NDB_ARRAY_SIZE(data.m_scanFragReq), JBB, &handle);
}
handle.clear();
- i++;
fragPtr.p->m_state = ScanFragHandle::SFH_SCANNING; // running
data.m_frags_outstanding++;
batchRange += bs_rows;
- }
+ requestsSent++;
+ list.next(fragPtr);
+ } // while (requestsSent < noOfFrags)
- if (treeNodePtr.p->m_bits & TreeNode::T_SCAN_PARALLEL)
- {
- ndbrequire(data.m_frags_outstanding ==
- data.m_fragCount - data.m_frags_complete);
- }
- else
- {
- ndbrequire(data.m_frags_outstanding == 1);
- }
-
- data.m_batch_chunks = 1;
- requestPtr.p->m_cnt_active++;
- requestPtr.p->m_outstanding++;
- treeNodePtr.p->m_state = TreeNode::TN_ACTIVE;
+ data.m_frags_not_started -= requestsSent;
}
void
@@ -5282,6 +5416,10 @@ Dbspj::scanIndex_execSCAN_FRAGCONF(Signa
}
requestPtr.p->m_rows += rows;
+ data.m_totalRows += rows;
+ data.m_totalBytes += conf->total_len;
+ data.m_largestBatchRows = MAX(data.m_largestBatchRows, rows);
+ data.m_largestBatchBytes = MAX(data.m_largestBatchBytes, conf->total_len);
if (!treeNodePtr.p->isLeaf())
{
@@ -5302,7 +5440,9 @@ Dbspj::scanIndex_execSCAN_FRAGCONF(Signa
ndbrequire(data.m_frags_complete < data.m_fragCount);
data.m_frags_complete++;
- if (data.m_frags_complete == data.m_fragCount)
+ if (data.m_frags_complete == data.m_fragCount ||
+ ((requestPtr.p->m_state & Request::RS_ABORTING) != 0 &&
+ data.m_fragCount == (data.m_frags_complete + data.m_frags_not_started)))
{
jam();
ndbrequire(requestPtr.p->m_cnt_active);
@@ -5314,6 +5454,32 @@ Dbspj::scanIndex_execSCAN_FRAGCONF(Signa
if (data.m_frags_outstanding == 0)
{
+ const ScanFragReq * const org
+ = reinterpret_cast<const ScanFragReq*>(data.m_scanFragReq);
+
+ if (data.m_frags_complete == data.m_fragCount)
+ {
+ jam();
+ /**
+ * Calculate what would have been the optimal parallelism for the
+ * scan instance that we have just completed, and update
+ * 'parallelismStat' with this value. We then use this statistics to set
+ * the initial parallelism for the next instance of this operation.
+ */
+ double parallelism = data.m_fragCount;
+ if (data.m_totalRows > 0)
+ {
+ parallelism = MIN(parallelism,
+ double(org->batch_size_rows) / data.m_totalRows);
+ }
+ if (data.m_totalBytes > 0)
+ {
+ parallelism = MIN(parallelism,
+ double(org->batch_size_bytes) / data.m_totalBytes);
+ }
+ data.m_parallelismStat.update(parallelism);
+ }
+
/**
* Don't reportBatchComplete to children if we're aborting...
*/
@@ -5364,7 +5530,7 @@ Dbspj::scanIndex_execSCAN_FRAGREF(Signal
ndbrequire(data.m_frags_outstanding > 0);
data.m_frags_outstanding--;
- if (data.m_frags_complete == data.m_fragCount)
+ if (data.m_fragCount == (data.m_frags_complete + data.m_frags_not_started))
{
jam();
ndbrequire(requestPtr.p->m_cnt_active);
@@ -5390,21 +5556,78 @@ Dbspj::scanIndex_execSCAN_NEXTREQ(Signal
jam();
ScanIndexData& data = treeNodePtr.p->m_scanindex_data;
+ const ScanFragReq * org = (const ScanFragReq*)data.m_scanFragReq;
data.m_rows_received = 0;
data.m_rows_expecting = 0;
ndbassert(data.m_frags_outstanding == 0);
ndbrequire(data.m_frags_complete < data.m_fragCount);
- Uint32 cnt = data.m_fragCount - data.m_frags_complete;
if ((treeNodePtr.p->m_bits & TreeNode::T_SCAN_PARALLEL) == 0)
{
jam();
- cnt = 1;
+ /**
+ * Since fetching few but large batches is more efficient, we
+ * set parallelism to the lowest value where we can still expect each
+ * batch to be full.
+ */
+ if (data.m_largestBatchRows < org->batch_size_rows/data.m_parallelism &&
+ data.m_largestBatchBytes < org->batch_size_bytes/data.m_parallelism)
+ {
+ jam();
+ data.m_parallelism = data.m_fragCount - data.m_frags_complete;
+ if (data.m_largestBatchRows > 0)
+ {
+ jam();
+ data.m_parallelism =
+ MIN(org->batch_size_rows / data.m_largestBatchRows,
+ data.m_parallelism);
+ }
+ if (data.m_largestBatchBytes > 0)
+ {
+ jam();
+ data.m_parallelism =
+ MIN(data.m_parallelism,
+ org->batch_size_bytes/data.m_largestBatchBytes);
+ }
+ if (data.m_frags_complete == 0 &&
+ data.m_frags_not_started % data.m_parallelism != 0)
+ {
+ jam();
+ /**
+ * Set parallelism such that we can expect to have similar
+ * parallelism in each batch. For example if there are 8 remaining
+ * fragments, then we should fecth 2 times 4 fragments rather than
+ * 7+1.
+ */
+ const Uint32 roundTrips =
+ 1 + data.m_frags_not_started / data.m_parallelism;
+ data.m_parallelism = data.m_frags_not_started / roundTrips;
+ }
+ }
+ else
+ {
+ jam();
+ // We get full batches, so we should lower parallelism.
+ data.m_parallelism = MIN(data.m_fragCount - data.m_frags_complete,
+ MAX(1, data.m_parallelism/2));
+ }
+ ndbassert(data.m_parallelism > 0);
+#ifdef DEBUG_SCAN_FRAGREQ
+ DEBUG("::scanIndex_execSCAN_NEXTREQ() Asking for new batches from " <<
+ data.m_parallelism <<
+ " fragments with " << org->batch_size_rows/data.m_parallelism <<
+ " rows and " << org->batch_size_bytes/data.m_parallelism <<
+ " bytes.");
+#endif
+ }
+ else
+ {
+ jam();
+ data.m_parallelism = data.m_fragCount - data.m_frags_complete;
}
- const ScanFragReq * org = (const ScanFragReq*)data.m_scanFragReq;
- const Uint32 bs_rows = org->batch_size_rows/cnt;
+ const Uint32 bs_rows = org->batch_size_rows/data.m_parallelism;
ndbassert(bs_rows > 0);
ScanFragNextReq* req =
reinterpret_cast<ScanFragNextReq*>(signal->getDataPtrSend());
@@ -5413,20 +5636,27 @@ Dbspj::scanIndex_execSCAN_NEXTREQ(Signal
req->transId1 = requestPtr.p->m_transId[0];
req->transId2 = requestPtr.p->m_transId[1];
req->batch_size_rows = bs_rows;
- req->batch_size_bytes = org->batch_size_bytes/cnt;
+ req->batch_size_bytes = org->batch_size_bytes/data.m_parallelism;
Uint32 batchRange = 0;
Ptr<ScanFragHandle> fragPtr;
+ Uint32 sentFragCount = 0;
+ {
+ /**
+ * First, ask for more data from fragments that are already started.
+ */
Local_ScanFragHandle_list list(m_scanfraghandle_pool, data.m_fragments);
list.first(fragPtr);
- for (Uint32 i = 0; i < cnt && !fragPtr.isNull(); list.next(fragPtr))
+ while (sentFragCount < data.m_parallelism && !fragPtr.isNull())
{
jam();
+ ndbassert(fragPtr.p->m_state == ScanFragHandle::SFH_WAIT_NEXTREQ ||
+ fragPtr.p->m_state == ScanFragHandle::SFH_COMPLETE ||
+ fragPtr.p->m_state == ScanFragHandle::SFH_NOT_STARTED);
if (fragPtr.p->m_state == ScanFragHandle::SFH_WAIT_NEXTREQ)
{
jam();
- i++;
data.m_frags_outstanding++;
req->variableData[0] = batchRange;
fragPtr.p->m_state = ScanFragHandle::SFH_SCANNING;
@@ -5434,6 +5664,7 @@ Dbspj::scanIndex_execSCAN_NEXTREQ(Signal
DEBUG("scanIndex_execSCAN_NEXTREQ to: " << hex
<< treeNodePtr.p->m_send.m_ref
+ << ", m_node_no=" << treeNodePtr.p->m_node_no
<< ", senderData: " << req->senderData);
#ifdef DEBUG_SCAN_FRAGREQ
@@ -5445,9 +5676,27 @@ Dbspj::scanIndex_execSCAN_NEXTREQ(Signal
sendSignal(fragPtr.p->m_ref, GSN_SCAN_NEXTREQ, signal,
ScanFragNextReq::SignalLength + 1,
JBB);
+ sentFragCount++;
+ }
+ list.next(fragPtr);
}
}
+ if (sentFragCount < data.m_parallelism)
+ {
+ /**
+ * Then start new fragments until we reach data.m_parallelism.
+ */
+ jam();
+ ndbassert(data.m_frags_not_started != 0);
+ scanIndex_send(signal,
+ requestPtr,
+ treeNodePtr,
+ data.m_parallelism - sentFragCount,
+ org->batch_size_bytes/data.m_parallelism,
+ bs_rows,
+ batchRange);
+ }
/**
* cursor should not have been positioned here...
* unless we actually had something more to send.
@@ -5544,14 +5793,28 @@ Dbspj::scanIndex_abort(Signal* signal,
}
}
- ndbrequire(cnt_waiting + cnt_scanning > 0);
if (cnt_scanning == 0)
{
- /**
- * If all were waiting...this should increase m_outstanding
- */
- jam();
- requestPtr.p->m_outstanding++;
+ if (cnt_waiting > 0)
+ {
+ /**
+ * If all were waiting...this should increase m_outstanding
+ */
+ jam();
+ requestPtr.p->m_outstanding++;
+ }
+ else
+ {
+ /**
+ * All fragments are either complete or not yet started, so there is
+ * nothing to abort.
+ */
+ jam();
+ ndbassert(data.m_frags_not_started > 0);
+ ndbrequire(requestPtr.p->m_cnt_active);
+ requestPtr.p->m_cnt_active--;
+ treeNodePtr.p->m_state = TreeNode::TN_INACTIVE;
+ }
}
}
@@ -5603,6 +5866,8 @@ Dbspj::scanIndex_execNODE_FAILREP(Signal
jam();
ndbrequire(data.m_frags_complete < data.m_fragCount);
data.m_frags_complete++;
+ ndbrequire(data.m_frags_not_started > 0);
+ data.m_frags_not_started--;
// fall through
case ScanFragHandle::SFH_COMPLETE:
jam();
@@ -5637,8 +5902,8 @@ Dbspj::scanIndex_execNODE_FAILREP(Signal
requestPtr.p->m_outstanding--;
}
- if (save1 != data.m_fragCount
- && data.m_frags_complete == data.m_fragCount)
+ if (save1 != 0 &&
+ data.m_fragCount == (data.m_frags_complete + data.m_frags_not_started))
{
jam();
ndbrequire(requestPtr.p->m_cnt_active);
@@ -5654,7 +5919,8 @@ Dbspj::scanIndex_release_rangekeys(Ptr<R
Ptr<TreeNode> treeNodePtr)
{
jam();
- DEBUG("scanIndex_release_rangekeys(), m_node_no: " << treeNodePtr.p->m_node_no);
+ DEBUG("scanIndex_release_rangekeys(), tree node " << treeNodePtr.i
+ << " m_node_no: " << treeNodePtr.p->m_node_no);
ScanIndexData& data = treeNodePtr.p->m_scanindex_data;
Local_ScanFragHandle_list list(m_scanfraghandle_pool, data.m_fragments);
@@ -7115,3 +7381,15 @@ void Dbspj::execDBINFO_SCANREQ(Signal *s
ndbinfo_send_scan_conf(signal, req, rl);
} // Dbspj::execDBINFO_SCANREQ(Signal *signal)
+
+void Dbspj::IncrementalStatistics::update(double sample)
+{
+ // Prevent wrap-around
+ if(m_noOfSamples < 0xffffffff)
+ {
+ m_noOfSamples++;
+ const double delta = sample - m_mean;
+ m_mean += delta/m_noOfSamples;
+ m_sumSquare += delta * (sample - m_mean);
+ }
+}
=== modified file 'storage/ndb/src/kernel/vm/Configuration.cpp'
--- a/storage/ndb/src/kernel/vm/Configuration.cpp 2011-07-04 13:37:56 +0000
+++ b/storage/ndb/src/kernel/vm/Configuration.cpp 2011-08-25 09:40:27 +0000
@@ -930,13 +930,16 @@ Configuration::setAllLockCPU(bool exec_t
Uint32 i;
for (i = 0; i < threadInfo.size(); i++)
{
- if (threadInfo[i].type != NotInUse)
+ if (threadInfo[i].type == NotInUse)
+ continue;
+
+ bool run =
+ (exec_thread && threadInfo[i].type == MainThread) ||
+ (!exec_thread && threadInfo[i].type != MainThread);
+
+ if (run)
{
- if (setLockCPU(threadInfo[i].pThread,
- threadInfo[i].type,
- exec_thread,
- FALSE))
- return;
+ setLockCPU(threadInfo[i].pThread, threadInfo[i].type);
}
}
}
@@ -966,11 +969,8 @@ Configuration::setRealtimeScheduler(NdbT
int
Configuration::setLockCPU(NdbThread * pThread,
- enum ThreadTypes type,
- bool exec_thread,
- bool init)
+ enum ThreadTypes type)
{
- (void)init;
Uint32 cpu_id;
int tid = NdbThread_GetTid(pThread);
if (tid == -1)
@@ -981,9 +981,6 @@ Configuration::setLockCPU(NdbThread * pT
We only set new lock CPU characteristics for the threads for which
it has changed
*/
- if ((exec_thread && type != MainThread) ||
- (!exec_thread && type == MainThread))
- return 0;
if (type == MainThread)
cpu_id = executeLockCPU();
else
@@ -1029,7 +1026,7 @@ Configuration::addThread(struct NdbThrea
* main threads are set in ThreadConfig::ipControlLoop
* as it's handled differently with mt
*/
- setLockCPU(pThread, type, (type == MainThread), TRUE);
+ setLockCPU(pThread, type);
}
return i;
}
=== modified file 'storage/ndb/src/kernel/vm/Configuration.hpp'
--- a/storage/ndb/src/kernel/vm/Configuration.hpp 2011-07-04 13:37:56 +0000
+++ b/storage/ndb/src/kernel/vm/Configuration.hpp 2011-08-25 09:40:27 +0000
@@ -86,10 +86,7 @@ public:
void setAllRealtimeScheduler();
void setAllLockCPU(bool exec_thread);
- int setLockCPU(NdbThread*,
- enum ThreadTypes type,
- bool exec_thread,
- bool init);
+ int setLockCPU(NdbThread*, enum ThreadTypes type);
int setRealtimeScheduler(NdbThread*,
enum ThreadTypes type,
bool real_time,
=== modified file 'storage/ndb/src/kernel/vm/Makefile.am'
--- a/storage/ndb/src/kernel/vm/Makefile.am 2011-06-30 15:59:25 +0000
+++ b/storage/ndb/src/kernel/vm/Makefile.am 2011-08-26 09:57:03 +0000
@@ -121,4 +121,10 @@ testSafeMutex_LDFLAGS = @ndb_bin_am_ldfl
$(top_builddir)/strings/libmystringslt.la \
@readline_link@ @TERMCAP_LIB@
+mt_thr_config_t_SOURCES = mt_thr_config.cpp
+mt_thr_config_t_CXXFLAGS = -DTEST_MT_THR_CONFIG
+mt_thr_config_t_LDADD = \
+ $(top_builddir)/storage/ndb/src/common/util/libgeneral.la \
+ $(top_builddir)/mysys/libmysyslt.la
+noinst_PROGRAMS = mt_thr_config-t
=== modified file 'storage/ndb/src/kernel/vm/SimulatedBlock.cpp'
--- a/storage/ndb/src/kernel/vm/SimulatedBlock.cpp 2011-06-30 15:59:25 +0000
+++ b/storage/ndb/src/kernel/vm/SimulatedBlock.cpp 2011-08-27 06:06:02 +0000
@@ -102,16 +102,7 @@ SimulatedBlock::SimulatedBlock(BlockNumb
globalData.setBlock(blockNumber, mainBlock);
} else {
ndbrequire(mainBlock != 0);
- if (mainBlock->theInstanceList == 0) {
- mainBlock->theInstanceList = new SimulatedBlock* [MaxInstances];
- ndbrequire(mainBlock->theInstanceList != 0);
- Uint32 i;
- for (i = 0; i < MaxInstances; i++)
- mainBlock->theInstanceList[i] = 0;
- }
- ndbrequire(theInstance < MaxInstances);
- ndbrequire(mainBlock->theInstanceList[theInstance] == 0);
- mainBlock->theInstanceList[theInstance] = this;
+ mainBlock->addInstance(this, theInstance);
}
theMainInstance = mainBlock;
@@ -139,6 +130,23 @@ SimulatedBlock::SimulatedBlock(BlockNumb
}
void
+SimulatedBlock::addInstance(SimulatedBlock* b, Uint32 theInstance)
+{
+ ndbrequire(theMainInstance == this);
+ ndbrequire(number() == b->number());
+ if (theInstanceList == 0)
+ {
+ theInstanceList = new SimulatedBlock* [MaxInstances];
+ ndbrequire(theInstanceList != 0);
+ for (Uint32 i = 0; i < MaxInstances; i++)
+ theInstanceList[i] = 0;
+ }
+ ndbrequire(theInstance < MaxInstances);
+ ndbrequire(theInstanceList[theInstance] == 0);
+ theInstanceList[theInstance] = b;
+}
+
+void
SimulatedBlock::initCommon()
{
Uint32 count = 10;
=== modified file 'storage/ndb/src/kernel/vm/SimulatedBlock.hpp'
--- a/storage/ndb/src/kernel/vm/SimulatedBlock.hpp 2011-05-17 23:29:55 +0000
+++ b/storage/ndb/src/kernel/vm/SimulatedBlock.hpp 2011-08-27 06:06:02 +0000
@@ -168,6 +168,7 @@ public:
return theInstanceList[instanceNumber];
return 0;
}
+ void addInstance(SimulatedBlock* b, Uint32 theInstanceNo);
virtual void loadWorkers() {}
struct ThreadContext
=== modified file 'storage/ndb/src/kernel/vm/dummy_nonmt.cpp'
--- a/storage/ndb/src/kernel/vm/dummy_nonmt.cpp 2011-06-30 15:59:25 +0000
+++ b/storage/ndb/src/kernel/vm/dummy_nonmt.cpp 2011-08-27 06:06:02 +0000
@@ -43,6 +43,12 @@ add_extra_worker_thr_map(Uint32, Uint32)
assert(false);
}
+void
+finalize_thr_map()
+{
+ assert(false);
+}
+
Uint32
compute_jb_pages(struct EmulatorData*)
{
=== modified file 'storage/ndb/src/kernel/vm/mt.cpp'
--- a/storage/ndb/src/kernel/vm/mt.cpp 2011-05-16 12:24:55 +0000
+++ b/storage/ndb/src/kernel/vm/mt.cpp 2011-08-27 06:06:02 +0000
@@ -2381,49 +2381,6 @@ check_queues_empty(thr_data *selfptr)
}
/*
- * Map instance number to real instance on this node. Used in
- * sendlocal/sendprioa to find right thread and in execute_signals
- * to find right block instance. SignalHeader is not modified.
- */
-
-static Uint8 g_map_instance[MAX_BLOCK_INSTANCES];
-
-static void
-map_instance_init()
-{
- g_map_instance[0] = 0;
- Uint32 ino;
- for (ino = 1; ino < MAX_BLOCK_INSTANCES; ino++)
- {
- if (!globalData.isNdbMtLqh)
- {
- g_map_instance[ino] = 0;
- }
- else
- {
- require(num_lqh_workers != 0);
- if (ino <= MAX_NDBMT_LQH_WORKERS)
- {
- g_map_instance[ino] = 1 + (ino - 1) % num_lqh_workers;
- }
- else
- {
- /* Extra workers are not mapped. */
- g_map_instance[ino] = ino;
- }
- }
- }
-}
-
-static inline Uint32
-map_instance(const SignalHeader *s)
-{
- Uint32 ino = blockToInstance(s->theReceiversBlockNumber);
- assert(ino < MAX_BLOCK_INSTANCES);
- return g_map_instance[ino];
-}
-
-/*
* Execute at most MAX_SIGNALS signals from one job queue, updating local read
* state as appropriate.
*
@@ -2489,7 +2446,7 @@ execute_signals(thr_data *selfptr, thr_j
NDB_PREFETCH_READ (read_buffer->m_data + read_pos + 32);
}
Uint32 bno = blockToMain(s->theReceiversBlockNumber);
- Uint32 ino = map_instance(s);
+ Uint32 ino = blockToInstance(s->theReceiversBlockNumber);
SimulatedBlock* block = globalData.mt_getBlock(bno, ino);
assert(block != 0);
@@ -2572,8 +2529,8 @@ run_job_buffers(thr_data *selfptr, Signa
}
struct thr_map_entry {
- enum { NULL_THR_NO = 0xFFFF };
- Uint32 thr_no;
+ enum { NULL_THR_NO = 0xFF };
+ Uint8 thr_no;
thr_map_entry() : thr_no(NULL_THR_NO) {}
};
@@ -2681,6 +2638,50 @@ add_extra_worker_thr_map(Uint32 block, U
add_thr_map(block, instance, thr_no);
}
+/**
+ * create the duplicate entries needed so that
+ * sender doesnt need to know how many instances there
+ * actually are in this node...
+ *
+ * if only 1 instance...then duplicate that for all slots
+ * else assume instance 0 is proxy...and duplicate workers (modulo)
+ *
+ * NOTE: extra pgman worker is instance 5
+ */
+void
+finalize_thr_map()
+{
+ for (Uint32 b = 0; b < NO_OF_BLOCKS; b++)
+ {
+ Uint32 bno = b + MIN_BLOCK_NO;
+ Uint32 cnt = 0;
+ while (cnt < MAX_BLOCK_INSTANCES &&
+ thr_map[b][cnt].thr_no != thr_map_entry::NULL_THR_NO)
+ cnt++;
+
+ if (cnt != MAX_BLOCK_INSTANCES)
+ {
+ SimulatedBlock * main = globalData.getBlock(bno, 0);
+ for (Uint32 i = cnt; i < MAX_BLOCK_INSTANCES; i++)
+ {
+ Uint32 dup = (cnt == 1) ? 0 : 1 + ((i - 1) % (cnt - 1));
+ if (thr_map[b][i].thr_no == thr_map_entry::NULL_THR_NO)
+ {
+ thr_map[b][i] = thr_map[b][dup];
+ main->addInstance(globalData.getBlock(bno, dup), i);
+ }
+ else
+ {
+ /**
+ * extra pgman instance
+ */
+ require(bno == PGMAN);
+ }
+ }
+ }
+ }
+}
+
static void reportSignalStats(Uint32 self, Uint32 a_count, Uint32 a_size,
Uint32 b_count, Uint32 b_size)
{
@@ -3120,7 +3121,7 @@ sendlocal(Uint32 self, const SignalHeade
const Uint32 secPtr[3])
{
Uint32 block = blockToMain(s->theReceiversBlockNumber);
- Uint32 instance = map_instance(s);
+ Uint32 instance = blockToInstance(s->theReceiversBlockNumber);
/*
* Max number of signals to put into job buffer before flushing the buffer
@@ -3156,7 +3157,7 @@ sendprioa(Uint32 self, const SignalHeade
const Uint32 secPtr[3])
{
Uint32 block = blockToMain(s->theReceiversBlockNumber);
- Uint32 instance = map_instance(s);
+ Uint32 instance = blockToInstance(s->theReceiversBlockNumber);
Uint32 dst = block2ThreadId(block, instance);
struct thr_repository* rep = &g_thr_repository;
@@ -3507,7 +3508,6 @@ ThreadConfig::init()
ndbout << "NDBMT: num_threads=" << num_threads << endl;
- ::map_instance_init();
::rep_init(&g_thr_repository, num_threads,
globalEmulatorData.m_mem_manager);
}
=== modified file 'storage/ndb/src/kernel/vm/mt.hpp'
--- a/storage/ndb/src/kernel/vm/mt.hpp 2011-06-30 15:59:25 +0000
+++ b/storage/ndb/src/kernel/vm/mt.hpp 2011-08-27 06:06:02 +0000
@@ -34,6 +34,7 @@ void add_thr_map(Uint32 block, Uint32 in
void add_main_thr_map();
void add_lqh_worker_thr_map(Uint32 block, Uint32 instance);
void add_extra_worker_thr_map(Uint32 block, Uint32 instance);
+void finalize_thr_map();
void sendlocal(Uint32 self, const struct SignalHeader *s,
const Uint32 *data, const Uint32 secPtr[3]);
=== added file 'storage/ndb/src/kernel/vm/mt_thr_config.cpp'
--- a/storage/ndb/src/kernel/vm/mt_thr_config.cpp 1970-01-01 00:00:00 +0000
+++ b/storage/ndb/src/kernel/vm/mt_thr_config.cpp 2011-08-26 09:57:03 +0000
@@ -0,0 +1,966 @@
+/*
+ Copyright (c) 2011, Oracle and/or its affiliates. All rights reserved.
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; version 2 of the License.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
+*/
+
+#include "mt_thr_config.hpp"
+#include <kernel/ndb_limits.h>
+#include "../../common/util/parse_mask.hpp"
+
+
+static const struct THRConfig::Entries m_entries[] =
+{
+ // name type min max
+ { "main", THRConfig::T_MAIN, 1, 1 },
+ { "ldm", THRConfig::T_LDM, 1, MAX_NDBMT_LQH_THREADS },
+ { "recv", THRConfig::T_RECV, 1, 1 },
+ { "rep", THRConfig::T_REP, 1, 1 },
+ { "maint", THRConfig::T_MAINT, 1, 1 }
+};
+
+static const struct THRConfig::Param m_params[] =
+{
+ { "count", THRConfig::Param::S_UNSIGNED },
+ { "cpubind", THRConfig::Param::S_BITMASK },
+ { "cpuset", THRConfig::Param::S_BITMASK }
+};
+
+#define IX_COUNT 0
+#define IX_CPUBOUND 1
+#define IX_CPUSET 2
+
+static
+unsigned
+getMaxEntries(Uint32 type)
+{
+ for (Uint32 i = 0; i<NDB_ARRAY_SIZE(m_entries); i++)
+ {
+ if (m_entries[i].m_type == type)
+ return m_entries[i].m_max_cnt;
+ }
+ return 0;
+}
+
+static
+const char *
+getEntryName(Uint32 type)
+{
+ for (Uint32 i = 0; i<NDB_ARRAY_SIZE(m_entries); i++)
+ {
+ if (m_entries[i].m_type == type)
+ return m_entries[i].m_name;
+ }
+ return 0;
+}
+
+static
+Uint32
+getEntryType(const char * type)
+{
+ for (Uint32 i = 0; i<NDB_ARRAY_SIZE(m_entries); i++)
+ {
+ if (strcasecmp(type, m_entries[i].m_name) == 0)
+ return i;
+ }
+
+ return THRConfig::T_END;
+}
+
+THRConfig::THRConfig()
+{
+ m_classic = false;
+}
+
+THRConfig::~THRConfig()
+{
+}
+
+int
+THRConfig::setLockExecuteThreadToCPU(const char * mask)
+{
+ int res = parse_mask(mask, m_LockExecuteThreadToCPU);
+ if (res < 0)
+ {
+ m_err_msg.assfmt("failed to parse 'LockExecuteThreadToCPU=%s' "
+ "(error: %d)",
+ mask, res);
+ return -1;
+ }
+ return 0;
+}
+
+int
+THRConfig::setLockMaintThreadsToCPU(unsigned val)
+{
+ m_LockMaintThreadsToCPU.set(val);
+ return 0;
+}
+
+void
+THRConfig::add(T_Type t)
+{
+ T_Thread tmp;
+ tmp.m_type = t;
+ tmp.m_bind_type = T_Thread::B_UNBOUND;
+ tmp.m_no = m_threads[t].size();
+ m_threads[t].push_back(tmp);
+}
+
+int
+THRConfig::do_parse(unsigned MaxNoOfExecutionThreads,
+ unsigned __ndbmt_lqh_threads,
+ unsigned __ndbmt_classic)
+{
+ /**
+ * This is old ndbd.cpp : get_multithreaded_config
+ */
+ if (__ndbmt_classic)
+ {
+ m_classic = true;
+ add(T_LDM);
+ add(T_MAIN);
+ add(T_MAINT);
+ return 0;
+ }
+
+ Uint32 lqhthreads = 0;
+ switch(MaxNoOfExecutionThreads){
+ case 0:
+ case 1:
+ case 2:
+ case 3:
+ lqhthreads = 1; // TC + receiver + SUMA + LQH
+ break;
+ case 4:
+ case 5:
+ case 6:
+ lqhthreads = 2; // TC + receiver + SUMA + 2 * LQH
+ break;
+ default:
+ lqhthreads = 4; // TC + receiver + SUMA + 4 * LQH
+ }
+
+ if (__ndbmt_lqh_threads)
+ {
+ lqhthreads = __ndbmt_lqh_threads;
+ }
+
+ add(T_MAIN);
+ add(T_REP);
+ add(T_RECV);
+ add(T_MAINT);
+ for(Uint32 i = 0; i < lqhthreads; i++)
+ {
+ add(T_LDM);
+ }
+
+ return do_bindings() || do_validate();
+}
+
+int
+THRConfig::do_bindings()
+{
+ if (m_LockMaintThreadsToCPU.count() == 1)
+ {
+ m_threads[T_MAINT][0].m_bind_type = T_Thread::B_CPU_BOUND;
+ m_threads[T_MAINT][0].m_bind_no = m_LockMaintThreadsToCPU.getBitNo(0);
+ }
+ else if (m_LockMaintThreadsToCPU.count() > 1)
+ {
+ unsigned no = createCpuSet(m_LockMaintThreadsToCPU);
+ m_threads[T_MAINT][0].m_bind_type = T_Thread::B_CPUSET_BOUND;
+ m_threads[T_MAINT][0].m_bind_no = no;
+ }
+
+ /**
+ * Check that no cpu_sets overlap
+ */
+ for (unsigned i = 0; i<m_cpu_sets.size(); i++)
+ {
+ for (unsigned j = i + 1; j < m_cpu_sets.size(); j++)
+ {
+ if (m_cpu_sets[i].overlaps(m_cpu_sets[j]))
+ {
+ m_err_msg.assfmt("Overlapping cpuset's [ %s ] and [ %s ]",
+ m_cpu_sets[i].str().c_str(),
+ m_cpu_sets[j].str().c_str());
+ return -1;
+ }
+ }
+ }
+
+ /**
+ * Check that no cpu_sets overlap
+ */
+ for (unsigned i = 0; i < NDB_ARRAY_SIZE(m_threads); i++)
+ {
+ for (unsigned j = 0; j < m_threads[i].size(); j++)
+ {
+ if (m_threads[i][j].m_bind_type == T_Thread::B_CPU_BOUND)
+ {
+ unsigned cpu = m_threads[i][j].m_bind_no;
+ for (unsigned k = 0; k<m_cpu_sets.size(); k++)
+ {
+ if (m_cpu_sets[k].get(cpu))
+ {
+ m_err_msg.assfmt("Overlapping cpubind %u with cpuset [ %s ]",
+ cpu,
+ m_cpu_sets[k].str().c_str());
+
+ return -1;
+ }
+ }
+ }
+ }
+ }
+
+ /**
+ * Remove all already bound threads from LockExecuteThreadToCPU-mask
+ */
+ for (unsigned i = 0; i<m_cpu_sets.size(); i++)
+ {
+ for (unsigned j = 0; j < m_cpu_sets[i].count(); j++)
+ {
+ m_LockExecuteThreadToCPU.clear(m_cpu_sets[i].getBitNo(j));
+ }
+ }
+
+ unsigned cnt_unbound = 0;
+ for (unsigned i = 0; i < NDB_ARRAY_SIZE(m_threads); i++)
+ {
+ for (unsigned j = 0; j < m_threads[i].size(); j++)
+ {
+ if (m_threads[i][j].m_bind_type == T_Thread::B_CPU_BOUND)
+ {
+ unsigned cpu = m_threads[i][j].m_bind_no;
+ m_LockExecuteThreadToCPU.clear(cpu);
+ }
+ else if (m_threads[i][j].m_bind_type == T_Thread::B_UNBOUND)
+ {
+ cnt_unbound ++;
+ }
+ }
+ }
+
+ if (m_threads[T_MAINT][0].m_bind_type == T_Thread::B_UNBOUND)
+ {
+ /**
+ * don't count this one...
+ */
+ cnt_unbound--;
+ }
+
+ if (m_LockExecuteThreadToCPU.count())
+ {
+ /**
+ * This is old mt.cpp : setcpuaffinity
+ */
+ SparseBitmask& mask = m_LockExecuteThreadToCPU;
+ unsigned cnt = mask.count();
+ unsigned num_threads = cnt_unbound;
+ bool isMtLqh = !m_classic;
+
+ if (cnt < num_threads)
+ {
+ m_info_msg.assfmt("WARNING: Too few CPU's specified with "
+ "LockExecuteThreadToCPU. Only %u specified "
+ " but %u was needed, this may cause contention.\n",
+ cnt, num_threads);
+ }
+
+ if (cnt >= num_threads)
+ {
+ m_info_msg.appfmt("Assigning each thread its own CPU\n");
+ unsigned no = 0;
+ for (unsigned i = 0; i < NDB_ARRAY_SIZE(m_threads); i++)
+ {
+ if (i == T_MAINT)
+ continue;
+ for (unsigned j = 0; j < m_threads[i].size(); j++)
+ {
+ if (m_threads[i][j].m_bind_type == T_Thread::B_UNBOUND)
+ {
+ m_threads[i][j].m_bind_type = T_Thread::B_CPU_BOUND;
+ m_threads[i][j].m_bind_no = mask.getBitNo(no);
+ no++;
+ }
+ }
+ }
+ }
+ else if (cnt == 1)
+ {
+ unsigned cpu = mask.getBitNo(0);
+ m_info_msg.appfmt("Assigning all threads to CPU %u\n", cpu);
+ for (unsigned i = 0; i < NDB_ARRAY_SIZE(m_threads); i++)
+ {
+ if (i == T_MAINT)
+ continue;
+ bind_unbound(m_threads[i], cpu);
+ }
+ }
+ else if (isMtLqh)
+ {
+ unsigned unbound_ldm = count_unbound(m_threads[T_LDM]);
+ if (cnt > unbound_ldm)
+ {
+ /**
+ * let each LQH have it's own CPU and rest share...
+ */
+ m_info_msg.append("Assigning LQH threads to dedicated CPU(s) and "
+ "other threads will share remaining\n");
+ unsigned cpu = mask.find(0);
+ for (unsigned i = 0; i < m_threads[T_LDM].size(); i++)
+ {
+ if (m_threads[T_LDM][i].m_bind_type == T_Thread::B_UNBOUND)
+ {
+ m_threads[T_LDM][i].m_bind_type = T_Thread::B_CPU_BOUND;
+ m_threads[T_LDM][i].m_bind_no = cpu;
+ mask.clear(cpu);
+ cpu = mask.find(cpu + 1);
+ }
+ }
+
+ cpu = mask.find(0);
+ bind_unbound(m_threads[T_MAIN], cpu);
+ bind_unbound(m_threads[T_REP], cpu);
+ if ((cpu = mask.find(cpu + 1)) == mask.NotFound)
+ {
+ cpu = mask.find(0);
+ }
+ bind_unbound(m_threads[T_RECV], cpu);
+ }
+ else
+ {
+ // put receiver, tc, backup/suma in 1 thread,
+ // and round robin LQH for rest
+ unsigned cpu = mask.find(0);
+ m_info_msg.appfmt("Assigning LQH threads round robin to CPU(s) and "
+ "other threads will share CPU %u\n", cpu);
+ bind_unbound(m_threads[T_MAIN], cpu); // TC
+ bind_unbound(m_threads[T_REP], cpu);
+ bind_unbound(m_threads[T_RECV], cpu);
+ mask.clear(cpu);
+
+ cpu = mask.find(0);
+ for (unsigned i = 0; i < m_threads[T_LDM].size(); i++)
+ {
+ if (m_threads[T_LDM][i].m_bind_type == T_Thread::B_UNBOUND)
+ {
+ m_threads[T_LDM][i].m_bind_type = T_Thread::B_CPU_BOUND;
+ m_threads[T_LDM][i].m_bind_no = cpu;
+ if ((cpu = mask.find(cpu + 1)) == mask.NotFound)
+ {
+ cpu = mask.find(0);
+ }
+ }
+ }
+ }
+ }
+ else
+ {
+ unsigned cpu = mask.find(0);
+ m_info_msg.appfmt("Assigning LQH thread to CPU %u and "
+ "other threads will share\n", cpu);
+ bind_unbound(m_threads[T_LDM], cpu);
+ cpu = mask.find(cpu + 1);
+ bind_unbound(m_threads[T_MAIN], cpu);
+ bind_unbound(m_threads[T_RECV], cpu);
+ }
+ }
+
+ return 0;
+}
+
+unsigned
+THRConfig::count_unbound(const Vector<T_Thread>& vec) const
+{
+ unsigned cnt = 0;
+ for (unsigned i = 0; i < vec.size(); i++)
+ {
+ if (vec[i].m_bind_type == T_Thread::B_UNBOUND)
+ cnt ++;
+ }
+ return cnt;
+}
+
+void
+THRConfig::bind_unbound(Vector<T_Thread>& vec, unsigned cpu)
+{
+ for (unsigned i = 0; i < vec.size(); i++)
+ {
+ if (vec[i].m_bind_type == T_Thread::B_UNBOUND)
+ {
+ vec[i].m_bind_type = T_Thread::B_CPU_BOUND;
+ vec[i].m_bind_no = cpu;
+ }
+ }
+}
+
+int
+THRConfig::do_validate()
+{
+ /**
+ * Check that there aren't too many of any thread type
+ */
+ for (unsigned i = 0; i< NDB_ARRAY_SIZE(m_threads); i++)
+ {
+ if (m_threads[i].size() > getMaxEntries(i))
+ {
+ m_err_msg.assfmt("Too many instances(%u) of %s max supported: %u",
+ m_threads[i].size(),
+ getEntryName(i),
+ getMaxEntries(i));
+ return -1;
+ }
+ }
+
+ /**
+ * LDM can be 1 2 4
+ */
+ if (m_threads[T_LDM].size() == 3)
+ {
+ m_err_msg.assfmt("No of LDM-instances can be 1,2,4. Specified: %u",
+ m_threads[T_LDM].size());
+ return -1;
+ }
+
+ return 0;
+}
+
+const char *
+THRConfig::getConfigString()
+{
+ m_cfg_string.clear();
+ const char * sep = "";
+ for (unsigned i = 0; i < NDB_ARRAY_SIZE(m_threads); i++)
+ {
+ if (m_threads[i].size())
+ {
+ const char * name = getEntryName(i);
+ if (i != T_MAINT)
+ {
+ for (unsigned j = 0; j < m_threads[i].size(); j++)
+ {
+ m_cfg_string.append(sep);
+ sep=",";
+ m_cfg_string.append(name);
+ if (m_threads[i][j].m_bind_type != T_Thread::B_UNBOUND)
+ {
+ m_cfg_string.append("={");
+ if (m_threads[i][j].m_bind_type == T_Thread::B_CPU_BOUND)
+ {
+ m_cfg_string.appfmt("cpubind=%u", m_threads[i][j].m_bind_no);
+ }
+ else if (m_threads[i][j].m_bind_type == T_Thread::B_CPUSET_BOUND)
+ {
+ m_cfg_string.appfmt("cpuset=%s",
+ m_cpu_sets[m_threads[i][j].m_bind_no].str().c_str());
+ }
+ m_cfg_string.append("}");
+ }
+ }
+ }
+ else
+ {
+ for (unsigned j = 0; j < m_threads[i].size(); j++)
+ {
+ if (m_threads[i][j].m_bind_type != T_Thread::B_UNBOUND)
+ {
+ m_cfg_string.append(sep);
+ sep=",";
+ m_cfg_string.append(name);
+ m_cfg_string.append("={");
+ if (m_threads[i][j].m_bind_type == T_Thread::B_CPU_BOUND)
+ {
+ m_cfg_string.appfmt("cpubind=%u", m_threads[i][j].m_bind_no);
+ }
+ else if (m_threads[i][j].m_bind_type == T_Thread::B_CPUSET_BOUND)
+ {
+ m_cfg_string.appfmt("cpuset=%s",
+ m_cpu_sets[m_threads[i][j].m_bind_no].str().c_str());
+ }
+ m_cfg_string.append("}");
+ }
+ }
+ }
+ }
+ }
+ return m_cfg_string.c_str();
+}
+
+const char *
+THRConfig::getErrorMessage() const
+{
+ return m_err_msg.c_str();
+}
+
+const char *
+THRConfig::getInfoMessage() const
+{
+ return m_info_msg.c_str();
+}
+
+static
+char *
+skipblank(char * str)
+{
+ while (isblank(* str))
+ str++;
+ return str;
+}
+
+Uint32
+THRConfig::find_type(char *& str)
+{
+ str = skipblank(str);
+
+ char * name = str;
+ if (* name == 0)
+ {
+ m_err_msg.assfmt("empty thread specification");
+ return 0;
+ }
+ char * end = name;
+ while(isalpha(* end))
+ end++;
+
+ char save = * end;
+ * end = 0;
+ Uint32 t = getEntryType(name);
+ if (t == T_END)
+ {
+ m_err_msg.assfmt("unknown thread type '%s'", name);
+ }
+ * end = save;
+ str = end;
+ return t;
+}
+
+struct ParamValue
+{
+ ParamValue() { found = false;}
+ bool found;
+ const char * string_val;
+ unsigned unsigned_val;
+ SparseBitmask mask_val;
+};
+
+static
+int
+parseUnsigned(char *& str, unsigned * dst)
+{
+ str = skipblank(str);
+ char * endptr = 0;
+ errno = 0;
+ long val = strtoll(str, &endptr, 0);
+ if (errno == ERANGE)
+ return -1;
+ if (val < 0 || Int64(val) > 0xFFFFFFFF)
+ return -1;
+ if (endptr == str)
+ return -1;
+ str = endptr;
+ *dst = (unsigned)val;
+ return 0;
+}
+
+static
+int
+parseBitmask(char *& str, SparseBitmask * mask)
+{
+ str = skipblank(str);
+ size_t len = strspn(str, "0123456789-, ");
+ if (len == 0)
+ return -1;
+
+ while (isblank(str[len-1]))
+ len--;
+ if (str[len-1] == ',')
+ len--;
+ char save = str[len];
+ str[len] = 0;
+ int res = parse_mask(str, *mask);
+ str[len] = save;
+ str = str + len;
+ return res;
+}
+
+static
+int
+parseParams(char * str, ParamValue values[], BaseString& err)
+{
+ const char * const save = str;
+ while (* str)
+ {
+ str = skipblank(str);
+
+ unsigned idx = 0;
+ for (; idx < NDB_ARRAY_SIZE(m_params); idx++)
+ {
+ if (strncasecmp(str, m_params[idx].name, strlen(m_params[idx].name)) == 0)
+ {
+ str += strlen(m_params[idx].name);
+ break;
+ }
+ }
+
+ if (idx == NDB_ARRAY_SIZE(m_params))
+ {
+ err.assfmt("Unknown param near: '%s'", str);
+ return -1;
+ }
+
+ if (values[idx].found == true)
+ {
+ err.assfmt("Param '%s' found twice", m_params[idx].name);
+ return -1;
+ }
+
+ str = skipblank(str);
+ if (* str != '=')
+ {
+ err.assfmt("Missing '=' after %s in '%s'", m_params[idx].name, save);
+ return -1;
+ }
+ str++;
+ str = skipblank(str);
+
+ int res = 0;
+ switch(m_params[idx].type){
+ case THRConfig::Param::S_UNSIGNED:
+ res = parseUnsigned(str, &values[idx].unsigned_val);
+ break;
+ case THRConfig::Param::S_BITMASK:
+ res = parseBitmask(str, &values[idx].mask_val);
+ break;
+ default:
+ err.assfmt("Internal error, unknown type for param: '%s'",
+ m_params[idx].name);
+ return -1;
+ }
+ if (res == -1)
+ {
+ err.assfmt("Unable to parse %s=%s", m_params[idx].name, str);
+ return -1;
+ }
+ values[idx].found = true;
+ str = skipblank(str);
+
+ if (* str == 0)
+ break;
+
+ if (* str != ',')
+ {
+ err.assfmt("Unable to parse near '%s'", str);
+ return -1;
+ }
+ str++;
+ }
+ return 0;
+}
+
+int
+THRConfig::find_spec(char *& str, T_Type type)
+{
+ str = skipblank(str);
+
+ switch(* str){
+ case ',':
+ case 0:
+ add(type);
+ return 0;
+ }
+
+ if (* str != '=')
+ {
+err:
+ int len = (int)strlen(str);
+ m_err_msg.assfmt("Invalid format near: '%.*s'",
+ (len > 10) ? 10 : len, str);
+ return -1;
+ }
+
+ str++; // skip over =
+ str = skipblank(str);
+
+ if (* str != '{')
+ {
+ goto err;
+ }
+
+ str++;
+ char * start = str;
+
+ /**
+ * Find end
+ */
+ while (* str && (* str) != '}')
+ str++;
+
+ if (* str != '}')
+ {
+ goto err;
+ }
+
+ char * end = str;
+ char save = * end;
+ * end = 0;
+
+ ParamValue values[NDB_ARRAY_SIZE(m_params)];
+ values[IX_COUNT].unsigned_val = 1;
+ int res = parseParams(start, values, m_err_msg);
+ * end = save;
+
+ if (res != 0)
+ {
+ return -1;
+ }
+
+ if (values[IX_CPUBOUND].found && values[IX_CPUSET].found)
+ {
+ m_err_msg.assfmt("Both cpuset and cpubind specified!");
+ return -1;
+ }
+
+ unsigned cnt = values[IX_COUNT].unsigned_val;
+ const int index = m_threads[type].size();
+ for (unsigned i = 0; i < cnt; i++)
+ {
+ add(type);
+ }
+
+ assert(m_threads[type].size() == index + cnt);
+ if (values[IX_CPUSET].found)
+ {
+ SparseBitmask & mask = values[IX_CPUSET].mask_val;
+ unsigned no = createCpuSet(mask);
+ for (unsigned i = 0; i < cnt; i++)
+ {
+ m_threads[type][index+i].m_bind_type = T_Thread::B_CPUSET_BOUND;
+ m_threads[type][index+i].m_bind_no = no;
+ }
+ }
+ else if (values[IX_CPUBOUND].found)
+ {
+ SparseBitmask & mask = values[IX_CPUBOUND].mask_val;
+ if (mask.count() < cnt)
+ {
+ m_err_msg.assfmt("%s: trying to bind %u threads to %u cpus [%s]",
+ getEntryName(type),
+ cnt,
+ mask.count(),
+ mask.str().c_str());
+ return -1;
+ }
+ for (unsigned i = 0; i < cnt; i++)
+ {
+ m_threads[type][index+i].m_bind_type = T_Thread::B_CPU_BOUND;
+ m_threads[type][index+i].m_bind_no = mask.getBitNo(i % mask.count());
+ }
+ }
+
+ str++; // skip over }
+ return 0;
+}
+
+int
+THRConfig::find_next(char *& str)
+{
+ str = skipblank(str);
+
+ if (* str == 0)
+ {
+ return 0;
+ }
+ else if (* str == ',')
+ {
+ str++;
+ return 1;
+ }
+
+ int len = (int)strlen(str);
+ m_err_msg.assfmt("Invalid format near: '%.*s'",
+ (len > 10) ? 10 : len, str);
+ return -1;
+}
+
+int
+THRConfig::do_parse(const char * ThreadConfig)
+{
+ BaseString str(ThreadConfig);
+ char * ptr = (char*)str.c_str();
+ while (* ptr)
+ {
+ Uint32 type = find_type(ptr);
+ if (type == T_END)
+ return -1;
+
+ if (find_spec(ptr, (T_Type)type) < 0)
+ return -1;
+
+ int ret = find_next(ptr);
+ if (ret < 0)
+ return ret;
+
+ if (ret == 0)
+ break;
+ }
+
+ for (Uint32 i = 0; i < T_END; i++)
+ {
+ while (m_threads[i].size() < m_entries[i].m_min_cnt)
+ add((T_Type)i);
+ }
+
+ return do_bindings() || do_validate();
+}
+
+unsigned
+THRConfig::createCpuSet(const SparseBitmask& mask)
+{
+ for (size_t i = 0; i < m_cpu_sets.size(); i++)
+ if (m_cpu_sets[i].equal(mask))
+ return i;
+
+ m_cpu_sets.push_back(mask);
+ return m_cpu_sets.size() - 1;
+}
+
+template class Vector<SparseBitmask>;
+template class Vector<THRConfig::T_Thread>;
+
+#ifdef TEST_MT_THR_CONFIG
+
+#include <NdbTap.hpp>
+
+TAPTEST(mt_thr_config)
+{
+ {
+ THRConfig tmp;
+ OK(tmp.do_parse(8, 0, 0) == 0);
+ }
+
+ /**
+ * BASIC test
+ */
+ {
+ const char * ok[] =
+ {
+ "ldm,ldm",
+ "ldm={count=3},ldm",
+ "ldm={cpubind=1-2,5,count=3},ldm",
+ "ldm={ cpubind = 1- 2, 5 , count = 3 },ldm",
+ "ldm={count=3,cpubind=1-2,5 }, ldm",
+ "ldm={cpuset=1-3,count=3 },ldm",
+ "main,ldm={},ldm",
+ 0
+ };
+
+ const char * fail [] =
+ {
+ "ldm,ldm,ldm",
+ "ldm={cpubind= 1 , cpuset=2 },ldm",
+ "ldm={count=4,cpubind=1-3},ldm",
+ "main,main,ldm,ldm",
+ "main={ keso=88, count=23},ldm,ldm",
+ "main={ cpuset=1-3 }, ldm={cpuset=3-4}",
+ "main={ cpuset=1-3 }, ldm={cpubind=2}",
+ 0
+ };
+
+ for (Uint32 i = 0; ok[i]; i++)
+ {
+ THRConfig tmp;
+ int res = tmp.do_parse(ok[i]);
+ printf("do_parse(%s) => %s - %s\n", ok[i],
+ res == 0 ? "OK" : "FAIL",
+ res == 0 ? "" : tmp.getErrorMessage());
+ OK(res == 0);
+ {
+ BaseString out(tmp.getConfigString());
+ THRConfig check;
+ OK(check.do_parse(out.c_str()) == 0);
+ OK(strcmp(out.c_str(), check.getConfigString()) == 0);
+ }
+ }
+
+ for (Uint32 i = 0; fail[i]; i++)
+ {
+ THRConfig tmp;
+ int res = tmp.do_parse(fail[i]);
+ printf("do_parse(%s) => %s - %s\n", fail[i],
+ res == 0 ? "OK" : "FAIL",
+ res == 0 ? "" : tmp.getErrorMessage());
+ OK(res != 0);
+ }
+ }
+
+ {
+ /**
+ * Test interaction with LockExecuteThreadToCPU
+ */
+ const char * t[] =
+ {
+ /** threads, LockExecuteThreadToCPU, answer */
+ "1-8",
+ "ldm={count=4}",
+ "main={cpubind=1},ldm={cpubind=2},ldm={cpubind=3},ldm={cpubind=4},ldm={cpubind=5},recv={cpubind=6},rep={cpubind=7}",
+
+ "1-5",
+ "ldm={count=4}",
+ "main={cpubind=5},ldm={cpubind=1},ldm={cpubind=2},ldm={cpubind=3},ldm={cpubind=4},recv={cpubind=5},rep={cpubind=5}",
+
+ "1-3",
+ "ldm={count=4}",
+ "main={cpubind=1},ldm={cpubind=2},ldm={cpubind=3},ldm={cpubind=2},ldm={cpubind=3},recv={cpubind=1},rep={cpubind=1}",
+
+ "1-4",
+ "ldm={count=4}",
+ "main={cpubind=1},ldm={cpubind=2},ldm={cpubind=3},ldm={cpubind=4},ldm={cpubind=2},recv={cpubind=1},rep={cpubind=1}",
+
+ "1-8",
+ "ldm={count=4},maint={cpubind=8}",
+ "main={cpubind=1},ldm={cpubind=2},ldm={cpubind=3},ldm={cpubind=4},ldm={cpubind=5},recv={cpubind=6},rep={cpubind=7},maint={cpubind=8}",
+
+ "1-8",
+ "ldm={count=4,cpubind=1,4,5,6}",
+ "main={cpubind=2},ldm={cpubind=1},ldm={cpubind=4},ldm={cpubind=5},ldm={cpubind=6},recv={cpubind=3},rep={cpubind=7}",
+
+ // END
+ 0
+ };
+
+ for (unsigned i = 0; t[i]; i+= 3)
+ {
+ THRConfig tmp;
+ tmp.setLockExecuteThreadToCPU(t[i+0]);
+ int res = tmp.do_parse(t[i+1]);
+ int ok = strcmp(tmp.getConfigString(), t[i+2]) == 0;
+ printf("mask: %s conf: %s => %s(%s) - %s - %s\n",
+ t[i+0],
+ t[i+1],
+ res == 0 ? "OK" : "FAIL",
+ res == 0 ? "" : tmp.getErrorMessage(),
+ tmp.getConfigString(),
+ ok == 1 ? "CORRECT" : "INCORRECT");
+ OK(res == 0);
+ OK(ok == 1);
+ }
+ }
+
+ return 1;
+}
+
+#endif
=== added file 'storage/ndb/src/kernel/vm/mt_thr_config.hpp'
--- a/storage/ndb/src/kernel/vm/mt_thr_config.hpp 1970-01-01 00:00:00 +0000
+++ b/storage/ndb/src/kernel/vm/mt_thr_config.hpp 2011-08-26 09:57:03 +0000
@@ -0,0 +1,125 @@
+/*
+ Copyright (c) 2011, Oracle and/or its affiliates. All rights reserved.
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; version 2 of the License.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
+*/
+
+#ifndef THRConfig_H
+#define THRConfig_H
+
+struct NdbThread;
+#include <Vector.hpp>
+#include <SparseBitmask.hpp>
+#include <BaseString.hpp>
+
+/**
+ * This class contains thread configuration
+ * it supports parsing the ThreadConfig parameter
+ * and handling LockExecuteThreadToCPU etc...
+ *
+ * This is used in ndb_mgmd when verifying configuration
+ * and by ndbmtd
+ *
+ * TAP-TESTS are provided in mt_thr_config.cpp
+ */
+class THRConfig
+{
+public:
+ enum T_Type
+ {
+ T_MAIN = 0, /* DIH/QMGR/TC/SPJ etc */
+ T_LDM = 1, /* LQH/ACC/TUP/TUX etc */
+ T_RECV = 2, /* CMVMI */
+ T_REP = 3, /* SUMA */
+ T_MAINT = 4, /* FS, SocketServer etc */
+
+ T_END = 5
+ };
+
+ THRConfig();
+ ~THRConfig();
+
+ // NOTE: needs to be called before do_parse
+ int setLockExecuteThreadToCPU(const char * val);
+ int setLockMaintThreadsToCPU(unsigned val);
+
+ int do_parse(const char * ThreadConfig);
+ int do_parse(unsigned MaxNoOfExecutionThreads,
+ unsigned __ndbmt_lqh_workers,
+ unsigned __ndbmt_classic);
+
+ const char * getConfigString();
+
+ const char * getErrorMessage() const;
+ const char * getInfoMessage() const;
+
+private:
+ struct T_Thread
+ {
+ unsigned m_type;
+ unsigned m_no; // within type
+ enum BType { B_UNBOUND, B_CPU_BOUND, B_CPUSET_BOUND } m_bind_type;
+ unsigned m_bind_no; // cpu_no/cpuset_no
+ };
+ bool m_classic;
+ SparseBitmask m_LockExecuteThreadToCPU;
+ SparseBitmask m_LockMaintThreadsToCPU;
+ Vector<SparseBitmask> m_cpu_sets;
+ Vector<T_Thread> m_threads[T_END];
+
+ BaseString m_err_msg;
+ BaseString m_info_msg;
+ BaseString m_cfg_string;
+ BaseString m_print_string;
+
+ void add(T_Type);
+ Uint32 find_type(char *&);
+ int find_spec(char *&, T_Type);
+ int find_next(char *&);
+
+ unsigned createCpuSet(const SparseBitmask&);
+ int do_bindings();
+ int do_validate();
+
+ unsigned count_unbound(const Vector<T_Thread>& vec) const;
+ void bind_unbound(Vector<T_Thread> & vec, unsigned cpu);
+
+public:
+ struct Entries
+ {
+ const char * m_name;
+ unsigned m_type;
+ unsigned m_min_cnt;
+ unsigned m_max_cnt;
+ };
+
+ struct Param
+ {
+ const char * name;
+ enum { S_UNSIGNED, S_BITMASK } type;
+ };
+};
+
+/**
+ * This class is used by ndbmtd
+ * when setting up threads (and locking)
+ */
+class THRConfigApplier : public THRConfig
+{
+public:
+ int create_cpusets();
+ int do_bind(unsigned t_type, unsigned no, NdbThread*);
+};
+
+#endif // IPCConfig_H
=== modified file 'storage/ndb/src/ndbapi/NdbQueryOperation.cpp'
--- a/storage/ndb/src/ndbapi/NdbQueryOperation.cpp 2011-08-18 11:47:22 +0000
+++ b/storage/ndb/src/ndbapi/NdbQueryOperation.cpp 2011-08-22 12:56:56 +0000
@@ -57,6 +57,7 @@ static const int Err_FunctionNotImplemen
static const int Err_UnknownColumn = 4004;
static const int Err_ReceiveTimedOut = 4008;
static const int Err_NodeFailCausedAbort = 4028;
+static const int Err_ParameterError = 4118;
static const int Err_SimpleDirtyReadFailed = 4119;
static const int Err_WrongFieldLength = 4209;
static const int Err_ReadTooMuch = 4257;
@@ -72,6 +73,21 @@ static const Uint16 tupleNotFound = 0xff
/** Set to true to trace incomming signals.*/
const bool traceSignals = false;
+enum
+{
+ /**
+ * Set NdbQueryOperationImpl::m_parallelism to this value to indicate that
+ * scan parallelism should be adaptive.
+ */
+ Parallelism_adaptive = 0xffff0000,
+
+ /**
+ * Set NdbQueryOperationImpl::m_parallelism to this value to indicate that
+ * all fragments should be scanned in parallel.
+ */
+ Parallelism_max = 0xffff0001
+};
+
/**
* A class for accessing the correlation data at the end of a tuple (for
* scan queries). These data have the following layout:
@@ -358,9 +374,6 @@ public:
Uint32 getRowCount() const
{ return m_rowCount; }
- char* getRow(Uint32 tupleNo) const
- { return (m_buffer + (tupleNo*m_rowSize)); }
-
private:
/** No copying.*/
NdbResultSet(const NdbResultSet&);
@@ -372,6 +385,9 @@ private:
/** Used for checking if buffer overrun occurred. */
Uint32* m_batchOverflowCheck;
+ /** Array of TupleCorrelations for all rows in m_buffer */
+ TupleCorrelation* m_correlations;
+
Uint32 m_rowSize;
/** The current #rows in 'm_buffer'.*/
@@ -415,8 +431,8 @@ public:
const NdbReceiver& getReceiver() const
{ return m_receiver; }
- const char* getCurrentRow() const
- { return m_receiver.peek_row(); }
+ const char* getCurrentRow()
+ { return m_receiver.get_row(); }
/**
* Process an incomming tuple for this stream. Extract parent and own tuple
@@ -653,6 +669,7 @@ void* NdbBulkAllocator::allocObjMem(Uint
NdbResultSet::NdbResultSet() :
m_buffer(NULL),
m_batchOverflowCheck(NULL),
+ m_correlations(NULL),
m_rowSize(0),
m_rowCount(0)
{}
@@ -672,6 +689,12 @@ NdbResultSet::init(NdbQueryImpl& query,
m_batchOverflowCheck =
reinterpret_cast<Uint32*>(bufferAlloc.allocObjMem(sizeof(Uint32)));
*m_batchOverflowCheck = 0xacbd1234;
+
+ if (query.getQueryDef().isScanQuery())
+ {
+ m_correlations = reinterpret_cast<TupleCorrelation*>
+ (bufferAlloc.allocObjMem(maxRows*sizeof(TupleCorrelation)));
+ }
}
}
@@ -856,19 +879,17 @@ void
NdbResultStream::execTRANSID_AI(const Uint32 *ptr, Uint32 len,
TupleCorrelation correlation)
{
- m_receiver.execTRANSID_AI(ptr, len);
- m_resultSets[m_recv].m_rowCount++;
-
+ NdbResultSet& receiveSet = m_resultSets[m_recv];
if (isScanQuery())
{
/**
- * Store TupleCorrelation as hidden value imm. after received row
- * (NdbQueryOperationImpl::getRowSize() has reserved space for it)
+ * Store TupleCorrelation.
*/
- Uint32* row_recv = reinterpret_cast<Uint32*>
- (m_receiver.m_record.m_row_recv);
- row_recv[-1] = correlation.toUint32();
+ receiveSet.m_correlations[receiveSet.m_rowCount] = correlation;
}
+
+ m_receiver.execTRANSID_AI(ptr, len);
+ receiveSet.m_rowCount++;
} // NdbResultStream::execTRANSID_AI()
/**
@@ -994,8 +1015,8 @@ NdbResultStream::prepareResultSet(Uint32
/**
* Fill m_tupleSet[] with correlation data between parent
- * and child tuples. The 'TupleCorrelation' is stored as
- * and extra Uint32 after each row in the NdbResultSet
+ * and child tuples. The 'TupleCorrelation' is stored
+ * in an array of TupleCorrelations in each ResultSet
* by execTRANSID_AI().
*
* NOTE: In order to reduce work done when holding the
@@ -1024,12 +1045,9 @@ NdbResultStream::buildResultCorrelations
/* Rebuild correlation & hashmap from 'readResult' */
for (Uint32 tupleNo=0; tupleNo<readResult.m_rowCount; tupleNo++)
{
- const Uint32* row = (Uint32*)readResult.getRow(tupleNo+1);
- const TupleCorrelation correlation(row[-1]);
-
- const Uint16 tupleId = correlation.getTupleId();
+ const Uint16 tupleId = readResult.m_correlations[tupleNo].getTupleId();
const Uint16 parentId = (m_parent!=NULL)
- ? correlation.getParentTupleId()
+ ? readResult.m_correlations[tupleNo].getParentTupleId()
: tupleNotFound;
m_tupleSet[tupleNo].m_skip = false;
@@ -1490,6 +1508,14 @@ int NdbQueryOperation::setParallelism(Ui
return m_impl.setParallelism(parallelism);
}
+int NdbQueryOperation::setMaxParallelism(){
+ return m_impl.setMaxParallelism();
+}
+
+int NdbQueryOperation::setAdaptiveParallelism(){
+ return m_impl.setAdaptiveParallelism();
+}
+
int NdbQueryOperation::setBatchSize(Uint32 batchSize){
return m_impl.setBatchSize(batchSize);
}
@@ -2623,16 +2649,17 @@ NdbQueryImpl::prepareSend()
{
/* For the first batch, we read from all fragments for both ordered
* and unordered scans.*/
- if (getQueryOperation(0U).m_parallelism > 0)
+ if (getQueryOperation(0U).m_parallelism == Parallelism_max)
{
m_rootFragCount
- = MIN(getRoot().getQueryOperationDef().getTable().getFragmentCount(),
- getQueryOperation(0U).m_parallelism);
+ = getRoot().getQueryOperationDef().getTable().getFragmentCount();
}
else
{
+ assert(getQueryOperation(0U).m_parallelism != Parallelism_adaptive);
m_rootFragCount
- = getRoot().getQueryOperationDef().getTable().getFragmentCount();
+ = MIN(getRoot().getQueryOperationDef().getTable().getFragmentCount(),
+ getQueryOperation(0U).m_parallelism);
}
Ndb* const ndb = m_transaction.getNdb();
@@ -2682,14 +2709,13 @@ NdbQueryImpl::prepareSend()
for (Uint32 opNo = 0; opNo < getNoOfOperations(); opNo++)
{
const NdbQueryOperationImpl& op = getQueryOperation(opNo);
+ // Add space for m_correlations, m_buffer & m_batchOverflowCheck
+ totalBuffSize += (sizeof(TupleCorrelation) * op.getMaxBatchRows());
totalBuffSize += (op.getRowSize() * op.getMaxBatchRows());
+ totalBuffSize += sizeof(Uint32); // Overflow check
}
- /**
- * Add one word per ResultStream for buffer overrun check. We add a word
- * rather than a byte to avoid possible alignment problems.
- */
- m_rowBufferAlloc.init(m_rootFragCount *
- (totalBuffSize + (sizeof(Uint32) * getNoOfOperations())) );
+
+ m_rowBufferAlloc.init(m_rootFragCount * totalBuffSize);
if (getQueryDef().isScanQuery())
{
Uint32 totalRows = 0;
@@ -3455,8 +3481,6 @@ NdbQueryImpl::OrderedFragSet::getCurrent
{
if (m_ordering!=NdbQueryOptions::ScanOrdering_unordered)
{
- // Results should be ordered.
- assert(verifySortOrder());
/**
* Must have tuples for each (non-completed) fragment when doing ordered
* scan.
@@ -3667,7 +3691,8 @@ NdbQueryOperationImpl::NdbQueryOperation
m_ordering(NdbQueryOptions::ScanOrdering_unordered),
m_interpretedCode(NULL),
m_diskInUserProjection(false),
- m_parallelism(0),
+ m_parallelism(def.getQueryOperationIx() == 0
+ ? Parallelism_max : Parallelism_adaptive),
m_rowSize(0xffffffff)
{
if (errno == ENOMEM)
@@ -4266,9 +4291,10 @@ NdbQueryOperationImpl
m_ndbRecord,
m_firstRecAttr,
0, // Key size.
- getRoot().m_parallelism > 0 ?
- getRoot().m_parallelism :
- m_queryImpl.getRootFragCount(),
+ getRoot().m_parallelism
+ == Parallelism_max ?
+ m_queryImpl.getRootFragCount() :
+ getRoot().m_parallelism,
maxBatchRows,
batchByteSize,
firstBatchRows);
@@ -4454,7 +4480,12 @@ NdbQueryOperationImpl::prepareAttrInfo(U
firstBatchRows);
assert(batchRows==getMaxBatchRows());
assert(batchRows==firstBatchRows);
- requestInfo |= QN_ScanIndexParameters::SIP_PARALLEL; // FIXME: SPJ always assume. SIP_PARALLEL
+ assert(m_parallelism == Parallelism_max ||
+ m_parallelism == Parallelism_adaptive);
+ if (m_parallelism == Parallelism_max)
+ {
+ requestInfo |= QN_ScanIndexParameters::SIP_PARALLEL;
+ }
param->requestInfo = requestInfo;
param->batchSize = ((Uint16)batchByteSize << 16) | (Uint16)firstBatchRows;
param->resultData = getIdOfReceiver();
@@ -4786,7 +4817,7 @@ NdbQueryOperationImpl::execTRANSID_AI(co
if (getQueryDef().isScanQuery())
{
const CorrelationData correlData(ptr, len);
- const Uint32 receiverId = CorrelationData(ptr, len).getRootReceiverId();
+ const Uint32 receiverId = correlData.getRootReceiverId();
/** receiverId holds the Id of the receiver of the corresponding stream
* of the root operation. We can thus find the correct root fragment
@@ -4958,7 +4989,7 @@ NdbQueryOperationImpl::setOrdering(NdbQu
return -1;
}
- if (m_parallelism != 0)
+ if (m_parallelism != Parallelism_max)
{
getQuery().setErrorCode(QRY_SEQUENTIAL_SCAN_SORTED);
return -1;
@@ -5053,10 +5084,40 @@ int NdbQueryOperationImpl::setParallelis
getQuery().setErrorCode(Err_FunctionNotImplemented);
return -1;
}
+ else if (parallelism < 1 || parallelism > MAX_NDB_PARTITIONS)
+ {
+ getQuery().setErrorCode(Err_ParameterError);
+ return -1;
+ }
m_parallelism = parallelism;
return 0;
}
+int NdbQueryOperationImpl::setMaxParallelism(){
+ if (!getQueryOperationDef().isScanOperation())
+ {
+ getQuery().setErrorCode(QRY_WRONG_OPERATION_TYPE);
+ return -1;
+ }
+ m_parallelism = Parallelism_max;
+ return 0;
+}
+
+int NdbQueryOperationImpl::setAdaptiveParallelism(){
+ if (!getQueryOperationDef().isScanOperation())
+ {
+ getQuery().setErrorCode(QRY_WRONG_OPERATION_TYPE);
+ return -1;
+ }
+ else if (getQueryOperationDef().getQueryOperationIx() == 0)
+ {
+ getQuery().setErrorCode(Err_FunctionNotImplemented);
+ return -1;
+ }
+ m_parallelism = Parallelism_adaptive;
+ return 0;
+}
+
int NdbQueryOperationImpl::setBatchSize(Uint32 batchSize){
if (!getQueryOperationDef().isScanOperation())
{
@@ -5124,12 +5185,6 @@ Uint32 NdbQueryOperationImpl::getRowSize
{
m_rowSize =
NdbReceiver::ndbrecord_rowsize(m_ndbRecord, m_firstRecAttr, 0, false);
-
- const bool withCorrelation = getRoot().getQueryDef().isScanQuery();
- if (withCorrelation)
- {
- m_rowSize += TupleCorrelation::wordCount*sizeof(Uint32);
- }
}
return m_rowSize;
}
=== modified file 'storage/ndb/src/ndbapi/NdbQueryOperation.hpp'
--- a/storage/ndb/src/ndbapi/NdbQueryOperation.hpp 2011-06-20 13:25:48 +0000
+++ b/storage/ndb/src/ndbapi/NdbQueryOperation.hpp 2011-08-22 08:35:35 +0000
@@ -335,13 +335,29 @@ public:
NdbQueryOptions::ScanOrdering getOrdering() const;
/**
- * Set the number of fragments to be scanned in parallel for the root
- * operation of this query. This only applies to table scans and non-sorted
- * scans of ordered indexes.
+ * Set the number of fragments to be scanned in parallel. This only applies
+ * to table scans and non-sorted scans of ordered indexes. This method is
+ * only implemented for then root scan operation.
* @return 0 if ok, -1 in case of error (call getNdbError() for details.)
*/
int setParallelism(Uint32 parallelism);
+ /**
+ * Set the number of fragments to be scanned in parallel to the maximum
+ * possible value. This is the default for the root scan operation.
+ * @return 0 if ok, -1 in case of error (call getNdbError() for details.)
+ */
+ int setMaxParallelism();
+
+ /**
+ * Let the system dynamically choose the number of fragments to scan in
+ * parallel. The system will try to choose a value that gives optimal
+ * performance. This is the default for all scans but the root scan. This
+ * method only implemented for non-root scan operations.
+ * @return 0 if ok, -1 in case of error (call getNdbError() for details.)
+ */
+ int setAdaptiveParallelism();
+
/** Set the batch size (max rows per batch) for this operation. This
* only applies to scan operations, as lookup operations always will
* have the same batch size as its parent operation, or 1 if it is the
=== modified file 'storage/ndb/src/ndbapi/NdbQueryOperationImpl.hpp'
--- a/storage/ndb/src/ndbapi/NdbQueryOperationImpl.hpp 2011-08-17 13:16:22 +0000
+++ b/storage/ndb/src/ndbapi/NdbQueryOperationImpl.hpp 2011-08-22 08:35:35 +0000
@@ -661,14 +661,30 @@ public:
NdbQueryOptions::ScanOrdering getOrdering() const
{ return m_ordering; }
- /**
- * Set the number of fragments to be scanned in parallel for the root
- * operation of this query. This only applies to table scans and non-sorted
- * scans of ordered indexes.
+ /**
+ * Set the number of fragments to be scanned in parallel. This only applies
+ * to table scans and non-sorted scans of ordered indexes. This method is
+ * only implemented for then root scan operation.
* @return 0 if ok, -1 in case of error (call getNdbError() for details.)
*/
int setParallelism(Uint32 parallelism);
+ /**
+ * Set the number of fragments to be scanned in parallel to the maximum
+ * possible value. This is the default for the root scan operation.
+ * @return 0 if ok, -1 in case of error (call getNdbError() for details.)
+ */
+ int setMaxParallelism();
+
+ /**
+ * Let the system dynamically choose the number of fragments to scan in
+ * parallel. The system will try to choose a value that gives optimal
+ * performance. This is the default for all scans but the root scan. This
+ * method only implemented for non-root scan operations.
+ * @return 0 if ok, -1 in case of error (call getNdbError() for details.)
+ */
+ int setAdaptiveParallelism();
+
/** Set the batch size (max rows per batch) for this operation. This
* only applies to scan operations, as lookup operations always will
* have the same batch size as its parent operation, or 1 if it is the
No bundle (reason: useless for push emails).
| Thread |
|---|
| • bzr push into mysql-5.1-telco-7.0-wl4124-new2 branch (pekka.nousiainen:4472to 4473) | Pekka Nousiainen | 29 Aug |