3546 Jan Wedvik 2011-09-01
Added regression test for mysql-5.1-telco-7.0 revno 4491
(fix for pushed scan-scan queries with large records).
modified:
mysql-test/suite/ndb/r/ndb_join_pushdown.result
mysql-test/suite/ndb/t/ndb_join_pushdown.test
3545 Jan Wedvik 2011-09-01 [merge]
Merged form mysql-5.1-telco-7.0
modified:
storage/ndb/include/kernel/signaldata/QueryTree.hpp
storage/ndb/include/mgmapi/mgmapi_config_parameters.h
storage/ndb/src/kernel/blocks/dbspj/DbspjMain.cpp
storage/ndb/src/kernel/ndbd.cpp
storage/ndb/src/kernel/vm/CMakeLists.txt
storage/ndb/src/kernel/vm/Configuration.cpp
storage/ndb/src/kernel/vm/Configuration.hpp
storage/ndb/src/kernel/vm/Makefile.am
storage/ndb/src/kernel/vm/mt.cpp
storage/ndb/src/kernel/vm/mt_thr_config.cpp
storage/ndb/src/kernel/vm/mt_thr_config.hpp
storage/ndb/src/ndbapi/NdbQueryOperation.cpp
3544 Jonas Oreland 2011-08-29 [merge]
ndb - merge 70 to 70-spj-scan-scan
added:
storage/ndb/src/kernel/vm/mt_thr_config.cpp
storage/ndb/src/kernel/vm/mt_thr_config.hpp
modified:
mysql-test/suite/ndb/r/ndb_index_stat.result
mysql-test/suite/ndb/r/ndb_statistics1.result
mysql-test/suite/ndb/t/ndb_index_stat.test
sql/ha_ndbcluster.cc
sql/ha_ndbcluster_binlog.cc
sql/ha_ndbinfo.cc
storage/ndb/include/util/SparseBitmask.hpp
storage/ndb/src/kernel/SimBlockList.cpp
storage/ndb/src/kernel/vm/Configuration.cpp
storage/ndb/src/kernel/vm/Configuration.hpp
storage/ndb/src/kernel/vm/Makefile.am
storage/ndb/src/kernel/vm/SimulatedBlock.cpp
storage/ndb/src/kernel/vm/SimulatedBlock.hpp
storage/ndb/src/kernel/vm/dummy_nonmt.cpp
storage/ndb/src/kernel/vm/mt.cpp
storage/ndb/src/kernel/vm/mt.hpp
=== modified file 'mysql-test/suite/ndb/r/ndb_join_pushdown.result'
--- a/mysql-test/suite/ndb/r/ndb_join_pushdown.result 2011-08-22 08:50:01 +0000
+++ b/mysql-test/suite/ndb/r/ndb_join_pushdown.result 2011-09-01 12:22:14 +0000
@@ -5178,6 +5178,55 @@ pw 4 4 5
wh 1 1 2
drop table t1;
drop table t2;
+create table t1 (
+a char(10) primary key,
+b char(10) not null,
+c char(10) not null,
+l00 char(255) not null,
+l01 char(255) not null,
+l02 char(255) not null,
+l03 char(255) not null,
+l04 char(255) not null,
+l05 char(255) not null,
+l06 char(255) not null,
+l07 char(255) not null,
+l08 char(255) not null,
+l09 char(255) not null,
+l10 char(255) not null,
+l11 char(255) not null,
+l12 char(255) not null,
+l13 char(255) not null,
+l14 char(255) not null,
+l15 char(255) not null,
+l16 char(255) not null,
+l17 char(255) not null,
+l18 char(255) not null,
+l19 char(255) not null,
+l20 char(255) not null,
+l21 char(255) not null,
+l22 char(255) not null,
+l23 char(255) not null,
+l24 char(255) not null,
+l25 char(255) not null,
+l26 char(255) not null,
+l27 char(255) not null,
+l28 char(255) not null,
+l29 char(255) not null,
+l30 char(255) not null,
+l31 char(255) not null,
+index(c, b)
+) engine=ndb partition by key(a) partitions 8;
+insert into t1 values ('a','a','a','x','x','x','x','x','x','x','x','x','x','x','x','x','x','x','x','x','x','x','x','x','x','x','x','x','x','x','x','x','x','x','x');
+insert into t1 values ('b','b','b','x','x','x','x','x','x','x','x','x','x','x','x','x','x','x','x','x','x','x','x','x','x','x','x','x','x','x','x','x','x','x','x');
+insert into t1 values ('c','c','c','x','x','x','x','x','x','x','x','x','x','x','x','x','x','x','x','x','x','x','x','x','x','x','x','x','x','x','x','x','x','x','x');
+explain select count(*) from t1 as x1 join t1 as x2 on x1.b = x2.c;
+id select_type table type possible_keys key key_len ref rows Extra
+1 SIMPLE x1 ALL NULL NULL NULL NULL 3 Parent of 2 pushed join@1
+1 SIMPLE x2 ref c c 10 test.x1.b 1 Child of 'x1' in pushed join@1
+select count(*) from t1 as x1 join t1 as x2 on x1.b = x2.c;
+count(*)
+3
+drop table t1;
create temporary table spj_counts_at_end
select counter_name, sum(val) as val
from ndbinfo.counters
@@ -5193,13 +5242,13 @@ and spj_counts_at_end.counter_name <> 'L
and spj_counts_at_end.counter_name <> 'SCAN_BATCHES_RETURNED';
counter_name spj_counts_at_end.val - spj_counts_at_startup.val
CONST_PRUNED_RANGE_SCANS_RECEIVED 6
-LOCAL_TABLE_SCANS_SENT 236
+LOCAL_TABLE_SCANS_SENT 244
PRUNED_RANGE_SCANS_RECEIVED 25
RANGE_SCANS_RECEIVED 720
READS_NOT_FOUND 6616
READS_RECEIVED 52
-SCAN_ROWS_RETURNED 76380
-TABLE_SCANS_RECEIVED 236
+SCAN_ROWS_RETURNED 76386
+TABLE_SCANS_RECEIVED 244
select sum(spj_counts_at_end.val - spj_counts_at_startup.val) as 'LOCAL+REMOTE READS_SENT'
from spj_counts_at_end, spj_counts_at_startup
where spj_counts_at_end.counter_name = spj_counts_at_startup.counter_name
@@ -5211,15 +5260,15 @@ drop table spj_save_counts;
drop table spj_counts_at_startup;
drop table spj_counts_at_end;
scan_count
-2520
+2524
pruned_scan_count
8
sorted_scan_count
10
pushed_queries_defined
-383
+385
pushed_queries_dropped
11
pushed_queries_executed
-534
+535
set ndb_join_pushdown = @save_ndb_join_pushdown;
=== modified file 'mysql-test/suite/ndb/t/ndb_join_pushdown.test'
--- a/mysql-test/suite/ndb/t/ndb_join_pushdown.test 2011-06-28 14:27:57 +0000
+++ b/mysql-test/suite/ndb/t/ndb_join_pushdown.test 2011-09-01 12:22:14 +0000
@@ -3641,6 +3641,63 @@ drop table t1;
drop table t2;
########################################
+# Test query with very long records.
+connection ddl;
+create table t1 (
+ a char(10) primary key,
+ b char(10) not null,
+ c char(10) not null,
+ l00 char(255) not null,
+ l01 char(255) not null,
+ l02 char(255) not null,
+ l03 char(255) not null,
+ l04 char(255) not null,
+ l05 char(255) not null,
+ l06 char(255) not null,
+ l07 char(255) not null,
+ l08 char(255) not null,
+ l09 char(255) not null,
+ l10 char(255) not null,
+ l11 char(255) not null,
+ l12 char(255) not null,
+ l13 char(255) not null,
+ l14 char(255) not null,
+ l15 char(255) not null,
+ l16 char(255) not null,
+ l17 char(255) not null,
+ l18 char(255) not null,
+ l19 char(255) not null,
+ l20 char(255) not null,
+ l21 char(255) not null,
+ l22 char(255) not null,
+ l23 char(255) not null,
+ l24 char(255) not null,
+ l25 char(255) not null,
+ l26 char(255) not null,
+ l27 char(255) not null,
+ l28 char(255) not null,
+ l29 char(255) not null,
+ l30 char(255) not null,
+ l31 char(255) not null,
+ index(c, b)
+) engine=ndb partition by key(a) partitions 8;
+
+connection spj;
+
+insert into t1 values ('a','a','a','x','x','x','x','x','x','x','x','x','x','x','x','x','x','x','x','x','x','x','x','x','x','x','x','x','x','x','x','x','x','x','x');
+
+insert into t1 values ('b','b','b','x','x','x','x','x','x','x','x','x','x','x','x','x','x','x','x','x','x','x','x','x','x','x','x','x','x','x','x','x','x','x','x');
+
+insert into t1 values ('c','c','c','x','x','x','x','x','x','x','x','x','x','x','x','x','x','x','x','x','x','x','x','x','x','x','x','x','x','x','x','x','x','x','x');
+
+explain select count(*) from t1 as x1 join t1 as x2 on x1.b = x2.c;
+
+select count(*) from t1 as x1 join t1 as x2 on x1.b = x2.c;
+
+connection ddl;
+drop table t1;
+
+########################################
# Verify DBSPJ counters for entire test:
# Note: These tables are 'temporary' withing 'connection spj'
=== modified file 'storage/ndb/include/kernel/signaldata/QueryTree.hpp'
--- a/storage/ndb/include/kernel/signaldata/QueryTree.hpp 2011-05-04 11:45:33 +0000
+++ b/storage/ndb/include/kernel/signaldata/QueryTree.hpp 2011-09-01 11:46:45 +0000
@@ -271,9 +271,11 @@ struct QN_ScanIndexParameters
{
Uint32 len;
Uint32 requestInfo;
- Uint32 batchSize; // (bytes << 16) | (rows)
+ Uint32 batchSize; // (bytes << 11) | (rows)
Uint32 resultData; // Api connect ptr
STATIC_CONST ( NodeSize = 4 );
+ // Number of bits for representing row count in 'batchSize'.
+ STATIC_CONST ( BatchRowBits = 11 );
enum ScanIndexParamBits
{
=== modified file 'storage/ndb/include/mgmapi/mgmapi_config_parameters.h'
--- a/storage/ndb/include/mgmapi/mgmapi_config_parameters.h 2011-07-04 13:37:56 +0000
+++ b/storage/ndb/include/mgmapi/mgmapi_config_parameters.h 2011-08-30 09:40:52 +0000
@@ -195,6 +195,7 @@
#define CFG_DB_INDEX_STAT_UPDATE_DELAY 626
#define CFG_DB_MAX_DML_OPERATIONS_PER_TRANSACTION 627
+#define CFG_DB_MT_THREAD_CONFIG 628
#define CFG_NODE_ARBIT_RANK 200
#define CFG_NODE_ARBIT_DELAY 201
=== modified file 'storage/ndb/src/kernel/blocks/dbspj/DbspjMain.cpp'
--- a/storage/ndb/src/kernel/blocks/dbspj/DbspjMain.cpp 2011-08-22 13:04:16 +0000
+++ b/storage/ndb/src/kernel/blocks/dbspj/DbspjMain.cpp 2011-09-01 12:17:37 +0000
@@ -4326,15 +4326,17 @@ Dbspj::scanIndex_build(Build_context& ct
treeNodePtr.p->m_info = &g_ScanIndexOpInfo;
treeNodePtr.p->m_bits |= TreeNode::T_ATTR_INTERPRETED;
treeNodePtr.p->m_bits |= TreeNode::T_NEED_REPORT_BATCH_COMPLETED;
- treeNodePtr.p->m_batch_size = batchSize & 0xFFFF;
+ treeNodePtr.p->m_batch_size =
+ batchSize & ~(0xFFFFFFFF << QN_ScanIndexParameters::BatchRowBits);
ScanFragReq*dst=(ScanFragReq*)treeNodePtr.p->m_scanindex_data.m_scanFragReq;
dst->senderData = treeNodePtr.i;
dst->resultRef = reference();
dst->resultData = treeNodePtr.i;
dst->savePointId = ctx.m_savepointId;
- dst->batch_size_rows = batchSize & 0xFFFF;
- dst->batch_size_bytes = batchSize >> 16;
+ dst->batch_size_rows =
+ batchSize & ~(0xFFFFFFFF << QN_ScanIndexParameters::BatchRowBits);
+ dst->batch_size_bytes = batchSize >> QN_ScanIndexParameters::BatchRowBits;
Uint32 transId1 = requestPtr.p->m_transId[0];
Uint32 transId2 = requestPtr.p->m_transId[1];
@@ -5020,12 +5022,13 @@ Dbspj::scanIndex_parent_batch_complete(S
* When parent's batch is complete, we send our batch
*/
const ScanFragReq * org = (const ScanFragReq*)data.m_scanFragReq;
- ndbassert(org->batch_size_rows >= data.m_fragCount - data.m_frags_complete);
+ ndbrequire(org->batch_size_rows > 0);
if (treeNodePtr.p->m_bits & TreeNode::T_SCAN_PARALLEL)
{
jam();
- data.m_parallelism = data.m_fragCount - data.m_frags_complete;
+ data.m_parallelism = MIN(data.m_fragCount - data.m_frags_complete,
+ org->batch_size_rows);
}
else if (data.m_firstExecution)
{
@@ -5051,8 +5054,9 @@ Dbspj::scanIndex_parent_batch_complete(S
* in the other direction is more costly).
*/
Int32 parallelism =
- static_cast<Int32>(data.m_parallelismStat.getMean()
- - 2 * data.m_parallelismStat.getStdDev());
+ static_cast<Int32>(MIN(data.m_parallelismStat.getMean()
+ - 2 * data.m_parallelismStat.getStdDev(),
+ org->batch_size_rows));
if (parallelism < 1)
{
@@ -5117,17 +5121,9 @@ Dbspj::scanIndex_parent_batch_complete(S
data.m_firstExecution = false;
- if (treeNodePtr.p->m_bits & TreeNode::T_SCAN_PARALLEL)
- {
- ndbrequire((data.m_frags_outstanding + data.m_frags_complete) ==
- data.m_fragCount);
- }
- else
- {
- ndbrequire(static_cast<Uint32>(data.m_frags_outstanding +
- data.m_frags_complete) <=
- data.m_fragCount);
- }
+ ndbrequire(static_cast<Uint32>(data.m_frags_outstanding +
+ data.m_frags_complete) <=
+ data.m_fragCount);
data.m_batch_chunks = 1;
requestPtr.p->m_cnt_active++;
@@ -5575,7 +5571,8 @@ Dbspj::scanIndex_execSCAN_NEXTREQ(Signal
data.m_largestBatchBytes < org->batch_size_bytes/data.m_parallelism)
{
jam();
- data.m_parallelism = data.m_fragCount - data.m_frags_complete;
+ data.m_parallelism = MIN(data.m_fragCount - data.m_frags_complete,
+ org->batch_size_rows);
if (data.m_largestBatchRows > 0)
{
jam();
@@ -5624,7 +5621,8 @@ Dbspj::scanIndex_execSCAN_NEXTREQ(Signal
else
{
jam();
- data.m_parallelism = data.m_fragCount - data.m_frags_complete;
+ data.m_parallelism = MIN(data.m_fragCount - data.m_frags_complete,
+ org->batch_size_rows);
}
const Uint32 bs_rows = org->batch_size_rows/data.m_parallelism;
=== modified file 'storage/ndb/src/kernel/ndbd.cpp'
--- a/storage/ndb/src/kernel/ndbd.cpp 2011-01-30 23:13:49 +0000
+++ b/storage/ndb/src/kernel/ndbd.cpp 2011-08-30 12:00:48 +0000
@@ -297,69 +297,45 @@ get_multithreaded_config(EmulatorData& e
{
// multithreaded is compiled in ndbd/ndbmtd for now
globalData.isNdbMt = SimulatedBlock::isMultiThreaded();
- if (!globalData.isNdbMt) {
+ if (!globalData.isNdbMt)
+ {
ndbout << "NDBMT: non-mt" << endl;
return 0;
}
- ndb_mgm_configuration * conf = ed.theConfiguration->getClusterConfig();
- if (conf == 0)
- {
- abort();
- }
+ THRConfig & conf = ed.theConfiguration->m_thr_config;
- ndb_mgm_configuration_iterator * p =
- ndb_mgm_create_configuration_iterator(conf, CFG_SECTION_NODE);
- if (ndb_mgm_find(p, CFG_NODE_ID, globalData.ownId))
- {
- abort();
- }
-
- Uint32 mtthreads = 0;
- ndb_mgm_get_int_parameter(p, CFG_DB_MT_THREADS, &mtthreads);
- ndbout << "NDBMT: MaxNoOfExecutionThreads=" << mtthreads << endl;
+ Uint32 threadcount = conf.getThreadCount();
+ ndbout << "NDBMT: MaxNoOfExecutionThreads=" << threadcount << endl;
globalData.isNdbMtLqh = true;
{
- Uint32 classic = 0;
- ndb_mgm_get_int_parameter(p, CFG_NDBMT_CLASSIC, &classic);
- if (classic)
- globalData.isNdbMtLqh = false;
-
- const char* p = NdbEnv_GetEnv("NDB_MT_LQH", (char*)0, 0);
- if (p != 0)
+ if (conf.getMtClassic())
{
- if (strstr(p, "NOPLEASE") != 0)
- globalData.isNdbMtLqh = false;
- else
- globalData.isNdbMtLqh = true;
+ globalData.isNdbMtLqh = false;
}
}
if (!globalData.isNdbMtLqh)
return 0;
- Uint32 threads = 0;
- switch(mtthreads){
- case 0:
- case 1:
- case 2:
- case 3:
- threads = 1; // TC + receiver + SUMA + LQH
- break;
- case 4:
- case 5:
- case 6:
- threads = 2; // TC + receiver + SUMA + 2 * LQH
- break;
- default:
- threads = 4; // TC + receiver + SUMA + 4 * LQH
- }
-
- ndb_mgm_get_int_parameter(p, CFG_NDBMT_LQH_THREADS, &threads);
+ Uint32 threads = conf.getThreadCount(THRConfig::T_LDM);
Uint32 workers = threads;
- ndb_mgm_get_int_parameter(p, CFG_NDBMT_LQH_WORKERS, &workers);
+ {
+ ndb_mgm_configuration * conf = ed.theConfiguration->getClusterConfig();
+ if (conf == 0)
+ {
+ abort();
+ }
+ ndb_mgm_configuration_iterator * p =
+ ndb_mgm_create_configuration_iterator(conf, CFG_SECTION_NODE);
+ if (ndb_mgm_find(p, CFG_NODE_ID, globalData.ownId))
+ {
+ abort();
+ }
+ ndb_mgm_get_int_parameter(p, CFG_NDBMT_LQH_WORKERS, &workers);
+ }
#ifdef VM_TRACE
// testing
@@ -368,9 +344,6 @@ get_multithreaded_config(EmulatorData& e
p = NdbEnv_GetEnv("NDBMT_LQH_WORKERS", (char*)0, 0);
if (p != 0)
workers = atoi(p);
- p = NdbEnv_GetEnv("NDBMT_LQH_THREADS", (char*)0, 0);
- if (p != 0)
- threads = atoi(p);
}
#endif
@@ -654,10 +627,11 @@ ndbd_run(bool foreground, int report_fd,
// Ignore error
}
+ theConfig->setupConfiguration();
+
if (get_multithreaded_config(globalEmulatorData))
ndbd_exit(-1);
- theConfig->setupConfiguration();
systemInfo(* theConfig, * theConfig->m_logLevel);
NdbThread* pWatchdog = globalEmulatorData.theWatchDog->doStart();
=== modified file 'storage/ndb/src/kernel/vm/CMakeLists.txt'
--- a/storage/ndb/src/kernel/vm/CMakeLists.txt 2011-02-04 11:45:24 +0000
+++ b/storage/ndb/src/kernel/vm/CMakeLists.txt 2011-09-01 12:17:37 +0000
@@ -38,6 +38,7 @@ ADD_LIBRARY(ndbkernel STATIC
Ndbinfo.cpp
NdbinfoTables.cpp
ArenaPool.cpp
+ mt_thr_config.cpp
)
ADD_LIBRARY(ndbsched STATIC
=== modified file 'storage/ndb/src/kernel/vm/Configuration.cpp'
--- a/storage/ndb/src/kernel/vm/Configuration.cpp 2011-08-25 09:40:27 +0000
+++ b/storage/ndb/src/kernel/vm/Configuration.cpp 2011-08-30 12:00:48 +0000
@@ -33,6 +33,7 @@
#include <kernel_config_parameters.h>
#include <util/ConfigValues.hpp>
+#include <NdbEnv.h>
#include <ndbapi_limits.h>
@@ -392,7 +393,77 @@ Configuration::setupConfiguration(){
t = globalEmulatorData.theWatchDog ->setCheckInterval(t);
_timeBetweenWatchDogCheckInitial = t;
}
-
+
+ const char * thrconfigstring = NdbEnv_GetEnv("NDB_MT_THREAD_CONFIG",
+ (char*)0, 0);
+ if (thrconfigstring ||
+ iter.get(CFG_DB_MT_THREAD_CONFIG, &thrconfigstring) == 0)
+ {
+ int res = m_thr_config.do_parse(thrconfigstring);
+ if (res != 0)
+ {
+ ERROR_SET(fatal, NDBD_EXIT_INVALID_CONFIG,
+ "Invalid configuration fetched, invalid ThreadConfig",
+ m_thr_config.getErrorMessage());
+ }
+ }
+ else
+ {
+ const char * mask;
+ if (iter.get(CFG_DB_EXECUTE_LOCK_CPU, &mask) == 0)
+ {
+ int res = m_thr_config.setLockExecuteThreadToCPU(mask);
+ if (res < 0)
+ {
+ // Could not parse LockExecuteThreadToCPU mask
+ g_eventLogger->warning("Failed to parse 'LockExecuteThreadToCPU=%s' "
+ "(error: %d), ignoring it!",
+ mask, res);
+ }
+ }
+
+ Uint32 maintCPU = NO_LOCK_CPU;
+ iter.get(CFG_DB_MAINT_LOCK_CPU, &maintCPU);
+ if (maintCPU == 65535)
+ maintCPU = NO_LOCK_CPU; // Ignore old default(may come from old mgmd)
+ if (maintCPU != NO_LOCK_CPU)
+ m_thr_config.setLockMaintThreadsToCPU(maintCPU);
+
+ Uint32 mtthreads = 0;
+ iter.get(CFG_DB_MT_THREADS, &mtthreads);
+
+ Uint32 classic = 0;
+ iter.get(CFG_NDBMT_CLASSIC, &classic);
+ const char* p = NdbEnv_GetEnv("NDB_MT_LQH", (char*)0, 0);
+ if (p != 0)
+ {
+ if (strstr(p, "NOPLEASE") != 0)
+ classic = 1;
+ }
+
+ Uint32 lqhthreads = 0;
+ iter.get(CFG_NDBMT_LQH_THREADS, &lqhthreads);
+
+ int res = m_thr_config.do_parse(mtthreads, lqhthreads, classic);
+ if (res != 0)
+ {
+ ERROR_SET(fatal, NDBD_EXIT_INVALID_CONFIG,
+ "Invalid configuration fetched, invalid thread configuration",
+ m_thr_config.getErrorMessage());
+ }
+ }
+ if (thrconfigstring)
+ {
+ ndbout_c("ThreadConfig: input: %s parsed: %s",
+ thrconfigstring,
+ m_thr_config.getConfigString());
+ }
+ else
+ {
+ ndbout_c("ThreadConfig (old ndb_mgmd): parsed: %s",
+ m_thr_config.getConfigString());
+ }
+
ConfigValues* cf = ConfigValuesFactory::extractCurrentSection(iter.m_config);
if(m_clusterConfigIter)
=== modified file 'storage/ndb/src/kernel/vm/Configuration.hpp'
--- a/storage/ndb/src/kernel/vm/Configuration.hpp 2011-08-25 09:40:27 +0000
+++ b/storage/ndb/src/kernel/vm/Configuration.hpp 2011-08-30 12:00:48 +0000
@@ -27,6 +27,7 @@
#include <NdbThread.h>
#include <util/SparseBitmask.hpp>
#include <util/UtilBuffer.hpp>
+#include "mt_thr_config.hpp"
enum ThreadTypes
{
@@ -124,6 +125,7 @@ public:
ndb_mgm_configuration* getClusterConfig() const { return m_clusterConfig; }
Uint32 get_config_generation() const;
+ THRConfigApplier m_thr_config;
private:
friend class Cmvmi;
friend class Qmgr;
=== modified file 'storage/ndb/src/kernel/vm/Makefile.am'
--- a/storage/ndb/src/kernel/vm/Makefile.am 2011-08-29 10:15:59 +0000
+++ b/storage/ndb/src/kernel/vm/Makefile.am 2011-09-01 12:17:37 +0000
@@ -39,7 +39,8 @@ libkernel_a_SOURCES = VMSignal.cpp \
SafeMutex.cpp \
Ndbinfo.cpp \
NdbinfoTables.cpp \
- ArenaPool.cpp
+ ArenaPool.cpp \
+ mt_thr_config.cpp
libsched_a_SOURCES = TimeQueue.cpp \
ThreadConfig.cpp \
=== modified file 'storage/ndb/src/kernel/vm/mt.cpp'
--- a/storage/ndb/src/kernel/vm/mt.cpp 2011-08-29 10:15:59 +0000
+++ b/storage/ndb/src/kernel/vm/mt.cpp 2011-09-01 12:17:37 +0000
@@ -3251,28 +3251,11 @@ sendprioa_STOP_FOR_CRASH(const struct th
*/
static thr_job_buffer dummy_buffer;
- /*
- * Before we had three main threads with fixed block assignment.
- * Now there is also worker instances (we send to LQH instance).
+ /**
+ * Pick any instance running in this thread
*/
- Uint32 main = 0;
- Uint32 instance = 0;
- if (dst == 0)
- main = NDBCNTR;
- else if (dst == 1)
- main = DBLQH;
- else if (dst >= NUM_MAIN_THREADS && dst < NUM_MAIN_THREADS + num_lqh_threads)
- {
- main = DBLQH;
- instance = dst - NUM_MAIN_THREADS + 1;
- }
- else if (dst == receiver_thread_no)
- main = CMVMI;
- else
- require(false);
- Uint32 bno = numberToBlock(main, instance);
- require(block2ThreadId(main, instance) == dst);
struct thr_data * dstptr = rep->m_thread + dst;
+ Uint32 bno = dstptr->m_instance_list[0];
memset(&signalT.header, 0, sizeof(SignalHeader));
signalT.header.theVerId_signalNumber = GSN_STOP_FOR_CRASH;
=== modified file 'storage/ndb/src/kernel/vm/mt_thr_config.cpp'
--- a/storage/ndb/src/kernel/vm/mt_thr_config.cpp 2011-08-26 09:57:03 +0000
+++ b/storage/ndb/src/kernel/vm/mt_thr_config.cpp 2011-08-30 14:13:15 +0000
@@ -500,6 +500,34 @@ THRConfig::getConfigString()
return m_cfg_string.c_str();
}
+Uint32
+THRConfig::getThreadCount() const
+{
+ // Note! not counting T_MAINT
+ Uint32 cnt = 0;
+ for (Uint32 i = 0; i < NDB_ARRAY_SIZE(m_threads); i++)
+ {
+ if (i != T_MAINT)
+ {
+ cnt += m_threads[i].size();
+ }
+ }
+ return cnt;
+}
+
+Uint32
+THRConfig::getThreadCount(T_Type type) const
+{
+ for (Uint32 i = 0; i < NDB_ARRAY_SIZE(m_threads); i++)
+ {
+ if (i == (Uint32)type)
+ {
+ return m_threads[i].size();
+ }
+ }
+ return 0;
+}
+
const char *
THRConfig::getErrorMessage() const
{
@@ -516,7 +544,7 @@ static
char *
skipblank(char * str)
{
- while (isblank(* str))
+ while (isspace(* str))
str++;
return str;
}
@@ -564,7 +592,7 @@ parseUnsigned(char *& str, unsigned * ds
str = skipblank(str);
char * endptr = 0;
errno = 0;
- long val = strtoll(str, &endptr, 0);
+ long val = strtol(str, &endptr, 0);
if (errno == ERANGE)
return -1;
if (val < 0 || Int64(val) > 0xFFFFFFFF)
@@ -585,7 +613,7 @@ parseBitmask(char *& str, SparseBitmask
if (len == 0)
return -1;
- while (isblank(str[len-1]))
+ while (isspace(str[len-1]))
len--;
if (str[len-1] == ',')
len--;
@@ -830,7 +858,7 @@ THRConfig::do_parse(const char * ThreadC
unsigned
THRConfig::createCpuSet(const SparseBitmask& mask)
{
- for (size_t i = 0; i < m_cpu_sets.size(); i++)
+ for (unsigned i = 0; i < m_cpu_sets.size(); i++)
if (m_cpu_sets[i].equal(mask))
return i;
=== modified file 'storage/ndb/src/kernel/vm/mt_thr_config.hpp'
--- a/storage/ndb/src/kernel/vm/mt_thr_config.hpp 2011-08-26 09:57:03 +0000
+++ b/storage/ndb/src/kernel/vm/mt_thr_config.hpp 2011-08-30 12:00:48 +0000
@@ -64,6 +64,9 @@ public:
const char * getErrorMessage() const;
const char * getInfoMessage() const;
+ Uint32 getThreadCount() const; // Don't count FS/MAINT thread
+ Uint32 getThreadCount(T_Type) const;
+ Uint32 getMtClassic() const { return m_classic; }
private:
struct T_Thread
{
=== modified file 'storage/ndb/src/ndbapi/NdbQueryOperation.cpp'
--- a/storage/ndb/src/ndbapi/NdbQueryOperation.cpp 2011-08-22 13:04:16 +0000
+++ b/storage/ndb/src/ndbapi/NdbQueryOperation.cpp 2011-09-01 12:17:37 +0000
@@ -47,7 +47,8 @@
*/
#define UNUSED(x) ((void)(x))
-//#define TEST_NEXTREQ
+// To force usage of SCAN_NEXTREQ even for small scans resultsets
+static const bool testNextReq = false;
/* Various error codes that are not specific to NdbQuery. */
static const int Err_TupleNotFound = 626;
@@ -4255,18 +4256,11 @@ NdbQueryOperationImpl
if (myClosestScan != NULL)
{
-#ifdef TEST_NEXTREQ
// To force usage of SCAN_NEXTREQ even for small scans resultsets
- if (this == &getRoot())
+ if (testNextReq)
{
m_maxBatchRows = 1;
}
- else
- {
- m_maxBatchRows =
- myClosestScan->getQueryOperationDef().getTable().getFragmentCount();
- }
-#endif
const Ndb& ndb = *getQuery().getNdbTransaction().getNdb();
@@ -4312,14 +4306,6 @@ NdbQueryOperationImpl
if (m_operationDef.isScanOperation())
{
- if (myClosestScan != &getRoot())
- {
- /** Each SPJ block instance will scan each fragment, so the batch size
- * cannot be smaller than the number of fragments.*/
- maxBatchRows =
- MAX(maxBatchRows, myClosestScan->getQueryOperationDef().
- getTable().getFragmentCount());
- }
// Use this value for current op and all lookup descendants.
m_maxBatchRows = maxBatchRows;
// Return max(Unit32) to avoid interfering with batch size calculation
@@ -4478,16 +4464,20 @@ NdbQueryOperationImpl::prepareAttrInfo(U
batchRows,
batchByteSize,
firstBatchRows);
- assert(batchRows==getMaxBatchRows());
- assert(batchRows==firstBatchRows);
+ assert(batchRows == firstBatchRows);
+ assert(batchRows == getMaxBatchRows());
assert(m_parallelism == Parallelism_max ||
m_parallelism == Parallelism_adaptive);
if (m_parallelism == Parallelism_max)
{
requestInfo |= QN_ScanIndexParameters::SIP_PARALLEL;
}
- param->requestInfo = requestInfo;
- param->batchSize = ((Uint16)batchByteSize << 16) | (Uint16)firstBatchRows;
+ param->requestInfo = requestInfo;
+ // Check that both values fit in param->batchSize.
+ assert(getMaxBatchRows() < (1<<QN_ScanIndexParameters::BatchRowBits));
+ assert(batchByteSize < (1 << (sizeof param->batchSize * 8
+ - QN_ScanIndexParameters::BatchRowBits)));
+ param->batchSize = (batchByteSize << 11) | getMaxBatchRows();
param->resultData = getIdOfReceiver();
QueryNodeParameters::setOpLen(param->len, paramType, length);
}
No bundle (reason: useless for push emails).
| Thread |
|---|
| • bzr push into mysql-5.1-telco-7.0-spj-scan-vs-scan branch (jan.wedvik:3544to 3546) | Jan Wedvik | 1 Sep |