From: Jan Wedvik Date: September 1 2011 12:23pm Subject: bzr push into mysql-5.1-telco-7.0-spj-scan-vs-scan branch (jan.wedvik:3544 to 3546) List-Archive: http://lists.mysql.com/commits/140878 Message-Id: <20110901122331.D8F9722E@fimafeng09.norway.sun.com> MIME-Version: 1.0 Content-Type: text/plain; charset="us-ascii" Content-Transfer-Encoding: 7bit 3546 Jan Wedvik 2011-09-01 Added regression test for mysql-5.1-telco-7.0 revno 4491 (fix for pushed scan-scan queries with large records). modified: mysql-test/suite/ndb/r/ndb_join_pushdown.result mysql-test/suite/ndb/t/ndb_join_pushdown.test 3545 Jan Wedvik 2011-09-01 [merge] Merged form mysql-5.1-telco-7.0 modified: storage/ndb/include/kernel/signaldata/QueryTree.hpp storage/ndb/include/mgmapi/mgmapi_config_parameters.h storage/ndb/src/kernel/blocks/dbspj/DbspjMain.cpp storage/ndb/src/kernel/ndbd.cpp storage/ndb/src/kernel/vm/CMakeLists.txt storage/ndb/src/kernel/vm/Configuration.cpp storage/ndb/src/kernel/vm/Configuration.hpp storage/ndb/src/kernel/vm/Makefile.am storage/ndb/src/kernel/vm/mt.cpp storage/ndb/src/kernel/vm/mt_thr_config.cpp storage/ndb/src/kernel/vm/mt_thr_config.hpp storage/ndb/src/ndbapi/NdbQueryOperation.cpp 3544 Jonas Oreland 2011-08-29 [merge] ndb - merge 70 to 70-spj-scan-scan added: storage/ndb/src/kernel/vm/mt_thr_config.cpp storage/ndb/src/kernel/vm/mt_thr_config.hpp modified: mysql-test/suite/ndb/r/ndb_index_stat.result mysql-test/suite/ndb/r/ndb_statistics1.result mysql-test/suite/ndb/t/ndb_index_stat.test sql/ha_ndbcluster.cc sql/ha_ndbcluster_binlog.cc sql/ha_ndbinfo.cc storage/ndb/include/util/SparseBitmask.hpp storage/ndb/src/kernel/SimBlockList.cpp storage/ndb/src/kernel/vm/Configuration.cpp storage/ndb/src/kernel/vm/Configuration.hpp storage/ndb/src/kernel/vm/Makefile.am storage/ndb/src/kernel/vm/SimulatedBlock.cpp storage/ndb/src/kernel/vm/SimulatedBlock.hpp storage/ndb/src/kernel/vm/dummy_nonmt.cpp storage/ndb/src/kernel/vm/mt.cpp storage/ndb/src/kernel/vm/mt.hpp === modified file 'mysql-test/suite/ndb/r/ndb_join_pushdown.result' --- a/mysql-test/suite/ndb/r/ndb_join_pushdown.result 2011-08-22 08:50:01 +0000 +++ b/mysql-test/suite/ndb/r/ndb_join_pushdown.result 2011-09-01 12:22:14 +0000 @@ -5178,6 +5178,55 @@ pw 4 4 5 wh 1 1 2 drop table t1; drop table t2; +create table t1 ( +a char(10) primary key, +b char(10) not null, +c char(10) not null, +l00 char(255) not null, +l01 char(255) not null, +l02 char(255) not null, +l03 char(255) not null, +l04 char(255) not null, +l05 char(255) not null, +l06 char(255) not null, +l07 char(255) not null, +l08 char(255) not null, +l09 char(255) not null, +l10 char(255) not null, +l11 char(255) not null, +l12 char(255) not null, +l13 char(255) not null, +l14 char(255) not null, +l15 char(255) not null, +l16 char(255) not null, +l17 char(255) not null, +l18 char(255) not null, +l19 char(255) not null, +l20 char(255) not null, +l21 char(255) not null, +l22 char(255) not null, +l23 char(255) not null, +l24 char(255) not null, +l25 char(255) not null, +l26 char(255) not null, +l27 char(255) not null, +l28 char(255) not null, +l29 char(255) not null, +l30 char(255) not null, +l31 char(255) not null, +index(c, b) +) engine=ndb partition by key(a) partitions 8; +insert into t1 values ('a','a','a','x','x','x','x','x','x','x','x','x','x','x','x','x','x','x','x','x','x','x','x','x','x','x','x','x','x','x','x','x','x','x','x'); +insert into t1 values ('b','b','b','x','x','x','x','x','x','x','x','x','x','x','x','x','x','x','x','x','x','x','x','x','x','x','x','x','x','x','x','x','x','x','x'); +insert into t1 values ('c','c','c','x','x','x','x','x','x','x','x','x','x','x','x','x','x','x','x','x','x','x','x','x','x','x','x','x','x','x','x','x','x','x','x'); +explain select count(*) from t1 as x1 join t1 as x2 on x1.b = x2.c; +id select_type table type possible_keys key key_len ref rows Extra +1 SIMPLE x1 ALL NULL NULL NULL NULL 3 Parent of 2 pushed join@1 +1 SIMPLE x2 ref c c 10 test.x1.b 1 Child of 'x1' in pushed join@1 +select count(*) from t1 as x1 join t1 as x2 on x1.b = x2.c; +count(*) +3 +drop table t1; create temporary table spj_counts_at_end select counter_name, sum(val) as val from ndbinfo.counters @@ -5193,13 +5242,13 @@ and spj_counts_at_end.counter_name <> 'L and spj_counts_at_end.counter_name <> 'SCAN_BATCHES_RETURNED'; counter_name spj_counts_at_end.val - spj_counts_at_startup.val CONST_PRUNED_RANGE_SCANS_RECEIVED 6 -LOCAL_TABLE_SCANS_SENT 236 +LOCAL_TABLE_SCANS_SENT 244 PRUNED_RANGE_SCANS_RECEIVED 25 RANGE_SCANS_RECEIVED 720 READS_NOT_FOUND 6616 READS_RECEIVED 52 -SCAN_ROWS_RETURNED 76380 -TABLE_SCANS_RECEIVED 236 +SCAN_ROWS_RETURNED 76386 +TABLE_SCANS_RECEIVED 244 select sum(spj_counts_at_end.val - spj_counts_at_startup.val) as 'LOCAL+REMOTE READS_SENT' from spj_counts_at_end, spj_counts_at_startup where spj_counts_at_end.counter_name = spj_counts_at_startup.counter_name @@ -5211,15 +5260,15 @@ drop table spj_save_counts; drop table spj_counts_at_startup; drop table spj_counts_at_end; scan_count -2520 +2524 pruned_scan_count 8 sorted_scan_count 10 pushed_queries_defined -383 +385 pushed_queries_dropped 11 pushed_queries_executed -534 +535 set ndb_join_pushdown = @save_ndb_join_pushdown; === modified file 'mysql-test/suite/ndb/t/ndb_join_pushdown.test' --- a/mysql-test/suite/ndb/t/ndb_join_pushdown.test 2011-06-28 14:27:57 +0000 +++ b/mysql-test/suite/ndb/t/ndb_join_pushdown.test 2011-09-01 12:22:14 +0000 @@ -3641,6 +3641,63 @@ drop table t1; drop table t2; ######################################## +# Test query with very long records. +connection ddl; +create table t1 ( + a char(10) primary key, + b char(10) not null, + c char(10) not null, + l00 char(255) not null, + l01 char(255) not null, + l02 char(255) not null, + l03 char(255) not null, + l04 char(255) not null, + l05 char(255) not null, + l06 char(255) not null, + l07 char(255) not null, + l08 char(255) not null, + l09 char(255) not null, + l10 char(255) not null, + l11 char(255) not null, + l12 char(255) not null, + l13 char(255) not null, + l14 char(255) not null, + l15 char(255) not null, + l16 char(255) not null, + l17 char(255) not null, + l18 char(255) not null, + l19 char(255) not null, + l20 char(255) not null, + l21 char(255) not null, + l22 char(255) not null, + l23 char(255) not null, + l24 char(255) not null, + l25 char(255) not null, + l26 char(255) not null, + l27 char(255) not null, + l28 char(255) not null, + l29 char(255) not null, + l30 char(255) not null, + l31 char(255) not null, + index(c, b) +) engine=ndb partition by key(a) partitions 8; + +connection spj; + +insert into t1 values ('a','a','a','x','x','x','x','x','x','x','x','x','x','x','x','x','x','x','x','x','x','x','x','x','x','x','x','x','x','x','x','x','x','x','x'); + +insert into t1 values ('b','b','b','x','x','x','x','x','x','x','x','x','x','x','x','x','x','x','x','x','x','x','x','x','x','x','x','x','x','x','x','x','x','x','x'); + +insert into t1 values ('c','c','c','x','x','x','x','x','x','x','x','x','x','x','x','x','x','x','x','x','x','x','x','x','x','x','x','x','x','x','x','x','x','x','x'); + +explain select count(*) from t1 as x1 join t1 as x2 on x1.b = x2.c; + +select count(*) from t1 as x1 join t1 as x2 on x1.b = x2.c; + +connection ddl; +drop table t1; + +######################################## # Verify DBSPJ counters for entire test: # Note: These tables are 'temporary' withing 'connection spj' === modified file 'storage/ndb/include/kernel/signaldata/QueryTree.hpp' --- a/storage/ndb/include/kernel/signaldata/QueryTree.hpp 2011-05-04 11:45:33 +0000 +++ b/storage/ndb/include/kernel/signaldata/QueryTree.hpp 2011-09-01 11:46:45 +0000 @@ -271,9 +271,11 @@ struct QN_ScanIndexParameters { Uint32 len; Uint32 requestInfo; - Uint32 batchSize; // (bytes << 16) | (rows) + Uint32 batchSize; // (bytes << 11) | (rows) Uint32 resultData; // Api connect ptr STATIC_CONST ( NodeSize = 4 ); + // Number of bits for representing row count in 'batchSize'. + STATIC_CONST ( BatchRowBits = 11 ); enum ScanIndexParamBits { === modified file 'storage/ndb/include/mgmapi/mgmapi_config_parameters.h' --- a/storage/ndb/include/mgmapi/mgmapi_config_parameters.h 2011-07-04 13:37:56 +0000 +++ b/storage/ndb/include/mgmapi/mgmapi_config_parameters.h 2011-08-30 09:40:52 +0000 @@ -195,6 +195,7 @@ #define CFG_DB_INDEX_STAT_UPDATE_DELAY 626 #define CFG_DB_MAX_DML_OPERATIONS_PER_TRANSACTION 627 +#define CFG_DB_MT_THREAD_CONFIG 628 #define CFG_NODE_ARBIT_RANK 200 #define CFG_NODE_ARBIT_DELAY 201 === modified file 'storage/ndb/src/kernel/blocks/dbspj/DbspjMain.cpp' --- a/storage/ndb/src/kernel/blocks/dbspj/DbspjMain.cpp 2011-08-22 13:04:16 +0000 +++ b/storage/ndb/src/kernel/blocks/dbspj/DbspjMain.cpp 2011-09-01 12:17:37 +0000 @@ -4326,15 +4326,17 @@ Dbspj::scanIndex_build(Build_context& ct treeNodePtr.p->m_info = &g_ScanIndexOpInfo; treeNodePtr.p->m_bits |= TreeNode::T_ATTR_INTERPRETED; treeNodePtr.p->m_bits |= TreeNode::T_NEED_REPORT_BATCH_COMPLETED; - treeNodePtr.p->m_batch_size = batchSize & 0xFFFF; + treeNodePtr.p->m_batch_size = + batchSize & ~(0xFFFFFFFF << QN_ScanIndexParameters::BatchRowBits); ScanFragReq*dst=(ScanFragReq*)treeNodePtr.p->m_scanindex_data.m_scanFragReq; dst->senderData = treeNodePtr.i; dst->resultRef = reference(); dst->resultData = treeNodePtr.i; dst->savePointId = ctx.m_savepointId; - dst->batch_size_rows = batchSize & 0xFFFF; - dst->batch_size_bytes = batchSize >> 16; + dst->batch_size_rows = + batchSize & ~(0xFFFFFFFF << QN_ScanIndexParameters::BatchRowBits); + dst->batch_size_bytes = batchSize >> QN_ScanIndexParameters::BatchRowBits; Uint32 transId1 = requestPtr.p->m_transId[0]; Uint32 transId2 = requestPtr.p->m_transId[1]; @@ -5020,12 +5022,13 @@ Dbspj::scanIndex_parent_batch_complete(S * When parent's batch is complete, we send our batch */ const ScanFragReq * org = (const ScanFragReq*)data.m_scanFragReq; - ndbassert(org->batch_size_rows >= data.m_fragCount - data.m_frags_complete); + ndbrequire(org->batch_size_rows > 0); if (treeNodePtr.p->m_bits & TreeNode::T_SCAN_PARALLEL) { jam(); - data.m_parallelism = data.m_fragCount - data.m_frags_complete; + data.m_parallelism = MIN(data.m_fragCount - data.m_frags_complete, + org->batch_size_rows); } else if (data.m_firstExecution) { @@ -5051,8 +5054,9 @@ Dbspj::scanIndex_parent_batch_complete(S * in the other direction is more costly). */ Int32 parallelism = - static_cast(data.m_parallelismStat.getMean() - - 2 * data.m_parallelismStat.getStdDev()); + static_cast(MIN(data.m_parallelismStat.getMean() + - 2 * data.m_parallelismStat.getStdDev(), + org->batch_size_rows)); if (parallelism < 1) { @@ -5117,17 +5121,9 @@ Dbspj::scanIndex_parent_batch_complete(S data.m_firstExecution = false; - if (treeNodePtr.p->m_bits & TreeNode::T_SCAN_PARALLEL) - { - ndbrequire((data.m_frags_outstanding + data.m_frags_complete) == - data.m_fragCount); - } - else - { - ndbrequire(static_cast(data.m_frags_outstanding + - data.m_frags_complete) <= - data.m_fragCount); - } + ndbrequire(static_cast(data.m_frags_outstanding + + data.m_frags_complete) <= + data.m_fragCount); data.m_batch_chunks = 1; requestPtr.p->m_cnt_active++; @@ -5575,7 +5571,8 @@ Dbspj::scanIndex_execSCAN_NEXTREQ(Signal data.m_largestBatchBytes < org->batch_size_bytes/data.m_parallelism) { jam(); - data.m_parallelism = data.m_fragCount - data.m_frags_complete; + data.m_parallelism = MIN(data.m_fragCount - data.m_frags_complete, + org->batch_size_rows); if (data.m_largestBatchRows > 0) { jam(); @@ -5624,7 +5621,8 @@ Dbspj::scanIndex_execSCAN_NEXTREQ(Signal else { jam(); - data.m_parallelism = data.m_fragCount - data.m_frags_complete; + data.m_parallelism = MIN(data.m_fragCount - data.m_frags_complete, + org->batch_size_rows); } const Uint32 bs_rows = org->batch_size_rows/data.m_parallelism; === modified file 'storage/ndb/src/kernel/ndbd.cpp' --- a/storage/ndb/src/kernel/ndbd.cpp 2011-01-30 23:13:49 +0000 +++ b/storage/ndb/src/kernel/ndbd.cpp 2011-08-30 12:00:48 +0000 @@ -297,69 +297,45 @@ get_multithreaded_config(EmulatorData& e { // multithreaded is compiled in ndbd/ndbmtd for now globalData.isNdbMt = SimulatedBlock::isMultiThreaded(); - if (!globalData.isNdbMt) { + if (!globalData.isNdbMt) + { ndbout << "NDBMT: non-mt" << endl; return 0; } - ndb_mgm_configuration * conf = ed.theConfiguration->getClusterConfig(); - if (conf == 0) - { - abort(); - } + THRConfig & conf = ed.theConfiguration->m_thr_config; - ndb_mgm_configuration_iterator * p = - ndb_mgm_create_configuration_iterator(conf, CFG_SECTION_NODE); - if (ndb_mgm_find(p, CFG_NODE_ID, globalData.ownId)) - { - abort(); - } - - Uint32 mtthreads = 0; - ndb_mgm_get_int_parameter(p, CFG_DB_MT_THREADS, &mtthreads); - ndbout << "NDBMT: MaxNoOfExecutionThreads=" << mtthreads << endl; + Uint32 threadcount = conf.getThreadCount(); + ndbout << "NDBMT: MaxNoOfExecutionThreads=" << threadcount << endl; globalData.isNdbMtLqh = true; { - Uint32 classic = 0; - ndb_mgm_get_int_parameter(p, CFG_NDBMT_CLASSIC, &classic); - if (classic) - globalData.isNdbMtLqh = false; - - const char* p = NdbEnv_GetEnv("NDB_MT_LQH", (char*)0, 0); - if (p != 0) + if (conf.getMtClassic()) { - if (strstr(p, "NOPLEASE") != 0) - globalData.isNdbMtLqh = false; - else - globalData.isNdbMtLqh = true; + globalData.isNdbMtLqh = false; } } if (!globalData.isNdbMtLqh) return 0; - Uint32 threads = 0; - switch(mtthreads){ - case 0: - case 1: - case 2: - case 3: - threads = 1; // TC + receiver + SUMA + LQH - break; - case 4: - case 5: - case 6: - threads = 2; // TC + receiver + SUMA + 2 * LQH - break; - default: - threads = 4; // TC + receiver + SUMA + 4 * LQH - } - - ndb_mgm_get_int_parameter(p, CFG_NDBMT_LQH_THREADS, &threads); + Uint32 threads = conf.getThreadCount(THRConfig::T_LDM); Uint32 workers = threads; - ndb_mgm_get_int_parameter(p, CFG_NDBMT_LQH_WORKERS, &workers); + { + ndb_mgm_configuration * conf = ed.theConfiguration->getClusterConfig(); + if (conf == 0) + { + abort(); + } + ndb_mgm_configuration_iterator * p = + ndb_mgm_create_configuration_iterator(conf, CFG_SECTION_NODE); + if (ndb_mgm_find(p, CFG_NODE_ID, globalData.ownId)) + { + abort(); + } + ndb_mgm_get_int_parameter(p, CFG_NDBMT_LQH_WORKERS, &workers); + } #ifdef VM_TRACE // testing @@ -368,9 +344,6 @@ get_multithreaded_config(EmulatorData& e p = NdbEnv_GetEnv("NDBMT_LQH_WORKERS", (char*)0, 0); if (p != 0) workers = atoi(p); - p = NdbEnv_GetEnv("NDBMT_LQH_THREADS", (char*)0, 0); - if (p != 0) - threads = atoi(p); } #endif @@ -654,10 +627,11 @@ ndbd_run(bool foreground, int report_fd, // Ignore error } + theConfig->setupConfiguration(); + if (get_multithreaded_config(globalEmulatorData)) ndbd_exit(-1); - theConfig->setupConfiguration(); systemInfo(* theConfig, * theConfig->m_logLevel); NdbThread* pWatchdog = globalEmulatorData.theWatchDog->doStart(); === modified file 'storage/ndb/src/kernel/vm/CMakeLists.txt' --- a/storage/ndb/src/kernel/vm/CMakeLists.txt 2011-02-04 11:45:24 +0000 +++ b/storage/ndb/src/kernel/vm/CMakeLists.txt 2011-09-01 12:17:37 +0000 @@ -38,6 +38,7 @@ ADD_LIBRARY(ndbkernel STATIC Ndbinfo.cpp NdbinfoTables.cpp ArenaPool.cpp + mt_thr_config.cpp ) ADD_LIBRARY(ndbsched STATIC === modified file 'storage/ndb/src/kernel/vm/Configuration.cpp' --- a/storage/ndb/src/kernel/vm/Configuration.cpp 2011-08-25 09:40:27 +0000 +++ b/storage/ndb/src/kernel/vm/Configuration.cpp 2011-08-30 12:00:48 +0000 @@ -33,6 +33,7 @@ #include #include +#include #include @@ -392,7 +393,77 @@ Configuration::setupConfiguration(){ t = globalEmulatorData.theWatchDog ->setCheckInterval(t); _timeBetweenWatchDogCheckInitial = t; } - + + const char * thrconfigstring = NdbEnv_GetEnv("NDB_MT_THREAD_CONFIG", + (char*)0, 0); + if (thrconfigstring || + iter.get(CFG_DB_MT_THREAD_CONFIG, &thrconfigstring) == 0) + { + int res = m_thr_config.do_parse(thrconfigstring); + if (res != 0) + { + ERROR_SET(fatal, NDBD_EXIT_INVALID_CONFIG, + "Invalid configuration fetched, invalid ThreadConfig", + m_thr_config.getErrorMessage()); + } + } + else + { + const char * mask; + if (iter.get(CFG_DB_EXECUTE_LOCK_CPU, &mask) == 0) + { + int res = m_thr_config.setLockExecuteThreadToCPU(mask); + if (res < 0) + { + // Could not parse LockExecuteThreadToCPU mask + g_eventLogger->warning("Failed to parse 'LockExecuteThreadToCPU=%s' " + "(error: %d), ignoring it!", + mask, res); + } + } + + Uint32 maintCPU = NO_LOCK_CPU; + iter.get(CFG_DB_MAINT_LOCK_CPU, &maintCPU); + if (maintCPU == 65535) + maintCPU = NO_LOCK_CPU; // Ignore old default(may come from old mgmd) + if (maintCPU != NO_LOCK_CPU) + m_thr_config.setLockMaintThreadsToCPU(maintCPU); + + Uint32 mtthreads = 0; + iter.get(CFG_DB_MT_THREADS, &mtthreads); + + Uint32 classic = 0; + iter.get(CFG_NDBMT_CLASSIC, &classic); + const char* p = NdbEnv_GetEnv("NDB_MT_LQH", (char*)0, 0); + if (p != 0) + { + if (strstr(p, "NOPLEASE") != 0) + classic = 1; + } + + Uint32 lqhthreads = 0; + iter.get(CFG_NDBMT_LQH_THREADS, &lqhthreads); + + int res = m_thr_config.do_parse(mtthreads, lqhthreads, classic); + if (res != 0) + { + ERROR_SET(fatal, NDBD_EXIT_INVALID_CONFIG, + "Invalid configuration fetched, invalid thread configuration", + m_thr_config.getErrorMessage()); + } + } + if (thrconfigstring) + { + ndbout_c("ThreadConfig: input: %s parsed: %s", + thrconfigstring, + m_thr_config.getConfigString()); + } + else + { + ndbout_c("ThreadConfig (old ndb_mgmd): parsed: %s", + m_thr_config.getConfigString()); + } + ConfigValues* cf = ConfigValuesFactory::extractCurrentSection(iter.m_config); if(m_clusterConfigIter) === modified file 'storage/ndb/src/kernel/vm/Configuration.hpp' --- a/storage/ndb/src/kernel/vm/Configuration.hpp 2011-08-25 09:40:27 +0000 +++ b/storage/ndb/src/kernel/vm/Configuration.hpp 2011-08-30 12:00:48 +0000 @@ -27,6 +27,7 @@ #include #include #include +#include "mt_thr_config.hpp" enum ThreadTypes { @@ -124,6 +125,7 @@ public: ndb_mgm_configuration* getClusterConfig() const { return m_clusterConfig; } Uint32 get_config_generation() const; + THRConfigApplier m_thr_config; private: friend class Cmvmi; friend class Qmgr; === modified file 'storage/ndb/src/kernel/vm/Makefile.am' --- a/storage/ndb/src/kernel/vm/Makefile.am 2011-08-29 10:15:59 +0000 +++ b/storage/ndb/src/kernel/vm/Makefile.am 2011-09-01 12:17:37 +0000 @@ -39,7 +39,8 @@ libkernel_a_SOURCES = VMSignal.cpp \ SafeMutex.cpp \ Ndbinfo.cpp \ NdbinfoTables.cpp \ - ArenaPool.cpp + ArenaPool.cpp \ + mt_thr_config.cpp libsched_a_SOURCES = TimeQueue.cpp \ ThreadConfig.cpp \ === modified file 'storage/ndb/src/kernel/vm/mt.cpp' --- a/storage/ndb/src/kernel/vm/mt.cpp 2011-08-29 10:15:59 +0000 +++ b/storage/ndb/src/kernel/vm/mt.cpp 2011-09-01 12:17:37 +0000 @@ -3251,28 +3251,11 @@ sendprioa_STOP_FOR_CRASH(const struct th */ static thr_job_buffer dummy_buffer; - /* - * Before we had three main threads with fixed block assignment. - * Now there is also worker instances (we send to LQH instance). + /** + * Pick any instance running in this thread */ - Uint32 main = 0; - Uint32 instance = 0; - if (dst == 0) - main = NDBCNTR; - else if (dst == 1) - main = DBLQH; - else if (dst >= NUM_MAIN_THREADS && dst < NUM_MAIN_THREADS + num_lqh_threads) - { - main = DBLQH; - instance = dst - NUM_MAIN_THREADS + 1; - } - else if (dst == receiver_thread_no) - main = CMVMI; - else - require(false); - Uint32 bno = numberToBlock(main, instance); - require(block2ThreadId(main, instance) == dst); struct thr_data * dstptr = rep->m_thread + dst; + Uint32 bno = dstptr->m_instance_list[0]; memset(&signalT.header, 0, sizeof(SignalHeader)); signalT.header.theVerId_signalNumber = GSN_STOP_FOR_CRASH; === modified file 'storage/ndb/src/kernel/vm/mt_thr_config.cpp' --- a/storage/ndb/src/kernel/vm/mt_thr_config.cpp 2011-08-26 09:57:03 +0000 +++ b/storage/ndb/src/kernel/vm/mt_thr_config.cpp 2011-08-30 14:13:15 +0000 @@ -500,6 +500,34 @@ THRConfig::getConfigString() return m_cfg_string.c_str(); } +Uint32 +THRConfig::getThreadCount() const +{ + // Note! not counting T_MAINT + Uint32 cnt = 0; + for (Uint32 i = 0; i < NDB_ARRAY_SIZE(m_threads); i++) + { + if (i != T_MAINT) + { + cnt += m_threads[i].size(); + } + } + return cnt; +} + +Uint32 +THRConfig::getThreadCount(T_Type type) const +{ + for (Uint32 i = 0; i < NDB_ARRAY_SIZE(m_threads); i++) + { + if (i == (Uint32)type) + { + return m_threads[i].size(); + } + } + return 0; +} + const char * THRConfig::getErrorMessage() const { @@ -516,7 +544,7 @@ static char * skipblank(char * str) { - while (isblank(* str)) + while (isspace(* str)) str++; return str; } @@ -564,7 +592,7 @@ parseUnsigned(char *& str, unsigned * ds str = skipblank(str); char * endptr = 0; errno = 0; - long val = strtoll(str, &endptr, 0); + long val = strtol(str, &endptr, 0); if (errno == ERANGE) return -1; if (val < 0 || Int64(val) > 0xFFFFFFFF) @@ -585,7 +613,7 @@ parseBitmask(char *& str, SparseBitmask if (len == 0) return -1; - while (isblank(str[len-1])) + while (isspace(str[len-1])) len--; if (str[len-1] == ',') len--; @@ -830,7 +858,7 @@ THRConfig::do_parse(const char * ThreadC unsigned THRConfig::createCpuSet(const SparseBitmask& mask) { - for (size_t i = 0; i < m_cpu_sets.size(); i++) + for (unsigned i = 0; i < m_cpu_sets.size(); i++) if (m_cpu_sets[i].equal(mask)) return i; === modified file 'storage/ndb/src/kernel/vm/mt_thr_config.hpp' --- a/storage/ndb/src/kernel/vm/mt_thr_config.hpp 2011-08-26 09:57:03 +0000 +++ b/storage/ndb/src/kernel/vm/mt_thr_config.hpp 2011-08-30 12:00:48 +0000 @@ -64,6 +64,9 @@ public: const char * getErrorMessage() const; const char * getInfoMessage() const; + Uint32 getThreadCount() const; // Don't count FS/MAINT thread + Uint32 getThreadCount(T_Type) const; + Uint32 getMtClassic() const { return m_classic; } private: struct T_Thread { === modified file 'storage/ndb/src/ndbapi/NdbQueryOperation.cpp' --- a/storage/ndb/src/ndbapi/NdbQueryOperation.cpp 2011-08-22 13:04:16 +0000 +++ b/storage/ndb/src/ndbapi/NdbQueryOperation.cpp 2011-09-01 12:17:37 +0000 @@ -47,7 +47,8 @@ */ #define UNUSED(x) ((void)(x)) -//#define TEST_NEXTREQ +// To force usage of SCAN_NEXTREQ even for small scans resultsets +static const bool testNextReq = false; /* Various error codes that are not specific to NdbQuery. */ static const int Err_TupleNotFound = 626; @@ -4255,18 +4256,11 @@ NdbQueryOperationImpl if (myClosestScan != NULL) { -#ifdef TEST_NEXTREQ // To force usage of SCAN_NEXTREQ even for small scans resultsets - if (this == &getRoot()) + if (testNextReq) { m_maxBatchRows = 1; } - else - { - m_maxBatchRows = - myClosestScan->getQueryOperationDef().getTable().getFragmentCount(); - } -#endif const Ndb& ndb = *getQuery().getNdbTransaction().getNdb(); @@ -4312,14 +4306,6 @@ NdbQueryOperationImpl if (m_operationDef.isScanOperation()) { - if (myClosestScan != &getRoot()) - { - /** Each SPJ block instance will scan each fragment, so the batch size - * cannot be smaller than the number of fragments.*/ - maxBatchRows = - MAX(maxBatchRows, myClosestScan->getQueryOperationDef(). - getTable().getFragmentCount()); - } // Use this value for current op and all lookup descendants. m_maxBatchRows = maxBatchRows; // Return max(Unit32) to avoid interfering with batch size calculation @@ -4478,16 +4464,20 @@ NdbQueryOperationImpl::prepareAttrInfo(U batchRows, batchByteSize, firstBatchRows); - assert(batchRows==getMaxBatchRows()); - assert(batchRows==firstBatchRows); + assert(batchRows == firstBatchRows); + assert(batchRows == getMaxBatchRows()); assert(m_parallelism == Parallelism_max || m_parallelism == Parallelism_adaptive); if (m_parallelism == Parallelism_max) { requestInfo |= QN_ScanIndexParameters::SIP_PARALLEL; } - param->requestInfo = requestInfo; - param->batchSize = ((Uint16)batchByteSize << 16) | (Uint16)firstBatchRows; + param->requestInfo = requestInfo; + // Check that both values fit in param->batchSize. + assert(getMaxBatchRows() < (1<batchSize * 8 + - QN_ScanIndexParameters::BatchRowBits))); + param->batchSize = (batchByteSize << 11) | getMaxBatchRows(); param->resultData = getIdOfReceiver(); QueryNodeParameters::setOpLen(param->len, paramType, length); } No bundle (reason: useless for push emails).