#At file:///export/space/pekka/ndb/version/my51-wl4391/
2677 Pekka Nousiainen 2008-07-23
wl#4391 01.diff
Block instances, mapping to threads, log part id, MT LQH switch.
added:
storage/ndb/src/kernel/vm/GlobalData.cpp
storage/ndb/src/kernel/vm/dummy_nonmt.cpp
modified:
storage/ndb/include/debugger/SignalLoggerManager.hpp
storage/ndb/include/kernel/RefConvert.hpp
storage/ndb/include/kernel/kernel_config_parameters.h
storage/ndb/include/kernel/kernel_types.h
storage/ndb/include/kernel/ndb_limits.h
storage/ndb/src/common/debugger/SignalLoggerManager.cpp
storage/ndb/src/kernel/blocks/dbdict/Dbdict.cpp
storage/ndb/src/kernel/blocks/dbdict/Dbdict.hpp
storage/ndb/src/kernel/blocks/dbdih/Dbdih.hpp
storage/ndb/src/kernel/blocks/dbdih/DbdihMain.cpp
storage/ndb/src/kernel/blocks/ndbfs/PosixAsyncFile.cpp
storage/ndb/src/kernel/main.cpp
storage/ndb/src/kernel/vm/Configuration.cpp
storage/ndb/src/kernel/vm/GlobalData.hpp
storage/ndb/src/kernel/vm/Makefile.am
storage/ndb/src/kernel/vm/SimulatedBlock.cpp
storage/ndb/src/kernel/vm/SimulatedBlock.hpp
storage/ndb/src/kernel/vm/TransporterCallback.cpp
storage/ndb/src/kernel/vm/mt.cpp
storage/ndb/src/kernel/vm/mt.hpp
=== modified file 'storage/ndb/include/debugger/SignalLoggerManager.hpp'
--- a/storage/ndb/include/debugger/SignalLoggerManager.hpp 2008-06-17 20:28:45 +0000
+++ b/storage/ndb/include/debugger/SignalLoggerManager.hpp 2008-07-23 16:00:25 +0000
@@ -26,6 +26,7 @@
#include <kernel_types.h>
#include <BlockNumbers.h>
#include <TransporterDefinitions.hpp>
+#include <RefConvert.hpp>
class SignalLoggerManager
{
@@ -187,11 +188,17 @@ private:
Uint32 traceId;
Uint8 logModes[NO_OF_BLOCKS];
+
+ NdbMutex* m_mutex;
+ void lock() { if (m_mutex != 0) NdbMutex_Lock(m_mutex); }
+ void unlock() { if (m_mutex != 0) NdbMutex_Unlock(m_mutex); }
public:
inline bool
logMatch(BlockNumber bno, LogMode mask)
{
+ // extract main block number
+ bno = blockToMain(bno);
// avoid addressing outside logModes
return
bno < MIN_BLOCK_NO || bno > MAX_BLOCK_NO ||
=== modified file 'storage/ndb/include/kernel/RefConvert.hpp'
--- a/storage/ndb/include/kernel/RefConvert.hpp 2006-12-23 19:20:40 +0000
+++ b/storage/ndb/include/kernel/RefConvert.hpp 2008-07-23 16:00:25 +0000
@@ -16,30 +16,82 @@
#ifndef REFCONVERT_H
#define REFCONVERT_H
+#include <assert.h>
#include "kernel_types.h"
+/*
+ * In multithreaded kernel, BlockNumber includes the main block
+ * number in lower 9 bits and the instance in upper 7 bits.
+ */
+
+inline
+BlockNumber blockToMain(Uint32 block){
+ assert(block < (1 << 16));
+ return (BlockNumber)(block & ((1 << 9) - 1));
+}
+
+inline
+BlockInstance blockToInstance(Uint32 block){
+ assert(block < (1 << 16));
+ return (BlockNumber)(block >> 9);
+}
+
+inline
+BlockNumber numberToBlock(Uint32 main, Uint32 instance)
+{
+ assert(main < (1 << 9) && instance < (1 << (16 - 9)));
+ return (BlockNumber)(main | (instance << 9));
+}
+
+/**
+ * Convert BlockReference to NodeId
+ */
+inline
+NodeId refToNode(Uint32 ref){
+ return (NodeId)(ref & ((1 << 16) - 1));
+}
+
/**
- * Convert BlockReference to BlockNumber
+ * Convert BlockReference to full 16-bit BlockNumber.
*/
inline
-BlockNumber refToBlock(BlockReference ref){
+BlockNumber refToBlock(Uint32 ref){
return (BlockNumber)(ref >> 16);
}
/**
- * Convert BlockReference to NodeId
+ * Convert BlockReference to main BlockNumber.
+ * Used in tests such as: refToMain(senderRef) == DBTC.
+ */
+inline
+BlockNumber refToMain(Uint32 ref){
+ return (BlockNumber)((ref >> 16) & ((1 << 9) - 1));
+}
+
+/**
+ * Convert BlockReference to BlockInstance.
*/
inline
-NodeId refToNode(BlockReference ref){
- return (NodeId)(ref & 0xFFFF);
+BlockInstance refToInstance(Uint32 ref){
+ return (BlockInstance)(ref >> (16 + 9));
}
/**
* Convert NodeId and BlockNumber to BlockReference
*/
inline
-BlockReference numberToRef(BlockNumber bnr, NodeId proc){
- return (((Uint32)bnr) << 16) + proc;
+BlockReference numberToRef(Uint32 block, Uint32 node){
+ assert(node < (1 << 16) && block < (1 << 16));
+ return (BlockReference)(node | (block << 16));
+}
+
+/**
+ * Convert NodeId and block main and instance to BlockReference
+ */
+inline
+BlockReference numberToRef(Uint32 main, Uint32 instance, Uint32 node){
+ assert(node < (1 << 16) && main < (1 << 9) && instance < (1 << (16 - 9)));
+ return (BlockReference)(node | (main << 16) | (instance << (16 + 9)));
}
#endif
=== modified file 'storage/ndb/include/kernel/kernel_config_parameters.h'
--- a/storage/ndb/include/kernel/kernel_config_parameters.h 2006-12-31 00:32:21 +0000
+++ b/storage/ndb/include/kernel/kernel_config_parameters.h 2008-07-23 16:00:25 +0000
@@ -63,4 +63,7 @@
#define CFG_TUX_ATTRIBUTE (PRIVATE_BASE + 42)
#define CFG_TUX_SCAN_OP (PRIVATE_BASE + 43)
+#define CFG_NDBMT_WORKERS (PRIVATE_BASE + 44)
+#define CFG_NDBMT_THREADS (PRIVATE_BASE + 45)
+
#endif
=== modified file 'storage/ndb/include/kernel/kernel_types.h'
--- a/storage/ndb/include/kernel/kernel_types.h 2006-12-23 19:20:40 +0000
+++ b/storage/ndb/include/kernel/kernel_types.h 2008-07-23 16:00:25 +0000
@@ -22,6 +22,7 @@
typedef Uint16 NodeId;
typedef Uint16 BlockNumber;
+typedef Uint16 BlockInstance;
typedef Uint32 BlockReference;
typedef Uint16 GlobalSignalNumber;
=== modified file 'storage/ndb/include/kernel/ndb_limits.h'
--- a/storage/ndb/include/kernel/ndb_limits.h 2008-06-18 09:01:43 +0000
+++ b/storage/ndb/include/kernel/ndb_limits.h 2008-07-23 16:00:25 +0000
@@ -178,7 +178,10 @@
*/
#define NDBMT_BLOCK_BITS 9
#define NDBMT_BLOCK_MASK 0x001FF
-#define NDBMT_BLOCK_INSTANCES_BITS 7
+#define NDBMT_BLOCK_INSTANCE_BITS 7
#define NDBMT_BLOCK_INSTANCE_MASK 0xFE00
+#define MAX_NDBMT_WORKERS 4
+#define MAX_NDBMT_THREADS 4
+
#endif
=== modified file 'storage/ndb/src/common/debugger/SignalLoggerManager.cpp'
--- a/storage/ndb/src/common/debugger/SignalLoggerManager.cpp 2008-06-17 20:28:45 +0000
+++ b/storage/ndb/src/common/debugger/SignalLoggerManager.cpp 2008-07-23 16:00:25 +0000
@@ -17,8 +17,22 @@
#include "SignalLoggerManager.hpp"
#include <LongSignal.hpp>
-
+#include <GlobalSignalNumbers.h>
#include <DebuggerNames.hpp>
+#include <NdbTick.h>
+
+// avoids MT log mixups but has some serializing effect
+static const bool g_use_mutex = true;
+
+static char* mytime()
+{
+ NDB_TICKS t = NdbTick_CurrentMillisecond();
+ uint s = (t / 1000) % 3600;
+ uint ms = t % 1000;
+ static char buf[100];
+ sprintf(buf, "%u.%03u", s, ms);
+ return buf;
+}
SignalLoggerManager::SignalLoggerManager()
{
@@ -28,6 +42,9 @@ SignalLoggerManager::SignalLoggerManager
outputStream = 0;
m_ownNodeId = 0;
m_logDistributed = false;
+ m_mutex = 0;
+ if (g_use_mutex)
+ m_mutex = NdbMutex_Create();
}
SignalLoggerManager::~SignalLoggerManager()
@@ -37,6 +54,10 @@ SignalLoggerManager::~SignalLoggerManage
fclose(outputStream);
outputStream = 0;
}
+ if (m_mutex != 0) {
+ NdbMutex_Destroy(m_mutex);
+ m_mutex = 0;
+ }
}
FILE *
@@ -133,7 +154,7 @@ SignalLoggerManager::log(LogMode logMode
count == 0){
for (int number = 0; number < NO_OF_BLOCKS; ++number){
- cnt += log(SLM_ON, number, logMode);
+ cnt += log(SLM_ON, MIN_BLOCK_NO + number, logMode);
}
} else {
for (int i = 0; i < count; ++i){
@@ -220,15 +241,17 @@ SignalLoggerManager::executeDirect(const
if(outputStream != 0 &&
(traceId == 0 || traceId == trace) &&
(logMatch(senderBlockNo, LogOut) || logMatch(receiverBlockNo, LogIn))){
+ lock();
const char* inOutStr = prio == 0 ? "In" : "Out";
#ifdef VM_TRACE_TIME
- fprintf(outputStream, "---- Direct --- Signal --- %s - %d ----\n", inOutStr, time(0));
+ fprintf(outputStream, "---- Direct --- Signal --- %s - %s ----\n", inOutStr, mytime());
#else
fprintf(outputStream, "---- Direct --- Signal --- %s ----------------\n", inOutStr);
#endif
// XXX pass in/out to print* function somehow
printSignalHeader(outputStream, sh, 0, node, true);
printSignalData(outputStream, sh, theData);
+ unlock();
}
}
@@ -249,8 +272,9 @@ SignalLoggerManager::executeSignal(const
(traceId == 0 || traceId == trace) &&
(logMatch(receiverBlockNo, LogOut) ||
(m_logDistributed && m_ownNodeId != senderNode))){
+ lock();
#ifdef VM_TRACE_TIME
- fprintf(outputStream, "---- Received - Signal - %d ----\n", time(0));
+ fprintf(outputStream, "---- Received - Signal - %s ----\n", mytime());
#else
fprintf(outputStream, "---- Received - Signal ----------------\n");
#endif
@@ -259,6 +283,7 @@ SignalLoggerManager::executeSignal(const
printSignalData(outputStream, sh, theData);
for (unsigned i = 0; i < secs; i++)
printSegmentedSection(outputStream, sh, ptr, i);
+ unlock();
}
}
@@ -276,8 +301,9 @@ SignalLoggerManager::executeSignal(const
(traceId == 0 || traceId == trace) &&
(logMatch(receiverBlockNo, LogOut) ||
(m_logDistributed && m_ownNodeId != senderNode))){
+ lock();
#ifdef VM_TRACE_TIME
- fprintf(outputStream, "---- Received - Signal - %d ----\n", time(0));
+ fprintf(outputStream, "---- Received - Signal - %s ----\n", mytime());
#else
fprintf(outputStream, "---- Received - Signal ----------------\n");
#endif
@@ -286,6 +312,7 @@ SignalLoggerManager::executeSignal(const
printSignalData(outputStream, sh, theData);
for (unsigned i = 0; i < secs; i++)
printLinearSection(outputStream, sh, ptr, i);
+ unlock();
}
}
@@ -306,8 +333,9 @@ SignalLoggerManager::sendSignal(const Si
(traceId == 0 || traceId == trace) &&
(logMatch(senderBlockNo, LogOut) ||
(m_logDistributed && m_ownNodeId != node))){
+ lock();
#ifdef VM_TRACE_TIME
- fprintf(outputStream, "---- Send ----- Signal - %d ----\n", time(0));
+ fprintf(outputStream, "---- Send ----- Signal - %s ----\n", mytime());
#else
fprintf(outputStream, "---- Send ----- Signal ----------------\n");
#endif
@@ -316,6 +344,7 @@ SignalLoggerManager::sendSignal(const Si
printSignalData(outputStream, sh, theData);
for (unsigned i = 0; i < secs; i++)
printLinearSection(outputStream, sh, ptr, i);
+ unlock();
}
}
@@ -335,8 +364,9 @@ SignalLoggerManager::sendSignal(const Si
(traceId == 0 || traceId == trace) &&
(logMatch(senderBlockNo, LogOut) ||
(m_logDistributed && m_ownNodeId != node))){
+ lock();
#ifdef VM_TRACE_TIME
- fprintf(outputStream, "---- Send ----- Signal - %d ----\n", time(0));
+ fprintf(outputStream, "---- Send ----- Signal - %s ----\n", mytime());
#else
fprintf(outputStream, "---- Send ----- Signal ----------------\n");
#endif
@@ -345,6 +375,7 @@ SignalLoggerManager::sendSignal(const Si
printSignalData(outputStream, sh, theData);
for (unsigned i = 0; i < secs; i++)
printSegmentedSection(outputStream, sh, ptr, i);
+ unlock();
}
}
@@ -362,8 +393,9 @@ SignalLoggerManager::sendSignal(const Si
(traceId == 0 || traceId == trace) &&
(logMatch(senderBlockNo, LogOut) ||
(m_logDistributed && m_ownNodeId != node))){
+ lock();
#ifdef VM_TRACE_TIME
- fprintf(outputStream, "---- Send ----- Signal - %d ----\n", time(0));
+ fprintf(outputStream, "---- Send ----- Signal - %s ----\n", mytime());
#else
fprintf(outputStream, "---- Send ----- Signal ----------------\n");
#endif
@@ -372,6 +404,7 @@ SignalLoggerManager::sendSignal(const Si
printSignalData(outputStream, sh, theData);
for (unsigned i = 0; i < secs; i++)
printGenericSection(outputStream, sh, ptr, i);
+ unlock();
}
}
@@ -388,11 +421,12 @@ SignalLoggerManager::sendSignalWithDelay
if(outputStream != 0 &&
(traceId == 0 || traceId == trace) &&
logMatch(senderBlockNo, LogOut)){
+ lock();
#ifdef VM_TRACE_TIME
fprintf(outputStream,
- "---- Send ----- Signal (%d ms) %d\n",
+ "---- Send ----- Signal (%d ms) %s\n",
delayInMilliSeconds,
- time(0));
+ mytime());
#else
fprintf(outputStream, "---- Send delay Signal (%d ms) ----------\n",
delayInMilliSeconds);
@@ -402,6 +436,7 @@ SignalLoggerManager::sendSignalWithDelay
printSignalData(outputStream, sh, theData);
for (unsigned i = 0; i < secs; i++)
printSegmentedSection(outputStream, sh, ptr, i);
+ unlock();
}
}
@@ -417,15 +452,40 @@ SignalLoggerManager::log(BlockNumber bno
if(outputStream != 0 &&
logModes[bno2] != LogOff){
+ lock();
va_list ap;
va_start(ap, msg);
fprintf(outputStream, "%s: ", getBlockName(bno, "API"));
vfprintf(outputStream, msg, ap);
fprintf(outputStream, "\n");
va_end(ap);
+ unlock();
}
}
+static inline bool
+isSysBlock(BlockNumber block, Uint32 gsn)
+{
+ if (block != 0)
+ return false;
+ switch (gsn) {
+ case GSN_START_ORD:
+ return true; // first sig
+ case GSN_CONNECT_REP:
+ case GSN_DISCONNECT_REP:
+ case GSN_EVENT_REP:
+ return true; // transporter
+ case GSN_STOP_FOR_CRASH:
+ return true; // mt scheduler
+ }
+ return false;
+}
+
+static inline bool
+isApiBlock(BlockNumber block)
+{
+ return block >= 0x8000 || block == 4002 || block == 2047;
+}
void
SignalLoggerManager::printSignalHeader(FILE * output,
@@ -434,36 +494,79 @@ SignalLoggerManager::printSignalHeader(F
Uint32 node,
bool printReceiversSignalId)
{
- Uint32 receiverBlockNo = sh.theReceiversBlockNumber;
+ const char* const dummy_block_name = "UUNET";
+
+ bool receiverIsApi = isApiBlock(sh.theReceiversBlockNumber);
+ Uint32 receiverBlockNo;
+ Uint32 receiverInstanceNo;
+ if (!receiverIsApi) {
+ receiverBlockNo = blockToMain(sh.theReceiversBlockNumber);
+ receiverInstanceNo = blockToInstance(sh.theReceiversBlockNumber);
+ } else {
+ receiverBlockNo = sh.theReceiversBlockNumber;
+ receiverInstanceNo = 0;
+ }
Uint32 receiverProcessor = node;
+
Uint32 gsn = sh.theVerId_signalNumber;
- Uint32 senderBlockNo = refToBlock(sh.theSendersBlockRef);
- Uint32 senderProcessor = refToNode(sh.theSendersBlockRef);
+
+ Uint32 sbref = sh.theSendersBlockRef;
+ bool senderIsSys = isSysBlock(refToBlock(sbref), gsn);
+ bool senderIsApi = isApiBlock(refToBlock(sbref));
+ Uint32 senderBlockNo;
+ Uint32 senderInstanceNo;
+ if (!senderIsSys && !senderIsApi) {
+ senderBlockNo = refToMain(sbref);
+ senderInstanceNo = refToInstance(sbref);
+ } else {
+ senderBlockNo = refToBlock(sbref);
+ senderInstanceNo = 0;
+ }
+ Uint32 senderProcessor = refToNode(sbref);
+
Uint32 length = sh.theLength;
Uint32 trace = sh.theTrace;
Uint32 rSigId = sh.theSignalId;
Uint32 sSigId = sh.theSendersSignalId;
const char * signalName = getSignalName(gsn);
- const char * rBlockName = getBlockName(receiverBlockNo, "API");
- const char * sBlockName = getBlockName(senderBlockNo, "API");
-
- if(printReceiversSignalId)
+ const char * rBlockName =
+ receiverIsApi ? "API" :
+ getBlockName(receiverBlockNo, dummy_block_name);
+ const char * sBlockName =
+ senderIsSys ? "SYS" :
+ senderIsApi ? "API" :
+ getBlockName(senderBlockNo, dummy_block_name);
+
+ char rInstanceText[20];
+ char sInstanceText[20];
+ rInstanceText[0] = 0;
+ sInstanceText[0] = 0;
+ if (receiverInstanceNo != 0)
+ sprintf(rInstanceText, "/%u", (uint)receiverInstanceNo);
+ if (senderInstanceNo != 0)
+ sprintf(sInstanceText, "/%u", (uint)senderInstanceNo);
+// wl4391_todo senders instance missing
+// assert(gsn != GSN_LQHKEYREQ || receiverProcessor == senderProcessor || senderInstanceNo != 0);
+ if (printReceiversSignalId)
fprintf(output,
- "r.bn: %d \"%s\", r.proc: %d, r.sigId: %d gsn: %d \"%s\" prio: %d\n"
- ,receiverBlockNo, rBlockName, receiverProcessor, rSigId,
- gsn, signalName, prio);
+ "r.bn: %d%s \"%s\", r.proc: %d, r.sigId: %d gsn: %d \"%s\" prio: %d\n"
+ ,receiverBlockNo, rInstanceText, rBlockName, receiverProcessor,
+ rSigId, gsn, signalName, prio);
else
fprintf(output,
- "r.bn: %d \"%s\", r.proc: %d, gsn: %d \"%s\" prio: %d\n",
- receiverBlockNo, rBlockName, receiverProcessor, gsn,
- signalName, prio);
+ "r.bn: %d%s \"%s\", r.proc: %d, gsn: %d \"%s\" prio: %d\n",
+ receiverBlockNo, rInstanceText, rBlockName, receiverProcessor,
+ gsn, signalName, prio);
fprintf(output,
- "s.bn: %d \"%s\", s.proc: %d, s.sigId: %d length: %d trace: %d "
+ "s.bn: %d%s \"%s\", s.proc: %d, s.sigId: %d length: %d trace: %d "
"#sec: %d fragInf: %d\n",
- senderBlockNo, sBlockName, senderProcessor, sSigId, length, trace,
- sh.m_noOfSections, sh.m_fragmentInfo);
+ senderBlockNo, sInstanceText, sBlockName, senderProcessor,
+ sSigId, length, trace, sh.m_noOfSections, sh.m_fragmentInfo);
+
+ //assert(strcmp(rBlockName, dummy_block_name) != 0);
+ //assert(strcmp(sBlockName, dummy_block_name) != 0);
}
void
=== modified file 'storage/ndb/src/kernel/blocks/dbdict/Dbdict.cpp'
--- a/storage/ndb/src/kernel/blocks/dbdict/Dbdict.cpp 2008-07-01 12:35:34 +0000
+++ b/storage/ndb/src/kernel/blocks/dbdict/Dbdict.cpp 2008-07-23 16:00:25 +0000
@@ -1642,9 +1642,6 @@ Dbdict::Dbdict(Block_context& ctx):
c_opDropEvent(c_opRecordPool),
c_opSignalUtil(c_opRecordPool),
c_opRecordSequence(0)
-#ifdef VM_TRACE
- ,debugOut(*new NullOutputStream())
-#endif
{
BLOCK_CONSTRUCTOR(Dbdict);
@@ -2394,25 +2391,8 @@ void Dbdict::execREAD_CONFIG_REQ(Signal*
new (ptr.p) DictObject();
objs.release();
}
-
-#ifdef VM_TRACE
- {
- FILE* debugFile = globalSignalLoggers.getOutputStream();
- if (debugFile != NULL)
- debugOut = *new NdbOut(*new FileOutputStream(debugFile));
- }
-#endif
}//execSIZEALT_REP()
-#ifdef VM_TRACE
-bool
-Dbdict::debugOutOn() const
-{
- SignalLoggerManager::LogMode mask = SignalLoggerManager::LogInOut;
- return globalSignalLoggers.logMatch(DBDICT, mask);
-}
-#endif
-
/* ---------------------------------------------------------------- */
// Start phase signals sent by CNTR. We reply with NDB_STTORRY when
// we completed this phase.
=== modified file 'storage/ndb/src/kernel/blocks/dbdict/Dbdict.hpp'
--- a/storage/ndb/src/kernel/blocks/dbdict/Dbdict.hpp 2008-06-05 20:34:20 +0000
+++ b/storage/ndb/src/kernel/blocks/dbdict/Dbdict.hpp 2008-07-23 16:00:25 +0000
@@ -74,20 +74,6 @@
#include <LockQueue.hpp>
#include <signaldata/CopyData.hpp>
-// Debug Macros
-
-#ifdef VM_TRACE
-#define D(x) \
- do { \
- if (!debugOutOn()) break; \
- debugOut << "DBDICT:" << __LINE__ << " " << x << endl; \
- } while (0)
-#define V(x) " " << #x << ":" << (x)
-#else
-#define D(x)
-#undef V
-#endif
-
#ifdef DBDICT_C
/*--------------------------------------------------------------*/
@@ -3422,8 +3408,6 @@ public:
int checkSingleUserMode(Uint32 senderRef);
#ifdef VM_TRACE
- NdbOut debugOut;
- bool debugOutOn() const;
friend NdbOut& operator<<(NdbOut& out, const DictObject&);
friend NdbOut& operator<<(NdbOut& out, const ErrorInfo&);
friend NdbOut& operator<<(NdbOut& out, const SchemaOp&);
=== modified file 'storage/ndb/src/kernel/blocks/dbdih/Dbdih.hpp'
--- a/storage/ndb/src/kernel/blocks/dbdih/Dbdih.hpp 2008-06-05 20:31:21 +0000
+++ b/storage/ndb/src/kernel/blocks/dbdih/Dbdih.hpp 2008-07-23 16:00:25 +0000
@@ -1773,6 +1773,16 @@ private:
bool c_sr_wait_to;
NdbNodeBitmask m_sr_nodes;
NdbNodeBitmask m_to_nodes;
+
+ // block instances
+public:
+ Uint32 dihGetInstanceKey(FragmentstorePtr tFragPtr) {
+ ndbrequire(!tFragPtr.isNull());
+ Uint32 log_part_id = tFragPtr.p->m_log_part_id;
+ Uint32 instanceKey = 1 + log_part_id % MAX_NDBMT_WORKERS;
+ return instanceKey;
+ }
+ Uint32 dihGetInstanceKey(Uint32 tabId, Uint32 fragId);
};
#if (DIH_CDATA_SIZE < _SYSFILE_SIZE32)
=== modified file 'storage/ndb/src/kernel/blocks/dbdih/DbdihMain.cpp'
--- a/storage/ndb/src/kernel/blocks/dbdih/DbdihMain.cpp 2008-06-09 14:32:01 +0000
+++ b/storage/ndb/src/kernel/blocks/dbdih/DbdihMain.cpp 2008-07-23 16:00:25 +0000
@@ -16870,4 +16870,17 @@ do_send:
infoEvent("%s %s", msg, buf);
}
+// block instances
+Uint32
+Dbdih::dihGetInstanceKey(Uint32 tabId, Uint32 fragId)
+{
+ TabRecordPtr tTabPtr;
+ tTabPtr.i = tabId;
+ ptrCheckGuard(tTabPtr, ctabFileSize, tabRecord);
+ FragmentstorePtr tFragPtr;
+ getFragstore(tTabPtr.p, fragId, tFragPtr);
+ Uint32 instanceKey = dihGetInstanceKey(tFragPtr);
+ return instanceKey;
+}
+
#endif
=== modified file 'storage/ndb/src/kernel/blocks/ndbfs/PosixAsyncFile.cpp'
--- a/storage/ndb/src/kernel/blocks/ndbfs/PosixAsyncFile.cpp 2008-04-09 10:41:03 +0000
+++ b/storage/ndb/src/kernel/blocks/ndbfs/PosixAsyncFile.cpp 2008-07-23 16:00:25 +0000
@@ -335,10 +335,8 @@ no_odirect:
req->data.pageData[0] = m_page_ptr.i;
m_fs.EXECUTE_DIRECT(block, GSN_FSWRITEREQ, signal,
- FsReadWriteReq::FixedLength + 1
-#ifdef VM_TRACE
- , true // This EXECUTE_DIRECT is thread safe
-#endif
+ FsReadWriteReq::FixedLength + 1,
+ 0 // wl4391_todo This EXECUTE_DIRECT is thread safe
);
retry:
Uint32 size = request->par.open.page_size;
=== modified file 'storage/ndb/src/kernel/main.cpp'
--- a/storage/ndb/src/kernel/main.cpp 2008-04-23 13:52:37 +0000
+++ b/storage/ndb/src/kernel/main.cpp 2008-07-23 16:00:25 +0000
@@ -1,4 +1,4 @@
-/* Copyright (C) 2003 MySQL AB
+ /* Copyright (C) 2003 MySQL AB
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
@@ -33,6 +33,7 @@
#include <LogLevel.hpp>
#include <EventLogger.hpp>
+#include <NdbEnv.h>
#include <NdbAutoPtr.hpp>
@@ -285,6 +286,56 @@ init_global_memory_manager(EmulatorData
return 0; // Success
}
+bool g_ndbMt = false;
+bool g_ndbMtLqh = false;
+
+static int
+get_multithreaded_config(EmulatorData& ed)
+{
+ // multithreaded is compiled in ndbd/ndbmtd
+ g_ndbMt = SimulatedBlock::isMultiThreaded();
+ if (!g_ndbMt)
+ return 0;
+
+ // mt lqh via environment during development
+ {
+ const char* p = NdbEnv_GetEnv("NDB_MT_LQH", (char*)0, 0);
+ if (p != 0 && strchr("1Y", p[0]) != 0)
+ g_ndbMtLqh = true;
+ if (!g_ndbMtLqh)
+ return 0;
+ }
+
+ const ndb_mgm_configuration_iterator * p =
+ ed.theConfiguration->getOwnConfigIterator();
+ if (p == 0)
+ {
+ abort();
+ }
+
+ Uint32 workers = 0;
+ Uint32 threads = 0;
+ if (ndb_mgm_get_int_parameter(p, CFG_NDBMT_WORKERS, &workers) ||
+ ndb_mgm_get_int_parameter(p, CFG_NDBMT_THREADS, &threads))
+ {
+ g_eventLogger->alert("Failed to get CFG_NDBMT parameters from "
+ "config, exiting.");
+ return -1;
+ }
+
+ ndbout << "NDBMT: workers=" << workers
+ << " threads=" << threads << endl;
+
+ assert(workers != 0 && workers <= MAX_NDBMT_WORKERS);
+ assert(threads != 0 && threads <= MAX_NDBMT_THREADS);
+ assert(workers % threads == 0);
+
+ globalData.ndbmtWorkers = workers;
+ globalData.ndbmtThreads = threads;
+ return 0;
+}
+
+
int main(int argc, char** argv)
{
NDB_INIT(argv[0]);
@@ -480,28 +531,41 @@ int main(int argc, char** argv)
globalEmulatorData.theWatchDog->unregisterWatchedThread(0);
}
- globalEmulatorData.theThreadConfig->init(&globalEmulatorData);
+ if (get_multithreaded_config(globalEmulatorData))
+ return -1;
- // Load blocks
- globalEmulatorData.theSimBlockList->load(globalEmulatorData);
-
- // Set thread concurrency for Solaris' light weight processes
- int status;
- status = NdbThread_SetConcurrencyLevel(30);
- assert(status == 0);
+ globalEmulatorData.theThreadConfig->init(&globalEmulatorData);
#ifdef VM_TRACE
- // Create a signal logger
+ // Create a signal logger before block constructors
char *buf= NdbConfig_SignalLogFileName(globalData.ownId);
NdbAutoPtr<char> tmp_aptr(buf);
FILE * signalLog = fopen(buf, "a");
globalSignalLoggers.setOwnNodeId(globalData.ownId);
globalSignalLoggers.setOutputStream(signalLog);
-#if 0 // to log startup
- globalSignalLoggers.log(SignalLoggerManager::LogInOut, "BLOCK=DBDICT,DBDIH");
- globalData.testOn = 1;
+#if 1 // to log startup
+ { const char* p = NdbEnv_GetEnv("NDB_SIGNAL_LOG", (char*)0, 0);
+ if (p != 0) {
+ char buf[200];
+ snprintf(buf, sizeof(buf), "BLOCK=%s", p);
+ for (char* q = buf; *q != 0; q++) *q = toupper(toascii(*q));
+ globalSignalLoggers.log(SignalLoggerManager::LogInOut, buf);
+ globalData.testOn = 1;
+ assert(signalLog != 0);
+ fprintf(signalLog, "START\n");
+ fflush(signalLog);
+ }
+ }
#endif
#endif
+
+ // Load blocks
+ globalEmulatorData.theSimBlockList->load(globalEmulatorData);
+
+ // Set thread concurrency for Solaris' light weight processes
+ int status;
+ status = NdbThread_SetConcurrencyLevel(30);
+ assert(status == 0);
catchsigs(false);
=== modified file 'storage/ndb/src/kernel/vm/Configuration.cpp'
--- a/storage/ndb/src/kernel/vm/Configuration.cpp 2008-05-29 15:06:11 +0000
+++ b/storage/ndb/src/kernel/vm/Configuration.cpp 2008-07-23 16:00:25 +0000
@@ -999,6 +999,17 @@ Configuration::calcSizeAlt(ConfigValues
cfg.put(CFG_TUX_SCAN_OP, noOfLocalScanRecords);
}
+ // NDBMT
+ {
+ uint workers = 4;
+ uint threads = 2;
+ assert(workers <= MAX_NDBMT_WORKERS);
+ assert(threads <= MAX_NDBMT_THREADS);
+ assert(workers % threads == 0);
+ cfg.put(CFG_NDBMT_WORKERS, workers);
+ cfg.put(CFG_NDBMT_THREADS, threads);
+ }
+
m_ownConfig = (ndb_mgm_configuration*)cfg.getConfigValues();
m_ownConfigIterator = ndb_mgm_create_configuration_iterator
(m_ownConfig, 0);
=== added file 'storage/ndb/src/kernel/vm/GlobalData.cpp'
--- a/storage/ndb/src/kernel/vm/GlobalData.cpp 1970-01-01 00:00:00 +0000
+++ b/storage/ndb/src/kernel/vm/GlobalData.cpp 2008-07-23 16:00:25 +0000
@@ -0,0 +1,28 @@
+/* Copyright (C) 2003 MySQL AB
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; version 2 of the License.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
+
+#include <ndb_global.h>
+#include <kernel_types.h>
+#include <SimulatedBlock.hpp>
+#include "GlobalData.hpp"
+
+SimulatedBlock*
+GlobalData::getBlock(BlockNumber blockNo, Uint32 instanceNo)
+{
+ SimulatedBlock* b = getBlock(blockNo);
+ if (b != 0 && instanceNo != 0)
+ b = b->getInstance(instanceNo);
+ return b;
+}
=== modified file 'storage/ndb/src/kernel/vm/GlobalData.hpp'
--- a/storage/ndb/src/kernel/vm/GlobalData.hpp 2008-01-01 12:45:11 +0000
+++ b/storage/ndb/src/kernel/vm/GlobalData.hpp 2008-07-23 16:00:25 +0000
@@ -67,11 +67,16 @@ struct GlobalData {
Uint32 sendPackedActivated;
Uint32 activateSendPacked;
+
+ Uint32 ndbmtWorkers;
+ Uint32 ndbmtThreads;
GlobalData(){
theSignalId = 0;
theStartLevel = NodeState::SL_NOTHING;
theRestartFlag = perform_start;
+ ndbmtWorkers = 0;
+ ndbmtThreads = 0;
#ifdef GCP_TIMER_HACK
gcp_timer_limit = 0;
#endif
@@ -80,6 +85,7 @@ struct GlobalData {
void setBlock(BlockNumber blockNo, SimulatedBlock * block);
SimulatedBlock * getBlock(BlockNumber blockNo);
+ SimulatedBlock * getBlock(BlockNumber blockNo, Uint32 instanceNo);
void incrementWatchDogCounter(Uint32 place);
Uint32 * getWatchDogPtr();
=== modified file 'storage/ndb/src/kernel/vm/Makefile.am'
--- a/storage/ndb/src/kernel/vm/Makefile.am 2008-04-25 16:53:41 +0000
+++ b/storage/ndb/src/kernel/vm/Makefile.am 2008-07-23 16:00:25 +0000
@@ -36,13 +36,15 @@ libkernel_a_SOURCES = \
Rope.cpp \
ndbd_malloc.cpp \
ndbd_malloc_impl.cpp \
- Pool.cpp WOPool.cpp RWPool.cpp DynArr256.cpp LockQueue.cpp
+ Pool.cpp WOPool.cpp RWPool.cpp DynArr256.cpp LockQueue.cpp \
+ GlobalData.cpp
libsched_a_SOURCES = TimeQueue.cpp \
ThreadConfig.cpp \
FastScheduler.cpp \
TransporterCallback_nonmt.cpp \
- SimulatedBlock_nonmt.cpp
+ SimulatedBlock_nonmt.cpp \
+ dummy_nonmt.cpp
libsched_mt_a_SOURCES = SimulatedBlock_mt.cpp \
TransporterCallback_mt.cpp \
=== modified file 'storage/ndb/src/kernel/vm/SimulatedBlock.cpp'
--- a/storage/ndb/src/kernel/vm/SimulatedBlock.cpp 2008-07-01 12:35:34 +0000
+++ b/storage/ndb/src/kernel/vm/SimulatedBlock.cpp 2008-07-23 16:00:25 +0000
@@ -24,6 +24,7 @@
#include "SimulatedBlock.hpp"
#include <NdbOut.hpp>
+#include <OutputStream.hpp>
#include <GlobalData.hpp>
#include <Emulator.hpp>
#include <WatchDog.hpp>
@@ -46,6 +47,8 @@
#include <AttributeDescriptor.hpp>
#include <NdbSqlUtil.hpp>
+#include "../blocks/dbdih/Dbdih.hpp"
+
#include <EventLogger.hpp>
extern EventLogger * g_eventLogger;
@@ -56,10 +59,15 @@ extern EventLogger * g_eventLogger;
// Constructor, Destructor
//
SimulatedBlock::SimulatedBlock(BlockNumber blockNumber,
- struct Block_context & ctx)
+ struct Block_context & ctx,
+ Uint32 instanceNumber)
: theNodeId(globalData.ownId),
theNumber(blockNumber),
- theReference(numberToRef(blockNumber, globalData.ownId)),
+ theInstance(instanceNumber),
+ theReference(numberToRef(blockNumber, instanceNumber, globalData.ownId)),
+ theInstanceCount(0),
+ theInstanceList(0),
+ theMainInstance(0),
m_ctx(ctx),
m_global_page_pool(globalData.m_global_page_pool),
m_shared_page_pool(globalData.m_shared_page_pool),
@@ -68,13 +76,41 @@ SimulatedBlock::SimulatedBlock(BlockNumb
c_segmentedFragmentSendList(c_fragmentSendPool),
c_mutexMgr(* this),
c_counterMgr(* this)
+#ifdef VM_TRACE
+ ,debugOut(*new NdbOut(*new FileOutputStream(globalSignalLoggers.getOutputStream())))
+#endif
{
m_threadId = 0;
m_watchDogCounter = NULL;
m_jamBuffer = (EmulatedJamBuffer *)NdbThread_GetTlsKey(NDB_THREAD_TLS_JAM);
NewVarRef = 0;
- globalData.setBlock(blockNumber, this);
+ SimulatedBlock* main = globalData.getBlock(blockNumber);
+
+ if (theInstance == 0) {
+ ndbrequire(main == 0);
+ main = this;
+ globalData.setBlock(blockNumber, main);
+ main->theInstanceCount = 1;
+ } else {
+ ndbrequire(main != 0);
+ ndbrequire(theInstance == main->theInstanceCount);
+ ndbrequire(theInstance < MaxInstances);
+ if (theInstance == 1) {
+ ndbrequire(main->theInstanceList == 0);
+ main->theInstanceList = new SimulatedBlock* [MaxInstances];
+ Uint32 i;
+ for (i = 0; i < MaxInstances; i++)
+ main->theInstanceList[i] = 0;
+ } else {
+ ndbrequire(main->theInstanceList != 0);
+ }
+ ndbrequire(main->theInstanceList[theInstance] == 0);
+ main->theInstanceList[theInstance] = this;
+ main->theInstanceCount = theInstance + 1;
+ }
+ theMainInstance = main;
+
c_fragmentIdCounter = 1;
c_fragSenderRunning = false;
@@ -130,6 +166,14 @@ SimulatedBlock::~SimulatedBlock()
#ifdef VM_TRACE
delete [] m_global_variables;
#endif
+
+ if (theInstanceList != 0) {
+ Uint32 i;
+ for (i = 0; i < theInstanceCount; i++)
+ delete theInstanceList[i];
+ delete [] theInstanceList;
+ }
+ theInstanceList = 0;
}
void
@@ -172,6 +216,15 @@ SimulatedBlock::addRecSignalImpl(GlobalS
theExecArray[gsn] = f;
}
+Uint32
+SimulatedBlock::getInstanceKey(Uint32 tabId, Uint32 fragId)
+{
+ Dbdih* dbdih = (Dbdih*)globalData.getBlock(DBDIH);
+ ndbrequire(dbdih != 0);
+ Uint32 instanceKey = dbdih->dihGetInstanceKey(tabId, fragId);
+ return instanceKey;
+}
+
void
SimulatedBlock::assignToThread(Uint32 threadId, EmulatedJamBuffer *jamBuffer,
Uint32 *watchDogCounter)
@@ -509,9 +562,9 @@ SimulatedBlock::sendSignal(BlockReferenc
signal->header.theSendersBlockRef = sendBRef;
#ifdef NDBD_MULTITHREADED
if (jobBuffer == JBB)
- sendlocal(m_threadId, recBlock, &signal->header, signal->theData, NULL);
+ sendlocal(m_threadId, &signal->header, signal->theData, NULL);
else
- sendprioa(m_threadId, recBlock, &signal->header, signal->theData, NULL);
+ sendprioa(m_threadId, &signal->header, signal->theData, NULL);
#else
globalScheduler.execute(signal, jobBuffer, recBlock,
gsn);
@@ -607,9 +660,9 @@ SimulatedBlock::sendSignal(NodeReceiverG
#ifdef NDBD_MULTITHREADED
if (jobBuffer == JBB)
- sendlocal(m_threadId, recBlock, &signal->header, signal->theData, NULL);
+ sendlocal(m_threadId, &signal->header, signal->theData, NULL);
else
- sendprioa(m_threadId, recBlock, &signal->header, signal->theData, NULL);
+ sendprioa(m_threadId, &signal->header, signal->theData, NULL);
#else
globalScheduler.execute(signal, jobBuffer, recBlock, gsn);
#endif
@@ -716,10 +769,10 @@ SimulatedBlock::sendSignal(BlockReferenc
#ifdef NDBD_MULTITHREADED
if (jobBuffer == JBB)
- sendlocal(m_threadId, recBlock, &signal->header, signal->theData,
+ sendlocal(m_threadId, &signal->header, signal->theData,
signal->theData+length);
else
- sendprioa(m_threadId, recBlock, &signal->header, signal->theData,
+ sendprioa(m_threadId, &signal->header, signal->theData,
signal->theData+length);
#else
globalScheduler.execute(signal, jobBuffer, recBlock,
@@ -831,10 +884,10 @@ SimulatedBlock::sendSignal(NodeReceiverG
#ifdef NDBD_MULTITHREADED
if (jobBuffer == JBB)
- sendlocal(m_threadId, recBlock, &signal->header, signal->theData,
+ sendlocal(m_threadId, &signal->header, signal->theData,
signal->theData + length);
else
- sendprioa(m_threadId, recBlock, &signal->header, signal->theData,
+ sendprioa(m_threadId, &signal->header, signal->theData,
signal->theData + length);
#else
globalScheduler.execute(signal, jobBuffer, recBlock, gsn);
@@ -944,10 +997,10 @@ SimulatedBlock::sendSignal(BlockReferenc
#ifdef NDBD_MULTITHREADED
if (jobBuffer == JBB)
- sendlocal(m_threadId, recBlock, &signal->header, signal->theData,
+ sendlocal(m_threadId, &signal->header, signal->theData,
signal->theData + length);
else
- sendprioa(m_threadId, recBlock, &signal->header, signal->theData,
+ sendprioa(m_threadId, &signal->header, signal->theData,
signal->theData + length);
#else
globalScheduler.execute(signal, jobBuffer, recBlock, gsn);
@@ -1059,10 +1112,10 @@ SimulatedBlock::sendSignal(NodeReceiverG
* dst ++ = sections->m_ptr[2].i;
#ifdef NDBD_MULTITHREADED
if (jobBuffer == JBB)
- sendlocal(m_threadId, recBlock, &signal->header, signal->theData,
+ sendlocal(m_threadId, &signal->header, signal->theData,
signal->theData + length);
else
- sendprioa(m_threadId, recBlock, &signal->header, signal->theData,
+ sendprioa(m_threadId, &signal->header, signal->theData,
signal->theData + length);
#else
globalScheduler.execute(signal, jobBuffer, recBlock, gsn);
@@ -1246,16 +1299,20 @@ SimulatedBlock::freeBat(){
}
const NewVARIABLE *
-SimulatedBlock::getBat(Uint16 blockNo){
+SimulatedBlock::getBat(Uint16 blockNo, Uint32 instanceNo){
SimulatedBlock * sb = globalData.getBlock(blockNo);
+ if (sb != 0 && instanceNo != 0)
+ sb = sb->getInstance(instanceNo);
if(sb == 0)
return 0;
return sb->NewVarRef;
}
Uint16
-SimulatedBlock::getBatSize(Uint16 blockNo){
+SimulatedBlock::getBatSize(Uint16 blockNo, Uint32 instanceNo){
SimulatedBlock * sb = globalData.getBlock(blockNo);
+ if (sb != 0 && instanceNo != 0)
+ sb = sb->getInstance(instanceNo);
if(sb == 0)
return 0;
return sb->theBATSize;
@@ -1418,7 +1475,7 @@ SimulatedBlock::infoEvent(const char * m
signalT.header.theLength = ((len+3)/4)+1;
#ifdef NDBD_MULTITHREADED
- sendlocal(m_threadId, signalT.header.theReceiversBlockNumber,
+ sendlocal(m_threadId,
&signalT.header, signalT.theData, signalT.m_sectionPtrI);
#else
globalScheduler.execute(&signalT.header, JBB, signalT.theData,
@@ -1463,7 +1520,7 @@ SimulatedBlock::warningEvent(const char
signalT.header.theLength = ((len+3)/4)+1;
#ifdef NDBD_MULTITHREADED
- sendlocal(m_threadId, signalT.header.theReceiversBlockNumber,
+ sendlocal(m_threadId,
&signalT.header, signalT.theData, signalT.m_sectionPtrI);
#else
globalScheduler.execute(&signalT.header, JBB, signalT.theData,
@@ -1621,7 +1678,7 @@ SimulatedBlock::printTimes(FILE * output
}
}
sum = (sum + 500)/ 1000;
- fprintf(output, "-- %s : %d --\n", getBlockName(number()), sum);
+ fprintf(output, "-- %s : %u --\n", getBlockName(number()), (Uint32)sum);
fprintf(output, "\n");
fflush(output);
}
@@ -2387,7 +2444,7 @@ SimulatedBlock::setNodeInfo(NodeId nodeI
}
bool
-SimulatedBlock::isMultiThreaded() const
+SimulatedBlock::isMultiThreaded()
{
#ifdef NDBD_MULTITHREADED
return true;
@@ -2720,3 +2777,37 @@ SimulatedBlock::create_distr_key(Uint32
}
CArray<KeyDescriptor> g_key_descriptor_pool;
+
+#ifdef VM_TRACE
+bool
+SimulatedBlock::debugOutOn() const
+{
+ SignalLoggerManager::LogMode mask = SignalLoggerManager::LogInOut;
+ return globalSignalLoggers.logMatch(number(), mask);
+}
+
+const char*
+SimulatedBlock::debugOutTag(char *buf, int line)
+{
+ char blockbuf[40];
+ char instancebuf[40];
+ char linebuf[40];
+ char timebuf[40];
+ sprintf(blockbuf, "%s", getBlockName(number(), "UNKNOWN"));
+ instancebuf[0] = 0;
+ if (instance() != 0)
+ sprintf(instancebuf, "/%u", instance());
+ sprintf(linebuf, " %d", line);
+ timebuf[0] = 0;
+#ifdef VM_TRACE_TIME
+ {
+ NDB_TICKS t = NdbTick_CurrentMillisecond();
+ uint s = (t / 1000) % 3600;
+ uint ms = t % 1000;
+ sprintf(timebuf, " - %u.%03u -", s, ms);
+ }
+#endif
+ sprintf(buf, "%s%s%s%s ", blockbuf, instancebuf, linebuf, timebuf);
+ return buf;
+}
+#endif
=== modified file 'storage/ndb/src/kernel/vm/SimulatedBlock.hpp'
--- a/storage/ndb/src/kernel/vm/SimulatedBlock.hpp 2008-06-17 20:28:45 +0000
+++ b/storage/ndb/src/kernel/vm/SimulatedBlock.hpp 2008-07-23 16:00:25 +0000
@@ -56,6 +56,19 @@
#include "ndbd_malloc_impl.hpp"
#include <blocks/record_types.hpp>
+#ifdef VM_TRACE
+#define D(x) \
+ do { \
+ char buf[200]; \
+ if (!debugOutOn()) break; \
+ debugOut << debugOutTag(buf, __LINE__) << x << endl; \
+ } while (0)
+#define V(x) " " << #x << ":" << (x)
+#else
+#define D(x)
+#undef V
+#endif
+
/**
* Something for filesystem access
*/
@@ -108,7 +121,8 @@ protected:
* Constructor
*/
SimulatedBlock(BlockNumber blockNumber,
- struct Block_context & ctx);
+ struct Block_context & ctx,
+ Uint32 instanceNumber = 0);
/**********************************************************
* Handling of execFunctions
@@ -125,12 +139,34 @@ public:
*/
inline void executeFunction(GlobalSignalNumber gsn, Signal* signal);
+ /* Multiple block instances */
+ Uint32 instance() const {
+ return theInstance;
+ }
+ Uint32 getWorkerCount() const {
+ ndbrequire(theInstance == 0); // valid only on main instance
+ ndbrequire(theInstanceCount >= 1);
+ return theInstanceCount - 1;
+ }
+ SimulatedBlock* getInstance(Uint32 instanceNumber) {
+ ndbrequire(theInstance == 0);
+ if (instanceNumber == 0)
+ return this;
+ if (instanceNumber < theInstanceCount) {
+ ndbrequire(theInstanceList != 0);
+ return theInstanceList[instanceNumber];
+ }
+ return 0;
+ }
+ Uint32 getInstanceKey(Uint32 tabId, Uint32 fragId);
+
/* Setup state of a block object for executing in a particular thread. */
void assignToThread(Uint32 threadId, EmulatedJamBuffer *jamBuffer,
Uint32 *watchDogCounter);
/* For multithreaded ndbd, get the id of owning thread. */
uint32 getThreadId() const { return m_threadId; }
- bool isMultiThreaded() const;
+ static bool isMultiThreaded();
+
public:
typedef void (SimulatedBlock::* CallbackFunction)(class Signal*,
Uint32 callbackData,
@@ -210,14 +246,15 @@ protected:
Uint32 length,
SectionHandle* sections) const;
+ /*
+ * Instance defaults to instance of sender. Using explicit
+ * instance argument asserts that the call is thread-safe.
+ */
void EXECUTE_DIRECT(Uint32 block,
Uint32 gsn,
Signal* signal,
- Uint32 len
-#ifdef VM_TRACE
- , bool is_thread_safe = false
-#endif
-);
+ Uint32 len,
+ Uint32 givenInstanceNo = ZNIL);
class SectionSegmentPool& getSectionSegmentPool();
void releaseSections(struct SectionHandle&);
@@ -398,8 +435,18 @@ private:
void signal_error(Uint32, Uint32, Uint32, const char*, int) const ;
const NodeId theNodeId;
const BlockNumber theNumber;
+ const Uint32 theInstance;
const BlockReference theReference;
/*
+ * Instance 0 is the main instance. It creates/owns other instances.
+ * In MT LQH main instance is the LQH proxy and the others ("workers")
+ * are real LQHs run by multiple threads.
+ */
+ enum { MaxInstances = 1 + MAX_NDBMT_WORKERS };
+ Uint32 theInstanceCount; // set in main
+ SimulatedBlock** theInstanceList; // set in main, indexed by instance
+ SimulatedBlock* theMainInstance; // set in all
+ /*
Thread id currently executing this block.
Not used in singlethreaded ndbd.
*/
@@ -416,8 +463,10 @@ protected:
Block_context m_ctx;
NewVARIABLE* allocateBat(int batSize);
void freeBat();
- static const NewVARIABLE* getBat (BlockNumber blockNo);
- static Uint16 getBatSize(BlockNumber blockNo);
+ static const NewVARIABLE* getBat (BlockNumber blockNo,
+ Uint32 instanceNo = 0);
+ static Uint16 getBatSize(BlockNumber blockNo,
+ Uint32 instanceNo = 0);
static BlockReference calcTcBlockRef (NodeId aNode);
static BlockReference calcLqhBlockRef (NodeId aNode);
@@ -629,6 +678,12 @@ public:
void clear_global_variables();
void init_globals_list(void ** tmp, size_t cnt);
#endif
+
+#ifdef VM_TRACE
+ NdbOut debugOut;
+ bool debugOutOn() const;
+ const char* debugOutTag(char* buf, int line);
+#endif
};
inline
@@ -806,17 +861,32 @@ void
SimulatedBlock::EXECUTE_DIRECT(Uint32 block,
Uint32 gsn,
Signal* signal,
- Uint32 len
-#ifdef VM_TRACE
- , bool is_thread_safe
-#endif
-)
+ Uint32 len,
+ Uint32 givenInstanceNo)
{
signal->setLength(len);
+ SimulatedBlock* b = globalData.getBlock(block);
+ ndbassert(b != 0);
+ /**
+ * In multithreaded NDB, blocks run in different threads, and EXECUTE_DIRECT
+ * (unlike sendSignal) is generally not thread-safe.
+ * So only allow EXECUTE_DIRECT between blocks that run in the same thread,
+ * unless caller explicitly marks it as being thread safe (eg NDBFS),
+ * by using an explicit instance argument.
+ * By default instance of sender is used. This is automatically thread-safe
+ * for worker instances (instance != 0).
+ */
+ Uint32 instanceNo = givenInstanceNo;
+ if (instanceNo == ZNIL)
+ instanceNo = instance();
+ if (instanceNo != 0)
+ b = b->getInstance(instanceNo);
+ ndbassert(b != 0);
+ ndbassert(givenInstanceNo != ZNIL || b->getThreadId() == getThreadId());
#ifdef VM_TRACE
if(globalData.testOn){
signal->header.theVerId_signalNumber = gsn;
- signal->header.theReceiversBlockNumber = block;
+ signal->header.theReceiversBlockNumber = numberToBlock(block, instanceNo);
signal->header.theSendersBlockRef = reference();
globalSignalLoggers.executeDirect(signal->header,
0, // in
@@ -824,14 +894,6 @@ SimulatedBlock::EXECUTE_DIRECT(Uint32 bl
globalData.ownId);
}
#endif
- SimulatedBlock* b = globalData.getBlock(block);
- /**
- * In multithreaded NDB, blocks run in different threads, and EXECUTE_DIRECT
- * (unlike sendSignal() is generally not thread-safe.
- * So only allow EXECUTE_DIRECT between blocks that run in the same thread,
- * unless caller explicitly marks it as being thread safe (eg NDBFS).
- */
- ndbassert(is_thread_safe || b->getThreadId() == getThreadId());
#ifdef VM_TRACE_TIME
Uint32 us1, us2;
Uint64 ms1, ms2;
@@ -854,7 +916,7 @@ SimulatedBlock::EXECUTE_DIRECT(Uint32 bl
#ifdef VM_TRACE
if(globalData.testOn){
signal->header.theVerId_signalNumber = gsn;
- signal->header.theReceiversBlockNumber = block;
+ signal->header.theReceiversBlockNumber = numberToBlock(block, instanceNo);
signal->header.theSendersBlockRef = reference();
globalSignalLoggers.executeDirect(signal->header,
1, // out
@@ -875,6 +937,8 @@ SimulatedBlock::addTime(Uint32 gsn, Uint
inline
void
SimulatedBlock::subTime(Uint32 gsn, Uint64 time){
+ // wl4391_todo got 0xf3f3f3f3 here on GSN_UPGRADE_PROTOCOL_ORD...
+ if (gsn < 0xffff)
m_timeTrace[gsn].sub += time;
}
#endif
=== modified file 'storage/ndb/src/kernel/vm/TransporterCallback.cpp'
--- a/storage/ndb/src/kernel/vm/TransporterCallback.cpp 2008-07-01 12:35:34 +0000
+++ b/storage/ndb/src/kernel/vm/TransporterCallback.cpp 2008-07-23 16:00:25 +0000
@@ -191,7 +191,14 @@ TransporterCallbackKernel::deliver_signa
const Uint32 secCount = header->m_noOfSections;
const Uint32 length = header->theLength;
+
+ /*
+ * Strip instance bits if not multithreaded. This is also
+ * done in versions prior to MT LQH, to simplify online upgrade.
+ */
+#ifndef NDBD_MULTITHREADED
header->theReceiversBlockNumber &= NDBMT_BLOCK_MASK;
+#endif
#ifdef TRACE_DISTRIBUTED
ndbout_c("recv: %s(%d) from (%s, %d)",
@@ -232,10 +239,10 @@ TransporterCallbackKernel::deliver_signa
globalScheduler.execute(header, prio, theData, secPtrI);
#else
if (prio == JBB)
- sendlocal(receiverThreadId, header->theReceiversBlockNumber,
+ sendlocal(receiverThreadId,
header, theData, secPtrI);
else
- sendprioa(receiverThreadId, header->theReceiversBlockNumber,
+ sendprioa(receiverThreadId,
header, theData, secPtrI);
#endif
@@ -271,10 +278,10 @@ TransporterCallbackKernel::deliver_signa
globalScheduler.execute(header, prio, theData, secPtrI);
#else
if (prio == JBB)
- sendlocal(receiverThreadId, header->theReceiversBlockNumber,
+ sendlocal(receiverThreadId,
header, theData, NULL);
else
- sendprioa(receiverThreadId, header->theReceiversBlockNumber,
+ sendprioa(receiverThreadId,
header, theData, NULL);
#endif
@@ -383,7 +390,7 @@ TransporterCallbackKernel::reportError(N
#else
signal.header.theVerId_signalNumber = GSN_EVENT_REP;
signal.header.theReceiversBlockNumber = CMVMI;
- sendprioa(receiverThreadId, signal.header.theReceiversBlockNumber,
+ sendprioa(receiverThreadId,
&signalT.header, signalT.theData, NULL);
#endif
@@ -436,7 +443,7 @@ TransporterCallbackKernel::reportReceive
#else
signal.header.theVerId_signalNumber = GSN_EVENT_REP;
signal.header.theReceiversBlockNumber = CMVMI;
- sendprioa(receiverThreadId, signal.header.theReceiversBlockNumber,
+ sendprioa(receiverThreadId,
&signalT.header, signalT.theData, NULL);
#endif
}
@@ -463,7 +470,7 @@ TransporterCallbackKernel::reportConnect
#else
signal.header.theVerId_signalNumber = GSN_CONNECT_REP;
signal.header.theReceiversBlockNumber = CMVMI;
- sendprioa(receiverThreadId, signal.header.theReceiversBlockNumber,
+ sendprioa(receiverThreadId,
&signalT.header, signalT.theData, NULL);
#endif
}
@@ -495,7 +502,7 @@ TransporterCallbackKernel::reportDisconn
#else
signal.header.theVerId_signalNumber = GSN_DISCONNECT_REP;
signal.header.theReceiversBlockNumber = CMVMI;
- sendprioa(receiverThreadId, signal.header.theReceiversBlockNumber,
+ sendprioa(receiverThreadId,
&signalT.header, signalT.theData, NULL);
#endif
=== added file 'storage/ndb/src/kernel/vm/dummy_nonmt.cpp'
--- a/storage/ndb/src/kernel/vm/dummy_nonmt.cpp 1970-01-01 00:00:00 +0000
+++ b/storage/ndb/src/kernel/vm/dummy_nonmt.cpp 2008-07-23 16:00:25 +0000
@@ -0,0 +1,14 @@
+#include <assert.h>
+#include <ndb_types.h>
+
+void
+add_thr_map(Uint32, Uint32, Uint32)
+{
+ assert(false);
+}
+
+void
+add_worker_thr_map(Uint32, Uint32)
+{
+ assert(false);
+}
=== modified file 'storage/ndb/src/kernel/vm/mt.cpp'
--- a/storage/ndb/src/kernel/vm/mt.cpp 2008-05-30 06:33:02 +0000
+++ b/storage/ndb/src/kernel/vm/mt.cpp 2008-07-23 16:00:25 +0000
@@ -43,9 +43,14 @@ static const Uint32 MAX_SIGNALS_PER_JB =
//#define NDB_MT_LOCK_TO_CPU
-static const Uint32 NUM_THREADS = 3;
-static const Uint32 RECEIVER_THREAD_NO = 2;
-#define MAX_THREADS 4
+#define MAX_BLOCK_INSTANCES (1 + MAX_NDBMT_WORKERS)
+#define NUM_MAIN_THREADS 2 // except receiver
+#define MAX_THREADS (NUM_MAIN_THREADS + MAX_NDBMT_THREADS + 1)
+
+static Uint32 ndbmt_workers = 0;
+static Uint32 ndbmt_threads = 0;
+static Uint32 num_threads = 0;
+static Uint32 receiver_thread_no = 0;
#ifdef NDBMTD_X86
static inline
@@ -444,8 +449,7 @@ struct thr_data
thr_wait m_waiter;
unsigned m_thr_no;
- struct SimulatedBlock* m_blocks[NO_OF_BLOCKS];
-
+
Uint64 m_time;
struct thr_tq m_tq;
@@ -487,9 +491,9 @@ struct thr_data
struct thr_jb_read_state m_read_states[MAX_THREADS];
/* Jam buffers for making trace files at crashes. */
- EmulatedJamBuffer *m_jam;
+ EmulatedJamBuffer m_jam;
/* Watchdog counter for this thread. */
- Uint32 *m_watchdog_counter;
+ Uint32 m_watchdog_counter;
/* Signal delivery statistics. */
Uint32 m_prioa_count;
Uint32 m_prioa_size;
@@ -699,7 +703,7 @@ struct trp_callback : public Transporter
Uint32 total_bytes(NodeId node) const
{
Uint32 total = 0;
- for (Uint32 i = 0; i < NUM_THREADS; i++)
+ for (Uint32 i = 0; i < num_threads; i++)
total += m_thr_buffers[i]->m_buffers[node].used_bytes();
return total;
}
@@ -907,7 +911,7 @@ scan_queue(struct thr_data* selfptr, Uin
* If they are frequent, we may want to optimize, as sending one prio A
* signal is somewhat expensive compared to sending one prio B.
*/
- sendprioa(thr_no, s->theReceiversBlockNumber, s, data,
+ sendprioa(thr_no, s, data,
data + s->theLength);
* (page + pos) = free;
free = idx;
@@ -1190,7 +1194,8 @@ thr_send_buf::updateWritePtr(NodeId node
trp_callback::trp_callback()
{
- for (Uint32 i = 0; i < NUM_THREADS; i++)
+ // number of threads not yet set so use MAX_THREADS
+ for (Uint32 i = 0; i < MAX_THREADS; i++)
m_thr_buffers[i] = new thr_send_buf(this, i, &g_thr_repository.m_free_list);
for (int i = 0; i < MAX_NTRANSPORTERS; i++)
m_send_thr[i] = ~(Uint32)0;
@@ -1211,7 +1216,7 @@ trp_callback::reportSendLen(NodeId nodeI
signal.theData[2] = (bytes/count);
signal.header.theVerId_signalNumber = GSN_EVENT_REP;
signal.header.theReceiversBlockNumber = CMVMI;
- sendprioa(m_send_thr[nodeId], signal.header.theReceiversBlockNumber,
+ sendprioa(m_send_thr[nodeId],
&signalT.header, signalT.theData, NULL);
}
@@ -1260,7 +1265,7 @@ trp_callback::get_bytes_to_send_iovec(No
if (max_iovecs == 0)
return 0;
- for (Uint32 thr = 0; thr < NUM_THREADS && iovecs < max_iovecs; thr++)
+ for (Uint32 thr = 0; thr < num_threads && iovecs < max_iovecs; thr++)
{
thr_send_buf::send_buffer *b = m_thr_buffers[thr]->m_buffers + node;
thr_send_buf::page *pg = b->m_current_page;
@@ -1345,7 +1350,7 @@ trp_callback::bytes_sent(NodeId node, co
break; // Found it
}
- curr_thr = (curr_thr + 1) % NUM_THREADS;
+ curr_thr = (curr_thr + 1) % num_threads;
#ifdef VM_TRACE
assert(curr_thr != start_thr); // Sent data was not in buffer
#endif
@@ -1378,7 +1383,7 @@ trp_callback::bytes_sent(NodeId node, co
bool
trp_callback::has_data_to_send(NodeId node)
{
- for (Uint32 thr = 0; thr < NUM_THREADS; thr++)
+ for (Uint32 thr = 0; thr < num_threads; thr++)
{
thr_send_buf::send_buffer *b = m_thr_buffers[thr]->m_buffers + node;
thr_send_buf::page *pg = b->m_current_page;
@@ -1565,15 +1570,14 @@ inline
void
sendpacked(struct thr_data* selfptr, Signal* signal, Uint32 thr_no)
{
- SimulatedBlock** blockptr = selfptr->m_blocks - MIN_BLOCK_NO;
- SimulatedBlock* b_lqh = * (blockptr + DBLQH);
- SimulatedBlock* b_tc = * (blockptr + DBTC);
- SimulatedBlock* b_tup = * (blockptr + DBTUP);
- if (b_lqh)
+ SimulatedBlock* b_lqh = globalData.getBlock(DBLQH);
+ SimulatedBlock* b_tc = globalData.getBlock(DBTC);
+ SimulatedBlock* b_tup = globalData.getBlock(DBTUP);
+ if (thr_no == 1)
b_lqh->executeFunction(GSN_SEND_PACKED, signal);
- if (b_tc)
+ if (thr_no == 0)
b_tc->executeFunction(GSN_SEND_PACKED, signal);
- if (b_tup)
+ if (thr_no == 1)
b_tup->executeFunction(GSN_SEND_PACKED, signal);
}
@@ -1724,7 +1728,6 @@ execute_signals(thr_data *selfptr, thr_j
r->m_write_pos :
q->m_buffers[read_index]->m_len);
thr_job_buffer *read_buffer = r->m_read_buffer;
- SimulatedBlock** blockptr = selfptr->m_blocks - MIN_BLOCK_NO;
while (num_signals < max_signals)
{
@@ -1767,9 +1770,11 @@ execute_signals(thr_data *selfptr, thr_j
Uint32 siglen = (sizeof(*s)>>2) + s->theLength;
if(siglen>16)
__builtin_prefetch (read_buffer->m_data + read_pos + 32, 0, 3);
- Uint32 bno = s->theReceiversBlockNumber;
+ Uint32 bno = blockToMain(s->theReceiversBlockNumber);
+ Uint32 ino = blockToInstance(s->theReceiversBlockNumber);
Uint32 gsn = s->theVerId_signalNumber;
- SimulatedBlock * block = blockptr[bno];
+ SimulatedBlock * main = globalData.getBlock(bno);
+ SimulatedBlock * block = main->getInstance(ino);
*watchDogCounter = 1;
/* Must update original buffer so signal dump will see it. */
s->theSignalId = (*signalIdCounter)++;
@@ -1787,6 +1792,16 @@ execute_signals(thr_data *selfptr, thr_j
/* Update just before execute so signal dump can know how far we are. */
r->m_read_pos = read_pos;
+#ifdef VM_TRACE
+ if (globalData.testOn) { //wl4391_todo segments
+ globalSignalLoggers.executeSignal(*s,
+ 0,
+ &sig->theData[0],
+ globalData.ownId);
+
+ }
+#endif
+
block->executeFunction(gsn, sig);
num_signals++;
@@ -1831,43 +1846,91 @@ run_job_buffers(thr_data *selfptr, Signa
return signal_count;
}
-static inline Uint32
-block2ThreadId(Uint32 block)
+struct thr_map_entry {
+ enum { NULL_THR_NO = 0xFFFF };
+ Uint32 thr_no;
+ SimulatedBlock* block;
+ thr_map_entry() : thr_no(NULL_THR_NO), block(0) {}
+};
+
+static struct thr_map_entry thr_map[NO_OF_BLOCKS][MAX_BLOCK_INSTANCES];
+
+void
+add_thr_map(Uint32 block, Uint32 instance, Uint32 thr_no)
{
- /**
- * CMVMIis special, it runs in the receive thread due to calling directly
- * into TransporterRegistry.
- */
- if (block == CMVMI)
- return 2;
+ Uint32 index = block - MIN_BLOCK_NO;
+ assert(index < NO_OF_BLOCKS);
+ assert(instance < MAX_BLOCK_INSTANCES);
- /*
- * Block assignment:
- * 0 BACKUP LOCAL
- * 1 DBTC GLOBAL
- * 2 DBDIH GLOBAL
- * 3 DBLQH LOCAL
- * 4 DBACC LOCAL
- * 5 DBTUP LOCAL
- * 6 DBDICT GLOBAL
- * 7 NDBCNTR GLOBAL
- * 8 QMGR GLOBAL
- * 9 NDBFS GLOBAL
- * 10 CMVMI [Receive thread]
- * 11 TRIX GLOBAL
- * 12 DBUTIL GLOBAL
- * 13 SUMA LOCAL
- * 14 DBTUX LOCAL
- * 15 TSMAN LOCAL
- * 16 LGMAN LOCAL
- * 17 PGMAN LOCAL
- * 18 RESTORE LOCAL
- *
- * Thread 0 is for global, thread 1 for locals.
- */
- static const Uint32 locals = 0x7e039;
+ SimulatedBlock* main = globalData.getBlock(block);
+ assert(main != 0);
+ SimulatedBlock* b = main->getInstance(instance);
+ assert(b != 0);
+
+ assert(thr_no < num_threads);
+ struct thr_repository* rep = &g_thr_repository;
+ thr_data* thr_ptr = rep->m_thread + thr_no;
+
+ b->assignToThread(thr_no, &thr_ptr->m_jam, &thr_ptr->m_watchdog_counter);
+
+ thr_map_entry& entry = thr_map[index][instance];
+ assert(entry.thr_no == thr_map_entry::NULL_THR_NO);
+ entry.thr_no = thr_no;
+ entry.block = b;
+}
+
+// static assignment of main instances before first signal
+static void
+add_main_thr_map()
+{
+ static int done = 0;
+ if (done++)
+ return;
+
+ static const Uint32 thr_GLOBAL = 0;
+ static const Uint32 thr_LOCAL = 1;
+ static const Uint32 thr_RECEIVER = receiver_thread_no;
+
+ add_thr_map(BACKUP, 0, thr_LOCAL);
+ add_thr_map(DBTC, 0, thr_GLOBAL);
+ add_thr_map(DBDIH, 0, thr_GLOBAL);
+ add_thr_map(DBLQH, 0, thr_LOCAL);
+ add_thr_map(DBACC, 0, thr_LOCAL);
+ add_thr_map(DBTUP, 0, thr_LOCAL);
+ add_thr_map(DBDICT, 0, thr_GLOBAL);
+ add_thr_map(NDBCNTR, 0, thr_GLOBAL);
+ add_thr_map(QMGR, 0, thr_GLOBAL);
+ add_thr_map(NDBFS, 0, thr_GLOBAL);
+ add_thr_map(CMVMI, 0, thr_RECEIVER);
+ add_thr_map(TRIX, 0, thr_GLOBAL);
+ add_thr_map(DBUTIL, 0, thr_GLOBAL);
+ add_thr_map(SUMA, 0, thr_LOCAL);
+ add_thr_map(DBTUX, 0, thr_LOCAL);
+ add_thr_map(TSMAN, 0, thr_LOCAL);
+ add_thr_map(LGMAN, 0, thr_LOCAL);
+ add_thr_map(PGMAN, 0, thr_LOCAL);
+ add_thr_map(RESTORE, 0, thr_LOCAL);
+}
+
+// workers added by LocalProxy
+void
+add_worker_thr_map(Uint32 block, Uint32 instance)
+{
+ assert(instance != 0);
+ Uint32 i = instance - 1;
+ Uint32 thr_no = NUM_MAIN_THREADS + i % ndbmt_threads;
+ add_thr_map(block, instance, thr_no);
+}
+
+static inline Uint32
+block2ThreadId(Uint32 block, Uint32 instance)
+{
assert(block >= MIN_BLOCK_NO && block <= MAX_BLOCK_NO);
- return (locals >> (block - MIN_BLOCK_NO)) & 1;
+ Uint32 index = block - MIN_BLOCK_NO;
+ assert(instance < MAX_BLOCK_INSTANCES);
+ const thr_map_entry& entry = thr_map[index][instance];
+ assert(entry.thr_no < num_threads);
+ return entry.thr_no;
}
static void reportSignalStats(Uint32 self, Uint32 a_count, Uint32 a_size,
@@ -1889,7 +1952,7 @@ static void reportSignalStats(Uint32 sel
s->theData[4] = b_count;
s->theData[5] = b_size;
/* ToDo: need this really be prio A like in old code? */
- sendlocal(self, s->header.theReceiversBlockNumber, &s->header, s->theData,
+ sendlocal(self, &s->header, s->theData,
NULL);
}
@@ -1911,17 +1974,15 @@ update_sched_stats(thr_data *selfptr)
}
static void
-init_thread(thr_data *selfptr, EmulatedJamBuffer *jam, Uint32 *watchDogCounter)
+init_thread(thr_data *selfptr)
{
- jam->theEmulatedJamIndex = 0;
- jam->theEmulatedJamBlockNumber = 0;
- memset(jam->theEmulatedJam, 0, sizeof(jam->theEmulatedJam));
- NdbThread_SetTlsKey(NDB_THREAD_TLS_JAM, jam);
- selfptr->m_jam = jam;
+ selfptr->m_jam.theEmulatedJamIndex = 0;
+ selfptr->m_jam.theEmulatedJamBlockNumber = 0;
+ memset(selfptr->m_jam.theEmulatedJam, 0, sizeof(selfptr->m_jam.theEmulatedJam));
+ NdbThread_SetTlsKey(NDB_THREAD_TLS_JAM, &selfptr->m_jam);
- selfptr->m_watchdog_counter = watchDogCounter;
unsigned thr_no = selfptr->m_thr_no;
- globalEmulatorData.theWatchDog->registerWatchedThread(watchDogCounter,
+ globalEmulatorData.theWatchDog->registerWatchedThread(&selfptr->m_watchdog_counter,
thr_no);
NdbThread_SetTlsKey(NDB_THREAD_TLS_THREAD, selfptr);
@@ -1941,22 +2002,6 @@ init_thread(thr_data *selfptr, EmulatedJ
#endif
}
-/* Assign to this thread all blocks that will run in this thread. */
-static void
-grab_threads(thr_data *selfptr, EmulatedJamBuffer *thread_jam,
- Uint32 *watchDogCounter)
-{
- for (Uint32 i = 0; i < NO_OF_BLOCKS; i++)
- {
- if (block2ThreadId(i + MIN_BLOCK_NO) == selfptr->m_thr_no)
- {
- require(selfptr->m_blocks[i] != 0);
- selfptr->m_blocks[i]->assignToThread(selfptr->m_thr_no, thread_jam,
- watchDogCounter);
- }
- }
-}
-
/**
* Align signal buffer for better cache performance.
* Also skew it a litte for each thread to avoid cache pollution.
@@ -1998,13 +2043,12 @@ mt_receiver_thread_main(void *thr_arg)
struct thr_data* selfptr = (struct thr_data *)thr_arg;
unsigned thr_no = selfptr->m_thr_no;
EmulatedJamBuffer thread_jam;
- Uint32 watchDogCounter;
+ Uint32& watchDogCounter = selfptr->m_watchdog_counter;
Uint32 thrSignalId = 0;
- init_thread(selfptr, &thread_jam, &watchDogCounter);
+ init_thread(selfptr);
receiverThreadId = thr_no;
signal = aligned_signal(signal_buf, thr_no);
- grab_threads(selfptr, &thread_jam, &watchDogCounter);
while (globalData.theRestartFlag != perform_stop)
{
@@ -2062,19 +2106,17 @@ mt_job_thread_main(void *thr_arg)
struct timespec nowait;
nowait.tv_sec = 0;
nowait.tv_nsec = 10 * 1000000;
- EmulatedJamBuffer thread_jam;
- Uint32 watchDogCounter;
Uint32 thrSignalId = 0;
struct thr_repository* rep = &g_thr_repository;
struct thr_data* selfptr = (struct thr_data *)thr_arg;
- init_thread(selfptr, &thread_jam, &watchDogCounter);
+ init_thread(selfptr);
+ EmulatedJamBuffer& thread_jam = selfptr->m_jam;
+ Uint32& watchDogCounter = selfptr->m_watchdog_counter;
unsigned thr_no = selfptr->m_thr_no;
signal = aligned_signal(signal_buf, thr_no);
- grab_threads(selfptr, &thread_jam, &watchDogCounter);
-
/* Avoid false watchdog alarms caused by race condition. */
watchDogCounter = 1;
@@ -2090,6 +2132,7 @@ mt_job_thread_main(void *thr_arg)
&watchDogCounter, &thrSignalId);
watchDogCounter = 1;
+ signal->header.m_noOfSections = 0; /* valgrind */
sendpacked(selfptr, signal, thr_no);
if (sum)
@@ -2127,16 +2170,23 @@ mt_job_thread_main(void *thr_arg)
}
void
-sendlocal(Uint32 self, Uint32 block, const SignalHeader *s, const Uint32 *data,
+sendlocal(Uint32 self, const SignalHeader *s, const Uint32 *data,
const Uint32 secPtr[3])
{
+ Uint32 block = blockToMain(s->theReceiversBlockNumber);
+ Uint32 instance = blockToInstance(s->theReceiversBlockNumber);
+
+ // map on receiver side
+ if (instance != 0)
+ instance = 1 + (instance - 1) % ndbmt_workers;
+
/*
* Max number of signals to put into job buffer before flushing the buffer
* to the other thread.
* This parameter found to be reasonable by benchmarking.
*/
- Uint32 MAX_SIGNALS_BEFORE_FLUSH = (self == RECEIVER_THREAD_NO ? 2 : 20);
- Uint32 dst = block2ThreadId(block);
+ Uint32 MAX_SIGNALS_BEFORE_FLUSH = (self == receiver_thread_no ? 2 : 20);
+ Uint32 dst = block2ThreadId(block, instance);
struct thr_repository* rep = &g_thr_repository;
struct thr_data * selfptr = rep->m_thread + self;
@@ -2151,15 +2201,23 @@ sendlocal(Uint32 self, Uint32 block, con
selfptr->m_next_buffer = seize_buffer(rep, self, false);
}
- if (w->m_pending_signals >= MAX_SIGNALS_BEFORE_FLUSH)
+ // wl4391_todo
+ if (true || w->m_pending_signals >= MAX_SIGNALS_BEFORE_FLUSH)
flush_write_state(dst, q, w);
}
void
-sendprioa(Uint32 self, Uint32 block, const SignalHeader *s, const uint32 *data,
+sendprioa(Uint32 self, const SignalHeader *s, const uint32 *data,
const Uint32 secPtr[3])
{
- Uint32 dst = block2ThreadId(block);
+ Uint32 block = blockToMain(s->theReceiversBlockNumber);
+ Uint32 instance = blockToInstance(s->theReceiversBlockNumber);
+
+ // map on receiver side
+ if (instance != 0)
+ instance = 1 + (instance - 1) % ndbmt_workers;
+
+ Uint32 dst = block2ThreadId(block, instance);
struct thr_repository* rep = &g_thr_repository;
struct thr_data * selfptr = rep->m_thread + self;
struct thr_data *dstptr = rep->m_thread + dst;
@@ -2247,17 +2305,26 @@ sendprioa_STOP_FOR_CRASH(Uint32 dst)
static thr_job_buffer dummy_buffer;
/*
- * Currently we have three threads with fixed block assignment.
+ * Before we had three main threads with fixed block assignment.
+ * Now there is also worker instances (we send to LQH instance).
*/
- assert(dst == 0 || dst == 1 || dst == 2); // ToDo when/if more threads.
- Uint32 bno = 0;
+ Uint32 main = 0;
+ Uint32 instance = 0;
if (dst == 0)
- bno = NDBCNTR;
+ main = NDBCNTR;
else if (dst == 1)
- bno = DBLQH;
- else if (dst == 2)
- bno = CMVMI;
- assert(block2ThreadId(bno) == dst);
+ main = DBLQH;
+ else if (dst >= NUM_MAIN_THREADS && dst < NUM_MAIN_THREADS + ndbmt_threads)
+ {
+ main = DBLQH;
+ instance = dst - NUM_MAIN_THREADS + 1;
+ }
+ else if (dst == receiver_thread_no)
+ main = CMVMI;
+ else
+ assert(false);
+ Uint32 bno = numberToBlock(main, instance);
+ assert(block2ThreadId(main, instance) == dst);
struct thr_data * dstptr = rep->m_thread + dst;
memset(&signalT.header, 0, sizeof(SignalHeader));
@@ -2459,8 +2526,6 @@ thr_init(struct thr_repository* rep, str
queue_init(&selfptr->m_tq);
- selfptr->m_jam = NULL;
-
selfptr->m_prioa_count = 0;
selfptr->m_prioa_size = 0;
selfptr->m_priob_count = 0;
@@ -2538,7 +2603,13 @@ ThreadConfig::~ThreadConfig()
void
ThreadConfig::init(EmulatorData *emulatorData)
{
- ::rep_init(&g_thr_repository, NUM_THREADS, emulatorData->m_mem_manager);
+ ndbmt_workers = globalData.ndbmtWorkers;
+ ndbmt_threads = globalData.ndbmtThreads;
+ num_threads = NUM_MAIN_THREADS + ndbmt_threads + 1;
+ assert(num_threads <= MAX_THREADS);
+ receiver_thread_no = num_threads - 1;
+
+ ::rep_init(&g_thr_repository, num_threads, emulatorData->m_mem_manager);
}
void ThreadConfig::ipControlLoop(Uint32 thread_index)
@@ -2546,25 +2617,19 @@ void ThreadConfig::ipControlLoop(Uint32
unsigned int i;
unsigned int thr_no;
struct thr_repository* rep = &g_thr_repository;
- NdbThread *threads[NUM_THREADS];
+ NdbThread *threads[MAX_THREADS];
+
+ add_main_thr_map();
/*
* Start threads for all execution threads, except for the receiver
* thread, which runs in the main thread.
*/
- for (thr_no = 0; thr_no < NUM_THREADS; thr_no++)
+ for (thr_no = 0; thr_no < num_threads; thr_no++)
{
- for (i = 0; i<NO_OF_BLOCKS; i++)
- {
- if (block2ThreadId(i + MIN_BLOCK_NO) == thr_no)
- {
- rep->m_thread[thr_no].m_blocks[i] =
- globalData.getBlock(i + MIN_BLOCK_NO);
- }
- }
rep->m_thread[thr_no].m_time = NdbTick_CurrentMillisecond();
- if (thr_no == RECEIVER_THREAD_NO)
+ if (thr_no == receiver_thread_no)
continue; // Will run in the main thread.
/*
* The NdbThread_Create() takes void **, but that is cast to void * when
@@ -2579,12 +2644,12 @@ void ThreadConfig::ipControlLoop(Uint32
}
/* Now run the main loop for thread 0 directly. */
- mt_receiver_thread_main(&(rep->m_thread[RECEIVER_THREAD_NO]));
+ mt_receiver_thread_main(&(rep->m_thread[receiver_thread_no]));
/* Wait for all threads to shutdown. */
- for (thr_no = 0; thr_no < NUM_THREADS; thr_no++)
+ for (thr_no = 0; thr_no < num_threads; thr_no++)
{
- if (thr_no == RECEIVER_THREAD_NO)
+ if (thr_no == receiver_thread_no)
continue;
void *dummy_return_status;
NdbThread_WaitFor(threads[thr_no], &dummy_return_status);
@@ -2595,6 +2660,8 @@ void ThreadConfig::ipControlLoop(Uint32
int
ThreadConfig::doStart(NodeState::StartLevel startLevel)
{
+ add_main_thr_map();
+
SignalT<3> signalT;
memset(&signalT.header, 0, sizeof(SignalHeader));
@@ -2608,7 +2675,7 @@ ThreadConfig::doStart(NodeState::StartLe
StartOrd * const startOrd = (StartOrd *)&signalT.theData[0];
startOrd->restartInfo = 0;
- senddelay(block2ThreadId(CMVMI), &signalT.header, 1);
+ senddelay(block2ThreadId(CMVMI, 0), &signalT.header, 1);
return 0;
}
@@ -2643,7 +2710,7 @@ Uint32
FastScheduler::traceDumpGetNumThreads()
{
/* The last thread is only for receiver -> no trace file. */
- return NUM_THREADS;
+ return num_threads;
}
bool
@@ -2651,7 +2718,7 @@ FastScheduler::traceDumpGetJam(Uint32 th
const Uint32 * & thrdTheEmulatedJam,
Uint32 & thrdTheEmulatedJamIndex)
{
- if (thr_no >= NUM_THREADS)
+ if (thr_no >= num_threads)
return false;
#ifdef NO_EMULATED_JAM
@@ -2659,7 +2726,7 @@ FastScheduler::traceDumpGetJam(Uint32 th
thrdTheEmulatedJam = NULL;
thrdTheEmulatedJamIndex = 0;
#else
- const EmulatedJamBuffer *jamBuffer = g_thr_repository.m_thread[thr_no].m_jam;
+ const EmulatedJamBuffer *jamBuffer = &g_thr_repository.m_thread[thr_no].m_jam;
thrdTheEmulatedJam = jamBuffer->theEmulatedJam;
thrdTheEmulatedJamIndex = jamBuffer->theEmulatedJamIndex;
jamBlockNumber = jamBuffer->theEmulatedJamBlockNumber;
@@ -2695,7 +2762,7 @@ FastScheduler::traceDumpPrepare()
pthread_mutex_lock(&g_thr_repository.stop_for_crash_mutex);
g_thr_repository.stopped_threads = 0;
- for (Uint32 thr_no = 0; thr_no < NUM_THREADS; thr_no++)
+ for (Uint32 thr_no = 0; thr_no < num_threads; thr_no++)
{
if (selfptr != NULL && selfptr->m_thr_no == thr_no)
{
@@ -2753,7 +2820,7 @@ void
FastScheduler::dumpSignalMemory(Uint32 thr_no, FILE* out)
{
void *value= NdbThread_GetTlsKey(NDB_THREAD_TLS_THREAD);
- const thr_data *selfptr = reinterpret_cast<const thr_data *>(value);
+ thr_data *selfptr = reinterpret_cast<thr_data *>(value);
const thr_repository *rep = &g_thr_repository;
/*
* The selfptr might be NULL, or pointer to thread that is doing the crash
@@ -2762,7 +2829,7 @@ FastScheduler::dumpSignalMemory(Uint32 t
*/
Uint32 *watchDogCounter;
if (selfptr)
- watchDogCounter = selfptr->m_watchdog_counter;
+ watchDogCounter = &selfptr->m_watchdog_counter;
else
watchDogCounter = NULL;
=== modified file 'storage/ndb/src/kernel/vm/mt.hpp'
--- a/storage/ndb/src/kernel/vm/mt.hpp 2008-05-29 15:06:11 +0000
+++ b/storage/ndb/src/kernel/vm/mt.hpp 2008-07-23 16:00:25 +0000
@@ -14,9 +14,13 @@
*/
extern Uint32 receiverThreadId;
-void sendlocal(Uint32 self, Uint32 block, const struct SignalHeader *s,
+/* Assign block instance to thread */
+void add_thr_map(Uint32 block, Uint32 instance, Uint32 thr_no);
+void add_worker_thr_map(Uint32 block, Uint32 instance);
+
+void sendlocal(Uint32 self, const struct SignalHeader *s,
const Uint32 *data, const Uint32 secPtr[3]);
-void sendprioa(Uint32 self, Uint32 block, const struct SignalHeader *s,
+void sendprioa(Uint32 self, const struct SignalHeader *s,
const Uint32 *data, const Uint32 secPtr[3]);
void senddelay(Uint32 thr_no, const struct SignalHeader*, Uint32 delay);
void mt_execSTOP_FOR_CRASH();
| Thread |
|---|
| • bzr commit into mysql-5.1-telco-6.4 branch (pekka:2677) WL#4391 | Pekka Nousiainen | 23 Jul |