#At file:///export/space/pekka/ndb/version/my51-wl4391/
2704 Pekka Nousiainen 2008-08-11
wl#4391 03.diff
Instance key mapping on receiver + misc.
modified:
storage/ndb/include/kernel/kernel_config_parameters.h
storage/ndb/include/kernel/ndb_limits.h
storage/ndb/src/kernel/SimBlockList.cpp
storage/ndb/src/kernel/blocks/LocalProxy.cpp
storage/ndb/src/kernel/blocks/LocalProxy.hpp
storage/ndb/src/kernel/blocks/dbdih/Dbdih.hpp
storage/ndb/src/kernel/blocks/dbdih/DbdihMain.cpp
storage/ndb/src/kernel/blocks/ndbfs/Ndbfs.cpp
storage/ndb/src/kernel/main.cpp
storage/ndb/src/kernel/vm/Configuration.cpp
storage/ndb/src/kernel/vm/FastScheduler.cpp
storage/ndb/src/kernel/vm/GlobalData.hpp
storage/ndb/src/kernel/vm/SimulatedBlock.cpp
storage/ndb/src/kernel/vm/SimulatedBlock.hpp
storage/ndb/src/kernel/vm/TransporterCallback.cpp
storage/ndb/src/kernel/vm/dummy_nonmt.cpp
storage/ndb/src/kernel/vm/mt.cpp
storage/ndb/src/kernel/vm/mt.hpp
=== modified file 'storage/ndb/include/kernel/kernel_config_parameters.h'
--- a/storage/ndb/include/kernel/kernel_config_parameters.h 2008-07-25 05:48:32 +0000
+++ b/storage/ndb/include/kernel/kernel_config_parameters.h 2008-08-11 11:21:36 +0000
@@ -63,7 +63,7 @@
#define CFG_TUX_ATTRIBUTE (PRIVATE_BASE + 42)
#define CFG_TUX_SCAN_OP (PRIVATE_BASE + 43)
-#define CFG_NDBMT_WORKERS (PRIVATE_BASE + 44)
-#define CFG_NDBMT_THREADS (PRIVATE_BASE + 45)
+#define CFG_NDBMT_LQH_WORKERS (PRIVATE_BASE + 44)
+#define CFG_NDBMT_LQH_THREADS (PRIVATE_BASE + 45)
#endif
=== modified file 'storage/ndb/include/kernel/ndb_limits.h'
--- a/storage/ndb/include/kernel/ndb_limits.h 2008-07-25 05:48:32 +0000
+++ b/storage/ndb/include/kernel/ndb_limits.h 2008-08-11 11:21:36 +0000
@@ -181,7 +181,7 @@
#define NDBMT_BLOCK_INSTANCE_BITS 7
#define NDBMT_BLOCK_INSTANCE_MASK 0xFE00
-#define MAX_NDBMT_WORKERS 4
-#define MAX_NDBMT_THREADS 4
+#define MAX_NDBMT_LQH_WORKERS 4
+#define MAX_NDBMT_LQH_THREADS 4
#endif
=== modified file 'storage/ndb/src/kernel/SimBlockList.cpp'
--- a/storage/ndb/src/kernel/SimBlockList.cpp 2008-07-26 05:13:40 +0000
+++ b/storage/ndb/src/kernel/SimBlockList.cpp 2008-08-11 11:21:36 +0000
@@ -75,9 +75,6 @@ void * operator new (size_t sz, SIMBLOCK
#define NEW_BLOCK(B) new(A_VALUE) B
#endif
-extern bool g_ndbMt;
-extern bool g_ndbMtLqh;
-
void
SimBlockList::load(EmulatorData& data){
noOfBlocks = NO_OF_BLOCKS;
@@ -102,10 +99,12 @@ SimBlockList::load(EmulatorData& data){
}
}
+ const bool mtLqh = globalData.isNdbMtLqh;
+
theList[0] = pg = NEW_BLOCK(Pgman)(ctx);
theList[1] = lg = NEW_BLOCK(Lgman)(ctx);
theList[2] = ts = NEW_BLOCK(Tsman)(ctx, pg, lg);
- if (!g_ndbMtLqh)
+ if (!mtLqh)
theList[3] = NEW_BLOCK(Dbacc)(ctx);
else
theList[3] = NEW_BLOCK(DbaccProxy)(ctx);
@@ -113,29 +112,29 @@ SimBlockList::load(EmulatorData& data){
theList[5] = fs;
theList[6] = dbdict = NEW_BLOCK(Dbdict)(ctx);
theList[7] = dbdih = NEW_BLOCK(Dbdih)(ctx);
- if (!g_ndbMtLqh)
+ if (!mtLqh)
theList[8] = NEW_BLOCK(Dblqh)(ctx);
else
theList[8] = NEW_BLOCK(DblqhProxy)(ctx);
theList[9] = NEW_BLOCK(Dbtc)(ctx);
- if (!g_ndbMtLqh)
+ if (!mtLqh)
theList[10] = NEW_BLOCK(Dbtup)(ctx, pg);
else
theList[10] = NEW_BLOCK(DbtupProxy)(ctx);//wl4391_todo pg
theList[11] = NEW_BLOCK(Ndbcntr)(ctx);
theList[12] = NEW_BLOCK(Qmgr)(ctx);
theList[13] = NEW_BLOCK(Trix)(ctx);
- if (!g_ndbMtLqh)
+ if (!mtLqh)
theList[14] = NEW_BLOCK(Backup)(ctx);
else
theList[14] = NEW_BLOCK(BackupProxy)(ctx);
theList[15] = NEW_BLOCK(DbUtil)(ctx);
theList[16] = NEW_BLOCK(Suma)(ctx);
- if (!g_ndbMtLqh)
+ if (!mtLqh)
theList[17] = NEW_BLOCK(Dbtux)(ctx);
else
theList[17] = NEW_BLOCK(DbtuxProxy)(ctx);
- if (!g_ndbMtLqh)
+ if (!mtLqh)
theList[18] = NEW_BLOCK(Restore)(ctx);
else
theList[18] = NEW_BLOCK(RestoreProxy)(ctx);
=== modified file 'storage/ndb/src/kernel/blocks/LocalProxy.cpp'
--- a/storage/ndb/src/kernel/blocks/LocalProxy.cpp 2008-07-26 05:13:40 +0000
+++ b/storage/ndb/src/kernel/blocks/LocalProxy.cpp 2008-08-11 11:21:36 +0000
@@ -51,8 +51,8 @@ LocalProxy::execREAD_CONFIG_REQ(Signal*
ss.m_readConfigReq = *req;
ndbrequire(ss.m_readConfigReq.noOfParameters == 0);
- const Uint32 workers = globalData.ndbmtWorkers;
- const Uint32 threads = globalData.ndbmtThreads;
+ const Uint32 workers = globalData.ndbMtLqhWorkers;
+ const Uint32 threads = globalData.ndbMtLqhThreads;
Uint32 i;
for (i = 0; i < workers; i++) {
@@ -62,7 +62,7 @@ LocalProxy::execREAD_CONFIG_REQ(Signal*
ndbrequire(this->getInstance(instanceNo) == worker);
c_worker[i] = worker;
- add_worker_thr_map(number(), instanceNo);
+ add_lqh_worker_thr_map(number(), instanceNo);
}
// set after instances are created (sendpacked)
=== modified file 'storage/ndb/src/kernel/blocks/LocalProxy.hpp'
--- a/storage/ndb/src/kernel/blocks/LocalProxy.hpp 2008-07-26 05:13:40 +0000
+++ b/storage/ndb/src/kernel/blocks/LocalProxy.hpp 2008-08-11 11:21:36 +0000
@@ -38,7 +38,7 @@ public:
BLOCK_DEFINES(LocalProxy);
protected:
- enum { MaxWorkers = MAX_NDBMT_WORKERS };
+ enum { MaxWorkers = MAX_NDBMT_LQH_WORKERS };
Uint32 c_workers;
Uint32 c_threads;
SimulatedBlock* c_worker[MaxWorkers];
=== modified file 'storage/ndb/src/kernel/blocks/dbdih/Dbdih.hpp'
--- a/storage/ndb/src/kernel/blocks/dbdih/Dbdih.hpp 2008-07-25 05:48:32 +0000
+++ b/storage/ndb/src/kernel/blocks/dbdih/Dbdih.hpp 2008-08-11 11:21:36 +0000
@@ -1779,10 +1779,11 @@ public:
Uint32 dihGetInstanceKey(FragmentstorePtr tFragPtr) {
ndbrequire(!tFragPtr.isNull());
Uint32 log_part_id = tFragPtr.p->m_log_part_id;
- Uint32 instanceKey = 1 + log_part_id % MAX_NDBMT_WORKERS;
+ Uint32 instanceKey = 1 + log_part_id % MAX_NDBMT_LQH_WORKERS;
return instanceKey;
}
Uint32 dihGetInstanceKey(Uint32 tabId, Uint32 fragId);
+ Uint32 dihGetLogPartId(Uint32 tabId, Uint32 fragId);
};
#if (DIH_CDATA_SIZE < _SYSFILE_SIZE32)
=== modified file 'storage/ndb/src/kernel/blocks/dbdih/DbdihMain.cpp'
--- a/storage/ndb/src/kernel/blocks/dbdih/DbdihMain.cpp 2008-07-25 05:48:32 +0000
+++ b/storage/ndb/src/kernel/blocks/dbdih/DbdihMain.cpp 2008-08-11 11:21:36 +0000
@@ -16884,3 +16884,15 @@ Dbdih::dihGetInstanceKey(Uint32 tabId, U
Uint32 instanceKey = dihGetInstanceKey(tFragPtr);
return instanceKey;
}
+
+Uint32
+Dbdih::dihGetLogPartId(Uint32 tabId, Uint32 fragId)
+{
+ TabRecordPtr tTabPtr;
+ tTabPtr.i = tabId;
+ ptrCheckGuard(tTabPtr, ctabFileSize, tabRecord);
+ FragmentstorePtr tFragPtr;
+ getFragstore(tTabPtr.p, fragId, tFragPtr);
+ Uint32 logPartId = tFragPtr.p->m_log_part_id;
+ return logPartId;
+}
=== modified file 'storage/ndb/src/kernel/blocks/ndbfs/Ndbfs.cpp'
--- a/storage/ndb/src/kernel/blocks/ndbfs/Ndbfs.cpp 2007-12-23 12:52:25 +0000
+++ b/storage/ndb/src/kernel/blocks/ndbfs/Ndbfs.cpp 2008-08-11 11:21:36 +0000
@@ -313,11 +313,13 @@ Ndbfs::readWriteRequest(int action, Sign
Uint16 filePointer = (Uint16)fsRWReq->filePointer;
const UintR userPointer = fsRWReq->userPointer;
const BlockReference userRef = fsRWReq->userReference;
- const BlockNumber blockNumber = refToBlock(userRef);
+ const BlockNumber blockNumber = refToMain(userRef);
+ const Uint32 instanceNumber = refToInstance(userRef);
AsyncFile* openFile = theOpenFiles.find(filePointer);
- const NewVARIABLE *myBaseAddrRef = &getBat(blockNumber)[fsRWReq->varIndex];
+ const NewVARIABLE *myBaseAddrRef =
+ &getBat(blockNumber, instanceNumber)[fsRWReq->varIndex];
UintPtr tPageSize;
UintPtr tClusterSize;
UintPtr tNRR;
@@ -343,7 +345,7 @@ Ndbfs::readWriteRequest(int action, Sign
if(format != FsReadWriteReq::fsFormatGlobalPage &&
format != FsReadWriteReq::fsFormatSharedPage)
{
- if (fsRWReq->varIndex >= getBatSize(blockNumber)) {
+ if (fsRWReq->varIndex >= getBatSize(blockNumber, instanceNumber)) {
jam();// Ensure that a valid variable is used
errorCode = FsRef::fsErrInvalidParameters;
goto error;
@@ -561,12 +563,14 @@ Ndbfs::execFSAPPENDREQ(Signal * signal)
const Uint16 filePointer = (Uint16)fsReq->filePointer;
const UintR userPointer = fsReq->userPointer;
const BlockReference userRef = fsReq->userReference;
- const BlockNumber blockNumber = refToBlock(userRef);
+ const BlockNumber blockNumber = refToMain(userRef);
+ const Uint32 instanceNumber = refToInstance(userRef);
FsRef::NdbfsErrorCodeType errorCode;
AsyncFile* openFile = theOpenFiles.find(filePointer);
- const NewVARIABLE *myBaseAddrRef = &getBat(blockNumber)[fsReq->varIndex];
+ const NewVARIABLE *myBaseAddrRef =
+ &getBat(blockNumber, instanceNumber)[fsReq->varIndex];
const Uint32* tWA = (const Uint32*)myBaseAddrRef->WA;
const Uint32 tSz = myBaseAddrRef->nrr;
@@ -587,7 +591,7 @@ Ndbfs::execFSAPPENDREQ(Signal * signal)
goto error;
}
- if (fsReq->varIndex >= getBatSize(blockNumber)) {
+ if (fsReq->varIndex >= getBatSize(blockNumber, instanceNumber)) {
jam();// Ensure that a valid variable is used
errorCode = FsRef::fsErrInvalidParameters;
goto error;
=== modified file 'storage/ndb/src/kernel/main.cpp'
--- a/storage/ndb/src/kernel/main.cpp 2008-07-25 05:48:32 +0000
+++ b/storage/ndb/src/kernel/main.cpp 2008-08-11 11:21:36 +0000
@@ -286,23 +286,20 @@ init_global_memory_manager(EmulatorData
return 0; // Success
}
-bool g_ndbMt = false;
-bool g_ndbMtLqh = false;
-
static int
get_multithreaded_config(EmulatorData& ed)
{
- // multithreaded is compiled in ndbd/ndbmtd
- g_ndbMt = SimulatedBlock::isMultiThreaded();
- if (!g_ndbMt)
+ // multithreaded is compiled in ndbd/ndbmtd for now
+ globalData.isNdbMt = SimulatedBlock::isMultiThreaded();
+ if (!globalData.isNdbMt)
return 0;
// mt lqh via environment during development
{
const char* p = NdbEnv_GetEnv("NDB_MT_LQH", (char*)0, 0);
if (p != 0 && strchr("1Y", p[0]) != 0)
- g_ndbMtLqh = true;
- if (!g_ndbMtLqh)
+ globalData.isNdbMtLqh = true;
+ if (!globalData.isNdbMtLqh)
return 0;
}
@@ -315,8 +312,8 @@ get_multithreaded_config(EmulatorData& e
Uint32 workers = 0;
Uint32 threads = 0;
- if (ndb_mgm_get_int_parameter(p, CFG_NDBMT_WORKERS, &workers) ||
- ndb_mgm_get_int_parameter(p, CFG_NDBMT_THREADS, &threads))
+ if (ndb_mgm_get_int_parameter(p, CFG_NDBMT_LQH_WORKERS, &workers) ||
+ ndb_mgm_get_int_parameter(p, CFG_NDBMT_LQH_THREADS, &threads))
{
g_eventLogger->alert("Failed to get CFG_NDBMT parameters from "
"config, exiting.");
@@ -326,12 +323,12 @@ get_multithreaded_config(EmulatorData& e
ndbout << "NDBMT: workers=" << workers
<< " threads=" << threads << endl;
- assert(workers != 0 && workers <= MAX_NDBMT_WORKERS);
- assert(threads != 0 && threads <= MAX_NDBMT_THREADS);
+ assert(workers != 0 && workers <= MAX_NDBMT_LQH_WORKERS);
+ assert(threads != 0 && threads <= MAX_NDBMT_LQH_THREADS);
assert(workers % threads == 0);
- globalData.ndbmtWorkers = workers;
- globalData.ndbmtThreads = threads;
+ globalData.ndbMtLqhWorkers = workers;
+ globalData.ndbMtLqhThreads = threads;
return 0;
}
=== modified file 'storage/ndb/src/kernel/vm/Configuration.cpp'
--- a/storage/ndb/src/kernel/vm/Configuration.cpp 2008-07-25 05:48:32 +0000
+++ b/storage/ndb/src/kernel/vm/Configuration.cpp 2008-08-11 11:21:36 +0000
@@ -1003,11 +1003,11 @@ Configuration::calcSizeAlt(ConfigValues
{
uint workers = 4;
uint threads = 2;
- assert(workers <= MAX_NDBMT_WORKERS);
- assert(threads <= MAX_NDBMT_THREADS);
+ assert(workers <= MAX_NDBMT_LQH_WORKERS);
+ assert(threads <= MAX_NDBMT_LQH_THREADS);
assert(workers % threads == 0);
- cfg.put(CFG_NDBMT_WORKERS, workers);
- cfg.put(CFG_NDBMT_THREADS, threads);
+ cfg.put(CFG_NDBMT_LQH_WORKERS, workers);
+ cfg.put(CFG_NDBMT_LQH_THREADS, threads);
}
m_ownConfig = (ndb_mgm_configuration*)cfg.getConfigValues();
=== modified file 'storage/ndb/src/kernel/vm/FastScheduler.cpp'
--- a/storage/ndb/src/kernel/vm/FastScheduler.cpp 2008-05-29 15:06:11 +0000
+++ b/storage/ndb/src/kernel/vm/FastScheduler.cpp 2008-08-11 11:21:36 +0000
@@ -99,7 +99,8 @@ FastScheduler::doJob()
// signal->garbage_register();
// To ensure we find bugs quickly
register Uint32 gsnbnr = theJobBuffers[tHighPrio].retrieve(signal);
- register BlockNumber reg_bnr = gsnbnr & 0xFFF;
+ // also strip any instance bits since this is non-MT code
+ register BlockNumber reg_bnr = gsnbnr & NDBMT_BLOCK_MASK;
register GlobalSignalNumber reg_gsn = gsnbnr >> 16;
globalData.incrementWatchDogCounter(1);
if (reg_bnr > 0) {
@@ -411,6 +412,8 @@ void FastScheduler::dumpSignalMemory(Uin
ReadPtr[tLevel]--;
theJobBuffers[tLevel].retrieveDump(&signal, ReadPtr[tLevel]);
+ // strip instance bits since this in non-MT code
+ signal.header.theReceiversBlockNumber &= NDBMT_BLOCK_MASK;
print_restart(output, &signal, tLevel);
if (tJob == 0)
=== modified file 'storage/ndb/src/kernel/vm/GlobalData.hpp'
--- a/storage/ndb/src/kernel/vm/GlobalData.hpp 2008-07-25 05:48:32 +0000
+++ b/storage/ndb/src/kernel/vm/GlobalData.hpp 2008-08-11 11:21:36 +0000
@@ -68,15 +68,19 @@ struct GlobalData {
Uint32 sendPackedActivated;
Uint32 activateSendPacked;
- Uint32 ndbmtWorkers;
- Uint32 ndbmtThreads;
+ bool isNdbMt; // ndbd multithreaded, no workers
+ bool isNdbMtLqh; // ndbd multithreaded, LQH workers
+ Uint32 ndbMtLqhWorkers;
+ Uint32 ndbMtLqhThreads;
GlobalData(){
theSignalId = 0;
theStartLevel = NodeState::SL_NOTHING;
theRestartFlag = perform_start;
- ndbmtWorkers = 0;
- ndbmtThreads = 0;
+ isNdbMt = false;
+ isNdbMtLqh = false;
+ ndbMtLqhWorkers = 0;
+ ndbMtLqhThreads = 0;
#ifdef GCP_TIMER_HACK
gcp_timer_limit = 0;
#endif
=== modified file 'storage/ndb/src/kernel/vm/SimulatedBlock.cpp'
--- a/storage/ndb/src/kernel/vm/SimulatedBlock.cpp 2008-07-25 05:48:32 +0000
+++ b/storage/ndb/src/kernel/vm/SimulatedBlock.cpp 2008-08-11 11:21:36 +0000
@@ -225,6 +225,25 @@ SimulatedBlock::getInstanceKey(Uint32 ta
return instanceKey;
}
+Uint32
+SimulatedBlock::getLogPartId(Uint32 tabId, Uint32 fragId)
+{
+ Dbdih* dbdih = (Dbdih*)globalData.getBlock(DBDIH);
+ ndbrequire(dbdih != 0);
+ Uint32 logPartId = dbdih->dihGetLogPartId(tabId, fragId);
+ return logPartId;
+}
+
+bool
+SimulatedBlock::isLogPartOwner(Uint32 worker, Uint32 logPartId)
+{
+ if (!globalData.isNdbMtLqh)
+ return true;
+ Uint32 workers = globalData.ndbMtLqhWorkers;
+ ndbrequire(workers != 0 && worker < workers);
+ return worker == logPartId % workers;
+}
+
void
SimulatedBlock::assignToThread(Uint32 threadId, EmulatedJamBuffer *jamBuffer,
Uint32 *watchDogCounter)
@@ -1300,6 +1319,7 @@ SimulatedBlock::freeBat(){
const NewVARIABLE *
SimulatedBlock::getBat(Uint16 blockNo, Uint32 instanceNo){
+ assert(blockNo == blockToMain(blockNo));
SimulatedBlock * sb = globalData.getBlock(blockNo);
if (sb != 0 && instanceNo != 0)
sb = sb->getInstance(instanceNo);
@@ -1310,6 +1330,7 @@ SimulatedBlock::getBat(Uint16 blockNo, U
Uint16
SimulatedBlock::getBatSize(Uint16 blockNo, Uint32 instanceNo){
+ assert(blockNo == blockToMain(blockNo));
SimulatedBlock * sb = globalData.getBlock(blockNo);
if (sb != 0 && instanceNo != 0)
sb = sb->getInstance(instanceNo);
=== modified file 'storage/ndb/src/kernel/vm/SimulatedBlock.hpp'
--- a/storage/ndb/src/kernel/vm/SimulatedBlock.hpp 2008-07-25 05:48:32 +0000
+++ b/storage/ndb/src/kernel/vm/SimulatedBlock.hpp 2008-08-11 11:21:36 +0000
@@ -158,7 +158,6 @@ public:
}
return 0;
}
- Uint32 getInstanceKey(Uint32 tabId, Uint32 fragId);
/* Setup state of a block object for executing in a particular thread. */
void assignToThread(Uint32 threadId, EmulatedJamBuffer *jamBuffer,
@@ -167,6 +166,20 @@ public:
uint32 getThreadId() const { return m_threadId; }
static bool isMultiThreaded();
+ /* Configuration based alternative. Applies only to this node */
+ static bool isNdbMt() { return globalData.isNdbMt; }
+ static bool isNdbMtLqh() { return globalData.isNdbMtLqh; }
+
+ /*
+ * Instance key (1-4, even if not MT LQH) is set in receiver block ref.
+ * The receiver maps it to a real instance (0, if not MT LQH).
+ */
+ Uint32 getInstanceKey(Uint32 tabId, Uint32 fragId);
+
+ /* MT LQH log parts info for use by this node */
+ Uint32 getLogPartId(Uint32 tabId, Uint32 fragId);
+ bool isLogPartOwner(Uint32 worker, Uint32 logPartId);
+
public:
typedef void (SimulatedBlock::* CallbackFunction)(class Signal*,
Uint32 callbackData,
@@ -442,7 +455,7 @@ private:
* In MT LQH main instance is the LQH proxy and the others ("workers")
* are real LQHs run by multiple threads.
*/
- enum { MaxInstances = 1 + MAX_NDBMT_WORKERS };
+ enum { MaxInstances = 1 + MAX_NDBMT_LQH_WORKERS };
Uint32 theInstanceCount; // set in main
SimulatedBlock** theInstanceList; // set in main, indexed by instance
SimulatedBlock* theMainInstance; // set in all
@@ -464,9 +477,9 @@ protected:
NewVARIABLE* allocateBat(int batSize);
void freeBat();
static const NewVARIABLE* getBat (BlockNumber blockNo,
- Uint32 instanceNo = 0);
+ Uint32 instanceNo);
static Uint16 getBatSize(BlockNumber blockNo,
- Uint32 instanceNo = 0);
+ Uint32 instanceNo);
static BlockReference calcTcBlockRef (NodeId aNode);
static BlockReference calcLqhBlockRef (NodeId aNode);
@@ -483,6 +496,12 @@ protected:
static BlockReference calcApiClusterMgrBlockRef (NodeId aNode);
+ // matching instance on same node e.g. LQH-ACC-TUP
+ BlockReference calcInstanceBlockRef(BlockNumber aBlock);
+
+ // matching instance on another node e.g. LQH-LQH
+ BlockReference calcInstanceBlockRef(BlockNumber aBlock, NodeId aNode);
+
/**
* allocRecord
* Allocates memory for the datastructures where ndb keeps the data
@@ -832,6 +851,18 @@ SimulatedBlock::calcApiClusterMgrBlockRe
}
inline
+BlockReference
+SimulatedBlock::calcInstanceBlockRef(BlockNumber aBlock){
+ return numberToRef(aBlock, instance(), getOwnNodeId());
+}
+
+inline
+BlockReference
+SimulatedBlock::calcInstanceBlockRef(BlockNumber aBlock, NodeId aNodeId){
+ return numberToRef(aBlock, instance(), aNodeId);
+}
+
+inline
const NodeState &
SimulatedBlock::getNodeState() const {
return theNodeState;
=== modified file 'storage/ndb/src/kernel/vm/TransporterCallback.cpp'
--- a/storage/ndb/src/kernel/vm/TransporterCallback.cpp 2008-08-07 11:52:50 +0000
+++ b/storage/ndb/src/kernel/vm/TransporterCallback.cpp 2008-08-11 11:21:36 +0000
@@ -202,13 +202,7 @@ TransporterCallbackKernel::deliver_signa
const Uint32 secCount = header->m_noOfSections;
const Uint32 length = header->theLength;
- /*
- * Strip instance bits if not multithreaded. This is also
- * done in versions prior to MT LQH, to simplify online upgrade.
- */
-#ifndef NDBD_MULTITHREADED
- header->theReceiversBlockNumber &= NDBMT_BLOCK_MASK;
-#endif
+ // if this node is not MT LQH then instance bits are stripped at execute
#ifdef TRACE_DISTRIBUTED
ndbout_c("recv: %s(%d) from (%s, %d)",
=== modified file 'storage/ndb/src/kernel/vm/dummy_nonmt.cpp'
--- a/storage/ndb/src/kernel/vm/dummy_nonmt.cpp 2008-07-25 05:48:32 +0000
+++ b/storage/ndb/src/kernel/vm/dummy_nonmt.cpp 2008-08-11 11:21:36 +0000
@@ -8,7 +8,7 @@ add_thr_map(Uint32, Uint32, Uint32)
}
void
-add_worker_thr_map(Uint32, Uint32)
+add_lqh_worker_thr_map(Uint32, Uint32)
{
assert(false);
}
=== modified file 'storage/ndb/src/kernel/vm/mt.cpp'
--- a/storage/ndb/src/kernel/vm/mt.cpp 2008-08-09 09:59:50 +0000
+++ b/storage/ndb/src/kernel/vm/mt.cpp 2008-08-11 11:21:36 +0000
@@ -43,9 +43,9 @@ static const Uint32 MAX_SIGNALS_PER_JB =
//#define NDB_MT_LOCK_TO_CPU
-#define MAX_BLOCK_INSTANCES (1 + MAX_NDBMT_WORKERS)
+#define MAX_BLOCK_INSTANCES (1 + MAX_NDBMT_LQH_WORKERS)
#define NUM_MAIN_THREADS 2 // except receiver
-#define MAX_THREADS (NUM_MAIN_THREADS + MAX_NDBMT_THREADS + 1)
+#define MAX_THREADS (NUM_MAIN_THREADS + MAX_NDBMT_LQH_THREADS + 1)
static Uint32 ndbmt_workers = 0;
static Uint32 ndbmt_threads = 0;
@@ -1783,10 +1783,19 @@ execute_signals(thr_data *selfptr, thr_j
if(siglen>16)
__builtin_prefetch (read_buffer->m_data + read_pos + 32, 0, 3);
Uint32 bno = blockToMain(s->theReceiversBlockNumber);
- Uint32 ino = blockToInstance(s->theReceiversBlockNumber);
+ SimulatedBlock* block = globalData.getBlock(bno);
+
+ if (ndbmt_workers != 0) { // MT LQH
+ Uint32 ino = blockToInstance(s->theReceiversBlockNumber);
+ if (ino != 0) {
+ // map instance key to real instance
+ ino = 1 + (ino - 1) % ndbmt_workers;
+ block = block->getInstance(ino);
+ assert(block != 0);
+ }
+ }
+
Uint32 gsn = s->theVerId_signalNumber;
- SimulatedBlock * main = globalData.getBlock(bno);
- SimulatedBlock * block = main->getInstance(ino);
*watchDogCounter = 1;
/* Must update original buffer so signal dump will see it. */
s->theSignalId = (*signalIdCounter)++;
@@ -1926,7 +1935,7 @@ add_main_thr_map()
// workers added by LocalProxy
void
-add_worker_thr_map(Uint32 block, Uint32 instance)
+add_lqh_worker_thr_map(Uint32 block, Uint32 instance)
{
assert(instance != 0);
Uint32 i = instance - 1;
@@ -2187,11 +2196,13 @@ sendlocal(Uint32 self, const SignalHeade
const Uint32 secPtr[3])
{
Uint32 block = blockToMain(s->theReceiversBlockNumber);
- Uint32 instance = blockToInstance(s->theReceiversBlockNumber);
+ Uint32 instance = 0;
- // map on receiver side
- if (instance != 0)
- instance = 1 + (instance - 1) % ndbmt_workers;
+ if (ndbmt_workers != 0) { // MT LQH
+ instance = blockToInstance(s->theReceiversBlockNumber);
+ if (instance != 0)
+ instance = 1 + (instance - 1) % ndbmt_workers;
+ }
/*
* Max number of signals to put into job buffer before flushing the buffer
@@ -2223,11 +2234,13 @@ sendprioa(Uint32 self, const SignalHeade
const Uint32 secPtr[3])
{
Uint32 block = blockToMain(s->theReceiversBlockNumber);
- Uint32 instance = blockToInstance(s->theReceiversBlockNumber);
+ Uint32 instance = 0;
- // map on receiver side
- if (instance != 0)
- instance = 1 + (instance - 1) % ndbmt_workers;
+ if (ndbmt_workers != 0) { // MT LQH
+ instance = blockToInstance(s->theReceiversBlockNumber);
+ if (instance != 0)
+ instance = 1 + (instance - 1) % ndbmt_workers;
+ }
Uint32 dst = block2ThreadId(block, instance);
struct thr_repository* rep = &g_thr_repository;
@@ -2615,8 +2628,8 @@ ThreadConfig::~ThreadConfig()
void
ThreadConfig::init(EmulatorData *emulatorData)
{
- ndbmt_workers = globalData.ndbmtWorkers;
- ndbmt_threads = globalData.ndbmtThreads;
+ ndbmt_workers = globalData.ndbMtLqhWorkers;
+ ndbmt_threads = globalData.ndbMtLqhThreads;
num_threads = NUM_MAIN_THREADS + ndbmt_threads + 1;
assert(num_threads <= MAX_THREADS);
receiver_thread_no = num_threads - 1;
@@ -3005,6 +3018,9 @@ FastScheduler::dumpSignalMemory(Uint32 t
if (siglen > 25)
siglen = 25; // Sanity check
memcpy(&signal.header, s, 4*siglen);
+ // instance number in trace file is confusing if not MT LQH
+ if (ndbmt_workers == 0)
+ signal.header.theReceiversBlockNumber &= NDBMT_BLOCK_MASK;
const Uint32 *posptr = reinterpret_cast<const Uint32 *>(s);
signal.m_sectionPtrI[0] = posptr[siglen + 0];
=== modified file 'storage/ndb/src/kernel/vm/mt.hpp'
--- a/storage/ndb/src/kernel/vm/mt.hpp 2008-07-25 05:48:32 +0000
+++ b/storage/ndb/src/kernel/vm/mt.hpp 2008-08-11 11:21:36 +0000
@@ -16,7 +16,7 @@ extern Uint32 receiverThreadId;
/* Assign block instance to thread */
void add_thr_map(Uint32 block, Uint32 instance, Uint32 thr_no);
-void add_worker_thr_map(Uint32 block, Uint32 instance);
+void add_lqh_worker_thr_map(Uint32 block, Uint32 instance);
void sendlocal(Uint32 self, const struct SignalHeader *s,
const Uint32 *data, const Uint32 secPtr[3]);
| Thread |
|---|
| • bzr commit into mysql-5.1 branch (pekka:2704) WL#4391 | Pekka Nousiainen | 11 Aug |