4575 jonas oreland 2011-10-07
ndb - add thread statistics
added:
storage/ndb/include/portlib/NdbGetRUsage.h
storage/ndb/src/common/portlib/NdbGetRUsage.cpp
storage/ndb/src/kernel/blocks/thrman.cpp
storage/ndb/src/kernel/blocks/thrman.hpp
modified:
storage/ndb/include/kernel/BlockNumbers.h
storage/ndb/src/common/debugger/BlockNames.cpp
storage/ndb/src/common/portlib/CMakeLists.txt
storage/ndb/src/common/portlib/Makefile.am
storage/ndb/src/kernel/SimBlockList.cpp
storage/ndb/src/kernel/blocks/CMakeLists.txt
storage/ndb/src/kernel/blocks/LocalProxy.cpp
storage/ndb/src/kernel/blocks/LocalProxy.hpp
storage/ndb/src/kernel/blocks/Makefile.am
storage/ndb/src/kernel/blocks/PgmanProxy.cpp
storage/ndb/src/kernel/blocks/PgmanProxy.hpp
storage/ndb/src/kernel/blocks/dbinfo/Dbinfo.cpp
storage/ndb/src/kernel/blocks/dblqh/DblqhProxy.cpp
storage/ndb/src/kernel/blocks/dbspj/DbspjProxy.hpp
storage/ndb/src/kernel/blocks/dbtc/DbtcProxy.hpp
storage/ndb/src/kernel/vm/Ndbinfo.hpp
storage/ndb/src/kernel/vm/NdbinfoTables.cpp
storage/ndb/src/kernel/vm/SimulatedBlock.hpp
storage/ndb/src/kernel/vm/dummy_nonmt.cpp
storage/ndb/src/kernel/vm/mt.cpp
storage/ndb/src/kernel/vm/mt.hpp
storage/ndb/src/kernel/vm/mt_thr_config.cpp
storage/ndb/src/kernel/vm/mt_thr_config.hpp
4574 jonas oreland 2011-10-07
ndb - fix bug introduced by deadlock detector, causing recv-thread in ndbmtd to get incorrect tid
modified:
storage/ndb/src/common/portlib/NdbThread.c
4573 Frazer Clement 2011-10-06
Bug#13067813 mysqlbinlog --database option broken
modified:
client/mysqlbinlog.cc
mysql-test/suite/binlog/t/binlog_row_mysqlbinlog_db_filter.test
=== modified file 'storage/ndb/include/kernel/BlockNumbers.h'
--- a/storage/ndb/include/kernel/BlockNumbers.h 2011-06-30 15:59:25 +0000
+++ b/storage/ndb/include/kernel/BlockNumbers.h 2011-10-07 08:07:21 +0000
@@ -60,6 +60,7 @@
#define RESTORE 0x106
#define DBINFO 0x107
#define DBSPJ 0x108
+#define THRMAN 0x109
const BlockReference BACKUP_REF = numberToRef(BACKUP, 0);
const BlockReference DBTC_REF = numberToRef(DBTC, 0);
@@ -82,6 +83,7 @@ const BlockReference PGMAN_REF = numbe
const BlockReference RESTORE_REF = numberToRef(RESTORE, 0);
const BlockReference DBINFO_REF = numberToRef(DBINFO, 0);
const BlockReference DBSPJ_REF = numberToRef(DBSPJ, 0);
+const BlockReference THRMAN_REF = numberToRef(THRMAN, 0);
static inline void __hide_warnings_unused_ref_vars(void) {
// Hide annoying warnings about unused variables
@@ -92,10 +94,11 @@ static inline void __hide_warnings_unuse
(void)DBUTIL_REF; (void)SUMA_REF; (void)DBTUX_REF;
(void)TSMAN_REF; (void)LGMAN_REF; (void)PGMAN_REF;
(void)RESTORE_REF; (void)DBINFO_REF; (void)DBSPJ_REF;
+ (void)THRMAN_REF;
}
const BlockNumber MIN_BLOCK_NO = BACKUP;
-const BlockNumber MAX_BLOCK_NO = DBSPJ;
+const BlockNumber MAX_BLOCK_NO = THRMAN;
const BlockNumber NO_OF_BLOCKS = (MAX_BLOCK_NO - MIN_BLOCK_NO + 1);
/**
=== added file 'storage/ndb/include/portlib/NdbGetRUsage.h'
--- a/storage/ndb/include/portlib/NdbGetRUsage.h 1970-01-01 00:00:00 +0000
+++ b/storage/ndb/include/portlib/NdbGetRUsage.h 2011-10-07 08:07:21 +0000
@@ -0,0 +1,46 @@
+/*
+ Copyright (c) 2011, Oracle and/or its affiliates. All rights reserved.
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; version 2 of the License.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
+*/
+
+#ifndef NDB_GET_RUSAGE_H
+#define NDB_GET_RUSAGE_H
+
+#include <ndb_global.h>
+
+struct ndb_rusage
+{
+ Uint64 ru_utime;
+ Uint64 ru_stime;
+ Uint64 ru_minflt;
+ Uint64 ru_majflt;
+ Uint64 ru_nvcsw;
+ Uint64 ru_nivcsw;
+};
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+ /**
+ * Get resource usage for calling thread
+ */
+ int Ndb_GetRUSage(ndb_rusage * dst);
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif
=== modified file 'storage/ndb/src/common/debugger/BlockNames.cpp'
--- a/storage/ndb/src/common/debugger/BlockNames.cpp 2011-06-30 15:59:25 +0000
+++ b/storage/ndb/src/common/debugger/BlockNames.cpp 2011-10-07 08:07:21 +0000
@@ -40,6 +40,7 @@ const BlockName BlockNames[] = {
,{ "RESTORE", RESTORE }
,{ "DBINFO", DBINFO }
,{ "DBSPJ", DBSPJ }
+ ,{ "THRMAN", THRMAN }
};
const BlockNumber NO_OF_BLOCK_NAMES = sizeof(BlockNames) / sizeof(BlockName);
=== modified file 'storage/ndb/src/common/portlib/CMakeLists.txt'
--- a/storage/ndb/src/common/portlib/CMakeLists.txt 2011-09-27 17:28:13 +0000
+++ b/storage/ndb/src/common/portlib/CMakeLists.txt 2011-10-07 08:07:21 +0000
@@ -27,7 +27,8 @@ ADD_CONVENIENCE_LIBRARY(ndbportlib
NdbEnv.c NdbThread.c NdbHost.c NdbTCP.cpp
NdbMem.c NdbConfig.c NdbTick.c NdbDir.cpp
ndb_daemon.cc ${EXTRA_SRC}
- NdbNuma.cpp NdbMutex_DeadlockDetector.cpp)
+ NdbNuma.cpp NdbMutex_DeadlockDetector.cpp
+ NdbGetRUsage.cpp)
TARGET_LINK_LIBRARIES(ndbportlib mysys ${LIBSOCKET})
ADD_EXECUTABLE(NdbDir-t
=== modified file 'storage/ndb/src/common/portlib/Makefile.am'
--- a/storage/ndb/src/common/portlib/Makefile.am 2011-09-27 17:28:13 +0000
+++ b/storage/ndb/src/common/portlib/Makefile.am 2011-10-07 08:07:21 +0000
@@ -22,7 +22,8 @@ libportlib_la_SOURCES = \
NdbEnv.c NdbThread.c NdbHost.c NdbTCP.cpp \
ndb_daemon.cc NdbMem.c \
NdbConfig.c NdbDir.cpp ndb_socket.cpp \
- NdbMutex_DeadlockDetector.cpp
+ NdbMutex_DeadlockDetector.cpp \
+ NdbGetRUsage.cpp
include $(top_srcdir)/storage/ndb/config/common.mk.am
include $(top_srcdir)/storage/ndb/config/type_util.mk.am
=== added file 'storage/ndb/src/common/portlib/NdbGetRUsage.cpp'
--- a/storage/ndb/src/common/portlib/NdbGetRUsage.cpp 1970-01-01 00:00:00 +0000
+++ b/storage/ndb/src/common/portlib/NdbGetRUsage.cpp 2011-10-07 08:07:21 +0000
@@ -0,0 +1,65 @@
+/* Copyright (c) 2011, Oracle and/or its affiliates. All rights reserved.
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; version 2 of the License.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
+
+#include <NdbGetRUsage.h>
+
+#ifdef HAVE_SYS_TIME_H
+#include <sys/time.h>
+#endif
+
+#ifdef HAVE_SYS_RESOURCE_H
+#include <sys/resource.h>
+#endif
+
+#ifndef _WIN32
+static
+Uint64
+micros(struct timeval val)
+{
+ return
+ (Uint64)val.tv_sec * (Uint64)1000000 + val.tv_usec;
+}
+#endif
+
+extern "C"
+int
+Ndb_GetRUSage(ndb_rusage* dst)
+{
+ int res = -1;
+#ifdef HAVE_GETRUSAGE
+ struct rusage tmp;
+#ifdef RUSAGE_THREAD
+ res = getrusage(RUSAGE_THREAD, &tmp);
+#elif defined RUSAGE_LWP
+ res = getrusage(RUSAGE_LWP, &tmp);
+#endif
+
+ if (res == 0)
+ {
+ dst->ru_utime = micros(tmp.ru_utime);
+ dst->ru_stime = micros(tmp.ru_stime);
+ dst->ru_minflt = tmp.ru_minflt;
+ dst->ru_majflt = tmp.ru_majflt;
+ dst->ru_nvcsw = tmp.ru_nvcsw;
+ dst->ru_nivcsw = tmp.ru_nivcsw;
+ }
+#endif
+
+ if (res != 0)
+ {
+ bzero(dst, sizeof(* dst));
+ }
+ return res;
+}
=== modified file 'storage/ndb/src/common/portlib/NdbThread.c'
--- a/storage/ndb/src/common/portlib/NdbThread.c 2011-09-27 17:28:13 +0000
+++ b/storage/ndb/src/common/portlib/NdbThread.c 2011-10-07 07:37:47 +0000
@@ -176,6 +176,7 @@ NdbThread_CreateObject(const char * name
if (g_main_thread != 0)
{
+ settid(g_main_thread);
if (name)
{
strnmov(g_main_thread->thread_name, name, sizeof(tmpThread->thread_name));
=== modified file 'storage/ndb/src/kernel/SimBlockList.cpp'
--- a/storage/ndb/src/kernel/SimBlockList.cpp 2011-09-23 09:13:22 +0000
+++ b/storage/ndb/src/kernel/SimBlockList.cpp 2011-10-07 08:07:21 +0000
@@ -51,6 +51,7 @@
#include <PgmanProxy.hpp>
#include <DbtcProxy.hpp>
#include <DbspjProxy.hpp>
+#include <thrman.hpp>
#include <mt.hpp>
#ifndef VM_TRACE
@@ -89,6 +90,10 @@ void * operator new (size_t sz, SIMBLOCK
void
SimBlockList::load(EmulatorData& data){
noOfBlocks = NO_OF_BLOCKS;
+#define THR 1
+#ifndef THR
+ noOfBlocks--;
+#endif
theList = new SimulatedBlock * [noOfBlocks];
if (!theList)
{
@@ -160,7 +165,14 @@ SimBlockList::load(EmulatorData& data){
theList[20] = NEW_BLOCK(Dbspj)(ctx);
else
theList[20] = NEW_BLOCK(DbspjProxy)(ctx);
- assert(NO_OF_BLOCKS == 21);
+#ifdef THR
+ if (NdbIsMultiThreaded() == false)
+ theList[21] = NEW_BLOCK(Thrman)(ctx);
+ else
+ theList[21] = NEW_BLOCK(ThrmanProxy)(ctx);
+
+ assert(NO_OF_BLOCKS == 22);
+#endif
// Check that all blocks could be created
for (int i = 0; i < noOfBlocks; i++)
@@ -174,10 +186,10 @@ SimBlockList::load(EmulatorData& data){
if (globalData.isNdbMt)
{
- add_main_thr_map();
+ mt_init_thr_map();
for (int i = 0; i < noOfBlocks; i++)
theList[i]->loadWorkers();
- finalize_thr_map();
+ mt_finalize_thr_map();
}
}
=== modified file 'storage/ndb/src/kernel/blocks/CMakeLists.txt'
--- a/storage/ndb/src/kernel/blocks/CMakeLists.txt 2011-06-30 15:59:25 +0000
+++ b/storage/ndb/src/kernel/blocks/CMakeLists.txt 2011-10-07 08:07:21 +0000
@@ -72,7 +72,8 @@ ADD_LIBRARY(ndbblocks STATIC
dblqh/DblqhCommon.cpp
PgmanProxy.cpp
dbtup/DbtupClient.cpp
- ${EXTRA_SRC})
+ ${EXTRA_SRC}
+ thrman.cpp)
MYSQL_ADD_EXECUTABLE(ndb_print_file
print_file.cpp
=== modified file 'storage/ndb/src/kernel/blocks/LocalProxy.cpp'
--- a/storage/ndb/src/kernel/blocks/LocalProxy.cpp 2011-09-15 20:21:59 +0000
+++ b/storage/ndb/src/kernel/blocks/LocalProxy.cpp 2011-10-07 08:07:21 +0000
@@ -22,8 +22,6 @@ LocalProxy::LocalProxy(BlockNumber block
BLOCK_CONSTRUCTOR(LocalProxy);
ndbrequire(instance() == 0); // this is main block
- c_lqhWorkers = 0;
- c_extraWorkers = 0; // sub-class constructor can set
c_workers = 0;
Uint32 i;
for (i = 0; i < MaxWorkers; i++)
@@ -187,13 +185,13 @@ LocalProxy::lastReply(const SsSequential
}
void
-LocalProxy::sendREQ(Signal* signal, SsParallel& ss)
+LocalProxy::sendREQ(Signal* signal, SsParallel& ss, bool skipLast)
{
ndbrequire(ss.m_sendREQ != 0);
ss.m_workerMask.clear();
ss.m_worker = 0;
- const Uint32 count = ss.m_extraLast ? c_lqhWorkers : c_workers;
+ const Uint32 count = skipLast ? c_workers - 1 : c_workers;
SectionHandle handle(this);
restoreHandle(handle, ss);
while (ss.m_worker < count) {
@@ -266,21 +264,6 @@ LocalProxy::lastReply(const SsParallel&
return ss.m_workerMask.isclear();
}
-bool
-LocalProxy::lastExtra(Signal* signal, SsParallel& ss)
-{
- SectionHandle handle(this);
- if (c_lqhWorkers + ss.m_extraSent < c_workers) {
- jam();
- ss.m_worker = c_lqhWorkers + ss.m_extraSent;
- ss.m_workerMask.set(ss.m_worker);
- (this->*ss.m_sendREQ)(signal, ss.m_ssId, &handle);
- ss.m_extraSent++;
- return false;
- }
- return true;
-}
-
// used in "reverse" proxying (start with worker REQs)
void
LocalProxy::setMask(SsParallel& ss)
@@ -301,11 +284,9 @@ LocalProxy::setMask(SsParallel& ss, cons
void
LocalProxy::loadWorkers()
{
- c_lqhWorkers = getLqhWorkers();
- c_workers = c_lqhWorkers + c_extraWorkers;
-
- Uint32 i;
- for (i = 0; i < c_workers; i++) {
+ c_workers = mt_get_instance_count(number());
+ for (Uint32 i = 0; i < c_workers; i++)
+ {
jam();
Uint32 instanceNo = workerInstance(i);
@@ -314,31 +295,7 @@ LocalProxy::loadWorkers()
ndbrequire(this->getInstance(instanceNo) == worker);
c_worker[i] = worker;
- if (i < c_lqhWorkers) {
- add_lqh_worker_thr_map(number(), instanceNo);
- } else {
- add_extra_worker_thr_map(number(), instanceNo);
- }
- }
-}
-
-void
-LocalProxy::tc_loadWorkers()
-{
- c_workers = globalData.ndbMtTcThreads;
- c_lqhWorkers = globalData.ndbMtTcThreads;
- c_extraWorkers = 0;
-
- Uint32 i;
- for (i = 0; i < c_workers; i++) {
- jam();
- Uint32 instanceNo = workerInstance(i);
-
- SimulatedBlock* worker = newWorker(instanceNo);
- ndbrequire(worker->instance() == instanceNo);
- ndbrequire(this->getInstance(instanceNo) == worker);
- c_worker[i] = worker;
- add_tc_worker_thr_map(number(), instanceNo);
+ mt_add_thr_map(number(), instanceNo);
}
}
=== modified file 'storage/ndb/src/kernel/blocks/LocalProxy.hpp'
--- a/storage/ndb/src/kernel/blocks/LocalProxy.hpp 2011-09-15 20:21:59 +0000
+++ b/storage/ndb/src/kernel/blocks/LocalProxy.hpp 2011-10-07 08:07:21 +0000
@@ -56,19 +56,14 @@ public:
BLOCK_DEFINES(LocalProxy);
protected:
- enum { MaxLqhWorkers = MAX_NDBMT_LQH_WORKERS };
- enum { MaxExtraWorkers = 1 };
- enum { MaxWorkers = MaxLqhWorkers + MaxExtraWorkers };
+ enum { MaxWorkers = SimulatedBlock::MaxInstances };
typedef Bitmask<(MaxWorkers+31)/32> WorkerMask;
- Uint32 c_lqhWorkers;
- Uint32 c_extraWorkers;
Uint32 c_workers;
// no gaps - extra worker has index c_lqhWorkers (not MaxLqhWorkers)
SimulatedBlock* c_worker[MaxWorkers];
virtual SimulatedBlock* newWorker(Uint32 instanceNo) = 0;
virtual void loadWorkers();
- virtual void tc_loadWorkers();
// get worker block by index (not by instance)
@@ -78,43 +73,22 @@ protected:
return c_worker[i];
}
- SimulatedBlock* extraWorkerBlock() {
- return workerBlock(c_lqhWorkers);
- }
-
// get worker block reference by index (not by instance)
BlockReference workerRef(Uint32 i) {
return numberToRef(number(), workerInstance(i), getOwnNodeId());
}
- BlockReference extraWorkerRef() {
- ndbrequire(c_workers == c_lqhWorkers + 1);
- Uint32 i = c_lqhWorkers;
- return workerRef(i);
- }
-
// convert between worker index and worker instance
Uint32 workerInstance(Uint32 i) const {
ndbrequire(i < c_workers);
- Uint32 ino;
- if (i < c_lqhWorkers)
- ino = 1 + i;
- else
- ino = 1 + MaxLqhWorkers;
- return ino;
+ return i + 1;
}
Uint32 workerIndex(Uint32 ino) const {
ndbrequire(ino != 0);
- Uint32 i;
- if (ino != 1 + MaxLqhWorkers)
- i = ino - 1;
- else
- i = c_lqhWorkers;
- ndbrequire(i < c_workers);
- return i;
+ return ino - 1;
}
// support routines and classes ("Ss" = signal state)
@@ -161,14 +135,10 @@ protected:
// run workers in parallel
struct SsParallel : SsCommon {
WorkerMask m_workerMask;
- bool m_extraLast; // run extra after LQH workers
- Uint32 m_extraSent;
SsParallel() {
- m_extraLast = false;
- m_extraSent = 0;
}
};
- void sendREQ(Signal*, SsParallel& ss);
+ void sendREQ(Signal*, SsParallel& ss, bool skipLast = false);
void recvCONF(Signal*, SsParallel& ss);
void recvREF(Signal*, SsParallel& ss, Uint32 error);
// for use in sendREQ
=== modified file 'storage/ndb/src/kernel/blocks/Makefile.am'
--- a/storage/ndb/src/kernel/blocks/Makefile.am 2011-06-30 15:59:25 +0000
+++ b/storage/ndb/src/kernel/blocks/Makefile.am 2011-10-07 08:07:21 +0000
@@ -68,7 +68,8 @@ libblocks_a_SOURCES = tsman.cpp lgman.cp
dblqh/DblqhCommon.cpp \
PgmanProxy.cpp \
dbtup/DbtupClient.cpp \
- dbtc/DbtcProxy.cpp
+ dbtc/DbtcProxy.cpp \
+ thrman.cpp
ndbtools_PROGRAMS = ndb_print_file
ndb_print_file_SOURCES = print_file.cpp diskpage.cpp dbtup/tuppage.cpp
=== modified file 'storage/ndb/src/kernel/blocks/PgmanProxy.cpp'
--- a/storage/ndb/src/kernel/blocks/PgmanProxy.cpp 2011-02-02 00:40:07 +0000
+++ b/storage/ndb/src/kernel/blocks/PgmanProxy.cpp 2011-10-07 08:07:21 +0000
@@ -20,8 +20,6 @@
PgmanProxy::PgmanProxy(Block_context& ctx) :
LocalProxy(PGMAN, ctx)
{
- c_extraWorkers = 1;
-
// GSN_LCP_FRAG_ORD
addRecSignal(GSN_LCP_FRAG_ORD, &PgmanProxy::execLCP_FRAG_ORD);
@@ -88,11 +86,15 @@ PgmanProxy::execEND_LCP_REQ(Signal* sign
req->senderRef = reference();
req->requestType = ReleasePagesReq::RT_RELEASE_UNLOCKED;
req->requestData = 0;
- sendSignal(extraWorkerRef(), GSN_RELEASE_PAGES_REQ,
+ // Extra worker
+ sendSignal(workerRef(c_workers - 1), GSN_RELEASE_PAGES_REQ,
signal, ReleasePagesReq::SignalLength, JBB);
return;
}
- sendREQ(signal, ss);
+ /**
+ * Send to extra PGMAN *after* all other PGMAN has completed
+ */
+ sendREQ(signal, ss, /* skip last */ true);
}
void
@@ -137,8 +139,14 @@ PgmanProxy::sendEND_LCP_CONF(Signal* sig
return;
}
- if (!lastExtra(signal, ss)) {
+ if (!ss.m_extraLast)
+ {
jam();
+ ss.m_extraLast = true;
+ ss.m_worker = c_workers - 1; // send to last PGMAN
+ ss.m_workerMask.set(ss.m_worker);
+ SectionHandle handle(this);
+ (this->*ss.m_sendREQ)(signal, ss.m_ssId, &handle);
return;
}
@@ -170,7 +178,7 @@ PgmanProxy::get_page(Page_cache_client&
{
ndbrequire(blockToInstance(caller.m_block) == 0);
SimulatedBlock* block = globalData.getBlock(caller.m_block);
- Pgman* worker = (Pgman*)extraWorkerBlock();
+ Pgman* worker = (Pgman*)workerBlock(c_workers - 1); // extraWorkerBlock();
Page_cache_client pgman(block, worker);
int ret = pgman.get_page(signal, req, flags);
caller.m_ptr = pgman.m_ptr;
@@ -183,7 +191,7 @@ PgmanProxy::update_lsn(Page_cache_client
{
ndbrequire(blockToInstance(caller.m_block) == 0);
SimulatedBlock* block = globalData.getBlock(caller.m_block);
- Pgman* worker = (Pgman*)extraWorkerBlock();
+ Pgman* worker = (Pgman*)workerBlock(c_workers - 1); // extraWorkerBlock();
Page_cache_client pgman(block, worker);
pgman.update_lsn(key, lsn);
}
@@ -194,7 +202,7 @@ PgmanProxy::drop_page(Page_cache_client&
{
ndbrequire(blockToInstance(caller.m_block) == 0);
SimulatedBlock* block = globalData.getBlock(caller.m_block);
- Pgman* worker = (Pgman*)extraWorkerBlock();
+ Pgman* worker = (Pgman*)workerBlock(c_workers - 1); // extraWorkerBlock();
Page_cache_client pgman(block, worker);
int ret = pgman.drop_page(key, page_id);
return ret;
@@ -209,10 +217,10 @@ PgmanProxy::drop_page(Page_cache_client&
Uint32
PgmanProxy::create_data_file(Signal* signal)
{
- Pgman* worker = (Pgman*)extraWorkerBlock();
+ Pgman* worker = (Pgman*)workerBlock(c_workers - 1); // extraWorkerBlock();
Uint32 ret = worker->create_data_file();
Uint32 i;
- for (i = 0; i < c_lqhWorkers; i++) {
+ for (i = 0; i < c_workers - 1; i++) {
jam();
send_data_file_ord(signal, i, ret,
DataFileOrd::CreateDataFile);
@@ -223,10 +231,10 @@ PgmanProxy::create_data_file(Signal* sig
Uint32
PgmanProxy::alloc_data_file(Signal* signal, Uint32 file_no)
{
- Pgman* worker = (Pgman*)extraWorkerBlock();
+ Pgman* worker = (Pgman*)workerBlock(c_workers - 1); // extraWorkerBlock();
Uint32 ret = worker->alloc_data_file(file_no);
Uint32 i;
- for (i = 0; i < c_lqhWorkers; i++) {
+ for (i = 0; i < c_workers - 1; i++) {
jam();
send_data_file_ord(signal, i, ret,
DataFileOrd::AllocDataFile, file_no);
@@ -237,10 +245,10 @@ PgmanProxy::alloc_data_file(Signal* sign
void
PgmanProxy::map_file_no(Signal* signal, Uint32 file_no, Uint32 fd)
{
- Pgman* worker = (Pgman*)extraWorkerBlock();
+ Pgman* worker = (Pgman*)workerBlock(c_workers - 1); // extraWorkerBlock();
worker->map_file_no(file_no, fd);
Uint32 i;
- for (i = 0; i < c_lqhWorkers; i++) {
+ for (i = 0; i < c_workers - 1; i++) {
jam();
send_data_file_ord(signal, i, ~(Uint32)0,
DataFileOrd::MapFileNo, file_no, fd);
@@ -250,10 +258,10 @@ PgmanProxy::map_file_no(Signal* signal,
void
PgmanProxy::free_data_file(Signal* signal, Uint32 file_no, Uint32 fd)
{
- Pgman* worker = (Pgman*)extraWorkerBlock();
+ Pgman* worker = (Pgman*)workerBlock(c_workers - 1); // extraWorkerBlock();
worker->free_data_file(file_no, fd);
Uint32 i;
- for (i = 0; i < c_lqhWorkers; i++) {
+ for (i = 0; i < c_workers - 1; i++) {
jam();
send_data_file_ord(signal, i, ~(Uint32)0,
DataFileOrd::FreeDataFile, file_no, fd);
=== modified file 'storage/ndb/src/kernel/blocks/PgmanProxy.hpp'
--- a/storage/ndb/src/kernel/blocks/PgmanProxy.hpp 2011-02-02 00:40:07 +0000
+++ b/storage/ndb/src/kernel/blocks/PgmanProxy.hpp 2011-10-07 08:07:21 +0000
@@ -62,11 +62,12 @@ protected:
*/
static const char* name() { return "END_LCP_REQ"; }
EndLcpReq m_req;
+ bool m_extraLast;
Ss_END_LCP_REQ() {
m_sendREQ = (SsFUNCREQ)&PgmanProxy::sendEND_LCP_REQ;
m_sendCONF = (SsFUNCREP)&PgmanProxy::sendEND_LCP_CONF;
// extra worker (for extent pages) must run after others
- m_extraLast = true;
+ m_extraLast = false;
}
enum { poolSize = 1 };
static SsPool<Ss_END_LCP_REQ>& pool(LocalProxy* proxy) {
=== modified file 'storage/ndb/src/kernel/blocks/dbinfo/Dbinfo.cpp'
--- a/storage/ndb/src/kernel/blocks/dbinfo/Dbinfo.cpp 2011-06-30 15:59:25 +0000
+++ b/storage/ndb/src/kernel/blocks/dbinfo/Dbinfo.cpp 2011-10-07 08:07:21 +0000
@@ -29,7 +29,7 @@
Uint32 dbinfo_blocks[] = { DBACC, DBTUP, BACKUP, DBTC, SUMA, DBUTIL,
TRIX, DBTUX, DBDICT, CMVMI, DBLQH, LGMAN,
- PGMAN, DBSPJ, 0};
+ PGMAN, DBSPJ, THRMAN, 0};
Dbinfo::Dbinfo(Block_context& ctx) :
SimulatedBlock(DBINFO, ctx)
=== modified file 'storage/ndb/src/kernel/blocks/dblqh/DblqhProxy.cpp'
--- a/storage/ndb/src/kernel/blocks/dblqh/DblqhProxy.cpp 2011-06-30 15:59:25 +0000
+++ b/storage/ndb/src/kernel/blocks/dblqh/DblqhProxy.cpp 2011-10-07 08:07:21 +0000
@@ -690,7 +690,7 @@ DblqhProxy::completeLCP_2(Signal* signal
* that will checkpoint extent-pages
*/
// NOTE: ugly to use MaxLqhWorkers directly
- Uint32 instance = MaxLqhWorkers + 1;
+ Uint32 instance = c_workers + 1;
sendSignal(numberToRef(PGMAN, instance, getOwnNodeId()),
GSN_END_LCP_REQ, signal, EndLcpReq::SignalLength, JBB);
}
=== modified file 'storage/ndb/src/kernel/blocks/dbspj/DbspjProxy.hpp'
--- a/storage/ndb/src/kernel/blocks/dbspj/DbspjProxy.hpp 2011-09-15 20:21:59 +0000
+++ b/storage/ndb/src/kernel/blocks/dbspj/DbspjProxy.hpp 2011-10-07 08:07:21 +0000
@@ -24,7 +24,6 @@ public:
virtual ~DbspjProxy();
BLOCK_DEFINES(DbspjProxy);
- virtual void loadWorkers() { tc_loadWorkers(); }
protected:
virtual SimulatedBlock* newWorker(Uint32 instanceNo);
=== modified file 'storage/ndb/src/kernel/blocks/dbtc/DbtcProxy.hpp'
--- a/storage/ndb/src/kernel/blocks/dbtc/DbtcProxy.hpp 2011-09-15 20:21:59 +0000
+++ b/storage/ndb/src/kernel/blocks/dbtc/DbtcProxy.hpp 2011-10-07 08:07:21 +0000
@@ -37,7 +37,6 @@ public:
virtual ~DbtcProxy();
BLOCK_DEFINES(DbtcProxy);
- virtual void loadWorkers() { tc_loadWorkers(); }
protected:
virtual SimulatedBlock* newWorker(Uint32 instanceNo);
=== added file 'storage/ndb/src/kernel/blocks/thrman.cpp'
--- a/storage/ndb/src/kernel/blocks/thrman.cpp 1970-01-01 00:00:00 +0000
+++ b/storage/ndb/src/kernel/blocks/thrman.cpp 2011-10-07 08:07:21 +0000
@@ -0,0 +1,122 @@
+/*
+ Copyright (c) 2011, Oracle and/or its affiliates. All rights reserved.
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; version 2 of the License.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
+*/
+
+#include "thrman.hpp"
+#include <mt.hpp>
+#include <signaldata/DbinfoScan.hpp>
+#include <NdbGetRUsage.h>
+
+#include <EventLogger.hpp>
+extern EventLogger * g_eventLogger;
+
+Thrman::Thrman(Block_context & ctx, Uint32 instanceno) :
+ SimulatedBlock(THRMAN, ctx, instanceno)
+{
+ BLOCK_CONSTRUCTOR(Thrman);
+
+ addRecSignal(GSN_DBINFO_SCANREQ, &Thrman::execDBINFO_SCANREQ);
+}
+
+Thrman::~Thrman()
+{
+}
+
+BLOCK_FUNCTIONS(Thrman)
+
+void
+Thrman::execDBINFO_SCANREQ(Signal* signal)
+{
+ jamEntry();
+
+ DbinfoScanReq req= *(DbinfoScanReq*)signal->theData;
+ const Ndbinfo::ScanCursor* cursor =
+ CAST_CONSTPTR(Ndbinfo::ScanCursor, DbinfoScan::getCursorPtr(&req));
+ Ndbinfo::Ratelimit rl;
+
+ switch(req.tableId) {
+ case Ndbinfo::THREADBLOCKS_TABLEID: {
+ Uint32 arr[NO_OF_BLOCKS];
+ Uint32 len = mt_get_blocklist(this, arr, NDB_ARRAY_SIZE(arr));
+ Uint32 pos = cursor->data[0];
+ for (; pos < len; pos++) {
+ Ndbinfo::Row row(signal, req);
+ row.write_uint32(getOwnNodeId());
+ row.write_uint32(getThreadId()); // thr_no
+ row.write_uint32(blockToMain(arr[pos])); // block_number
+ row.write_uint32(blockToInstance(arr[pos])); // block_instance
+ ndbinfo_send_row(signal, req, row, rl);
+
+ if (rl.need_break(req))
+ {
+ jam();
+ ndbinfo_send_scan_break(signal, req, rl, pos);
+ return;
+ }
+ }
+ break;
+ }
+ case Ndbinfo::THREADSTAT_TABLEID:{
+ ndb_thr_stat stat;
+ mt_get_thr_stat(this, &stat);
+ Ndbinfo::Row row(signal, req);
+ row.write_uint32(getOwnNodeId());
+ row.write_uint32(getThreadId()); // thr_no
+ row.write_string(stat.name);
+ row.write_uint64(stat.loop_cnt);
+ row.write_uint64(stat.exec_cnt);
+ row.write_uint64(stat.wait_cnt);
+ row.write_uint64(stat.local_sent_prioa);
+ row.write_uint64(stat.local_sent_priob);
+ row.write_uint64(stat.remote_sent_prioa);
+ row.write_uint64(stat.remote_sent_priob);
+
+ row.write_uint64(stat.os_tid);
+ row.write_uint64(NdbTick_CurrentMillisecond());
+
+ struct ndb_rusage os_rusage;
+ Ndb_GetRUSage(&os_rusage);
+ row.write_uint64(os_rusage.ru_utime);
+ row.write_uint64(os_rusage.ru_stime);
+ row.write_uint64(os_rusage.ru_minflt);
+ row.write_uint64(os_rusage.ru_majflt);
+ row.write_uint64(os_rusage.ru_nvcsw);
+ row.write_uint64(os_rusage.ru_nivcsw);
+ ndbinfo_send_row(signal, req, row, rl);
+ break;
+ }
+ default:
+ break;
+ }
+
+ ndbinfo_send_scan_conf(signal, req, rl);
+}
+
+ThrmanProxy::ThrmanProxy(Block_context & ctx) :
+ LocalProxy(THRMAN, ctx)
+{
+}
+
+ThrmanProxy::~ThrmanProxy()
+{
+}
+
+SimulatedBlock*
+ThrmanProxy::newWorker(Uint32 instanceNo)
+{
+ return new Thrman(m_ctx, instanceNo);
+}
+
=== added file 'storage/ndb/src/kernel/blocks/thrman.hpp'
--- a/storage/ndb/src/kernel/blocks/thrman.hpp 1970-01-01 00:00:00 +0000
+++ b/storage/ndb/src/kernel/blocks/thrman.hpp 2011-10-07 08:07:21 +0000
@@ -0,0 +1,48 @@
+/*
+ Copyright (c) 2011, Oracle and/or its affiliates. All rights reserved.
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; version 2 of the License.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
+*/
+
+#ifndef THRMAN_H
+#define THRMAN_H
+
+#include <SimulatedBlock.hpp>
+#include <LocalProxy.hpp>
+
+class Thrman : public SimulatedBlock
+{
+public:
+ Thrman(Block_context& ctx, Uint32 instanceNumber = 0);
+ virtual ~Thrman();
+ BLOCK_DEFINES(Thrman);
+
+ void execDBINFO_SCANREQ(Signal*);
+protected:
+
+};
+
+class ThrmanProxy : public LocalProxy
+{
+public:
+ ThrmanProxy(Block_context& ctx);
+ virtual ~ThrmanProxy();
+ BLOCK_DEFINES(ThrmanProxy);
+
+protected:
+ virtual SimulatedBlock* newWorker(Uint32 instanceNo);
+
+};
+
+#endif
=== modified file 'storage/ndb/src/kernel/vm/Ndbinfo.hpp'
--- a/storage/ndb/src/kernel/vm/Ndbinfo.hpp 2011-06-30 15:59:25 +0000
+++ b/storage/ndb/src/kernel/vm/Ndbinfo.hpp 2011-10-07 08:07:21 +0000
@@ -46,7 +46,9 @@ public:
RESOURCES_TABLEID = 7,
COUNTERS_TABLEID = 8,
NODES_TABLEID = 9,
- DISKPAGEBUFFER_TABLEID = 10
+ DISKPAGEBUFFER_TABLEID = 10,
+ THREADBLOCKS_TABLEID = 11,
+ THREADSTAT_TABLEID = 12
};
struct Table {
=== modified file 'storage/ndb/src/kernel/vm/NdbinfoTables.cpp'
--- a/storage/ndb/src/kernel/vm/NdbinfoTables.cpp 2011-06-30 15:59:25 +0000
+++ b/storage/ndb/src/kernel/vm/NdbinfoTables.cpp 2011-10-07 08:07:21 +0000
@@ -169,6 +169,41 @@ DECLARE_NDBINFO_TABLE(DISKPAGEBUFFER, 9)
}
};
+DECLARE_NDBINFO_TABLE(THREADBLOCKS, 4) =
+{ { "threadblocks", 4, 0, "which blocks are run in which threads" },
+ {
+ {"node_id", Ndbinfo::Number, ""},
+ {"thr_no", Ndbinfo::Number, ""},
+ {"block_number", Ndbinfo::Number, ""},
+ {"block_instance", Ndbinfo::Number, ""},
+ }
+};
+
+DECLARE_NDBINFO_TABLE(THREADSTAT, 18) =
+{ { "threadstat", 18, 0, "threadstat" },
+ {
+ //{"0123456701234567"}
+ {"node_id", Ndbinfo::Number, ""},
+ {"thr_no", Ndbinfo::Number, ""},
+ {"thr_nm", Ndbinfo::String, ""},
+ {"c_loop", Ndbinfo::Number64,""},
+ {"c_exec", Ndbinfo::Number64,""},
+ {"c_wait", Ndbinfo::Number64,""},
+ {"c_l_sent_prioa", Ndbinfo::Number64,""},
+ {"c_l_sent_priob", Ndbinfo::Number64,""},
+ {"c_r_sent_prioa", Ndbinfo::Number64,""},
+ {"c_r_sent_priob", Ndbinfo::Number64,""},
+ {"os_tid", Ndbinfo::Number64,""},
+ {"os_now", Ndbinfo::Number64,""},
+ {"os_ru_utime", Ndbinfo::Number64,""},
+ {"os_ru_stime", Ndbinfo::Number64,""},
+ {"os_ru_minflt", Ndbinfo::Number64,""},
+ {"os_ru_majflt", Ndbinfo::Number64,""},
+ {"os_ru_nvcsw", Ndbinfo::Number64,""},
+ {"os_ru_nivcsw", Ndbinfo::Number64,""}
+ }
+};
+
#define DBINFOTBL(x) { Ndbinfo::x##_TABLEID, (Ndbinfo::Table*)&ndbinfo_##x }
static
@@ -188,7 +223,9 @@ struct ndbinfo_table_list_entry {
DBINFOTBL(RESOURCES),
DBINFOTBL(COUNTERS),
DBINFOTBL(NODES),
- DBINFOTBL(DISKPAGEBUFFER)
+ DBINFOTBL(DISKPAGEBUFFER),
+ DBINFOTBL(THREADBLOCKS),
+ DBINFOTBL(THREADSTAT)
};
static int no_ndbinfo_tables =
=== modified file 'storage/ndb/src/kernel/vm/SimulatedBlock.hpp'
--- a/storage/ndb/src/kernel/vm/SimulatedBlock.hpp 2011-08-27 06:06:02 +0000
+++ b/storage/ndb/src/kernel/vm/SimulatedBlock.hpp 2011-10-07 08:07:21 +0000
@@ -634,7 +634,9 @@ private:
* In MT LQH main instance is the LQH proxy and the others ("workers")
* are real LQHs run by multiple threads.
*/
- enum { MaxInstances = 1 + MAX_NDBMT_LQH_WORKERS + 1 }; // main+lqh+extra
+protected:
+ enum { MaxInstances = 3 + MAX_NDBMT_TC_THREADS + MAX_NDBMT_LQH_WORKERS + 1 };
+private:
SimulatedBlock** theInstanceList; // set in main, indexed by instance
SimulatedBlock* theMainInstance; // set in all
/*
=== modified file 'storage/ndb/src/kernel/vm/dummy_nonmt.cpp'
--- a/storage/ndb/src/kernel/vm/dummy_nonmt.cpp 2011-09-15 20:21:59 +0000
+++ b/storage/ndb/src/kernel/vm/dummy_nonmt.cpp 2011-10-07 08:07:21 +0000
@@ -20,49 +20,62 @@
#include <ndb_types.h>
void
-add_thr_map(Uint32, Uint32, Uint32)
+mt_init_thr_map()
{
assert(false);
}
void
-add_main_thr_map()
+mt_add_thr_map(Uint32, Uint32)
{
assert(false);
}
void
-add_lqh_worker_thr_map(Uint32, Uint32)
+mt_finalize_thr_map()
{
assert(false);
}
-void
-add_extra_worker_thr_map(Uint32, Uint32)
+Uint32
+mt_get_instance_count(Uint32 block)
{
assert(false);
+ return 0;
}
-void
-add_tc_worker_thr_map(Uint32, Uint32)
+Uint32
+compute_jb_pages(struct EmulatorData*)
{
- assert(false);
+ return 0;
}
-void
-finalize_thr_map()
+
+bool
+NdbIsMultiThreaded()
{
- assert(false);
+ return false;
}
+#include <BlockNumbers.h>
+
Uint32
-compute_jb_pages(struct EmulatorData*)
+mt_get_blocklist(class SimulatedBlock * block, Uint32 arr[], Uint32 len)
{
- return 0;
+ (void)block;
+ for (Uint32 i = 0; i<NO_OF_BLOCKS; i++)
+ {
+ arr[i] = numberToBlock(MIN_BLOCK_NO + i, 0);
+ }
+ return NO_OF_BLOCKS;
}
-bool
-NdbIsMultiThreaded()
+#include "mt.hpp"
+
+void
+mt_get_thr_stat(class SimulatedBlock *, ndb_thr_stat* dst)
{
- return false;
+ bzero(dst, sizeof(* dst));
+ dst->name = "main";
}
+
=== modified file 'storage/ndb/src/kernel/vm/mt.cpp'
--- a/storage/ndb/src/kernel/vm/mt.cpp 2011-09-15 20:21:59 +0000
+++ b/storage/ndb/src/kernel/vm/mt.cpp 2011-10-07 08:07:21 +0000
@@ -72,11 +72,11 @@ static const Uint32 MAX_SIGNALS_BEFORE_W
//#define NDB_MT_LOCK_TO_CPU
-#define MAX_BLOCK_INSTANCES (1 + MAX_NDBMT_LQH_WORKERS + 1) //main+lqh+extra
#define NUM_MAIN_THREADS 2 // except receiver
#define MAX_THREADS (NUM_MAIN_THREADS + \
MAX_NDBMT_LQH_THREADS + \
MAX_NDBMT_TC_THREADS + 1)
+#define MAX_BLOCK_INSTANCES (MAX_THREADS)
/* If this is too small it crashes before first signal. */
#define MAX_INSTANCES_PER_THREAD (16 + 8 * MAX_NDBMT_LQH_THREADS)
@@ -876,10 +876,16 @@ struct thr_data
/* Watchdog counter for this thread. */
Uint32 m_watchdog_counter;
/* Signal delivery statistics. */
- Uint32 m_prioa_count;
- Uint32 m_prioa_size;
- Uint32 m_priob_count;
- Uint32 m_priob_size;
+ struct
+ {
+ Uint64 m_loop_cnt;
+ Uint64 m_exec_cnt;
+ Uint64 m_wait_cnt;
+ Uint64 m_prioa_count;
+ Uint64 m_prioa_size;
+ Uint64 m_priob_count;
+ Uint64 m_priob_size;
+ } m_stat;
/* Array of node ids with pending remote send data. */
Uint8 m_pending_send_nodes[MAX_NTRANSPORTERS];
@@ -2592,7 +2598,7 @@ add_thr_map(Uint32 main, Uint32 instance
/* Static assignment of main instances (before first signal). */
void
-add_main_thr_map()
+mt_init_thr_map()
{
/* Keep mt-classic assignments in MT LQH. */
const Uint32 thr_GLOBAL = 0;
@@ -2620,33 +2626,72 @@ add_main_thr_map()
add_thr_map(RESTORE, 0, thr_LOCAL);
add_thr_map(DBINFO, 0, thr_LOCAL);
add_thr_map(DBSPJ, 0, thr_GLOBAL);
+ add_thr_map(THRMAN, 0, thr_GLOBAL);
}
-/* Workers added by LocalProxy (before first signal). */
-void
-add_lqh_worker_thr_map(Uint32 block, Uint32 instance)
+Uint32
+mt_get_instance_count(Uint32 block)
{
- require(instance != 0);
- Uint32 i = instance - 1;
- Uint32 thr_no = NUM_MAIN_THREADS + i % num_lqh_threads;
- add_thr_map(block, instance, thr_no);
+ switch(block){
+ case DBLQH:
+ case DBACC:
+ case DBTUP:
+ case DBTUX:
+ case BACKUP:
+ case RESTORE:
+ return globalData.ndbMtLqhWorkers;
+ break;
+ case PGMAN:
+ return globalData.ndbMtLqhWorkers + 1;
+ break;
+ case DBTC:
+ case DBSPJ:
+ return globalData.ndbMtTcThreads;
+ break;
+ case THRMAN:
+ return num_threads;
+ default:
+ require(false);
+ }
+ return 0;
}
void
-add_tc_worker_thr_map(Uint32 block, Uint32 instance)
+mt_add_thr_map(Uint32 block, Uint32 instance)
{
require(instance != 0);
- Uint32 i = instance - 1;
- Uint32 thr_no = NUM_MAIN_THREADS + num_lqh_threads + i;
- add_thr_map(block, instance, thr_no);
-}
+ Uint32 thr_no = NUM_MAIN_THREADS;
+ switch(block){
+ case DBLQH:
+ case DBACC:
+ case DBTUP:
+ case DBTUX:
+ case BACKUP:
+ case RESTORE:
+ thr_no += (instance - 1) % num_lqh_threads;
+ break;
+ case PGMAN:
+ if (instance == num_lqh_threads + 1)
+ {
+ // Put extra PGMAN together with it's Proxy
+ thr_no = block2ThreadId(block, 0);
+ }
+ else
+ {
+ thr_no += (instance - 1) % num_lqh_threads;
+ }
+ break;
+ case DBTC:
+ case DBSPJ:
+ thr_no += num_lqh_threads + (instance - 1);
+ break;
+ case THRMAN:
+ thr_no = instance - 1;
+ break;
+ default:
+ require(false);
+ }
-/* Extra workers run`in proxy thread. */
-void
-add_extra_worker_thr_map(Uint32 block, Uint32 instance)
-{
- require(instance != 0);
- Uint32 thr_no = block2ThreadId(block, 0);
add_thr_map(block, instance, thr_no);
}
@@ -2661,7 +2706,7 @@ add_extra_worker_thr_map(Uint32 block, U
* NOTE: extra pgman worker is instance 5
*/
void
-finalize_thr_map()
+mt_finalize_thr_map()
{
for (Uint32 b = 0; b < NO_OF_BLOCKS; b++)
{
@@ -2694,60 +2739,6 @@ finalize_thr_map()
}
}
-static void reportSignalStats(Uint32 self, Uint32 a_count, Uint32 a_size,
- Uint32 b_count, Uint32 b_size)
-{
- SignalT<6> sT;
- Signal *s= new (&sT) Signal(0);
-
- memset(&s->header, 0, sizeof(s->header));
- s->header.theLength = 6;
- s->header.theSendersSignalId = 0;
- s->header.theSendersBlockRef = numberToRef(0, 0);
- s->header.theVerId_signalNumber = GSN_EVENT_REP;
- s->header.theReceiversBlockNumber = CMVMI;
- s->theData[0] = NDB_LE_MTSignalStatistics;
- s->theData[1] = self;
- s->theData[2] = a_count;
- s->theData[3] = a_size;
- s->theData[4] = b_count;
- s->theData[5] = b_size;
- /* ToDo: need this really be prio A like in old code? */
- sendlocal(self, &s->header, s->theData,
- NULL);
-}
-
-static inline void
-update_sched_stats(thr_data *selfptr)
-{
- if(selfptr->m_prioa_count + selfptr->m_priob_count >= 2000000)
- {
- reportSignalStats(selfptr->m_thr_no,
- selfptr->m_prioa_count,
- selfptr->m_prioa_size,
- selfptr->m_priob_count,
- selfptr->m_priob_size);
- selfptr->m_prioa_count = 0;
- selfptr->m_prioa_size = 0;
- selfptr->m_priob_count = 0;
- selfptr->m_priob_size = 0;
-
-#if 0
- Uint32 thr_no = selfptr->m_thr_no;
- ndbout_c("--- %u fifo: %u jba: %u global: %u",
- thr_no,
- fifo_used_pages(selfptr),
- selfptr->m_jba_head.used(),
- g_thr_repository.m_free_list.m_cnt);
- for (Uint32 i = 0; i<num_threads; i++)
- {
- ndbout_c(" %u-%u : %u",
- thr_no, i, selfptr->m_in_queue_head[i].used());
- }
-#endif
- }
-}
-
static void
init_thread(thr_data *selfptr)
{
@@ -2854,8 +2845,6 @@ mt_receiver_thread_main(void *thr_arg)
{
static int cnt = 0;
- update_sched_stats(selfptr);
-
if (cnt == 0)
{
watchDogCounter = 5;
@@ -2892,6 +2881,8 @@ mt_receiver_thread_main(void *thr_arg)
has_received = true;
}
}
+ selfptr->m_stat.m_loop_cnt++;
+ selfptr->m_stat.m_exec_cnt += sum;
}
globalEmulatorData.theWatchDog->unregisterWatchedThread(thr_no);
@@ -3028,14 +3019,14 @@ mt_job_thread_main(void *thr_arg)
Uint32 pending_send = 0;
Uint32 send_sum = 0;
- int loops = 0;
- int maxloops = 10;/* Loops before reading clock, fuzzy adapted to 1ms freq. */
+ Uint32 loops = 0;
+ Uint32 maxloops = 10;/* Loops before reading clock, fuzzy adapted to 1ms freq. */
+ Uint32 waits = 0;
NDB_TICKS now = selfptr->m_time;
while (globalData.theRestartFlag != perform_stop)
{
loops++;
- update_sched_stats(selfptr);
watchDogCounter = 2;
scan_time_queues(selfptr, now);
@@ -3080,9 +3071,12 @@ mt_job_thread_main(void *thr_arg)
selfptr);
if (waited)
{
+ waits++;
/* Update current time after sleeping */
now = NdbTick_CurrentMillisecond();
- loops = 0;
+ selfptr->m_stat.m_wait_cnt += waits;
+ selfptr->m_stat.m_loop_cnt += loops;
+ waits = loops = 0;
}
}
}
@@ -3097,7 +3091,9 @@ mt_job_thread_main(void *thr_arg)
{
/* Update current time after sleeping */
now = NdbTick_CurrentMillisecond();
- loops = 0;
+ selfptr->m_stat.m_wait_cnt += waits;
+ selfptr->m_stat.m_loop_cnt += loops;
+ waits = loops = 0;
}
}
else
@@ -3120,8 +3116,11 @@ mt_job_thread_main(void *thr_arg)
else if (diff > 1 && maxloops > 1)
maxloops -= ((maxloops/10) + 1); /* Overslept: Need more frequent read*/
- loops = 0;
+ selfptr->m_stat.m_wait_cnt += waits;
+ selfptr->m_stat.m_loop_cnt += loops;
+ waits = loops = 0;
}
+ selfptr->m_stat.m_exec_cnt += sum;
}
globalEmulatorData.theWatchDog->unregisterWatchedThread(thr_no);
@@ -3150,9 +3149,9 @@ sendlocal(Uint32 self, const SignalHeade
assert(pthread_equal(selfptr->m_thr_id, pthread_self()));
struct thr_data * dstptr = rep->m_thread + dst;
- selfptr->m_priob_count++;
+ selfptr->m_stat.m_priob_count++;
Uint32 siglen = (sizeof(*s) >> 2) + s->theLength + s->m_noOfSections;
- selfptr->m_priob_size += siglen;
+ selfptr->m_stat.m_priob_size += siglen;
thr_job_queue *q = dstptr->m_in_queue + self;
thr_jb_write_state *w = selfptr->m_write_states + dst;
@@ -3178,9 +3177,9 @@ sendprioa(Uint32 self, const SignalHeade
pthread_equal(selfptr->m_thr_id, pthread_self()));
struct thr_data *dstptr = rep->m_thread + dst;
- selfptr->m_prioa_count++;
+ selfptr->m_stat.m_prioa_count++;
Uint32 siglen = (sizeof(*s) >> 2) + s->theLength + s->m_noOfSections;
- selfptr->m_prioa_size += siglen;
+ selfptr->m_stat.m_prioa_size += siglen;
thr_job_queue *q = &(dstptr->m_jba);
thr_jb_write_state w;
@@ -3359,10 +3358,7 @@ thr_init(struct thr_repository* rep, str
}
queue_init(&selfptr->m_tq);
- selfptr->m_prioa_count = 0;
- selfptr->m_prioa_size = 0;
- selfptr->m_priob_count = 0;
- selfptr->m_priob_size = 0;
+ bzero(&selfptr->m_stat, sizeof(selfptr->m_stat));
selfptr->m_pending_send_count = 0;
selfptr->m_pending_send_mask.clear();
@@ -4086,6 +4082,40 @@ mt_assert_own_thread(SimulatedBlock* blo
}
#endif
+
+Uint32
+mt_get_blocklist(SimulatedBlock * block, Uint32 arr[], Uint32 len)
+{
+ Uint32 thr_no = block->getThreadId();
+ thr_data *thr_ptr = g_thr_repository.m_thread + thr_no;
+
+ for (Uint32 i = 0; i < thr_ptr->m_instance_count; i++)
+ {
+ arr[i] = thr_ptr->m_instance_list[i];
+ }
+
+ return thr_ptr->m_instance_count;
+}
+
+void
+mt_get_thr_stat(class SimulatedBlock * block, ndb_thr_stat* dst)
+{
+ bzero(dst, sizeof(* dst));
+ Uint32 thr_no = block->getThreadId();
+ thr_data *selfptr = g_thr_repository.m_thread + thr_no;
+
+ THRConfigApplier & conf = globalEmulatorData.theConfiguration->m_thr_config;
+ dst->thr_no = thr_no;
+ dst->name = conf.getName(selfptr->m_instance_list, selfptr->m_instance_count);
+ dst->os_tid = NdbThread_GetTid(selfptr->m_thread);
+ dst->loop_cnt = selfptr->m_stat.m_loop_cnt;
+ dst->exec_cnt = selfptr->m_stat.m_exec_cnt;
+ dst->wait_cnt = selfptr->m_stat.m_wait_cnt;
+ dst->local_sent_prioa = selfptr->m_stat.m_prioa_count;
+ dst->local_sent_priob = selfptr->m_stat.m_priob_count;
+}
+
+
/**
* Global data
*/
=== modified file 'storage/ndb/src/kernel/vm/mt.hpp'
--- a/storage/ndb/src/kernel/vm/mt.hpp 2011-09-15 20:21:59 +0000
+++ b/storage/ndb/src/kernel/vm/mt.hpp 2011-10-07 08:07:21 +0000
@@ -29,13 +29,12 @@
*/
extern Uint32 receiverThreadId;
+Uint32 mt_get_instance_count(Uint32 block);
+
/* Assign block instances to thread */
-void add_thr_map(Uint32 block, Uint32 instance, Uint32 thr_no);
-void add_main_thr_map();
-void add_lqh_worker_thr_map(Uint32 block, Uint32 instance);
-void add_tc_worker_thr_map(Uint32 block, Uint32 instance);
-void add_extra_worker_thr_map(Uint32 block, Uint32 instance);
-void finalize_thr_map();
+void mt_init_thr_map();
+void mt_add_thr_map(Uint32 block, Uint32 instance);
+void mt_finalize_thr_map();
void sendlocal(Uint32 self, const struct SignalHeader *s,
const Uint32 *data, const Uint32 secPtr[3]);
@@ -87,4 +86,28 @@ void mt_wakeup(class SimulatedBlock*);
void mt_assert_own_thread(class SimulatedBlock*);
#endif
+/**
+ * return list of references running in this thread
+ */
+Uint32
+mt_get_blocklist(class SimulatedBlock*, Uint32 dst[], Uint32 len);
+
+
+struct ndb_thr_stat
+{
+ Uint32 thr_no;
+ Uint64 os_tid;
+ const char * name;
+ Uint64 loop_cnt;
+ Uint64 exec_cnt;
+ Uint64 wait_cnt;
+ Uint64 local_sent_prioa;
+ Uint64 local_sent_priob;
+ Uint64 remote_sent_prioa;
+ Uint64 remote_sent_priob;
+};
+
+void
+mt_get_thr_stat(class SimulatedBlock *, ndb_thr_stat* dst);
+
#endif
=== modified file 'storage/ndb/src/kernel/vm/mt_thr_config.cpp'
--- a/storage/ndb/src/kernel/vm/mt_thr_config.cpp 2011-09-23 09:13:22 +0000
+++ b/storage/ndb/src/kernel/vm/mt_thr_config.cpp 2011-10-07 08:07:21 +0000
@@ -964,6 +964,14 @@ THRConfigApplier::appendInfo(BaseString&
}
}
+const char *
+THRConfigApplier::getName(const unsigned short list[], unsigned cnt) const
+{
+ const T_Thread* thr = find_thread(list, cnt);
+ assert(thr != 0);
+ return getEntryName(thr->m_type);
+}
+
int
THRConfigApplier::create_cpusets()
{
=== modified file 'storage/ndb/src/kernel/vm/mt_thr_config.hpp'
--- a/storage/ndb/src/kernel/vm/mt_thr_config.hpp 2011-09-23 09:13:22 +0000
+++ b/storage/ndb/src/kernel/vm/mt_thr_config.hpp 2011-10-07 08:07:21 +0000
@@ -124,6 +124,7 @@ class THRConfigApplier : public THRConfi
public:
int create_cpusets();
+ const char * getName(const unsigned short list[], unsigned cnt) const;
void appendInfo(BaseString&, const unsigned short list[], unsigned cnt) const;
int do_bind(NdbThread*, const unsigned short list[], unsigned cnt);
int do_bind_io(NdbThread*);
No bundle (reason: useless for push emails).
| Thread |
|---|
| • bzr push into mysql-5.1-telco-7.0 branch (jonas.oreland:4573 to 4575) | jonas oreland | 7 Oct |