From: Pekka Nousiainen Date: August 29 2011 2:37pm Subject: bzr push into mysql-5.1-telco-7.0-wl4124-new2 branch (pekka.nousiainen:4472 to 4473) List-Archive: http://lists.mysql.com/commits/140839 Message-Id: <20110829143751.BF5165587A@sama.localdomain> MIME-Version: 1.0 Content-Type: text/plain; charset="us-ascii" Content-Transfer-Encoding: 7bit 4473 Pekka Nousiainen 2011-08-29 [merge] merge telco-7.0 to wl4124-new2 added: storage/ndb/src/kernel/vm/mt_thr_config.cpp storage/ndb/src/kernel/vm/mt_thr_config.hpp modified: .bzr-mysql/default.conf mysql-test/suite/ndb/r/ndb_index_stat.result mysql-test/suite/ndb/r/ndb_statistics1.result mysql-test/suite/ndb/t/ndb_index_stat.test sql/ha_ndbcluster.cc sql/ha_ndbcluster_binlog.cc sql/ha_ndbinfo.cc storage/ndb/include/util/SparseBitmask.hpp storage/ndb/src/kernel/SimBlockList.cpp storage/ndb/src/kernel/blocks/dbspj/Dbspj.hpp storage/ndb/src/kernel/blocks/dbspj/DbspjMain.cpp storage/ndb/src/kernel/vm/Configuration.cpp storage/ndb/src/kernel/vm/Configuration.hpp storage/ndb/src/kernel/vm/Makefile.am storage/ndb/src/kernel/vm/SimulatedBlock.cpp storage/ndb/src/kernel/vm/SimulatedBlock.hpp storage/ndb/src/kernel/vm/dummy_nonmt.cpp storage/ndb/src/kernel/vm/mt.cpp storage/ndb/src/kernel/vm/mt.hpp storage/ndb/src/ndbapi/NdbQueryOperation.cpp storage/ndb/src/ndbapi/NdbQueryOperation.hpp storage/ndb/src/ndbapi/NdbQueryOperationImpl.hpp 4472 Pekka Nousiainen 2011-08-21 wl#4124 g01_enable.diff MTR: index-stat-enable=1 (but AutoCreate OFF) modified: mysql-test/suite/ndb/my.cnf mysql-test/suite/ndb/r/ndb_index.result mysql-test/suite/ndb/r/ndb_index_stat.result mysql-test/suite/ndb/r/ndb_statistics0.result mysql-test/suite/ndb/r/ndb_statistics1.result mysql-test/suite/ndb/t/ndb_index_stat_enable.inc mysql-test/suite/ndb/t/ndb_share.cnf === modified file '.bzr-mysql/default.conf' --- a/.bzr-mysql/default.conf 2011-08-17 09:09:22 +0000 +++ b/.bzr-mysql/default.conf 2011-08-29 14:35:32 +0000 @@ -1,4 +1,4 @@ [MYSQL] post_commit_to = commits@stripped post_push_to = commits@stripped -tree_name = mysql-5.1-telco-7.0 +tree_name = mysql-5.1-telco-7.0-wl4124-new2 === modified file 'mysql-test/suite/ndb/r/ndb_index_stat.result' --- a/mysql-test/suite/ndb/r/ndb_index_stat.result 2011-08-21 08:48:41 +0000 +++ b/mysql-test/suite/ndb/r/ndb_index_stat.result 2011-08-29 14:35:32 +0000 @@ -473,6 +473,18 @@ select count(*) from t1 where f > '222'; count(*) 1 drop table t1; +create table t1 (a1 int, b1 int, primary key(b1), key(a1)) engine=ndbcluster partition by key() partitions 1; +create table t2 (b2 int, c2 int, primary key(b2,c2)) engine=ndbcluster partition by key() partitions 1; +# table t1 is only for forcing record by key count for table t2 that should be near 50 (not 1) +analyze table t1, t2; +Table Op Msg_type Msg_text +test.t1 analyze status OK +test.t2 analyze status OK +explain select * from t1, t2 where b2 = b1 and a1 = 1; +id select_type table type possible_keys key key_len ref rows Extra +1 SIMPLE t1 ref PRIMARY,a1 a1 5 const 2 # +1 SIMPLE t2 ref PRIMARY PRIMARY 4 test.t1.b1 50 # +drop table t1, t2; set @is_enable = @is_enable_default; set @is_enable = NULL; # is_enable_on=0 is_enable_off=0 === modified file 'mysql-test/suite/ndb/r/ndb_statistics1.result' --- a/mysql-test/suite/ndb/r/ndb_statistics1.result 2011-08-21 08:48:41 +0000 +++ b/mysql-test/suite/ndb/r/ndb_statistics1.result 2011-08-29 14:35:32 +0000 @@ -75,7 +75,7 @@ SELECT * FROM t10000 AS X JOIN t10000 AS ON Y.I=X.I AND Y.J = X.I; id select_type table type possible_keys key key_len ref rows Extra 1 SIMPLE X ALL I NULL NULL NULL 10000 -1 SIMPLE Y ref J,I I 10 test.X.I,test.X.I 1 Using where +1 SIMPLE Y ref J,I J 5 test.X.I 1 Using where EXPLAIN SELECT * FROM t100 WHERE k < 42; id select_type table type possible_keys key key_len ref rows Extra @@ -143,11 +143,11 @@ id select_type table type possible_keys EXPLAIN SELECT * FROM t10000 WHERE J = 0 AND K < 1; id select_type table type possible_keys key key_len ref rows Extra -1 SIMPLE t10000 range PRIMARY,J PRIMARY 4 NULL 2 Using where with pushed condition +1 SIMPLE t10000 ref PRIMARY,J J 5 const 2 Using where with pushed condition EXPLAIN SELECT * FROM t10000 WHERE J = 0 AND K BETWEEN 1 AND 10; id select_type table type possible_keys key key_len ref rows Extra -1 SIMPLE t10000 range PRIMARY,J PRIMARY 4 NULL 2 Using where with pushed condition +1 SIMPLE t10000 ref PRIMARY,J J 5 const 2 Using where with pushed condition EXPLAIN SELECT * FROM t10000 WHERE J = 0 AND K = 1; id select_type table type possible_keys key key_len ref rows Extra === modified file 'mysql-test/suite/ndb/t/ndb_index_stat.test' --- a/mysql-test/suite/ndb/t/ndb_index_stat.test 2011-07-23 14:35:37 +0000 +++ b/mysql-test/suite/ndb/t/ndb_index_stat.test 2011-08-28 13:29:22 +0000 @@ -309,5 +309,30 @@ while ($i) } drop table t1; +# +# Check estimates of records per key for partial keys using unique/primary ordered index +# + +create table t1 (a1 int, b1 int, primary key(b1), key(a1)) engine=ndbcluster partition by key() partitions 1; +create table t2 (b2 int, c2 int, primary key(b2,c2)) engine=ndbcluster partition by key() partitions 1; + +--disable_query_log +let $i = 100; +while ($i) +{ + eval insert into t1 (a1,b1) values ($i,$i); + eval insert into t2 (b2,c2) values ($i mod 2, $i div 2); + dec $i; +} +--enable_query_log + +--echo # table t1 is only for forcing record by key count for table t2 that should be near 50 (not 1) +analyze table t1, t2; +# Hide Extra column +--replace_column 10 # +explain select * from t1, t2 where b2 = b1 and a1 = 1; + +drop table t1, t2; + set @is_enable = @is_enable_default; source ndb_index_stat_enable.inc; === modified file 'sql/ha_ndbcluster.cc' --- a/sql/ha_ndbcluster.cc 2011-07-23 14:38:08 +0000 +++ b/sql/ha_ndbcluster.cc 2011-08-28 13:29:22 +0000 @@ -164,6 +164,11 @@ static MYSQL_THDVAR_ULONG( 0 /* block */ ); +#if NDB_VERSION_D < NDB_MAKE_VERSION(7,2,0) +#define DEFAULT_NDB_INDEX_STAT_ENABLE FALSE +#else +#define DEFAULT_NDB_INDEX_STAT_ENABLE TRUE +#endif static MYSQL_THDVAR_BOOL( index_stat_enable, /* name */ @@ -171,7 +176,7 @@ static MYSQL_THDVAR_BOOL( "Use ndb index statistics in query optimization.", NULL, /* check func. */ NULL, /* update func. */ - FALSE /* default */ + DEFAULT_NDB_INDEX_STAT_ENABLE /* default */ ); @@ -1353,19 +1358,22 @@ void ha_ndbcluster::set_rec_per_key() */ for (uint i=0 ; i < table_share->keys ; i++) { + bool is_unique_index= false; KEY* key_info= table->key_info + i; switch (get_index_type(i)) { - case UNIQUE_ORDERED_INDEX: - case PRIMARY_KEY_ORDERED_INDEX: case UNIQUE_INDEX: case PRIMARY_KEY_INDEX: { // Index is unique when all 'key_parts' are specified, // else distribution is unknown and not specified here. - key_info->rec_per_key[key_info->key_parts-1]= 1; + is_unique_index= true; break; } + case UNIQUE_ORDERED_INDEX: + case PRIMARY_KEY_ORDERED_INDEX: + is_unique_index= true; + // intentional fall thru to logic for ordered index case ORDERED_INDEX: // 'Records pr. key' are unknown for non-unique indexes. // (May change when we get better index statistics.) @@ -1395,6 +1403,11 @@ void ha_ndbcluster::set_rec_per_key() default: DBUG_ASSERT(false); } + // set rows per key to 1 for complete key given for unique/primary index + if (is_unique_index) + { + key_info->rec_per_key[key_info->key_parts-1]= 1; + } } DBUG_VOID_RETURN; } === modified file 'sql/ha_ndbcluster_binlog.cc' --- a/sql/ha_ndbcluster_binlog.cc 2011-07-08 12:28:37 +0000 +++ b/sql/ha_ndbcluster_binlog.cc 2011-08-27 12:12:27 +0000 @@ -6299,8 +6299,8 @@ ndb_binlog_thread_handle_data_event(Ndb MY_BITMAP b; /* Potential buffer for the bitmap */ uint32 bitbuf[128 / (sizeof(uint32) * 8)]; - bitmap_init(&b, n_fields <= sizeof(bitbuf) * 8 ? bitbuf : NULL, - n_fields, FALSE); + const bool own_buffer = n_fields <= sizeof(bitbuf) * 8; + bitmap_init(&b, own_buffer ? bitbuf : NULL, n_fields, FALSE); bitmap_set_all(&b); /* @@ -6463,6 +6463,11 @@ ndb_binlog_thread_handle_data_event(Ndb my_free(blobs_buffer[1], MYF(MY_ALLOW_ZERO_PTR)); } + if (!own_buffer) + { + bitmap_free(&b); + } + return 0; } === modified file 'sql/ha_ndbinfo.cc' --- a/sql/ha_ndbinfo.cc 2011-06-30 15:59:25 +0000 +++ b/sql/ha_ndbinfo.cc 2011-08-27 09:54:26 +0000 @@ -367,6 +367,7 @@ int ha_ndbinfo::open(const char *name, i int err = g_ndbinfo->openTable(name, &m_impl.m_table); if (err) { + assert(m_impl.m_table == 0); if (err == NdbInfo::ERR_NoSuchTable) DBUG_RETURN(HA_ERR_NO_SUCH_TABLE); DBUG_RETURN(err2mysql(err)); @@ -390,6 +391,7 @@ int ha_ndbinfo::open(const char *name, i warn_incompatible(ndb_tab, true, "column '%s' is NOT NULL", field->field_name); + delete m_impl.m_table; m_impl.m_table= 0; DBUG_RETURN(ERR_INCOMPAT_TABLE_DEF); } @@ -427,6 +429,7 @@ int ha_ndbinfo::open(const char *name, i warn_incompatible(ndb_tab, true, "column '%s' is not compatible", field->field_name); + delete m_impl.m_table; m_impl.m_table= 0; DBUG_RETURN(ERR_INCOMPAT_TABLE_DEF); } } === modified file 'storage/ndb/include/util/SparseBitmask.hpp' --- a/storage/ndb/include/util/SparseBitmask.hpp 2010-08-28 09:37:09 +0000 +++ b/storage/ndb/include/util/SparseBitmask.hpp 2011-08-26 09:20:08 +0000 @@ -20,6 +20,7 @@ #include #include +#include class SparseBitmask { unsigned m_max_size; @@ -102,6 +103,11 @@ public: bool isclear() const { return count() == 0; } + unsigned getBitNo(unsigned n) const { + assert(n < m_vec.size()); + return m_vec[n]; + } + void print(void) const { for (unsigned i = 0; i < m_vec.size(); i++) { @@ -110,6 +116,38 @@ public: } } + bool equal(const SparseBitmask& obj) const { + if (obj.count() != count()) + return false; + + for (unsigned i = 0; iloadWorkers(); } + finalize_thr_map(); } // Check that all blocks could be created === modified file 'storage/ndb/src/kernel/blocks/dbspj/Dbspj.hpp' --- a/storage/ndb/src/kernel/blocks/dbspj/Dbspj.hpp 2011-07-04 13:37:56 +0000 +++ b/storage/ndb/src/kernel/blocks/dbspj/Dbspj.hpp 2011-08-22 08:35:35 +0000 @@ -530,21 +530,87 @@ public: typedef SLFifoListImpl ScanFragHandle_list; typedef LocalSLFifoListImpl Local_ScanFragHandle_list; + /** + * This class computes mean and standard deviation incrementally for a series + * of samples. + */ + class IncrementalStatistics + { + public: + /** + * We cannot have a (non-trivial) constructor, since this class is used in + * unions. + */ + void init() + { + m_mean = m_sumSquare = 0.0; + m_noOfSamples = 0; + } + + // Add another sample. + void update(double sample); + + double getMean() const { return m_mean; } + + double getStdDev() const { + return m_noOfSamples < 2 ? 0.0 : sqrt(m_sumSquare/(m_noOfSamples - 1)); + } + + private: + // Mean of all samples + double m_mean; + //Sum of square of differences from the current mean. + double m_sumSquare; + Uint32 m_noOfSamples; + }; // IncrementalStatistics + struct ScanIndexData { Uint16 m_frags_complete; Uint16 m_frags_outstanding; + /** + * The number of fragment for which we have not yet sent SCAN_FRAGREQ but + * will eventually do so. + */ + Uint16 m_frags_not_started; Uint32 m_rows_received; // #execTRANSID_AI Uint32 m_rows_expecting; // Sum(ScanFragConf) Uint32 m_batch_chunks; // #SCAN_FRAGREQ + #SCAN_NEXTREQ to retrieve batch Uint32 m_scanCookie; Uint32 m_fragCount; + // The number of fragments that we scan in parallel. + Uint32 m_parallelism; + /** + * True if this is the first instantiation of this operation. A child + * operation will be instantiated once for each batch of its parent. + */ + bool m_firstExecution; + /** + * Mean and standard deviation for the optimal parallelism for earlier + * executions of this operation. + */ + IncrementalStatistics m_parallelismStat; + // Total number of rows for the current execution of this operation. + Uint32 m_totalRows; + // Total number of bytes for the current execution of this operation. + Uint32 m_totalBytes; + ScanFragHandle_list::HeadPOD m_fragments; // ScanFrag states union { PatternStore::HeadPOD m_prunePattern; Uint32 m_constPrunePtrI; }; + /** + * Max number of rows seen in a batch. Used for calculating the number of + * rows per fragment in the next next batch when using adaptive batch size. + */ + Uint32 m_largestBatchRows; + /** + * Max number of bytes seen in a batch. Used for calculating the number of + * rows per fragment in the next next batch when using adaptive batch size. + */ + Uint32 m_largestBatchBytes; Uint32 m_scanFragReq[ScanFragReq::SignalLength + 2]; }; @@ -1164,6 +1230,13 @@ private: void scanIndex_parent_row(Signal*,Ptr,Ptr, const RowPtr&); void scanIndex_fixupBound(Ptr fragPtr, Uint32 ptrI, Uint32); void scanIndex_send(Signal*,Ptr,Ptr); + void scanIndex_send(Signal* signal, + Ptr requestPtr, + Ptr treeNodePtr, + Uint32 noOfFrags, + Uint32 bs_bytes, + Uint32 bs_rows, + Uint32& batchRange); void scanIndex_batchComplete(Signal* signal); Uint32 scanIndex_findFrag(Local_ScanFragHandle_list &, Ptr&, Uint32 fragId); === modified file 'storage/ndb/src/kernel/blocks/dbspj/DbspjMain.cpp' --- a/storage/ndb/src/kernel/blocks/dbspj/DbspjMain.cpp 2011-07-04 13:37:56 +0000 +++ b/storage/ndb/src/kernel/blocks/dbspj/DbspjMain.cpp 2011-08-22 11:50:41 +0000 @@ -4450,6 +4450,9 @@ Dbspj::parseScanIndex(Build_context& ctx data.m_fragments.init(); data.m_frags_outstanding = 0; data.m_frags_complete = 0; + data.m_frags_not_started = 0; + data.m_parallelismStat.init(); + data.m_firstExecution = true; data.m_batch_chunks = 0; err = parseDA(ctx, requestPtr, treeNodePtr, @@ -5002,6 +5005,7 @@ Dbspj::scanIndex_parent_batch_complete(S } } } + data.m_frags_not_started = data.m_fragCount - data.m_frags_complete; if (data.m_frags_complete == data.m_fragCount) { @@ -5015,7 +5019,120 @@ Dbspj::scanIndex_parent_batch_complete(S /** * When parent's batch is complete, we send our batch */ - scanIndex_send(signal, requestPtr, treeNodePtr); + const ScanFragReq * org = (const ScanFragReq*)data.m_scanFragReq; + ndbassert(org->batch_size_rows >= data.m_fragCount - data.m_frags_complete); + + if (treeNodePtr.p->m_bits & TreeNode::T_SCAN_PARALLEL) + { + jam(); + data.m_parallelism = data.m_fragCount - data.m_frags_complete; + } + else if (data.m_firstExecution) + { + /** + * Having a high parallelism would allow us to fetch data from many + * fragments in parallel and thus reduce the number of round trips. + * On the other hand, we should set parallelism so low that we can fetch + * all data from a fragment in one batch if possible. + * Since this is the first execution, we do not know how many rows or bytes + * this operation is likely to return. Therefore we set parallelism to 1, + * since this gives the lowest penalty if our guess is wrong. + */ + jam(); + data.m_parallelism = 1; + } + else + { + jam(); + /** + * Use statistics from earlier runs of this operation to estimate the + * initial parallelism. We use the mean minus two times the standard + * deviation to have a low risk of setting parallelism to high (as erring + * in the other direction is more costly). + */ + Int32 parallelism = + static_cast(data.m_parallelismStat.getMean() + - 2 * data.m_parallelismStat.getStdDev()); + + if (parallelism < 1) + { + jam(); + parallelism = 1; + } + else if ((data.m_fragCount - data.m_frags_complete) % parallelism != 0) + { + jam(); + /** + * Set parallelism such that we can expect to have similar + * parallelism in each batch. For example if there are 8 remaining + * fragments, then we should fecth 2 times 4 fragments rather than + * 7+1. + */ + const Int32 roundTrips = + 1 + (data.m_fragCount - data.m_frags_complete) / parallelism; + parallelism = (data.m_fragCount - data.m_frags_complete) / roundTrips; + } + + data.m_parallelism = static_cast(parallelism); + +#ifdef DEBUG_SCAN_FRAGREQ + DEBUG("::scanIndex_send() starting index scan with parallelism=" + << data.m_parallelism); +#endif + } + ndbrequire(data.m_parallelism > 0); + + const Uint32 bs_rows = org->batch_size_rows/ data.m_parallelism; + const Uint32 bs_bytes = org->batch_size_bytes / data.m_parallelism; + ndbassert(bs_rows > 0); + ndbassert(bs_bytes > 0); + + data.m_largestBatchRows = 0; + data.m_largestBatchBytes = 0; + data.m_totalRows = 0; + data.m_totalBytes = 0; + + { + Local_ScanFragHandle_list list(m_scanfraghandle_pool, data.m_fragments); + Ptr fragPtr; + list.first(fragPtr); + + while(!fragPtr.isNull()) + { + ndbassert(fragPtr.p->m_state == ScanFragHandle::SFH_NOT_STARTED || + fragPtr.p->m_state == ScanFragHandle::SFH_COMPLETE); + fragPtr.p->m_state = ScanFragHandle::SFH_NOT_STARTED; + list.next(fragPtr); + } + } + + Uint32 batchRange = 0; + scanIndex_send(signal, + requestPtr, + treeNodePtr, + data.m_parallelism, + bs_bytes, + bs_rows, + batchRange); + + data.m_firstExecution = false; + + if (treeNodePtr.p->m_bits & TreeNode::T_SCAN_PARALLEL) + { + ndbrequire((data.m_frags_outstanding + data.m_frags_complete) == + data.m_fragCount); + } + else + { + ndbrequire(static_cast(data.m_frags_outstanding + + data.m_frags_complete) <= + data.m_fragCount); + } + + data.m_batch_chunks = 1; + requestPtr.p->m_cnt_active++; + requestPtr.p->m_outstanding++; + treeNodePtr.p->m_state = TreeNode::TN_ACTIVE; } void @@ -5045,77 +5162,91 @@ Dbspj::scanIndex_parent_batch_repeat(Sig } } +/** + * Ask for the first batch for a number of fragments. + */ void Dbspj::scanIndex_send(Signal* signal, Ptr requestPtr, - Ptr treeNodePtr) + Ptr treeNodePtr, + Uint32 noOfFrags, + Uint32 bs_bytes, + Uint32 bs_rows, + Uint32& batchRange) { - jam(); - - ScanIndexData& data = treeNodePtr.p->m_scanindex_data; - const ScanFragReq * org = (const ScanFragReq*)data.m_scanFragReq; - - Uint32 cnt = 1; - Uint32 bs_rows = org->batch_size_rows; - Uint32 bs_bytes = org->batch_size_bytes; - if (treeNodePtr.p->m_bits & TreeNode::T_SCAN_PARALLEL) - { - jam(); - cnt = data.m_fragCount - data.m_frags_complete; - ndbrequire(cnt > 0); - - bs_rows /= cnt; - bs_bytes /= cnt; - ndbassert(bs_rows > 0); - } - /** * if (m_bits & prunemask): * - Range keys sliced out to each ScanFragHandle * - Else, range keys kept on first (and only) ScanFragHandle */ - Uint32 prunemask = TreeNode::T_PRUNE_PATTERN | TreeNode::T_CONST_PRUNE; + const bool prune = treeNodePtr.p->m_bits & + (TreeNode::T_PRUNE_PATTERN | TreeNode::T_CONST_PRUNE); /** - * Don't release keyInfo if it may be sent multiple times, eiter: - * - Not pruned -> same keyInfo goes to all datanodes. - * - Result handling is REPEAT_SCAN_RESULT and same batch may be - * repeated multiple times due to incomplete bushy X-scans. - * (by ::scanIndex_parent_batch_repeat()) - * - * When not released, ::scanIndex_parent_batch_cleanup() will - * eventually release them when preparing arrival of a new parent batch. + * If scan is repeatable, we must make sure not to release range keys so + * that we canuse them again in the next repetition. */ - const bool release = ((treeNodePtr.p->m_bits & prunemask) != 0 && - (treeNodePtr.p->m_bits & TreeNode::T_SCAN_REPEATABLE) == 0); + const bool repeatable = + (treeNodePtr.p->m_bits & TreeNode::T_SCAN_REPEATABLE) != 0; - ScanFragReq* req = reinterpret_cast(signal->getDataPtrSend()); + ScanIndexData& data = treeNodePtr.p->m_scanindex_data; + ndbassert(noOfFrags > 0); + ndbassert(data.m_frags_not_started >= noOfFrags); + ScanFragReq* const req = + reinterpret_cast(signal->getDataPtrSend()); + const ScanFragReq * const org + = reinterpret_cast(data.m_scanFragReq); memcpy(req, org, sizeof(data.m_scanFragReq)); // req->variableData[0] // set below req->variableData[1] = requestPtr.p->m_rootResultData; req->batch_size_bytes = bs_bytes; req->batch_size_rows = bs_rows; - Ptr fragPtr; + Uint32 requestsSent = 0; Local_ScanFragHandle_list list(m_scanfraghandle_pool, data.m_fragments); - - Uint32 keyInfoPtrI = RNIL; + Ptr fragPtr; list.first(fragPtr); - if ((treeNodePtr.p->m_bits & prunemask) == 0) + Uint32 keyInfoPtrI = fragPtr.p->m_rangePtrI; + ndbrequire(prune || keyInfoPtrI != RNIL); + /** + * Iterate over the list of fragments until we have sent as many + * SCAN_FRAGREQs as we should. + */ + while (requestsSent < noOfFrags) { jam(); - keyInfoPtrI = fragPtr.p->m_rangePtrI; - ndbrequire(keyInfoPtrI != RNIL); - } + ndbassert(!fragPtr.isNull()); - Uint32 batchRange = 0; - for (Uint32 i = 0; i < cnt && !fragPtr.isNull(); list.next(fragPtr)) - { - jam(); + if (fragPtr.p->m_state != ScanFragHandle::SFH_NOT_STARTED) + { + // Skip forward to the frags that we should send. + jam(); + list.next(fragPtr); + continue; + } + + const Uint32 ref = fragPtr.p->m_ref; + + if (noOfFrags==1 && !prune && + data.m_frags_not_started == data.m_fragCount && + refToNode(ref) != getOwnNodeId() && + list.hasNext(fragPtr)) + { + /** + * If we are doing a scan with adaptive parallelism and start with + * parallelism=1 then it makes sense to fetch a batch from a fragment on + * the local data node. The reason for this is that if that fragment + * contains few rows, we may be able to read from several fragments in + * parallel. Then we minimize the total number of round trips (to remote + * data nodes) if we fetch the first fragment batch locally. + */ + jam(); + list.next(fragPtr); + continue; + } SectionHandle handle(this); - Uint32 ref = fragPtr.p->m_ref; Uint32 attrInfoPtrI = treeNodePtr.p->m_send.m_attrInfoPtrI; /** @@ -5124,26 +5255,34 @@ Dbspj::scanIndex_send(Signal* signal, req->senderData = fragPtr.i; req->fragmentNoKeyLen = fragPtr.p->m_fragId; - if ((treeNodePtr.p->m_bits & prunemask)) + if (prune) { jam(); keyInfoPtrI = fragPtr.p->m_rangePtrI; if (keyInfoPtrI == RNIL) { + /** + * Since we use pruning, we can see that no parent rows would hash + * to this fragment. + */ jam(); fragPtr.p->m_state = ScanFragHandle::SFH_COMPLETE; + list.next(fragPtr); continue; } - } - if (release) - { - /** - * If we'll use sendSignal() and we need to send the attrInfo several - * times, we need to copy them - */ - Uint32 tmp = RNIL; - ndbrequire(dupSection(tmp, attrInfoPtrI)); // TODO handle error - attrInfoPtrI = tmp; + + if (!repeatable) + { + /** + * If we'll use sendSignal() and we need to send the attrInfo several + * times, we need to copy them. (For repeatable or unpruned scans + * we use sendSignalNoRelease(), so then we do not need to copy.) + */ + jam(); + Uint32 tmp = RNIL; + ndbrequire(dupSection(tmp, attrInfoPtrI)); // TODO handle error + attrInfoPtrI = tmp; + } } req->variableData[0] = batchRange; @@ -5152,16 +5291,14 @@ Dbspj::scanIndex_send(Signal* signal, handle.m_cnt = 2; #if defined DEBUG_SCAN_FRAGREQ - { - ndbout_c("SCAN_FRAGREQ to %x", ref); - printSCAN_FRAGREQ(stdout, signal->getDataPtrSend(), - NDB_ARRAY_SIZE(treeNodePtr.p->m_scanfrag_data.m_scanFragReq), - DBLQH); - printf("ATTRINFO: "); - print(handle.m_ptr[0], stdout); - printf("KEYINFO: "); - print(handle.m_ptr[1], stdout); - } + ndbout_c("SCAN_FRAGREQ to %x", ref); + printSCAN_FRAGREQ(stdout, signal->getDataPtrSend(), + NDB_ARRAY_SIZE(treeNodePtr.p->m_scanfrag_data.m_scanFragReq), + DBLQH); + printf("ATTRINFO: "); + print(handle.m_ptr[0], stdout); + printf("KEYINFO: "); + print(handle.m_ptr[1], stdout); #endif if (refToNode(ref) == getOwnNodeId()) @@ -5173,8 +5310,13 @@ Dbspj::scanIndex_send(Signal* signal, c_Counters.incr_counter(CI_REMOTE_RANGE_SCANS_SENT, 1); } - if (release) + if (prune && !repeatable) { + /** + * For a non-repeatable pruned scan, key info is unique for each + * fragment and therefore cannot be reused, so we release key info + * right away. + */ jam(); sendSignal(ref, GSN_SCAN_FRAGREQ, signal, NDB_ARRAY_SIZE(data.m_scanFragReq), JBB, &handle); @@ -5183,32 +5325,24 @@ Dbspj::scanIndex_send(Signal* signal, } else { + /** + * Reuse key info for multiple fragments and/or multiple repetitions + * of the scan. + */ jam(); sendSignalNoRelease(ref, GSN_SCAN_FRAGREQ, signal, NDB_ARRAY_SIZE(data.m_scanFragReq), JBB, &handle); } handle.clear(); - i++; fragPtr.p->m_state = ScanFragHandle::SFH_SCANNING; // running data.m_frags_outstanding++; batchRange += bs_rows; - } + requestsSent++; + list.next(fragPtr); + } // while (requestsSent < noOfFrags) - if (treeNodePtr.p->m_bits & TreeNode::T_SCAN_PARALLEL) - { - ndbrequire(data.m_frags_outstanding == - data.m_fragCount - data.m_frags_complete); - } - else - { - ndbrequire(data.m_frags_outstanding == 1); - } - - data.m_batch_chunks = 1; - requestPtr.p->m_cnt_active++; - requestPtr.p->m_outstanding++; - treeNodePtr.p->m_state = TreeNode::TN_ACTIVE; + data.m_frags_not_started -= requestsSent; } void @@ -5282,6 +5416,10 @@ Dbspj::scanIndex_execSCAN_FRAGCONF(Signa } requestPtr.p->m_rows += rows; + data.m_totalRows += rows; + data.m_totalBytes += conf->total_len; + data.m_largestBatchRows = MAX(data.m_largestBatchRows, rows); + data.m_largestBatchBytes = MAX(data.m_largestBatchBytes, conf->total_len); if (!treeNodePtr.p->isLeaf()) { @@ -5302,7 +5440,9 @@ Dbspj::scanIndex_execSCAN_FRAGCONF(Signa ndbrequire(data.m_frags_complete < data.m_fragCount); data.m_frags_complete++; - if (data.m_frags_complete == data.m_fragCount) + if (data.m_frags_complete == data.m_fragCount || + ((requestPtr.p->m_state & Request::RS_ABORTING) != 0 && + data.m_fragCount == (data.m_frags_complete + data.m_frags_not_started))) { jam(); ndbrequire(requestPtr.p->m_cnt_active); @@ -5314,6 +5454,32 @@ Dbspj::scanIndex_execSCAN_FRAGCONF(Signa if (data.m_frags_outstanding == 0) { + const ScanFragReq * const org + = reinterpret_cast(data.m_scanFragReq); + + if (data.m_frags_complete == data.m_fragCount) + { + jam(); + /** + * Calculate what would have been the optimal parallelism for the + * scan instance that we have just completed, and update + * 'parallelismStat' with this value. We then use this statistics to set + * the initial parallelism for the next instance of this operation. + */ + double parallelism = data.m_fragCount; + if (data.m_totalRows > 0) + { + parallelism = MIN(parallelism, + double(org->batch_size_rows) / data.m_totalRows); + } + if (data.m_totalBytes > 0) + { + parallelism = MIN(parallelism, + double(org->batch_size_bytes) / data.m_totalBytes); + } + data.m_parallelismStat.update(parallelism); + } + /** * Don't reportBatchComplete to children if we're aborting... */ @@ -5364,7 +5530,7 @@ Dbspj::scanIndex_execSCAN_FRAGREF(Signal ndbrequire(data.m_frags_outstanding > 0); data.m_frags_outstanding--; - if (data.m_frags_complete == data.m_fragCount) + if (data.m_fragCount == (data.m_frags_complete + data.m_frags_not_started)) { jam(); ndbrequire(requestPtr.p->m_cnt_active); @@ -5390,21 +5556,78 @@ Dbspj::scanIndex_execSCAN_NEXTREQ(Signal jam(); ScanIndexData& data = treeNodePtr.p->m_scanindex_data; + const ScanFragReq * org = (const ScanFragReq*)data.m_scanFragReq; data.m_rows_received = 0; data.m_rows_expecting = 0; ndbassert(data.m_frags_outstanding == 0); ndbrequire(data.m_frags_complete < data.m_fragCount); - Uint32 cnt = data.m_fragCount - data.m_frags_complete; if ((treeNodePtr.p->m_bits & TreeNode::T_SCAN_PARALLEL) == 0) { jam(); - cnt = 1; + /** + * Since fetching few but large batches is more efficient, we + * set parallelism to the lowest value where we can still expect each + * batch to be full. + */ + if (data.m_largestBatchRows < org->batch_size_rows/data.m_parallelism && + data.m_largestBatchBytes < org->batch_size_bytes/data.m_parallelism) + { + jam(); + data.m_parallelism = data.m_fragCount - data.m_frags_complete; + if (data.m_largestBatchRows > 0) + { + jam(); + data.m_parallelism = + MIN(org->batch_size_rows / data.m_largestBatchRows, + data.m_parallelism); + } + if (data.m_largestBatchBytes > 0) + { + jam(); + data.m_parallelism = + MIN(data.m_parallelism, + org->batch_size_bytes/data.m_largestBatchBytes); + } + if (data.m_frags_complete == 0 && + data.m_frags_not_started % data.m_parallelism != 0) + { + jam(); + /** + * Set parallelism such that we can expect to have similar + * parallelism in each batch. For example if there are 8 remaining + * fragments, then we should fecth 2 times 4 fragments rather than + * 7+1. + */ + const Uint32 roundTrips = + 1 + data.m_frags_not_started / data.m_parallelism; + data.m_parallelism = data.m_frags_not_started / roundTrips; + } + } + else + { + jam(); + // We get full batches, so we should lower parallelism. + data.m_parallelism = MIN(data.m_fragCount - data.m_frags_complete, + MAX(1, data.m_parallelism/2)); + } + ndbassert(data.m_parallelism > 0); +#ifdef DEBUG_SCAN_FRAGREQ + DEBUG("::scanIndex_execSCAN_NEXTREQ() Asking for new batches from " << + data.m_parallelism << + " fragments with " << org->batch_size_rows/data.m_parallelism << + " rows and " << org->batch_size_bytes/data.m_parallelism << + " bytes."); +#endif + } + else + { + jam(); + data.m_parallelism = data.m_fragCount - data.m_frags_complete; } - const ScanFragReq * org = (const ScanFragReq*)data.m_scanFragReq; - const Uint32 bs_rows = org->batch_size_rows/cnt; + const Uint32 bs_rows = org->batch_size_rows/data.m_parallelism; ndbassert(bs_rows > 0); ScanFragNextReq* req = reinterpret_cast(signal->getDataPtrSend()); @@ -5413,20 +5636,27 @@ Dbspj::scanIndex_execSCAN_NEXTREQ(Signal req->transId1 = requestPtr.p->m_transId[0]; req->transId2 = requestPtr.p->m_transId[1]; req->batch_size_rows = bs_rows; - req->batch_size_bytes = org->batch_size_bytes/cnt; + req->batch_size_bytes = org->batch_size_bytes/data.m_parallelism; Uint32 batchRange = 0; Ptr fragPtr; + Uint32 sentFragCount = 0; + { + /** + * First, ask for more data from fragments that are already started. + */ Local_ScanFragHandle_list list(m_scanfraghandle_pool, data.m_fragments); list.first(fragPtr); - for (Uint32 i = 0; i < cnt && !fragPtr.isNull(); list.next(fragPtr)) + while (sentFragCount < data.m_parallelism && !fragPtr.isNull()) { jam(); + ndbassert(fragPtr.p->m_state == ScanFragHandle::SFH_WAIT_NEXTREQ || + fragPtr.p->m_state == ScanFragHandle::SFH_COMPLETE || + fragPtr.p->m_state == ScanFragHandle::SFH_NOT_STARTED); if (fragPtr.p->m_state == ScanFragHandle::SFH_WAIT_NEXTREQ) { jam(); - i++; data.m_frags_outstanding++; req->variableData[0] = batchRange; fragPtr.p->m_state = ScanFragHandle::SFH_SCANNING; @@ -5434,6 +5664,7 @@ Dbspj::scanIndex_execSCAN_NEXTREQ(Signal DEBUG("scanIndex_execSCAN_NEXTREQ to: " << hex << treeNodePtr.p->m_send.m_ref + << ", m_node_no=" << treeNodePtr.p->m_node_no << ", senderData: " << req->senderData); #ifdef DEBUG_SCAN_FRAGREQ @@ -5445,9 +5676,27 @@ Dbspj::scanIndex_execSCAN_NEXTREQ(Signal sendSignal(fragPtr.p->m_ref, GSN_SCAN_NEXTREQ, signal, ScanFragNextReq::SignalLength + 1, JBB); + sentFragCount++; + } + list.next(fragPtr); } } + if (sentFragCount < data.m_parallelism) + { + /** + * Then start new fragments until we reach data.m_parallelism. + */ + jam(); + ndbassert(data.m_frags_not_started != 0); + scanIndex_send(signal, + requestPtr, + treeNodePtr, + data.m_parallelism - sentFragCount, + org->batch_size_bytes/data.m_parallelism, + bs_rows, + batchRange); + } /** * cursor should not have been positioned here... * unless we actually had something more to send. @@ -5544,14 +5793,28 @@ Dbspj::scanIndex_abort(Signal* signal, } } - ndbrequire(cnt_waiting + cnt_scanning > 0); if (cnt_scanning == 0) { - /** - * If all were waiting...this should increase m_outstanding - */ - jam(); - requestPtr.p->m_outstanding++; + if (cnt_waiting > 0) + { + /** + * If all were waiting...this should increase m_outstanding + */ + jam(); + requestPtr.p->m_outstanding++; + } + else + { + /** + * All fragments are either complete or not yet started, so there is + * nothing to abort. + */ + jam(); + ndbassert(data.m_frags_not_started > 0); + ndbrequire(requestPtr.p->m_cnt_active); + requestPtr.p->m_cnt_active--; + treeNodePtr.p->m_state = TreeNode::TN_INACTIVE; + } } } @@ -5603,6 +5866,8 @@ Dbspj::scanIndex_execNODE_FAILREP(Signal jam(); ndbrequire(data.m_frags_complete < data.m_fragCount); data.m_frags_complete++; + ndbrequire(data.m_frags_not_started > 0); + data.m_frags_not_started--; // fall through case ScanFragHandle::SFH_COMPLETE: jam(); @@ -5637,8 +5902,8 @@ Dbspj::scanIndex_execNODE_FAILREP(Signal requestPtr.p->m_outstanding--; } - if (save1 != data.m_fragCount - && data.m_frags_complete == data.m_fragCount) + if (save1 != 0 && + data.m_fragCount == (data.m_frags_complete + data.m_frags_not_started)) { jam(); ndbrequire(requestPtr.p->m_cnt_active); @@ -5654,7 +5919,8 @@ Dbspj::scanIndex_release_rangekeys(Ptr treeNodePtr) { jam(); - DEBUG("scanIndex_release_rangekeys(), m_node_no: " << treeNodePtr.p->m_node_no); + DEBUG("scanIndex_release_rangekeys(), tree node " << treeNodePtr.i + << " m_node_no: " << treeNodePtr.p->m_node_no); ScanIndexData& data = treeNodePtr.p->m_scanindex_data; Local_ScanFragHandle_list list(m_scanfraghandle_pool, data.m_fragments); @@ -7115,3 +7381,15 @@ void Dbspj::execDBINFO_SCANREQ(Signal *s ndbinfo_send_scan_conf(signal, req, rl); } // Dbspj::execDBINFO_SCANREQ(Signal *signal) + +void Dbspj::IncrementalStatistics::update(double sample) +{ + // Prevent wrap-around + if(m_noOfSamples < 0xffffffff) + { + m_noOfSamples++; + const double delta = sample - m_mean; + m_mean += delta/m_noOfSamples; + m_sumSquare += delta * (sample - m_mean); + } +} === modified file 'storage/ndb/src/kernel/vm/Configuration.cpp' --- a/storage/ndb/src/kernel/vm/Configuration.cpp 2011-07-04 13:37:56 +0000 +++ b/storage/ndb/src/kernel/vm/Configuration.cpp 2011-08-25 09:40:27 +0000 @@ -930,13 +930,16 @@ Configuration::setAllLockCPU(bool exec_t Uint32 i; for (i = 0; i < threadInfo.size(); i++) { - if (threadInfo[i].type != NotInUse) + if (threadInfo[i].type == NotInUse) + continue; + + bool run = + (exec_thread && threadInfo[i].type == MainThread) || + (!exec_thread && threadInfo[i].type != MainThread); + + if (run) { - if (setLockCPU(threadInfo[i].pThread, - threadInfo[i].type, - exec_thread, - FALSE)) - return; + setLockCPU(threadInfo[i].pThread, threadInfo[i].type); } } } @@ -966,11 +969,8 @@ Configuration::setRealtimeScheduler(NdbT int Configuration::setLockCPU(NdbThread * pThread, - enum ThreadTypes type, - bool exec_thread, - bool init) + enum ThreadTypes type) { - (void)init; Uint32 cpu_id; int tid = NdbThread_GetTid(pThread); if (tid == -1) @@ -981,9 +981,6 @@ Configuration::setLockCPU(NdbThread * pT We only set new lock CPU characteristics for the threads for which it has changed */ - if ((exec_thread && type != MainThread) || - (!exec_thread && type == MainThread)) - return 0; if (type == MainThread) cpu_id = executeLockCPU(); else @@ -1029,7 +1026,7 @@ Configuration::addThread(struct NdbThrea * main threads are set in ThreadConfig::ipControlLoop * as it's handled differently with mt */ - setLockCPU(pThread, type, (type == MainThread), TRUE); + setLockCPU(pThread, type); } return i; } === modified file 'storage/ndb/src/kernel/vm/Configuration.hpp' --- a/storage/ndb/src/kernel/vm/Configuration.hpp 2011-07-04 13:37:56 +0000 +++ b/storage/ndb/src/kernel/vm/Configuration.hpp 2011-08-25 09:40:27 +0000 @@ -86,10 +86,7 @@ public: void setAllRealtimeScheduler(); void setAllLockCPU(bool exec_thread); - int setLockCPU(NdbThread*, - enum ThreadTypes type, - bool exec_thread, - bool init); + int setLockCPU(NdbThread*, enum ThreadTypes type); int setRealtimeScheduler(NdbThread*, enum ThreadTypes type, bool real_time, === modified file 'storage/ndb/src/kernel/vm/Makefile.am' --- a/storage/ndb/src/kernel/vm/Makefile.am 2011-06-30 15:59:25 +0000 +++ b/storage/ndb/src/kernel/vm/Makefile.am 2011-08-26 09:57:03 +0000 @@ -121,4 +121,10 @@ testSafeMutex_LDFLAGS = @ndb_bin_am_ldfl $(top_builddir)/strings/libmystringslt.la \ @readline_link@ @TERMCAP_LIB@ +mt_thr_config_t_SOURCES = mt_thr_config.cpp +mt_thr_config_t_CXXFLAGS = -DTEST_MT_THR_CONFIG +mt_thr_config_t_LDADD = \ + $(top_builddir)/storage/ndb/src/common/util/libgeneral.la \ + $(top_builddir)/mysys/libmysyslt.la +noinst_PROGRAMS = mt_thr_config-t === modified file 'storage/ndb/src/kernel/vm/SimulatedBlock.cpp' --- a/storage/ndb/src/kernel/vm/SimulatedBlock.cpp 2011-06-30 15:59:25 +0000 +++ b/storage/ndb/src/kernel/vm/SimulatedBlock.cpp 2011-08-27 06:06:02 +0000 @@ -102,16 +102,7 @@ SimulatedBlock::SimulatedBlock(BlockNumb globalData.setBlock(blockNumber, mainBlock); } else { ndbrequire(mainBlock != 0); - if (mainBlock->theInstanceList == 0) { - mainBlock->theInstanceList = new SimulatedBlock* [MaxInstances]; - ndbrequire(mainBlock->theInstanceList != 0); - Uint32 i; - for (i = 0; i < MaxInstances; i++) - mainBlock->theInstanceList[i] = 0; - } - ndbrequire(theInstance < MaxInstances); - ndbrequire(mainBlock->theInstanceList[theInstance] == 0); - mainBlock->theInstanceList[theInstance] = this; + mainBlock->addInstance(this, theInstance); } theMainInstance = mainBlock; @@ -139,6 +130,23 @@ SimulatedBlock::SimulatedBlock(BlockNumb } void +SimulatedBlock::addInstance(SimulatedBlock* b, Uint32 theInstance) +{ + ndbrequire(theMainInstance == this); + ndbrequire(number() == b->number()); + if (theInstanceList == 0) + { + theInstanceList = new SimulatedBlock* [MaxInstances]; + ndbrequire(theInstanceList != 0); + for (Uint32 i = 0; i < MaxInstances; i++) + theInstanceList[i] = 0; + } + ndbrequire(theInstance < MaxInstances); + ndbrequire(theInstanceList[theInstance] == 0); + theInstanceList[theInstance] = b; +} + +void SimulatedBlock::initCommon() { Uint32 count = 10; === modified file 'storage/ndb/src/kernel/vm/SimulatedBlock.hpp' --- a/storage/ndb/src/kernel/vm/SimulatedBlock.hpp 2011-05-17 23:29:55 +0000 +++ b/storage/ndb/src/kernel/vm/SimulatedBlock.hpp 2011-08-27 06:06:02 +0000 @@ -168,6 +168,7 @@ public: return theInstanceList[instanceNumber]; return 0; } + void addInstance(SimulatedBlock* b, Uint32 theInstanceNo); virtual void loadWorkers() {} struct ThreadContext === modified file 'storage/ndb/src/kernel/vm/dummy_nonmt.cpp' --- a/storage/ndb/src/kernel/vm/dummy_nonmt.cpp 2011-06-30 15:59:25 +0000 +++ b/storage/ndb/src/kernel/vm/dummy_nonmt.cpp 2011-08-27 06:06:02 +0000 @@ -43,6 +43,12 @@ add_extra_worker_thr_map(Uint32, Uint32) assert(false); } +void +finalize_thr_map() +{ + assert(false); +} + Uint32 compute_jb_pages(struct EmulatorData*) { === modified file 'storage/ndb/src/kernel/vm/mt.cpp' --- a/storage/ndb/src/kernel/vm/mt.cpp 2011-05-16 12:24:55 +0000 +++ b/storage/ndb/src/kernel/vm/mt.cpp 2011-08-27 06:06:02 +0000 @@ -2381,49 +2381,6 @@ check_queues_empty(thr_data *selfptr) } /* - * Map instance number to real instance on this node. Used in - * sendlocal/sendprioa to find right thread and in execute_signals - * to find right block instance. SignalHeader is not modified. - */ - -static Uint8 g_map_instance[MAX_BLOCK_INSTANCES]; - -static void -map_instance_init() -{ - g_map_instance[0] = 0; - Uint32 ino; - for (ino = 1; ino < MAX_BLOCK_INSTANCES; ino++) - { - if (!globalData.isNdbMtLqh) - { - g_map_instance[ino] = 0; - } - else - { - require(num_lqh_workers != 0); - if (ino <= MAX_NDBMT_LQH_WORKERS) - { - g_map_instance[ino] = 1 + (ino - 1) % num_lqh_workers; - } - else - { - /* Extra workers are not mapped. */ - g_map_instance[ino] = ino; - } - } - } -} - -static inline Uint32 -map_instance(const SignalHeader *s) -{ - Uint32 ino = blockToInstance(s->theReceiversBlockNumber); - assert(ino < MAX_BLOCK_INSTANCES); - return g_map_instance[ino]; -} - -/* * Execute at most MAX_SIGNALS signals from one job queue, updating local read * state as appropriate. * @@ -2489,7 +2446,7 @@ execute_signals(thr_data *selfptr, thr_j NDB_PREFETCH_READ (read_buffer->m_data + read_pos + 32); } Uint32 bno = blockToMain(s->theReceiversBlockNumber); - Uint32 ino = map_instance(s); + Uint32 ino = blockToInstance(s->theReceiversBlockNumber); SimulatedBlock* block = globalData.mt_getBlock(bno, ino); assert(block != 0); @@ -2572,8 +2529,8 @@ run_job_buffers(thr_data *selfptr, Signa } struct thr_map_entry { - enum { NULL_THR_NO = 0xFFFF }; - Uint32 thr_no; + enum { NULL_THR_NO = 0xFF }; + Uint8 thr_no; thr_map_entry() : thr_no(NULL_THR_NO) {} }; @@ -2681,6 +2638,50 @@ add_extra_worker_thr_map(Uint32 block, U add_thr_map(block, instance, thr_no); } +/** + * create the duplicate entries needed so that + * sender doesnt need to know how many instances there + * actually are in this node... + * + * if only 1 instance...then duplicate that for all slots + * else assume instance 0 is proxy...and duplicate workers (modulo) + * + * NOTE: extra pgman worker is instance 5 + */ +void +finalize_thr_map() +{ + for (Uint32 b = 0; b < NO_OF_BLOCKS; b++) + { + Uint32 bno = b + MIN_BLOCK_NO; + Uint32 cnt = 0; + while (cnt < MAX_BLOCK_INSTANCES && + thr_map[b][cnt].thr_no != thr_map_entry::NULL_THR_NO) + cnt++; + + if (cnt != MAX_BLOCK_INSTANCES) + { + SimulatedBlock * main = globalData.getBlock(bno, 0); + for (Uint32 i = cnt; i < MAX_BLOCK_INSTANCES; i++) + { + Uint32 dup = (cnt == 1) ? 0 : 1 + ((i - 1) % (cnt - 1)); + if (thr_map[b][i].thr_no == thr_map_entry::NULL_THR_NO) + { + thr_map[b][i] = thr_map[b][dup]; + main->addInstance(globalData.getBlock(bno, dup), i); + } + else + { + /** + * extra pgman instance + */ + require(bno == PGMAN); + } + } + } + } +} + static void reportSignalStats(Uint32 self, Uint32 a_count, Uint32 a_size, Uint32 b_count, Uint32 b_size) { @@ -3120,7 +3121,7 @@ sendlocal(Uint32 self, const SignalHeade const Uint32 secPtr[3]) { Uint32 block = blockToMain(s->theReceiversBlockNumber); - Uint32 instance = map_instance(s); + Uint32 instance = blockToInstance(s->theReceiversBlockNumber); /* * Max number of signals to put into job buffer before flushing the buffer @@ -3156,7 +3157,7 @@ sendprioa(Uint32 self, const SignalHeade const Uint32 secPtr[3]) { Uint32 block = blockToMain(s->theReceiversBlockNumber); - Uint32 instance = map_instance(s); + Uint32 instance = blockToInstance(s->theReceiversBlockNumber); Uint32 dst = block2ThreadId(block, instance); struct thr_repository* rep = &g_thr_repository; @@ -3507,7 +3508,6 @@ ThreadConfig::init() ndbout << "NDBMT: num_threads=" << num_threads << endl; - ::map_instance_init(); ::rep_init(&g_thr_repository, num_threads, globalEmulatorData.m_mem_manager); } === modified file 'storage/ndb/src/kernel/vm/mt.hpp' --- a/storage/ndb/src/kernel/vm/mt.hpp 2011-06-30 15:59:25 +0000 +++ b/storage/ndb/src/kernel/vm/mt.hpp 2011-08-27 06:06:02 +0000 @@ -34,6 +34,7 @@ void add_thr_map(Uint32 block, Uint32 in void add_main_thr_map(); void add_lqh_worker_thr_map(Uint32 block, Uint32 instance); void add_extra_worker_thr_map(Uint32 block, Uint32 instance); +void finalize_thr_map(); void sendlocal(Uint32 self, const struct SignalHeader *s, const Uint32 *data, const Uint32 secPtr[3]); === added file 'storage/ndb/src/kernel/vm/mt_thr_config.cpp' --- a/storage/ndb/src/kernel/vm/mt_thr_config.cpp 1970-01-01 00:00:00 +0000 +++ b/storage/ndb/src/kernel/vm/mt_thr_config.cpp 2011-08-26 09:57:03 +0000 @@ -0,0 +1,966 @@ +/* + Copyright (c) 2011, Oracle and/or its affiliates. All rights reserved. + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; version 2 of the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA +*/ + +#include "mt_thr_config.hpp" +#include +#include "../../common/util/parse_mask.hpp" + + +static const struct THRConfig::Entries m_entries[] = +{ + // name type min max + { "main", THRConfig::T_MAIN, 1, 1 }, + { "ldm", THRConfig::T_LDM, 1, MAX_NDBMT_LQH_THREADS }, + { "recv", THRConfig::T_RECV, 1, 1 }, + { "rep", THRConfig::T_REP, 1, 1 }, + { "maint", THRConfig::T_MAINT, 1, 1 } +}; + +static const struct THRConfig::Param m_params[] = +{ + { "count", THRConfig::Param::S_UNSIGNED }, + { "cpubind", THRConfig::Param::S_BITMASK }, + { "cpuset", THRConfig::Param::S_BITMASK } +}; + +#define IX_COUNT 0 +#define IX_CPUBOUND 1 +#define IX_CPUSET 2 + +static +unsigned +getMaxEntries(Uint32 type) +{ + for (Uint32 i = 0; i 1) + { + unsigned no = createCpuSet(m_LockMaintThreadsToCPU); + m_threads[T_MAINT][0].m_bind_type = T_Thread::B_CPUSET_BOUND; + m_threads[T_MAINT][0].m_bind_no = no; + } + + /** + * Check that no cpu_sets overlap + */ + for (unsigned i = 0; i= num_threads) + { + m_info_msg.appfmt("Assigning each thread its own CPU\n"); + unsigned no = 0; + for (unsigned i = 0; i < NDB_ARRAY_SIZE(m_threads); i++) + { + if (i == T_MAINT) + continue; + for (unsigned j = 0; j < m_threads[i].size(); j++) + { + if (m_threads[i][j].m_bind_type == T_Thread::B_UNBOUND) + { + m_threads[i][j].m_bind_type = T_Thread::B_CPU_BOUND; + m_threads[i][j].m_bind_no = mask.getBitNo(no); + no++; + } + } + } + } + else if (cnt == 1) + { + unsigned cpu = mask.getBitNo(0); + m_info_msg.appfmt("Assigning all threads to CPU %u\n", cpu); + for (unsigned i = 0; i < NDB_ARRAY_SIZE(m_threads); i++) + { + if (i == T_MAINT) + continue; + bind_unbound(m_threads[i], cpu); + } + } + else if (isMtLqh) + { + unsigned unbound_ldm = count_unbound(m_threads[T_LDM]); + if (cnt > unbound_ldm) + { + /** + * let each LQH have it's own CPU and rest share... + */ + m_info_msg.append("Assigning LQH threads to dedicated CPU(s) and " + "other threads will share remaining\n"); + unsigned cpu = mask.find(0); + for (unsigned i = 0; i < m_threads[T_LDM].size(); i++) + { + if (m_threads[T_LDM][i].m_bind_type == T_Thread::B_UNBOUND) + { + m_threads[T_LDM][i].m_bind_type = T_Thread::B_CPU_BOUND; + m_threads[T_LDM][i].m_bind_no = cpu; + mask.clear(cpu); + cpu = mask.find(cpu + 1); + } + } + + cpu = mask.find(0); + bind_unbound(m_threads[T_MAIN], cpu); + bind_unbound(m_threads[T_REP], cpu); + if ((cpu = mask.find(cpu + 1)) == mask.NotFound) + { + cpu = mask.find(0); + } + bind_unbound(m_threads[T_RECV], cpu); + } + else + { + // put receiver, tc, backup/suma in 1 thread, + // and round robin LQH for rest + unsigned cpu = mask.find(0); + m_info_msg.appfmt("Assigning LQH threads round robin to CPU(s) and " + "other threads will share CPU %u\n", cpu); + bind_unbound(m_threads[T_MAIN], cpu); // TC + bind_unbound(m_threads[T_REP], cpu); + bind_unbound(m_threads[T_RECV], cpu); + mask.clear(cpu); + + cpu = mask.find(0); + for (unsigned i = 0; i < m_threads[T_LDM].size(); i++) + { + if (m_threads[T_LDM][i].m_bind_type == T_Thread::B_UNBOUND) + { + m_threads[T_LDM][i].m_bind_type = T_Thread::B_CPU_BOUND; + m_threads[T_LDM][i].m_bind_no = cpu; + if ((cpu = mask.find(cpu + 1)) == mask.NotFound) + { + cpu = mask.find(0); + } + } + } + } + } + else + { + unsigned cpu = mask.find(0); + m_info_msg.appfmt("Assigning LQH thread to CPU %u and " + "other threads will share\n", cpu); + bind_unbound(m_threads[T_LDM], cpu); + cpu = mask.find(cpu + 1); + bind_unbound(m_threads[T_MAIN], cpu); + bind_unbound(m_threads[T_RECV], cpu); + } + } + + return 0; +} + +unsigned +THRConfig::count_unbound(const Vector& vec) const +{ + unsigned cnt = 0; + for (unsigned i = 0; i < vec.size(); i++) + { + if (vec[i].m_bind_type == T_Thread::B_UNBOUND) + cnt ++; + } + return cnt; +} + +void +THRConfig::bind_unbound(Vector& vec, unsigned cpu) +{ + for (unsigned i = 0; i < vec.size(); i++) + { + if (vec[i].m_bind_type == T_Thread::B_UNBOUND) + { + vec[i].m_bind_type = T_Thread::B_CPU_BOUND; + vec[i].m_bind_no = cpu; + } + } +} + +int +THRConfig::do_validate() +{ + /** + * Check that there aren't too many of any thread type + */ + for (unsigned i = 0; i< NDB_ARRAY_SIZE(m_threads); i++) + { + if (m_threads[i].size() > getMaxEntries(i)) + { + m_err_msg.assfmt("Too many instances(%u) of %s max supported: %u", + m_threads[i].size(), + getEntryName(i), + getMaxEntries(i)); + return -1; + } + } + + /** + * LDM can be 1 2 4 + */ + if (m_threads[T_LDM].size() == 3) + { + m_err_msg.assfmt("No of LDM-instances can be 1,2,4. Specified: %u", + m_threads[T_LDM].size()); + return -1; + } + + return 0; +} + +const char * +THRConfig::getConfigString() +{ + m_cfg_string.clear(); + const char * sep = ""; + for (unsigned i = 0; i < NDB_ARRAY_SIZE(m_threads); i++) + { + if (m_threads[i].size()) + { + const char * name = getEntryName(i); + if (i != T_MAINT) + { + for (unsigned j = 0; j < m_threads[i].size(); j++) + { + m_cfg_string.append(sep); + sep=","; + m_cfg_string.append(name); + if (m_threads[i][j].m_bind_type != T_Thread::B_UNBOUND) + { + m_cfg_string.append("={"); + if (m_threads[i][j].m_bind_type == T_Thread::B_CPU_BOUND) + { + m_cfg_string.appfmt("cpubind=%u", m_threads[i][j].m_bind_no); + } + else if (m_threads[i][j].m_bind_type == T_Thread::B_CPUSET_BOUND) + { + m_cfg_string.appfmt("cpuset=%s", + m_cpu_sets[m_threads[i][j].m_bind_no].str().c_str()); + } + m_cfg_string.append("}"); + } + } + } + else + { + for (unsigned j = 0; j < m_threads[i].size(); j++) + { + if (m_threads[i][j].m_bind_type != T_Thread::B_UNBOUND) + { + m_cfg_string.append(sep); + sep=","; + m_cfg_string.append(name); + m_cfg_string.append("={"); + if (m_threads[i][j].m_bind_type == T_Thread::B_CPU_BOUND) + { + m_cfg_string.appfmt("cpubind=%u", m_threads[i][j].m_bind_no); + } + else if (m_threads[i][j].m_bind_type == T_Thread::B_CPUSET_BOUND) + { + m_cfg_string.appfmt("cpuset=%s", + m_cpu_sets[m_threads[i][j].m_bind_no].str().c_str()); + } + m_cfg_string.append("}"); + } + } + } + } + } + return m_cfg_string.c_str(); +} + +const char * +THRConfig::getErrorMessage() const +{ + return m_err_msg.c_str(); +} + +const char * +THRConfig::getInfoMessage() const +{ + return m_info_msg.c_str(); +} + +static +char * +skipblank(char * str) +{ + while (isblank(* str)) + str++; + return str; +} + +Uint32 +THRConfig::find_type(char *& str) +{ + str = skipblank(str); + + char * name = str; + if (* name == 0) + { + m_err_msg.assfmt("empty thread specification"); + return 0; + } + char * end = name; + while(isalpha(* end)) + end++; + + char save = * end; + * end = 0; + Uint32 t = getEntryType(name); + if (t == T_END) + { + m_err_msg.assfmt("unknown thread type '%s'", name); + } + * end = save; + str = end; + return t; +} + +struct ParamValue +{ + ParamValue() { found = false;} + bool found; + const char * string_val; + unsigned unsigned_val; + SparseBitmask mask_val; +}; + +static +int +parseUnsigned(char *& str, unsigned * dst) +{ + str = skipblank(str); + char * endptr = 0; + errno = 0; + long val = strtoll(str, &endptr, 0); + if (errno == ERANGE) + return -1; + if (val < 0 || Int64(val) > 0xFFFFFFFF) + return -1; + if (endptr == str) + return -1; + str = endptr; + *dst = (unsigned)val; + return 0; +} + +static +int +parseBitmask(char *& str, SparseBitmask * mask) +{ + str = skipblank(str); + size_t len = strspn(str, "0123456789-, "); + if (len == 0) + return -1; + + while (isblank(str[len-1])) + len--; + if (str[len-1] == ',') + len--; + char save = str[len]; + str[len] = 0; + int res = parse_mask(str, *mask); + str[len] = save; + str = str + len; + return res; +} + +static +int +parseParams(char * str, ParamValue values[], BaseString& err) +{ + const char * const save = str; + while (* str) + { + str = skipblank(str); + + unsigned idx = 0; + for (; idx < NDB_ARRAY_SIZE(m_params); idx++) + { + if (strncasecmp(str, m_params[idx].name, strlen(m_params[idx].name)) == 0) + { + str += strlen(m_params[idx].name); + break; + } + } + + if (idx == NDB_ARRAY_SIZE(m_params)) + { + err.assfmt("Unknown param near: '%s'", str); + return -1; + } + + if (values[idx].found == true) + { + err.assfmt("Param '%s' found twice", m_params[idx].name); + return -1; + } + + str = skipblank(str); + if (* str != '=') + { + err.assfmt("Missing '=' after %s in '%s'", m_params[idx].name, save); + return -1; + } + str++; + str = skipblank(str); + + int res = 0; + switch(m_params[idx].type){ + case THRConfig::Param::S_UNSIGNED: + res = parseUnsigned(str, &values[idx].unsigned_val); + break; + case THRConfig::Param::S_BITMASK: + res = parseBitmask(str, &values[idx].mask_val); + break; + default: + err.assfmt("Internal error, unknown type for param: '%s'", + m_params[idx].name); + return -1; + } + if (res == -1) + { + err.assfmt("Unable to parse %s=%s", m_params[idx].name, str); + return -1; + } + values[idx].found = true; + str = skipblank(str); + + if (* str == 0) + break; + + if (* str != ',') + { + err.assfmt("Unable to parse near '%s'", str); + return -1; + } + str++; + } + return 0; +} + +int +THRConfig::find_spec(char *& str, T_Type type) +{ + str = skipblank(str); + + switch(* str){ + case ',': + case 0: + add(type); + return 0; + } + + if (* str != '=') + { +err: + int len = (int)strlen(str); + m_err_msg.assfmt("Invalid format near: '%.*s'", + (len > 10) ? 10 : len, str); + return -1; + } + + str++; // skip over = + str = skipblank(str); + + if (* str != '{') + { + goto err; + } + + str++; + char * start = str; + + /** + * Find end + */ + while (* str && (* str) != '}') + str++; + + if (* str != '}') + { + goto err; + } + + char * end = str; + char save = * end; + * end = 0; + + ParamValue values[NDB_ARRAY_SIZE(m_params)]; + values[IX_COUNT].unsigned_val = 1; + int res = parseParams(start, values, m_err_msg); + * end = save; + + if (res != 0) + { + return -1; + } + + if (values[IX_CPUBOUND].found && values[IX_CPUSET].found) + { + m_err_msg.assfmt("Both cpuset and cpubind specified!"); + return -1; + } + + unsigned cnt = values[IX_COUNT].unsigned_val; + const int index = m_threads[type].size(); + for (unsigned i = 0; i < cnt; i++) + { + add(type); + } + + assert(m_threads[type].size() == index + cnt); + if (values[IX_CPUSET].found) + { + SparseBitmask & mask = values[IX_CPUSET].mask_val; + unsigned no = createCpuSet(mask); + for (unsigned i = 0; i < cnt; i++) + { + m_threads[type][index+i].m_bind_type = T_Thread::B_CPUSET_BOUND; + m_threads[type][index+i].m_bind_no = no; + } + } + else if (values[IX_CPUBOUND].found) + { + SparseBitmask & mask = values[IX_CPUBOUND].mask_val; + if (mask.count() < cnt) + { + m_err_msg.assfmt("%s: trying to bind %u threads to %u cpus [%s]", + getEntryName(type), + cnt, + mask.count(), + mask.str().c_str()); + return -1; + } + for (unsigned i = 0; i < cnt; i++) + { + m_threads[type][index+i].m_bind_type = T_Thread::B_CPU_BOUND; + m_threads[type][index+i].m_bind_no = mask.getBitNo(i % mask.count()); + } + } + + str++; // skip over } + return 0; +} + +int +THRConfig::find_next(char *& str) +{ + str = skipblank(str); + + if (* str == 0) + { + return 0; + } + else if (* str == ',') + { + str++; + return 1; + } + + int len = (int)strlen(str); + m_err_msg.assfmt("Invalid format near: '%.*s'", + (len > 10) ? 10 : len, str); + return -1; +} + +int +THRConfig::do_parse(const char * ThreadConfig) +{ + BaseString str(ThreadConfig); + char * ptr = (char*)str.c_str(); + while (* ptr) + { + Uint32 type = find_type(ptr); + if (type == T_END) + return -1; + + if (find_spec(ptr, (T_Type)type) < 0) + return -1; + + int ret = find_next(ptr); + if (ret < 0) + return ret; + + if (ret == 0) + break; + } + + for (Uint32 i = 0; i < T_END; i++) + { + while (m_threads[i].size() < m_entries[i].m_min_cnt) + add((T_Type)i); + } + + return do_bindings() || do_validate(); +} + +unsigned +THRConfig::createCpuSet(const SparseBitmask& mask) +{ + for (size_t i = 0; i < m_cpu_sets.size(); i++) + if (m_cpu_sets[i].equal(mask)) + return i; + + m_cpu_sets.push_back(mask); + return m_cpu_sets.size() - 1; +} + +template class Vector; +template class Vector; + +#ifdef TEST_MT_THR_CONFIG + +#include + +TAPTEST(mt_thr_config) +{ + { + THRConfig tmp; + OK(tmp.do_parse(8, 0, 0) == 0); + } + + /** + * BASIC test + */ + { + const char * ok[] = + { + "ldm,ldm", + "ldm={count=3},ldm", + "ldm={cpubind=1-2,5,count=3},ldm", + "ldm={ cpubind = 1- 2, 5 , count = 3 },ldm", + "ldm={count=3,cpubind=1-2,5 }, ldm", + "ldm={cpuset=1-3,count=3 },ldm", + "main,ldm={},ldm", + 0 + }; + + const char * fail [] = + { + "ldm,ldm,ldm", + "ldm={cpubind= 1 , cpuset=2 },ldm", + "ldm={count=4,cpubind=1-3},ldm", + "main,main,ldm,ldm", + "main={ keso=88, count=23},ldm,ldm", + "main={ cpuset=1-3 }, ldm={cpuset=3-4}", + "main={ cpuset=1-3 }, ldm={cpubind=2}", + 0 + }; + + for (Uint32 i = 0; ok[i]; i++) + { + THRConfig tmp; + int res = tmp.do_parse(ok[i]); + printf("do_parse(%s) => %s - %s\n", ok[i], + res == 0 ? "OK" : "FAIL", + res == 0 ? "" : tmp.getErrorMessage()); + OK(res == 0); + { + BaseString out(tmp.getConfigString()); + THRConfig check; + OK(check.do_parse(out.c_str()) == 0); + OK(strcmp(out.c_str(), check.getConfigString()) == 0); + } + } + + for (Uint32 i = 0; fail[i]; i++) + { + THRConfig tmp; + int res = tmp.do_parse(fail[i]); + printf("do_parse(%s) => %s - %s\n", fail[i], + res == 0 ? "OK" : "FAIL", + res == 0 ? "" : tmp.getErrorMessage()); + OK(res != 0); + } + } + + { + /** + * Test interaction with LockExecuteThreadToCPU + */ + const char * t[] = + { + /** threads, LockExecuteThreadToCPU, answer */ + "1-8", + "ldm={count=4}", + "main={cpubind=1},ldm={cpubind=2},ldm={cpubind=3},ldm={cpubind=4},ldm={cpubind=5},recv={cpubind=6},rep={cpubind=7}", + + "1-5", + "ldm={count=4}", + "main={cpubind=5},ldm={cpubind=1},ldm={cpubind=2},ldm={cpubind=3},ldm={cpubind=4},recv={cpubind=5},rep={cpubind=5}", + + "1-3", + "ldm={count=4}", + "main={cpubind=1},ldm={cpubind=2},ldm={cpubind=3},ldm={cpubind=2},ldm={cpubind=3},recv={cpubind=1},rep={cpubind=1}", + + "1-4", + "ldm={count=4}", + "main={cpubind=1},ldm={cpubind=2},ldm={cpubind=3},ldm={cpubind=4},ldm={cpubind=2},recv={cpubind=1},rep={cpubind=1}", + + "1-8", + "ldm={count=4},maint={cpubind=8}", + "main={cpubind=1},ldm={cpubind=2},ldm={cpubind=3},ldm={cpubind=4},ldm={cpubind=5},recv={cpubind=6},rep={cpubind=7},maint={cpubind=8}", + + "1-8", + "ldm={count=4,cpubind=1,4,5,6}", + "main={cpubind=2},ldm={cpubind=1},ldm={cpubind=4},ldm={cpubind=5},ldm={cpubind=6},recv={cpubind=3},rep={cpubind=7}", + + // END + 0 + }; + + for (unsigned i = 0; t[i]; i+= 3) + { + THRConfig tmp; + tmp.setLockExecuteThreadToCPU(t[i+0]); + int res = tmp.do_parse(t[i+1]); + int ok = strcmp(tmp.getConfigString(), t[i+2]) == 0; + printf("mask: %s conf: %s => %s(%s) - %s - %s\n", + t[i+0], + t[i+1], + res == 0 ? "OK" : "FAIL", + res == 0 ? "" : tmp.getErrorMessage(), + tmp.getConfigString(), + ok == 1 ? "CORRECT" : "INCORRECT"); + OK(res == 0); + OK(ok == 1); + } + } + + return 1; +} + +#endif === added file 'storage/ndb/src/kernel/vm/mt_thr_config.hpp' --- a/storage/ndb/src/kernel/vm/mt_thr_config.hpp 1970-01-01 00:00:00 +0000 +++ b/storage/ndb/src/kernel/vm/mt_thr_config.hpp 2011-08-26 09:57:03 +0000 @@ -0,0 +1,125 @@ +/* + Copyright (c) 2011, Oracle and/or its affiliates. All rights reserved. + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; version 2 of the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA +*/ + +#ifndef THRConfig_H +#define THRConfig_H + +struct NdbThread; +#include +#include +#include + +/** + * This class contains thread configuration + * it supports parsing the ThreadConfig parameter + * and handling LockExecuteThreadToCPU etc... + * + * This is used in ndb_mgmd when verifying configuration + * and by ndbmtd + * + * TAP-TESTS are provided in mt_thr_config.cpp + */ +class THRConfig +{ +public: + enum T_Type + { + T_MAIN = 0, /* DIH/QMGR/TC/SPJ etc */ + T_LDM = 1, /* LQH/ACC/TUP/TUX etc */ + T_RECV = 2, /* CMVMI */ + T_REP = 3, /* SUMA */ + T_MAINT = 4, /* FS, SocketServer etc */ + + T_END = 5 + }; + + THRConfig(); + ~THRConfig(); + + // NOTE: needs to be called before do_parse + int setLockExecuteThreadToCPU(const char * val); + int setLockMaintThreadsToCPU(unsigned val); + + int do_parse(const char * ThreadConfig); + int do_parse(unsigned MaxNoOfExecutionThreads, + unsigned __ndbmt_lqh_workers, + unsigned __ndbmt_classic); + + const char * getConfigString(); + + const char * getErrorMessage() const; + const char * getInfoMessage() const; + +private: + struct T_Thread + { + unsigned m_type; + unsigned m_no; // within type + enum BType { B_UNBOUND, B_CPU_BOUND, B_CPUSET_BOUND } m_bind_type; + unsigned m_bind_no; // cpu_no/cpuset_no + }; + bool m_classic; + SparseBitmask m_LockExecuteThreadToCPU; + SparseBitmask m_LockMaintThreadsToCPU; + Vector m_cpu_sets; + Vector m_threads[T_END]; + + BaseString m_err_msg; + BaseString m_info_msg; + BaseString m_cfg_string; + BaseString m_print_string; + + void add(T_Type); + Uint32 find_type(char *&); + int find_spec(char *&, T_Type); + int find_next(char *&); + + unsigned createCpuSet(const SparseBitmask&); + int do_bindings(); + int do_validate(); + + unsigned count_unbound(const Vector& vec) const; + void bind_unbound(Vector & vec, unsigned cpu); + +public: + struct Entries + { + const char * m_name; + unsigned m_type; + unsigned m_min_cnt; + unsigned m_max_cnt; + }; + + struct Param + { + const char * name; + enum { S_UNSIGNED, S_BITMASK } type; + }; +}; + +/** + * This class is used by ndbmtd + * when setting up threads (and locking) + */ +class THRConfigApplier : public THRConfig +{ +public: + int create_cpusets(); + int do_bind(unsigned t_type, unsigned no, NdbThread*); +}; + +#endif // IPCConfig_H === modified file 'storage/ndb/src/ndbapi/NdbQueryOperation.cpp' --- a/storage/ndb/src/ndbapi/NdbQueryOperation.cpp 2011-08-18 11:47:22 +0000 +++ b/storage/ndb/src/ndbapi/NdbQueryOperation.cpp 2011-08-22 12:56:56 +0000 @@ -57,6 +57,7 @@ static const int Err_FunctionNotImplemen static const int Err_UnknownColumn = 4004; static const int Err_ReceiveTimedOut = 4008; static const int Err_NodeFailCausedAbort = 4028; +static const int Err_ParameterError = 4118; static const int Err_SimpleDirtyReadFailed = 4119; static const int Err_WrongFieldLength = 4209; static const int Err_ReadTooMuch = 4257; @@ -72,6 +73,21 @@ static const Uint16 tupleNotFound = 0xff /** Set to true to trace incomming signals.*/ const bool traceSignals = false; +enum +{ + /** + * Set NdbQueryOperationImpl::m_parallelism to this value to indicate that + * scan parallelism should be adaptive. + */ + Parallelism_adaptive = 0xffff0000, + + /** + * Set NdbQueryOperationImpl::m_parallelism to this value to indicate that + * all fragments should be scanned in parallel. + */ + Parallelism_max = 0xffff0001 +}; + /** * A class for accessing the correlation data at the end of a tuple (for * scan queries). These data have the following layout: @@ -358,9 +374,6 @@ public: Uint32 getRowCount() const { return m_rowCount; } - char* getRow(Uint32 tupleNo) const - { return (m_buffer + (tupleNo*m_rowSize)); } - private: /** No copying.*/ NdbResultSet(const NdbResultSet&); @@ -372,6 +385,9 @@ private: /** Used for checking if buffer overrun occurred. */ Uint32* m_batchOverflowCheck; + /** Array of TupleCorrelations for all rows in m_buffer */ + TupleCorrelation* m_correlations; + Uint32 m_rowSize; /** The current #rows in 'm_buffer'.*/ @@ -415,8 +431,8 @@ public: const NdbReceiver& getReceiver() const { return m_receiver; } - const char* getCurrentRow() const - { return m_receiver.peek_row(); } + const char* getCurrentRow() + { return m_receiver.get_row(); } /** * Process an incomming tuple for this stream. Extract parent and own tuple @@ -653,6 +669,7 @@ void* NdbBulkAllocator::allocObjMem(Uint NdbResultSet::NdbResultSet() : m_buffer(NULL), m_batchOverflowCheck(NULL), + m_correlations(NULL), m_rowSize(0), m_rowCount(0) {} @@ -672,6 +689,12 @@ NdbResultSet::init(NdbQueryImpl& query, m_batchOverflowCheck = reinterpret_cast(bufferAlloc.allocObjMem(sizeof(Uint32))); *m_batchOverflowCheck = 0xacbd1234; + + if (query.getQueryDef().isScanQuery()) + { + m_correlations = reinterpret_cast + (bufferAlloc.allocObjMem(maxRows*sizeof(TupleCorrelation))); + } } } @@ -856,19 +879,17 @@ void NdbResultStream::execTRANSID_AI(const Uint32 *ptr, Uint32 len, TupleCorrelation correlation) { - m_receiver.execTRANSID_AI(ptr, len); - m_resultSets[m_recv].m_rowCount++; - + NdbResultSet& receiveSet = m_resultSets[m_recv]; if (isScanQuery()) { /** - * Store TupleCorrelation as hidden value imm. after received row - * (NdbQueryOperationImpl::getRowSize() has reserved space for it) + * Store TupleCorrelation. */ - Uint32* row_recv = reinterpret_cast - (m_receiver.m_record.m_row_recv); - row_recv[-1] = correlation.toUint32(); + receiveSet.m_correlations[receiveSet.m_rowCount] = correlation; } + + m_receiver.execTRANSID_AI(ptr, len); + receiveSet.m_rowCount++; } // NdbResultStream::execTRANSID_AI() /** @@ -994,8 +1015,8 @@ NdbResultStream::prepareResultSet(Uint32 /** * Fill m_tupleSet[] with correlation data between parent - * and child tuples. The 'TupleCorrelation' is stored as - * and extra Uint32 after each row in the NdbResultSet + * and child tuples. The 'TupleCorrelation' is stored + * in an array of TupleCorrelations in each ResultSet * by execTRANSID_AI(). * * NOTE: In order to reduce work done when holding the @@ -1024,12 +1045,9 @@ NdbResultStream::buildResultCorrelations /* Rebuild correlation & hashmap from 'readResult' */ for (Uint32 tupleNo=0; tupleNo 0) + if (getQueryOperation(0U).m_parallelism == Parallelism_max) { m_rootFragCount - = MIN(getRoot().getQueryOperationDef().getTable().getFragmentCount(), - getQueryOperation(0U).m_parallelism); + = getRoot().getQueryOperationDef().getTable().getFragmentCount(); } else { + assert(getQueryOperation(0U).m_parallelism != Parallelism_adaptive); m_rootFragCount - = getRoot().getQueryOperationDef().getTable().getFragmentCount(); + = MIN(getRoot().getQueryOperationDef().getTable().getFragmentCount(), + getQueryOperation(0U).m_parallelism); } Ndb* const ndb = m_transaction.getNdb(); @@ -2682,14 +2709,13 @@ NdbQueryImpl::prepareSend() for (Uint32 opNo = 0; opNo < getNoOfOperations(); opNo++) { const NdbQueryOperationImpl& op = getQueryOperation(opNo); + // Add space for m_correlations, m_buffer & m_batchOverflowCheck + totalBuffSize += (sizeof(TupleCorrelation) * op.getMaxBatchRows()); totalBuffSize += (op.getRowSize() * op.getMaxBatchRows()); + totalBuffSize += sizeof(Uint32); // Overflow check } - /** - * Add one word per ResultStream for buffer overrun check. We add a word - * rather than a byte to avoid possible alignment problems. - */ - m_rowBufferAlloc.init(m_rootFragCount * - (totalBuffSize + (sizeof(Uint32) * getNoOfOperations())) ); + + m_rowBufferAlloc.init(m_rootFragCount * totalBuffSize); if (getQueryDef().isScanQuery()) { Uint32 totalRows = 0; @@ -3455,8 +3481,6 @@ NdbQueryImpl::OrderedFragSet::getCurrent { if (m_ordering!=NdbQueryOptions::ScanOrdering_unordered) { - // Results should be ordered. - assert(verifySortOrder()); /** * Must have tuples for each (non-completed) fragment when doing ordered * scan. @@ -3667,7 +3691,8 @@ NdbQueryOperationImpl::NdbQueryOperation m_ordering(NdbQueryOptions::ScanOrdering_unordered), m_interpretedCode(NULL), m_diskInUserProjection(false), - m_parallelism(0), + m_parallelism(def.getQueryOperationIx() == 0 + ? Parallelism_max : Parallelism_adaptive), m_rowSize(0xffffffff) { if (errno == ENOMEM) @@ -4266,9 +4291,10 @@ NdbQueryOperationImpl m_ndbRecord, m_firstRecAttr, 0, // Key size. - getRoot().m_parallelism > 0 ? - getRoot().m_parallelism : - m_queryImpl.getRootFragCount(), + getRoot().m_parallelism + == Parallelism_max ? + m_queryImpl.getRootFragCount() : + getRoot().m_parallelism, maxBatchRows, batchByteSize, firstBatchRows); @@ -4454,7 +4480,12 @@ NdbQueryOperationImpl::prepareAttrInfo(U firstBatchRows); assert(batchRows==getMaxBatchRows()); assert(batchRows==firstBatchRows); - requestInfo |= QN_ScanIndexParameters::SIP_PARALLEL; // FIXME: SPJ always assume. SIP_PARALLEL + assert(m_parallelism == Parallelism_max || + m_parallelism == Parallelism_adaptive); + if (m_parallelism == Parallelism_max) + { + requestInfo |= QN_ScanIndexParameters::SIP_PARALLEL; + } param->requestInfo = requestInfo; param->batchSize = ((Uint16)batchByteSize << 16) | (Uint16)firstBatchRows; param->resultData = getIdOfReceiver(); @@ -4786,7 +4817,7 @@ NdbQueryOperationImpl::execTRANSID_AI(co if (getQueryDef().isScanQuery()) { const CorrelationData correlData(ptr, len); - const Uint32 receiverId = CorrelationData(ptr, len).getRootReceiverId(); + const Uint32 receiverId = correlData.getRootReceiverId(); /** receiverId holds the Id of the receiver of the corresponding stream * of the root operation. We can thus find the correct root fragment @@ -4958,7 +4989,7 @@ NdbQueryOperationImpl::setOrdering(NdbQu return -1; } - if (m_parallelism != 0) + if (m_parallelism != Parallelism_max) { getQuery().setErrorCode(QRY_SEQUENTIAL_SCAN_SORTED); return -1; @@ -5053,10 +5084,40 @@ int NdbQueryOperationImpl::setParallelis getQuery().setErrorCode(Err_FunctionNotImplemented); return -1; } + else if (parallelism < 1 || parallelism > MAX_NDB_PARTITIONS) + { + getQuery().setErrorCode(Err_ParameterError); + return -1; + } m_parallelism = parallelism; return 0; } +int NdbQueryOperationImpl::setMaxParallelism(){ + if (!getQueryOperationDef().isScanOperation()) + { + getQuery().setErrorCode(QRY_WRONG_OPERATION_TYPE); + return -1; + } + m_parallelism = Parallelism_max; + return 0; +} + +int NdbQueryOperationImpl::setAdaptiveParallelism(){ + if (!getQueryOperationDef().isScanOperation()) + { + getQuery().setErrorCode(QRY_WRONG_OPERATION_TYPE); + return -1; + } + else if (getQueryOperationDef().getQueryOperationIx() == 0) + { + getQuery().setErrorCode(Err_FunctionNotImplemented); + return -1; + } + m_parallelism = Parallelism_adaptive; + return 0; +} + int NdbQueryOperationImpl::setBatchSize(Uint32 batchSize){ if (!getQueryOperationDef().isScanOperation()) { @@ -5124,12 +5185,6 @@ Uint32 NdbQueryOperationImpl::getRowSize { m_rowSize = NdbReceiver::ndbrecord_rowsize(m_ndbRecord, m_firstRecAttr, 0, false); - - const bool withCorrelation = getRoot().getQueryDef().isScanQuery(); - if (withCorrelation) - { - m_rowSize += TupleCorrelation::wordCount*sizeof(Uint32); - } } return m_rowSize; } === modified file 'storage/ndb/src/ndbapi/NdbQueryOperation.hpp' --- a/storage/ndb/src/ndbapi/NdbQueryOperation.hpp 2011-06-20 13:25:48 +0000 +++ b/storage/ndb/src/ndbapi/NdbQueryOperation.hpp 2011-08-22 08:35:35 +0000 @@ -335,13 +335,29 @@ public: NdbQueryOptions::ScanOrdering getOrdering() const; /** - * Set the number of fragments to be scanned in parallel for the root - * operation of this query. This only applies to table scans and non-sorted - * scans of ordered indexes. + * Set the number of fragments to be scanned in parallel. This only applies + * to table scans and non-sorted scans of ordered indexes. This method is + * only implemented for then root scan operation. * @return 0 if ok, -1 in case of error (call getNdbError() for details.) */ int setParallelism(Uint32 parallelism); + /** + * Set the number of fragments to be scanned in parallel to the maximum + * possible value. This is the default for the root scan operation. + * @return 0 if ok, -1 in case of error (call getNdbError() for details.) + */ + int setMaxParallelism(); + + /** + * Let the system dynamically choose the number of fragments to scan in + * parallel. The system will try to choose a value that gives optimal + * performance. This is the default for all scans but the root scan. This + * method only implemented for non-root scan operations. + * @return 0 if ok, -1 in case of error (call getNdbError() for details.) + */ + int setAdaptiveParallelism(); + /** Set the batch size (max rows per batch) for this operation. This * only applies to scan operations, as lookup operations always will * have the same batch size as its parent operation, or 1 if it is the === modified file 'storage/ndb/src/ndbapi/NdbQueryOperationImpl.hpp' --- a/storage/ndb/src/ndbapi/NdbQueryOperationImpl.hpp 2011-08-17 13:16:22 +0000 +++ b/storage/ndb/src/ndbapi/NdbQueryOperationImpl.hpp 2011-08-22 08:35:35 +0000 @@ -661,14 +661,30 @@ public: NdbQueryOptions::ScanOrdering getOrdering() const { return m_ordering; } - /** - * Set the number of fragments to be scanned in parallel for the root - * operation of this query. This only applies to table scans and non-sorted - * scans of ordered indexes. + /** + * Set the number of fragments to be scanned in parallel. This only applies + * to table scans and non-sorted scans of ordered indexes. This method is + * only implemented for then root scan operation. * @return 0 if ok, -1 in case of error (call getNdbError() for details.) */ int setParallelism(Uint32 parallelism); + /** + * Set the number of fragments to be scanned in parallel to the maximum + * possible value. This is the default for the root scan operation. + * @return 0 if ok, -1 in case of error (call getNdbError() for details.) + */ + int setMaxParallelism(); + + /** + * Let the system dynamically choose the number of fragments to scan in + * parallel. The system will try to choose a value that gives optimal + * performance. This is the default for all scans but the root scan. This + * method only implemented for non-root scan operations. + * @return 0 if ok, -1 in case of error (call getNdbError() for details.) + */ + int setAdaptiveParallelism(); + /** Set the batch size (max rows per batch) for this operation. This * only applies to scan operations, as lookup operations always will * have the same batch size as its parent operation, or 1 if it is the No bundle (reason: useless for push emails).