From: Jonas Oreland Date: August 29 2011 10:17am Subject: bzr push into mysql-5.1-telco-7.0-spj-scan-vs-scan branch (jonas.oreland:3543 to 3544) List-Archive: http://lists.mysql.com/commits/140832 Message-Id: <20110829101718.1094A892BA0@perch.localdomain> MIME-Version: 1.0 Content-Type: text/plain; charset="us-ascii" Content-Transfer-Encoding: 7bit 3544 Jonas Oreland 2011-08-29 [merge] ndb - merge 70 to 70-spj-scan-scan added: storage/ndb/src/kernel/vm/mt_thr_config.cpp storage/ndb/src/kernel/vm/mt_thr_config.hpp modified: mysql-test/suite/ndb/r/ndb_index_stat.result mysql-test/suite/ndb/r/ndb_statistics1.result mysql-test/suite/ndb/t/ndb_index_stat.test sql/ha_ndbcluster.cc sql/ha_ndbcluster_binlog.cc sql/ha_ndbinfo.cc storage/ndb/include/util/SparseBitmask.hpp storage/ndb/src/kernel/SimBlockList.cpp storage/ndb/src/kernel/vm/Configuration.cpp storage/ndb/src/kernel/vm/Configuration.hpp storage/ndb/src/kernel/vm/Makefile.am storage/ndb/src/kernel/vm/SimulatedBlock.cpp storage/ndb/src/kernel/vm/SimulatedBlock.hpp storage/ndb/src/kernel/vm/dummy_nonmt.cpp storage/ndb/src/kernel/vm/mt.cpp storage/ndb/src/kernel/vm/mt.hpp 3543 Ole John Aske 2011-08-22 [merge] Merge telco-7.0 -> telco-7.0-spj-scan-scan modified: storage/ndb/src/kernel/blocks/dbspj/DbspjMain.cpp storage/ndb/src/ndbapi/NdbQueryOperation.cpp === modified file 'mysql-test/suite/ndb/r/ndb_index_stat.result' --- a/mysql-test/suite/ndb/r/ndb_index_stat.result 2011-08-17 12:51:57 +0000 +++ b/mysql-test/suite/ndb/r/ndb_index_stat.result 2011-08-29 10:15:59 +0000 @@ -475,6 +475,18 @@ select count(*) from t1 where f > '222'; count(*) 1 drop table t1; +create table t1 (a1 int, b1 int, primary key(b1), key(a1)) engine=ndbcluster partition by key() partitions 1; +create table t2 (b2 int, c2 int, primary key(b2,c2)) engine=ndbcluster partition by key() partitions 1; +# table t1 is only for forcing record by key count for table t2 that should be near 50 (not 1) +analyze table t1, t2; +Table Op Msg_type Msg_text +test.t1 analyze status OK +test.t2 analyze status OK +explain select * from t1, t2 where b2 = b1 and a1 = 1; +id select_type table type possible_keys key key_len ref rows Extra +1 SIMPLE t1 ref PRIMARY,a1 a1 5 const 2 # +1 SIMPLE t2 ref PRIMARY PRIMARY 4 test.t1.b1 50 # +drop table t1, t2; set @is_enable = @is_enable_default; set @is_enable = NULL; # is_enable_on=0 is_enable_off=1 === modified file 'mysql-test/suite/ndb/r/ndb_statistics1.result' --- a/mysql-test/suite/ndb/r/ndb_statistics1.result 2011-08-17 12:51:57 +0000 +++ b/mysql-test/suite/ndb/r/ndb_statistics1.result 2011-08-29 10:15:59 +0000 @@ -77,7 +77,7 @@ SELECT * FROM t10000 AS X JOIN t10000 AS ON Y.I=X.I AND Y.J = X.I; id select_type table type possible_keys key key_len ref rows Extra 1 SIMPLE X ALL I NULL NULL NULL 10000 Parent of 2 pushed join@1 -1 SIMPLE Y ref J,I I 10 test.X.I,test.X.I 1 Child of 'X' in pushed join@1; Using where +1 SIMPLE Y ref J,I J 5 test.X.I 1 Child of 'X' in pushed join@1; Using where EXPLAIN SELECT * FROM t100 WHERE k < 42; id select_type table type possible_keys key key_len ref rows Extra @@ -145,11 +145,11 @@ id select_type table type possible_keys EXPLAIN SELECT * FROM t10000 WHERE J = 0 AND K < 1; id select_type table type possible_keys key key_len ref rows Extra -1 SIMPLE t10000 range PRIMARY,J PRIMARY 4 NULL 2 Using where with pushed condition +1 SIMPLE t10000 ref PRIMARY,J J 5 const 2 Using where with pushed condition EXPLAIN SELECT * FROM t10000 WHERE J = 0 AND K BETWEEN 1 AND 10; id select_type table type possible_keys key key_len ref rows Extra -1 SIMPLE t10000 range PRIMARY,J PRIMARY 4 NULL 2 Using where with pushed condition +1 SIMPLE t10000 ref PRIMARY,J J 5 const 2 Using where with pushed condition EXPLAIN SELECT * FROM t10000 WHERE J = 0 AND K = 1; id select_type table type possible_keys key key_len ref rows Extra === modified file 'mysql-test/suite/ndb/t/ndb_index_stat.test' --- a/mysql-test/suite/ndb/t/ndb_index_stat.test 2011-07-23 14:35:37 +0000 +++ b/mysql-test/suite/ndb/t/ndb_index_stat.test 2011-08-28 13:29:22 +0000 @@ -309,5 +309,30 @@ while ($i) } drop table t1; +# +# Check estimates of records per key for partial keys using unique/primary ordered index +# + +create table t1 (a1 int, b1 int, primary key(b1), key(a1)) engine=ndbcluster partition by key() partitions 1; +create table t2 (b2 int, c2 int, primary key(b2,c2)) engine=ndbcluster partition by key() partitions 1; + +--disable_query_log +let $i = 100; +while ($i) +{ + eval insert into t1 (a1,b1) values ($i,$i); + eval insert into t2 (b2,c2) values ($i mod 2, $i div 2); + dec $i; +} +--enable_query_log + +--echo # table t1 is only for forcing record by key count for table t2 that should be near 50 (not 1) +analyze table t1, t2; +# Hide Extra column +--replace_column 10 # +explain select * from t1, t2 where b2 = b1 and a1 = 1; + +drop table t1, t2; + set @is_enable = @is_enable_default; source ndb_index_stat_enable.inc; === modified file 'sql/ha_ndbcluster.cc' --- a/sql/ha_ndbcluster.cc 2011-08-17 12:51:57 +0000 +++ b/sql/ha_ndbcluster.cc 2011-08-29 10:15:59 +0000 @@ -168,6 +168,11 @@ static MYSQL_THDVAR_ULONG( 0 /* block */ ); +#if NDB_VERSION_D < NDB_MAKE_VERSION(7,2,0) +#define DEFAULT_NDB_INDEX_STAT_ENABLE FALSE +#else +#define DEFAULT_NDB_INDEX_STAT_ENABLE TRUE +#endif static MYSQL_THDVAR_BOOL( index_stat_enable, /* name */ @@ -175,7 +180,7 @@ static MYSQL_THDVAR_BOOL( "Use ndb index statistics in query optimization.", NULL, /* check func. */ NULL, /* update func. */ - FALSE /* default */ + DEFAULT_NDB_INDEX_STAT_ENABLE /* default */ ); @@ -1396,19 +1401,22 @@ void ha_ndbcluster::set_rec_per_key() */ for (uint i=0 ; i < table_share->keys ; i++) { + bool is_unique_index= false; KEY* key_info= table->key_info + i; switch (get_index_type(i)) { - case UNIQUE_ORDERED_INDEX: - case PRIMARY_KEY_ORDERED_INDEX: case UNIQUE_INDEX: case PRIMARY_KEY_INDEX: { // Index is unique when all 'key_parts' are specified, // else distribution is unknown and not specified here. - key_info->rec_per_key[key_info->key_parts-1]= 1; + is_unique_index= true; break; } + case UNIQUE_ORDERED_INDEX: + case PRIMARY_KEY_ORDERED_INDEX: + is_unique_index= true; + // intentional fall thru to logic for ordered index case ORDERED_INDEX: // 'Records pr. key' are unknown for non-unique indexes. // (May change when we get better index statistics.) @@ -1438,6 +1446,11 @@ void ha_ndbcluster::set_rec_per_key() default: DBUG_ASSERT(false); } + // set rows per key to 1 for complete key given for unique/primary index + if (is_unique_index) + { + key_info->rec_per_key[key_info->key_parts-1]= 1; + } } DBUG_VOID_RETURN; } === modified file 'sql/ha_ndbcluster_binlog.cc' --- a/sql/ha_ndbcluster_binlog.cc 2011-07-08 12:28:37 +0000 +++ b/sql/ha_ndbcluster_binlog.cc 2011-08-27 12:12:27 +0000 @@ -6299,8 +6299,8 @@ ndb_binlog_thread_handle_data_event(Ndb MY_BITMAP b; /* Potential buffer for the bitmap */ uint32 bitbuf[128 / (sizeof(uint32) * 8)]; - bitmap_init(&b, n_fields <= sizeof(bitbuf) * 8 ? bitbuf : NULL, - n_fields, FALSE); + const bool own_buffer = n_fields <= sizeof(bitbuf) * 8; + bitmap_init(&b, own_buffer ? bitbuf : NULL, n_fields, FALSE); bitmap_set_all(&b); /* @@ -6463,6 +6463,11 @@ ndb_binlog_thread_handle_data_event(Ndb my_free(blobs_buffer[1], MYF(MY_ALLOW_ZERO_PTR)); } + if (!own_buffer) + { + bitmap_free(&b); + } + return 0; } === modified file 'sql/ha_ndbinfo.cc' --- a/sql/ha_ndbinfo.cc 2011-06-30 15:59:25 +0000 +++ b/sql/ha_ndbinfo.cc 2011-08-27 09:54:26 +0000 @@ -367,6 +367,7 @@ int ha_ndbinfo::open(const char *name, i int err = g_ndbinfo->openTable(name, &m_impl.m_table); if (err) { + assert(m_impl.m_table == 0); if (err == NdbInfo::ERR_NoSuchTable) DBUG_RETURN(HA_ERR_NO_SUCH_TABLE); DBUG_RETURN(err2mysql(err)); @@ -390,6 +391,7 @@ int ha_ndbinfo::open(const char *name, i warn_incompatible(ndb_tab, true, "column '%s' is NOT NULL", field->field_name); + delete m_impl.m_table; m_impl.m_table= 0; DBUG_RETURN(ERR_INCOMPAT_TABLE_DEF); } @@ -427,6 +429,7 @@ int ha_ndbinfo::open(const char *name, i warn_incompatible(ndb_tab, true, "column '%s' is not compatible", field->field_name); + delete m_impl.m_table; m_impl.m_table= 0; DBUG_RETURN(ERR_INCOMPAT_TABLE_DEF); } } === modified file 'storage/ndb/include/util/SparseBitmask.hpp' --- a/storage/ndb/include/util/SparseBitmask.hpp 2010-08-28 09:37:09 +0000 +++ b/storage/ndb/include/util/SparseBitmask.hpp 2011-08-26 09:20:08 +0000 @@ -20,6 +20,7 @@ #include #include +#include class SparseBitmask { unsigned m_max_size; @@ -102,6 +103,11 @@ public: bool isclear() const { return count() == 0; } + unsigned getBitNo(unsigned n) const { + assert(n < m_vec.size()); + return m_vec[n]; + } + void print(void) const { for (unsigned i = 0; i < m_vec.size(); i++) { @@ -110,6 +116,38 @@ public: } } + bool equal(const SparseBitmask& obj) const { + if (obj.count() != count()) + return false; + + for (unsigned i = 0; iloadWorkers(); } + finalize_thr_map(); } // Check that all blocks could be created === modified file 'storage/ndb/src/kernel/vm/Configuration.cpp' --- a/storage/ndb/src/kernel/vm/Configuration.cpp 2011-07-04 13:37:56 +0000 +++ b/storage/ndb/src/kernel/vm/Configuration.cpp 2011-08-25 09:40:27 +0000 @@ -930,13 +930,16 @@ Configuration::setAllLockCPU(bool exec_t Uint32 i; for (i = 0; i < threadInfo.size(); i++) { - if (threadInfo[i].type != NotInUse) + if (threadInfo[i].type == NotInUse) + continue; + + bool run = + (exec_thread && threadInfo[i].type == MainThread) || + (!exec_thread && threadInfo[i].type != MainThread); + + if (run) { - if (setLockCPU(threadInfo[i].pThread, - threadInfo[i].type, - exec_thread, - FALSE)) - return; + setLockCPU(threadInfo[i].pThread, threadInfo[i].type); } } } @@ -966,11 +969,8 @@ Configuration::setRealtimeScheduler(NdbT int Configuration::setLockCPU(NdbThread * pThread, - enum ThreadTypes type, - bool exec_thread, - bool init) + enum ThreadTypes type) { - (void)init; Uint32 cpu_id; int tid = NdbThread_GetTid(pThread); if (tid == -1) @@ -981,9 +981,6 @@ Configuration::setLockCPU(NdbThread * pT We only set new lock CPU characteristics for the threads for which it has changed */ - if ((exec_thread && type != MainThread) || - (!exec_thread && type == MainThread)) - return 0; if (type == MainThread) cpu_id = executeLockCPU(); else @@ -1029,7 +1026,7 @@ Configuration::addThread(struct NdbThrea * main threads are set in ThreadConfig::ipControlLoop * as it's handled differently with mt */ - setLockCPU(pThread, type, (type == MainThread), TRUE); + setLockCPU(pThread, type); } return i; } === modified file 'storage/ndb/src/kernel/vm/Configuration.hpp' --- a/storage/ndb/src/kernel/vm/Configuration.hpp 2011-07-04 13:37:56 +0000 +++ b/storage/ndb/src/kernel/vm/Configuration.hpp 2011-08-25 09:40:27 +0000 @@ -86,10 +86,7 @@ public: void setAllRealtimeScheduler(); void setAllLockCPU(bool exec_thread); - int setLockCPU(NdbThread*, - enum ThreadTypes type, - bool exec_thread, - bool init); + int setLockCPU(NdbThread*, enum ThreadTypes type); int setRealtimeScheduler(NdbThread*, enum ThreadTypes type, bool real_time, === modified file 'storage/ndb/src/kernel/vm/Makefile.am' --- a/storage/ndb/src/kernel/vm/Makefile.am 2011-07-09 11:16:31 +0000 +++ b/storage/ndb/src/kernel/vm/Makefile.am 2011-08-29 10:15:59 +0000 @@ -121,4 +121,10 @@ testSafeMutex_LDFLAGS = @ndb_bin_am_ldfl $(top_builddir)/strings/libmystringslt.la \ @readline_link@ @TERMCAP_LIB@ +mt_thr_config_t_SOURCES = mt_thr_config.cpp +mt_thr_config_t_CXXFLAGS = -DTEST_MT_THR_CONFIG +mt_thr_config_t_LDADD = \ + $(top_builddir)/storage/ndb/src/common/util/libgeneral.la \ + $(top_builddir)/mysys/libmysyslt.la +noinst_PROGRAMS = mt_thr_config-t === modified file 'storage/ndb/src/kernel/vm/SimulatedBlock.cpp' --- a/storage/ndb/src/kernel/vm/SimulatedBlock.cpp 2011-07-09 11:16:31 +0000 +++ b/storage/ndb/src/kernel/vm/SimulatedBlock.cpp 2011-08-29 10:15:59 +0000 @@ -102,16 +102,7 @@ SimulatedBlock::SimulatedBlock(BlockNumb globalData.setBlock(blockNumber, mainBlock); } else { ndbrequire(mainBlock != 0); - if (mainBlock->theInstanceList == 0) { - mainBlock->theInstanceList = new SimulatedBlock* [MaxInstances]; - ndbrequire(mainBlock->theInstanceList != 0); - Uint32 i; - for (i = 0; i < MaxInstances; i++) - mainBlock->theInstanceList[i] = 0; - } - ndbrequire(theInstance < MaxInstances); - ndbrequire(mainBlock->theInstanceList[theInstance] == 0); - mainBlock->theInstanceList[theInstance] = this; + mainBlock->addInstance(this, theInstance); } theMainInstance = mainBlock; @@ -139,6 +130,23 @@ SimulatedBlock::SimulatedBlock(BlockNumb } void +SimulatedBlock::addInstance(SimulatedBlock* b, Uint32 theInstance) +{ + ndbrequire(theMainInstance == this); + ndbrequire(number() == b->number()); + if (theInstanceList == 0) + { + theInstanceList = new SimulatedBlock* [MaxInstances]; + ndbrequire(theInstanceList != 0); + for (Uint32 i = 0; i < MaxInstances; i++) + theInstanceList[i] = 0; + } + ndbrequire(theInstance < MaxInstances); + ndbrequire(theInstanceList[theInstance] == 0); + theInstanceList[theInstance] = b; +} + +void SimulatedBlock::initCommon() { Uint32 count = 10; === modified file 'storage/ndb/src/kernel/vm/SimulatedBlock.hpp' --- a/storage/ndb/src/kernel/vm/SimulatedBlock.hpp 2011-05-20 05:54:20 +0000 +++ b/storage/ndb/src/kernel/vm/SimulatedBlock.hpp 2011-08-29 10:15:59 +0000 @@ -168,6 +168,7 @@ public: return theInstanceList[instanceNumber]; return 0; } + void addInstance(SimulatedBlock* b, Uint32 theInstanceNo); virtual void loadWorkers() {} struct ThreadContext === modified file 'storage/ndb/src/kernel/vm/dummy_nonmt.cpp' --- a/storage/ndb/src/kernel/vm/dummy_nonmt.cpp 2011-06-30 15:59:25 +0000 +++ b/storage/ndb/src/kernel/vm/dummy_nonmt.cpp 2011-08-27 06:06:02 +0000 @@ -43,6 +43,12 @@ add_extra_worker_thr_map(Uint32, Uint32) assert(false); } +void +finalize_thr_map() +{ + assert(false); +} + Uint32 compute_jb_pages(struct EmulatorData*) { === modified file 'storage/ndb/src/kernel/vm/mt.cpp' --- a/storage/ndb/src/kernel/vm/mt.cpp 2011-05-20 05:54:20 +0000 +++ b/storage/ndb/src/kernel/vm/mt.cpp 2011-08-29 10:15:59 +0000 @@ -2381,49 +2381,6 @@ check_queues_empty(thr_data *selfptr) } /* - * Map instance number to real instance on this node. Used in - * sendlocal/sendprioa to find right thread and in execute_signals - * to find right block instance. SignalHeader is not modified. - */ - -static Uint8 g_map_instance[MAX_BLOCK_INSTANCES]; - -static void -map_instance_init() -{ - g_map_instance[0] = 0; - Uint32 ino; - for (ino = 1; ino < MAX_BLOCK_INSTANCES; ino++) - { - if (!globalData.isNdbMtLqh) - { - g_map_instance[ino] = 0; - } - else - { - require(num_lqh_workers != 0); - if (ino <= MAX_NDBMT_LQH_WORKERS) - { - g_map_instance[ino] = 1 + (ino - 1) % num_lqh_workers; - } - else - { - /* Extra workers are not mapped. */ - g_map_instance[ino] = ino; - } - } - } -} - -static inline Uint32 -map_instance(const SignalHeader *s) -{ - Uint32 ino = blockToInstance(s->theReceiversBlockNumber); - assert(ino < MAX_BLOCK_INSTANCES); - return g_map_instance[ino]; -} - -/* * Execute at most MAX_SIGNALS signals from one job queue, updating local read * state as appropriate. * @@ -2489,7 +2446,7 @@ execute_signals(thr_data *selfptr, thr_j NDB_PREFETCH_READ (read_buffer->m_data + read_pos + 32); } Uint32 bno = blockToMain(s->theReceiversBlockNumber); - Uint32 ino = map_instance(s); + Uint32 ino = blockToInstance(s->theReceiversBlockNumber); SimulatedBlock* block = globalData.mt_getBlock(bno, ino); assert(block != 0); @@ -2572,8 +2529,8 @@ run_job_buffers(thr_data *selfptr, Signa } struct thr_map_entry { - enum { NULL_THR_NO = 0xFFFF }; - Uint32 thr_no; + enum { NULL_THR_NO = 0xFF }; + Uint8 thr_no; thr_map_entry() : thr_no(NULL_THR_NO) {} }; @@ -2681,6 +2638,50 @@ add_extra_worker_thr_map(Uint32 block, U add_thr_map(block, instance, thr_no); } +/** + * create the duplicate entries needed so that + * sender doesnt need to know how many instances there + * actually are in this node... + * + * if only 1 instance...then duplicate that for all slots + * else assume instance 0 is proxy...and duplicate workers (modulo) + * + * NOTE: extra pgman worker is instance 5 + */ +void +finalize_thr_map() +{ + for (Uint32 b = 0; b < NO_OF_BLOCKS; b++) + { + Uint32 bno = b + MIN_BLOCK_NO; + Uint32 cnt = 0; + while (cnt < MAX_BLOCK_INSTANCES && + thr_map[b][cnt].thr_no != thr_map_entry::NULL_THR_NO) + cnt++; + + if (cnt != MAX_BLOCK_INSTANCES) + { + SimulatedBlock * main = globalData.getBlock(bno, 0); + for (Uint32 i = cnt; i < MAX_BLOCK_INSTANCES; i++) + { + Uint32 dup = (cnt == 1) ? 0 : 1 + ((i - 1) % (cnt - 1)); + if (thr_map[b][i].thr_no == thr_map_entry::NULL_THR_NO) + { + thr_map[b][i] = thr_map[b][dup]; + main->addInstance(globalData.getBlock(bno, dup), i); + } + else + { + /** + * extra pgman instance + */ + require(bno == PGMAN); + } + } + } + } +} + static void reportSignalStats(Uint32 self, Uint32 a_count, Uint32 a_size, Uint32 b_count, Uint32 b_size) { @@ -3120,7 +3121,7 @@ sendlocal(Uint32 self, const SignalHeade const Uint32 secPtr[3]) { Uint32 block = blockToMain(s->theReceiversBlockNumber); - Uint32 instance = map_instance(s); + Uint32 instance = blockToInstance(s->theReceiversBlockNumber); /* * Max number of signals to put into job buffer before flushing the buffer @@ -3156,7 +3157,7 @@ sendprioa(Uint32 self, const SignalHeade const Uint32 secPtr[3]) { Uint32 block = blockToMain(s->theReceiversBlockNumber); - Uint32 instance = map_instance(s); + Uint32 instance = blockToInstance(s->theReceiversBlockNumber); Uint32 dst = block2ThreadId(block, instance); struct thr_repository* rep = &g_thr_repository; @@ -3507,7 +3508,6 @@ ThreadConfig::init() ndbout << "NDBMT: num_threads=" << num_threads << endl; - ::map_instance_init(); ::rep_init(&g_thr_repository, num_threads, globalEmulatorData.m_mem_manager); } === modified file 'storage/ndb/src/kernel/vm/mt.hpp' --- a/storage/ndb/src/kernel/vm/mt.hpp 2011-06-30 15:59:25 +0000 +++ b/storage/ndb/src/kernel/vm/mt.hpp 2011-08-27 06:06:02 +0000 @@ -34,6 +34,7 @@ void add_thr_map(Uint32 block, Uint32 in void add_main_thr_map(); void add_lqh_worker_thr_map(Uint32 block, Uint32 instance); void add_extra_worker_thr_map(Uint32 block, Uint32 instance); +void finalize_thr_map(); void sendlocal(Uint32 self, const struct SignalHeader *s, const Uint32 *data, const Uint32 secPtr[3]); === added file 'storage/ndb/src/kernel/vm/mt_thr_config.cpp' --- a/storage/ndb/src/kernel/vm/mt_thr_config.cpp 1970-01-01 00:00:00 +0000 +++ b/storage/ndb/src/kernel/vm/mt_thr_config.cpp 2011-08-26 09:57:03 +0000 @@ -0,0 +1,966 @@ +/* + Copyright (c) 2011, Oracle and/or its affiliates. All rights reserved. + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; version 2 of the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA +*/ + +#include "mt_thr_config.hpp" +#include +#include "../../common/util/parse_mask.hpp" + + +static const struct THRConfig::Entries m_entries[] = +{ + // name type min max + { "main", THRConfig::T_MAIN, 1, 1 }, + { "ldm", THRConfig::T_LDM, 1, MAX_NDBMT_LQH_THREADS }, + { "recv", THRConfig::T_RECV, 1, 1 }, + { "rep", THRConfig::T_REP, 1, 1 }, + { "maint", THRConfig::T_MAINT, 1, 1 } +}; + +static const struct THRConfig::Param m_params[] = +{ + { "count", THRConfig::Param::S_UNSIGNED }, + { "cpubind", THRConfig::Param::S_BITMASK }, + { "cpuset", THRConfig::Param::S_BITMASK } +}; + +#define IX_COUNT 0 +#define IX_CPUBOUND 1 +#define IX_CPUSET 2 + +static +unsigned +getMaxEntries(Uint32 type) +{ + for (Uint32 i = 0; i 1) + { + unsigned no = createCpuSet(m_LockMaintThreadsToCPU); + m_threads[T_MAINT][0].m_bind_type = T_Thread::B_CPUSET_BOUND; + m_threads[T_MAINT][0].m_bind_no = no; + } + + /** + * Check that no cpu_sets overlap + */ + for (unsigned i = 0; i= num_threads) + { + m_info_msg.appfmt("Assigning each thread its own CPU\n"); + unsigned no = 0; + for (unsigned i = 0; i < NDB_ARRAY_SIZE(m_threads); i++) + { + if (i == T_MAINT) + continue; + for (unsigned j = 0; j < m_threads[i].size(); j++) + { + if (m_threads[i][j].m_bind_type == T_Thread::B_UNBOUND) + { + m_threads[i][j].m_bind_type = T_Thread::B_CPU_BOUND; + m_threads[i][j].m_bind_no = mask.getBitNo(no); + no++; + } + } + } + } + else if (cnt == 1) + { + unsigned cpu = mask.getBitNo(0); + m_info_msg.appfmt("Assigning all threads to CPU %u\n", cpu); + for (unsigned i = 0; i < NDB_ARRAY_SIZE(m_threads); i++) + { + if (i == T_MAINT) + continue; + bind_unbound(m_threads[i], cpu); + } + } + else if (isMtLqh) + { + unsigned unbound_ldm = count_unbound(m_threads[T_LDM]); + if (cnt > unbound_ldm) + { + /** + * let each LQH have it's own CPU and rest share... + */ + m_info_msg.append("Assigning LQH threads to dedicated CPU(s) and " + "other threads will share remaining\n"); + unsigned cpu = mask.find(0); + for (unsigned i = 0; i < m_threads[T_LDM].size(); i++) + { + if (m_threads[T_LDM][i].m_bind_type == T_Thread::B_UNBOUND) + { + m_threads[T_LDM][i].m_bind_type = T_Thread::B_CPU_BOUND; + m_threads[T_LDM][i].m_bind_no = cpu; + mask.clear(cpu); + cpu = mask.find(cpu + 1); + } + } + + cpu = mask.find(0); + bind_unbound(m_threads[T_MAIN], cpu); + bind_unbound(m_threads[T_REP], cpu); + if ((cpu = mask.find(cpu + 1)) == mask.NotFound) + { + cpu = mask.find(0); + } + bind_unbound(m_threads[T_RECV], cpu); + } + else + { + // put receiver, tc, backup/suma in 1 thread, + // and round robin LQH for rest + unsigned cpu = mask.find(0); + m_info_msg.appfmt("Assigning LQH threads round robin to CPU(s) and " + "other threads will share CPU %u\n", cpu); + bind_unbound(m_threads[T_MAIN], cpu); // TC + bind_unbound(m_threads[T_REP], cpu); + bind_unbound(m_threads[T_RECV], cpu); + mask.clear(cpu); + + cpu = mask.find(0); + for (unsigned i = 0; i < m_threads[T_LDM].size(); i++) + { + if (m_threads[T_LDM][i].m_bind_type == T_Thread::B_UNBOUND) + { + m_threads[T_LDM][i].m_bind_type = T_Thread::B_CPU_BOUND; + m_threads[T_LDM][i].m_bind_no = cpu; + if ((cpu = mask.find(cpu + 1)) == mask.NotFound) + { + cpu = mask.find(0); + } + } + } + } + } + else + { + unsigned cpu = mask.find(0); + m_info_msg.appfmt("Assigning LQH thread to CPU %u and " + "other threads will share\n", cpu); + bind_unbound(m_threads[T_LDM], cpu); + cpu = mask.find(cpu + 1); + bind_unbound(m_threads[T_MAIN], cpu); + bind_unbound(m_threads[T_RECV], cpu); + } + } + + return 0; +} + +unsigned +THRConfig::count_unbound(const Vector& vec) const +{ + unsigned cnt = 0; + for (unsigned i = 0; i < vec.size(); i++) + { + if (vec[i].m_bind_type == T_Thread::B_UNBOUND) + cnt ++; + } + return cnt; +} + +void +THRConfig::bind_unbound(Vector& vec, unsigned cpu) +{ + for (unsigned i = 0; i < vec.size(); i++) + { + if (vec[i].m_bind_type == T_Thread::B_UNBOUND) + { + vec[i].m_bind_type = T_Thread::B_CPU_BOUND; + vec[i].m_bind_no = cpu; + } + } +} + +int +THRConfig::do_validate() +{ + /** + * Check that there aren't too many of any thread type + */ + for (unsigned i = 0; i< NDB_ARRAY_SIZE(m_threads); i++) + { + if (m_threads[i].size() > getMaxEntries(i)) + { + m_err_msg.assfmt("Too many instances(%u) of %s max supported: %u", + m_threads[i].size(), + getEntryName(i), + getMaxEntries(i)); + return -1; + } + } + + /** + * LDM can be 1 2 4 + */ + if (m_threads[T_LDM].size() == 3) + { + m_err_msg.assfmt("No of LDM-instances can be 1,2,4. Specified: %u", + m_threads[T_LDM].size()); + return -1; + } + + return 0; +} + +const char * +THRConfig::getConfigString() +{ + m_cfg_string.clear(); + const char * sep = ""; + for (unsigned i = 0; i < NDB_ARRAY_SIZE(m_threads); i++) + { + if (m_threads[i].size()) + { + const char * name = getEntryName(i); + if (i != T_MAINT) + { + for (unsigned j = 0; j < m_threads[i].size(); j++) + { + m_cfg_string.append(sep); + sep=","; + m_cfg_string.append(name); + if (m_threads[i][j].m_bind_type != T_Thread::B_UNBOUND) + { + m_cfg_string.append("={"); + if (m_threads[i][j].m_bind_type == T_Thread::B_CPU_BOUND) + { + m_cfg_string.appfmt("cpubind=%u", m_threads[i][j].m_bind_no); + } + else if (m_threads[i][j].m_bind_type == T_Thread::B_CPUSET_BOUND) + { + m_cfg_string.appfmt("cpuset=%s", + m_cpu_sets[m_threads[i][j].m_bind_no].str().c_str()); + } + m_cfg_string.append("}"); + } + } + } + else + { + for (unsigned j = 0; j < m_threads[i].size(); j++) + { + if (m_threads[i][j].m_bind_type != T_Thread::B_UNBOUND) + { + m_cfg_string.append(sep); + sep=","; + m_cfg_string.append(name); + m_cfg_string.append("={"); + if (m_threads[i][j].m_bind_type == T_Thread::B_CPU_BOUND) + { + m_cfg_string.appfmt("cpubind=%u", m_threads[i][j].m_bind_no); + } + else if (m_threads[i][j].m_bind_type == T_Thread::B_CPUSET_BOUND) + { + m_cfg_string.appfmt("cpuset=%s", + m_cpu_sets[m_threads[i][j].m_bind_no].str().c_str()); + } + m_cfg_string.append("}"); + } + } + } + } + } + return m_cfg_string.c_str(); +} + +const char * +THRConfig::getErrorMessage() const +{ + return m_err_msg.c_str(); +} + +const char * +THRConfig::getInfoMessage() const +{ + return m_info_msg.c_str(); +} + +static +char * +skipblank(char * str) +{ + while (isblank(* str)) + str++; + return str; +} + +Uint32 +THRConfig::find_type(char *& str) +{ + str = skipblank(str); + + char * name = str; + if (* name == 0) + { + m_err_msg.assfmt("empty thread specification"); + return 0; + } + char * end = name; + while(isalpha(* end)) + end++; + + char save = * end; + * end = 0; + Uint32 t = getEntryType(name); + if (t == T_END) + { + m_err_msg.assfmt("unknown thread type '%s'", name); + } + * end = save; + str = end; + return t; +} + +struct ParamValue +{ + ParamValue() { found = false;} + bool found; + const char * string_val; + unsigned unsigned_val; + SparseBitmask mask_val; +}; + +static +int +parseUnsigned(char *& str, unsigned * dst) +{ + str = skipblank(str); + char * endptr = 0; + errno = 0; + long val = strtoll(str, &endptr, 0); + if (errno == ERANGE) + return -1; + if (val < 0 || Int64(val) > 0xFFFFFFFF) + return -1; + if (endptr == str) + return -1; + str = endptr; + *dst = (unsigned)val; + return 0; +} + +static +int +parseBitmask(char *& str, SparseBitmask * mask) +{ + str = skipblank(str); + size_t len = strspn(str, "0123456789-, "); + if (len == 0) + return -1; + + while (isblank(str[len-1])) + len--; + if (str[len-1] == ',') + len--; + char save = str[len]; + str[len] = 0; + int res = parse_mask(str, *mask); + str[len] = save; + str = str + len; + return res; +} + +static +int +parseParams(char * str, ParamValue values[], BaseString& err) +{ + const char * const save = str; + while (* str) + { + str = skipblank(str); + + unsigned idx = 0; + for (; idx < NDB_ARRAY_SIZE(m_params); idx++) + { + if (strncasecmp(str, m_params[idx].name, strlen(m_params[idx].name)) == 0) + { + str += strlen(m_params[idx].name); + break; + } + } + + if (idx == NDB_ARRAY_SIZE(m_params)) + { + err.assfmt("Unknown param near: '%s'", str); + return -1; + } + + if (values[idx].found == true) + { + err.assfmt("Param '%s' found twice", m_params[idx].name); + return -1; + } + + str = skipblank(str); + if (* str != '=') + { + err.assfmt("Missing '=' after %s in '%s'", m_params[idx].name, save); + return -1; + } + str++; + str = skipblank(str); + + int res = 0; + switch(m_params[idx].type){ + case THRConfig::Param::S_UNSIGNED: + res = parseUnsigned(str, &values[idx].unsigned_val); + break; + case THRConfig::Param::S_BITMASK: + res = parseBitmask(str, &values[idx].mask_val); + break; + default: + err.assfmt("Internal error, unknown type for param: '%s'", + m_params[idx].name); + return -1; + } + if (res == -1) + { + err.assfmt("Unable to parse %s=%s", m_params[idx].name, str); + return -1; + } + values[idx].found = true; + str = skipblank(str); + + if (* str == 0) + break; + + if (* str != ',') + { + err.assfmt("Unable to parse near '%s'", str); + return -1; + } + str++; + } + return 0; +} + +int +THRConfig::find_spec(char *& str, T_Type type) +{ + str = skipblank(str); + + switch(* str){ + case ',': + case 0: + add(type); + return 0; + } + + if (* str != '=') + { +err: + int len = (int)strlen(str); + m_err_msg.assfmt("Invalid format near: '%.*s'", + (len > 10) ? 10 : len, str); + return -1; + } + + str++; // skip over = + str = skipblank(str); + + if (* str != '{') + { + goto err; + } + + str++; + char * start = str; + + /** + * Find end + */ + while (* str && (* str) != '}') + str++; + + if (* str != '}') + { + goto err; + } + + char * end = str; + char save = * end; + * end = 0; + + ParamValue values[NDB_ARRAY_SIZE(m_params)]; + values[IX_COUNT].unsigned_val = 1; + int res = parseParams(start, values, m_err_msg); + * end = save; + + if (res != 0) + { + return -1; + } + + if (values[IX_CPUBOUND].found && values[IX_CPUSET].found) + { + m_err_msg.assfmt("Both cpuset and cpubind specified!"); + return -1; + } + + unsigned cnt = values[IX_COUNT].unsigned_val; + const int index = m_threads[type].size(); + for (unsigned i = 0; i < cnt; i++) + { + add(type); + } + + assert(m_threads[type].size() == index + cnt); + if (values[IX_CPUSET].found) + { + SparseBitmask & mask = values[IX_CPUSET].mask_val; + unsigned no = createCpuSet(mask); + for (unsigned i = 0; i < cnt; i++) + { + m_threads[type][index+i].m_bind_type = T_Thread::B_CPUSET_BOUND; + m_threads[type][index+i].m_bind_no = no; + } + } + else if (values[IX_CPUBOUND].found) + { + SparseBitmask & mask = values[IX_CPUBOUND].mask_val; + if (mask.count() < cnt) + { + m_err_msg.assfmt("%s: trying to bind %u threads to %u cpus [%s]", + getEntryName(type), + cnt, + mask.count(), + mask.str().c_str()); + return -1; + } + for (unsigned i = 0; i < cnt; i++) + { + m_threads[type][index+i].m_bind_type = T_Thread::B_CPU_BOUND; + m_threads[type][index+i].m_bind_no = mask.getBitNo(i % mask.count()); + } + } + + str++; // skip over } + return 0; +} + +int +THRConfig::find_next(char *& str) +{ + str = skipblank(str); + + if (* str == 0) + { + return 0; + } + else if (* str == ',') + { + str++; + return 1; + } + + int len = (int)strlen(str); + m_err_msg.assfmt("Invalid format near: '%.*s'", + (len > 10) ? 10 : len, str); + return -1; +} + +int +THRConfig::do_parse(const char * ThreadConfig) +{ + BaseString str(ThreadConfig); + char * ptr = (char*)str.c_str(); + while (* ptr) + { + Uint32 type = find_type(ptr); + if (type == T_END) + return -1; + + if (find_spec(ptr, (T_Type)type) < 0) + return -1; + + int ret = find_next(ptr); + if (ret < 0) + return ret; + + if (ret == 0) + break; + } + + for (Uint32 i = 0; i < T_END; i++) + { + while (m_threads[i].size() < m_entries[i].m_min_cnt) + add((T_Type)i); + } + + return do_bindings() || do_validate(); +} + +unsigned +THRConfig::createCpuSet(const SparseBitmask& mask) +{ + for (size_t i = 0; i < m_cpu_sets.size(); i++) + if (m_cpu_sets[i].equal(mask)) + return i; + + m_cpu_sets.push_back(mask); + return m_cpu_sets.size() - 1; +} + +template class Vector; +template class Vector; + +#ifdef TEST_MT_THR_CONFIG + +#include + +TAPTEST(mt_thr_config) +{ + { + THRConfig tmp; + OK(tmp.do_parse(8, 0, 0) == 0); + } + + /** + * BASIC test + */ + { + const char * ok[] = + { + "ldm,ldm", + "ldm={count=3},ldm", + "ldm={cpubind=1-2,5,count=3},ldm", + "ldm={ cpubind = 1- 2, 5 , count = 3 },ldm", + "ldm={count=3,cpubind=1-2,5 }, ldm", + "ldm={cpuset=1-3,count=3 },ldm", + "main,ldm={},ldm", + 0 + }; + + const char * fail [] = + { + "ldm,ldm,ldm", + "ldm={cpubind= 1 , cpuset=2 },ldm", + "ldm={count=4,cpubind=1-3},ldm", + "main,main,ldm,ldm", + "main={ keso=88, count=23},ldm,ldm", + "main={ cpuset=1-3 }, ldm={cpuset=3-4}", + "main={ cpuset=1-3 }, ldm={cpubind=2}", + 0 + }; + + for (Uint32 i = 0; ok[i]; i++) + { + THRConfig tmp; + int res = tmp.do_parse(ok[i]); + printf("do_parse(%s) => %s - %s\n", ok[i], + res == 0 ? "OK" : "FAIL", + res == 0 ? "" : tmp.getErrorMessage()); + OK(res == 0); + { + BaseString out(tmp.getConfigString()); + THRConfig check; + OK(check.do_parse(out.c_str()) == 0); + OK(strcmp(out.c_str(), check.getConfigString()) == 0); + } + } + + for (Uint32 i = 0; fail[i]; i++) + { + THRConfig tmp; + int res = tmp.do_parse(fail[i]); + printf("do_parse(%s) => %s - %s\n", fail[i], + res == 0 ? "OK" : "FAIL", + res == 0 ? "" : tmp.getErrorMessage()); + OK(res != 0); + } + } + + { + /** + * Test interaction with LockExecuteThreadToCPU + */ + const char * t[] = + { + /** threads, LockExecuteThreadToCPU, answer */ + "1-8", + "ldm={count=4}", + "main={cpubind=1},ldm={cpubind=2},ldm={cpubind=3},ldm={cpubind=4},ldm={cpubind=5},recv={cpubind=6},rep={cpubind=7}", + + "1-5", + "ldm={count=4}", + "main={cpubind=5},ldm={cpubind=1},ldm={cpubind=2},ldm={cpubind=3},ldm={cpubind=4},recv={cpubind=5},rep={cpubind=5}", + + "1-3", + "ldm={count=4}", + "main={cpubind=1},ldm={cpubind=2},ldm={cpubind=3},ldm={cpubind=2},ldm={cpubind=3},recv={cpubind=1},rep={cpubind=1}", + + "1-4", + "ldm={count=4}", + "main={cpubind=1},ldm={cpubind=2},ldm={cpubind=3},ldm={cpubind=4},ldm={cpubind=2},recv={cpubind=1},rep={cpubind=1}", + + "1-8", + "ldm={count=4},maint={cpubind=8}", + "main={cpubind=1},ldm={cpubind=2},ldm={cpubind=3},ldm={cpubind=4},ldm={cpubind=5},recv={cpubind=6},rep={cpubind=7},maint={cpubind=8}", + + "1-8", + "ldm={count=4,cpubind=1,4,5,6}", + "main={cpubind=2},ldm={cpubind=1},ldm={cpubind=4},ldm={cpubind=5},ldm={cpubind=6},recv={cpubind=3},rep={cpubind=7}", + + // END + 0 + }; + + for (unsigned i = 0; t[i]; i+= 3) + { + THRConfig tmp; + tmp.setLockExecuteThreadToCPU(t[i+0]); + int res = tmp.do_parse(t[i+1]); + int ok = strcmp(tmp.getConfigString(), t[i+2]) == 0; + printf("mask: %s conf: %s => %s(%s) - %s - %s\n", + t[i+0], + t[i+1], + res == 0 ? "OK" : "FAIL", + res == 0 ? "" : tmp.getErrorMessage(), + tmp.getConfigString(), + ok == 1 ? "CORRECT" : "INCORRECT"); + OK(res == 0); + OK(ok == 1); + } + } + + return 1; +} + +#endif === added file 'storage/ndb/src/kernel/vm/mt_thr_config.hpp' --- a/storage/ndb/src/kernel/vm/mt_thr_config.hpp 1970-01-01 00:00:00 +0000 +++ b/storage/ndb/src/kernel/vm/mt_thr_config.hpp 2011-08-26 09:57:03 +0000 @@ -0,0 +1,125 @@ +/* + Copyright (c) 2011, Oracle and/or its affiliates. All rights reserved. + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; version 2 of the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA +*/ + +#ifndef THRConfig_H +#define THRConfig_H + +struct NdbThread; +#include +#include +#include + +/** + * This class contains thread configuration + * it supports parsing the ThreadConfig parameter + * and handling LockExecuteThreadToCPU etc... + * + * This is used in ndb_mgmd when verifying configuration + * and by ndbmtd + * + * TAP-TESTS are provided in mt_thr_config.cpp + */ +class THRConfig +{ +public: + enum T_Type + { + T_MAIN = 0, /* DIH/QMGR/TC/SPJ etc */ + T_LDM = 1, /* LQH/ACC/TUP/TUX etc */ + T_RECV = 2, /* CMVMI */ + T_REP = 3, /* SUMA */ + T_MAINT = 4, /* FS, SocketServer etc */ + + T_END = 5 + }; + + THRConfig(); + ~THRConfig(); + + // NOTE: needs to be called before do_parse + int setLockExecuteThreadToCPU(const char * val); + int setLockMaintThreadsToCPU(unsigned val); + + int do_parse(const char * ThreadConfig); + int do_parse(unsigned MaxNoOfExecutionThreads, + unsigned __ndbmt_lqh_workers, + unsigned __ndbmt_classic); + + const char * getConfigString(); + + const char * getErrorMessage() const; + const char * getInfoMessage() const; + +private: + struct T_Thread + { + unsigned m_type; + unsigned m_no; // within type + enum BType { B_UNBOUND, B_CPU_BOUND, B_CPUSET_BOUND } m_bind_type; + unsigned m_bind_no; // cpu_no/cpuset_no + }; + bool m_classic; + SparseBitmask m_LockExecuteThreadToCPU; + SparseBitmask m_LockMaintThreadsToCPU; + Vector m_cpu_sets; + Vector m_threads[T_END]; + + BaseString m_err_msg; + BaseString m_info_msg; + BaseString m_cfg_string; + BaseString m_print_string; + + void add(T_Type); + Uint32 find_type(char *&); + int find_spec(char *&, T_Type); + int find_next(char *&); + + unsigned createCpuSet(const SparseBitmask&); + int do_bindings(); + int do_validate(); + + unsigned count_unbound(const Vector& vec) const; + void bind_unbound(Vector & vec, unsigned cpu); + +public: + struct Entries + { + const char * m_name; + unsigned m_type; + unsigned m_min_cnt; + unsigned m_max_cnt; + }; + + struct Param + { + const char * name; + enum { S_UNSIGNED, S_BITMASK } type; + }; +}; + +/** + * This class is used by ndbmtd + * when setting up threads (and locking) + */ +class THRConfigApplier : public THRConfig +{ +public: + int create_cpusets(); + int do_bind(unsigned t_type, unsigned no, NdbThread*); +}; + +#endif // IPCConfig_H No bundle (reason: useless for push emails).