List:Commits« Previous MessageNext Message »
From:Pekka Nousiainen Date:July 23 2008 4:00pm
Subject:bzr commit into mysql-5.1-telco-6.4 branch (pekka:2677) WL#4391
View as plain text  
#At file:///export/space/pekka/ndb/version/my51-wl4391/

 2677 Pekka Nousiainen	2008-07-23
      wl#4391 01.diff
      Block instances, mapping to threads, log part id, MT LQH switch.
added:
  storage/ndb/src/kernel/vm/GlobalData.cpp
  storage/ndb/src/kernel/vm/dummy_nonmt.cpp
modified:
  storage/ndb/include/debugger/SignalLoggerManager.hpp
  storage/ndb/include/kernel/RefConvert.hpp
  storage/ndb/include/kernel/kernel_config_parameters.h
  storage/ndb/include/kernel/kernel_types.h
  storage/ndb/include/kernel/ndb_limits.h
  storage/ndb/src/common/debugger/SignalLoggerManager.cpp
  storage/ndb/src/kernel/blocks/dbdict/Dbdict.cpp
  storage/ndb/src/kernel/blocks/dbdict/Dbdict.hpp
  storage/ndb/src/kernel/blocks/dbdih/Dbdih.hpp
  storage/ndb/src/kernel/blocks/dbdih/DbdihMain.cpp
  storage/ndb/src/kernel/blocks/ndbfs/PosixAsyncFile.cpp
  storage/ndb/src/kernel/main.cpp
  storage/ndb/src/kernel/vm/Configuration.cpp
  storage/ndb/src/kernel/vm/GlobalData.hpp
  storage/ndb/src/kernel/vm/Makefile.am
  storage/ndb/src/kernel/vm/SimulatedBlock.cpp
  storage/ndb/src/kernel/vm/SimulatedBlock.hpp
  storage/ndb/src/kernel/vm/TransporterCallback.cpp
  storage/ndb/src/kernel/vm/mt.cpp
  storage/ndb/src/kernel/vm/mt.hpp

=== modified file 'storage/ndb/include/debugger/SignalLoggerManager.hpp'
--- a/storage/ndb/include/debugger/SignalLoggerManager.hpp	2008-06-17 20:28:45 +0000
+++ b/storage/ndb/include/debugger/SignalLoggerManager.hpp	2008-07-23 16:00:25 +0000
@@ -26,6 +26,7 @@
 #include <kernel_types.h>
 #include <BlockNumbers.h>
 #include <TransporterDefinitions.hpp>
+#include <RefConvert.hpp>
 
 class SignalLoggerManager
 {
@@ -187,11 +188,17 @@ private:
   
   Uint32        traceId;
   Uint8         logModes[NO_OF_BLOCKS];
+
+  NdbMutex* m_mutex;
+  void lock() { if (m_mutex != 0) NdbMutex_Lock(m_mutex); }
+  void unlock() { if (m_mutex != 0) NdbMutex_Unlock(m_mutex); }
  
 public:
   inline bool
   logMatch(BlockNumber bno, LogMode mask)
   {
+    // extract main block number
+    bno = blockToMain(bno);
     // avoid addressing outside logModes
     return
       bno < MIN_BLOCK_NO || bno > MAX_BLOCK_NO ||

=== modified file 'storage/ndb/include/kernel/RefConvert.hpp'
--- a/storage/ndb/include/kernel/RefConvert.hpp	2006-12-23 19:20:40 +0000
+++ b/storage/ndb/include/kernel/RefConvert.hpp	2008-07-23 16:00:25 +0000
@@ -16,30 +16,82 @@
 #ifndef REFCONVERT_H
 #define REFCONVERT_H
 
+#include <assert.h>
 #include "kernel_types.h"
 
+/*
+ * In multithreaded kernel, BlockNumber includes the main block
+ * number in lower 9 bits and the instance in upper 7 bits.
+ */
+
+inline
+BlockNumber blockToMain(Uint32 block){
+  assert(block < (1 << 16));
+  return (BlockNumber)(block & ((1 << 9) - 1));
+}
+
+inline
+BlockInstance blockToInstance(Uint32 block){
+  assert(block < (1 << 16));
+  return (BlockNumber)(block >> 9);
+}
+
+inline
+BlockNumber numberToBlock(Uint32 main, Uint32 instance)
+{
+  assert(main < (1 << 9) && instance < (1 << (16 - 9)));
+  return (BlockNumber)(main | (instance << 9));
+}
+
+/**
+ * Convert BlockReference to NodeId
+ */
+inline
+NodeId refToNode(Uint32 ref){
+  return (NodeId)(ref & ((1 << 16) - 1));
+}
+
 /**
- * Convert BlockReference to BlockNumber
+ * Convert BlockReference to full 16-bit BlockNumber.
  */
 inline
-BlockNumber refToBlock(BlockReference ref){
+BlockNumber refToBlock(Uint32 ref){
   return (BlockNumber)(ref >> 16);
 }
 
 /**
- * Convert BlockReference to NodeId
+ * Convert BlockReference to main BlockNumber.
+ * Used in tests such as: refToMain(senderRef) == DBTC.
+ */
+inline
+BlockNumber refToMain(Uint32 ref){
+  return (BlockNumber)((ref >> 16) & ((1 << 9) - 1));
+}
+
+/**
+ * Convert BlockReference to BlockInstance.
  */
 inline
-NodeId refToNode(BlockReference ref){
-  return (NodeId)(ref & 0xFFFF);
+BlockInstance refToInstance(Uint32 ref){
+  return (BlockInstance)(ref >> (16 + 9));
 }
 
 /**
  * Convert NodeId and BlockNumber to BlockReference
  */
 inline 
-BlockReference numberToRef(BlockNumber bnr, NodeId proc){
-  return (((Uint32)bnr) << 16) + proc;
+BlockReference numberToRef(Uint32 block, Uint32 node){
+  assert(node < (1 << 16) && block < (1 << 16));
+  return (BlockReference)(node | (block << 16));
+}
+
+/**
+ * Convert NodeId and block main and instance to BlockReference
+ */
+inline 
+BlockReference numberToRef(Uint32 main, Uint32 instance, Uint32 node){
+  assert(node < (1 << 16) && main < (1 << 9) && instance < (1 << (16 - 9)));
+  return (BlockReference)(node | (main << 16) | (instance << (16 + 9)));
 }
 
 #endif

=== modified file 'storage/ndb/include/kernel/kernel_config_parameters.h'
--- a/storage/ndb/include/kernel/kernel_config_parameters.h	2006-12-31 00:32:21 +0000
+++ b/storage/ndb/include/kernel/kernel_config_parameters.h	2008-07-23 16:00:25 +0000
@@ -63,4 +63,7 @@
 #define CFG_TUX_ATTRIBUTE     (PRIVATE_BASE + 42)
 #define CFG_TUX_SCAN_OP       (PRIVATE_BASE + 43)
 
+#define CFG_NDBMT_WORKERS     (PRIVATE_BASE + 44)
+#define CFG_NDBMT_THREADS     (PRIVATE_BASE + 45)
+
 #endif

=== modified file 'storage/ndb/include/kernel/kernel_types.h'
--- a/storage/ndb/include/kernel/kernel_types.h	2006-12-23 19:20:40 +0000
+++ b/storage/ndb/include/kernel/kernel_types.h	2008-07-23 16:00:25 +0000
@@ -22,6 +22,7 @@
 
 typedef Uint16 NodeId; 
 typedef Uint16 BlockNumber;
+typedef Uint16 BlockInstance;
 typedef Uint32 BlockReference;
 typedef Uint16 GlobalSignalNumber;
 

=== modified file 'storage/ndb/include/kernel/ndb_limits.h'
--- a/storage/ndb/include/kernel/ndb_limits.h	2008-06-18 09:01:43 +0000
+++ b/storage/ndb/include/kernel/ndb_limits.h	2008-07-23 16:00:25 +0000
@@ -178,7 +178,10 @@
  */
 #define NDBMT_BLOCK_BITS 9
 #define NDBMT_BLOCK_MASK 0x001FF
-#define NDBMT_BLOCK_INSTANCES_BITS 7
+#define NDBMT_BLOCK_INSTANCE_BITS 7
 #define NDBMT_BLOCK_INSTANCE_MASK 0xFE00
 
+#define MAX_NDBMT_WORKERS 4
+#define MAX_NDBMT_THREADS 4
+
 #endif

=== modified file 'storage/ndb/src/common/debugger/SignalLoggerManager.cpp'
--- a/storage/ndb/src/common/debugger/SignalLoggerManager.cpp	2008-06-17 20:28:45 +0000
+++ b/storage/ndb/src/common/debugger/SignalLoggerManager.cpp	2008-07-23 16:00:25 +0000
@@ -17,8 +17,22 @@
 
 #include "SignalLoggerManager.hpp"
 #include <LongSignal.hpp>
-
+#include <GlobalSignalNumbers.h>
 #include <DebuggerNames.hpp>
+#include <NdbTick.h>
+
+// avoids MT log mixups but has some serializing effect
+static const bool g_use_mutex = true;
+
+static char* mytime()
+{
+  NDB_TICKS t = NdbTick_CurrentMillisecond();
+  uint s = (t / 1000) % 3600;
+  uint ms = t % 1000;
+  static char buf[100];
+  sprintf(buf, "%u.%03u", s, ms);
+  return buf;
+}
 
 SignalLoggerManager::SignalLoggerManager()
 {
@@ -28,6 +42,9 @@ SignalLoggerManager::SignalLoggerManager
   outputStream = 0;
   m_ownNodeId = 0;
   m_logDistributed = false;
+  m_mutex = 0;
+  if (g_use_mutex)
+    m_mutex = NdbMutex_Create();
 }
 
 SignalLoggerManager::~SignalLoggerManager()
@@ -37,6 +54,10 @@ SignalLoggerManager::~SignalLoggerManage
     fclose(outputStream);
     outputStream = 0;
   }
+  if (m_mutex != 0) {
+    NdbMutex_Destroy(m_mutex);
+    m_mutex = 0;
+  }
 }
 
 FILE *
@@ -133,7 +154,7 @@ SignalLoggerManager::log(LogMode logMode
      count == 0){
     
     for (int number = 0; number < NO_OF_BLOCKS; ++number){
-      cnt += log(SLM_ON, number, logMode);
+      cnt += log(SLM_ON, MIN_BLOCK_NO + number, logMode);
     }
   } else {
     for (int i = 0; i < count; ++i){
@@ -220,15 +241,17 @@ SignalLoggerManager::executeDirect(const
   if(outputStream != 0 && 
      (traceId == 0 || traceId == trace) &&
      (logMatch(senderBlockNo, LogOut) || logMatch(receiverBlockNo, LogIn))){
+    lock();
     const char* inOutStr = prio == 0 ? "In" : "Out";
 #ifdef VM_TRACE_TIME
-    fprintf(outputStream, "---- Direct --- Signal --- %s - %d ----\n", inOutStr, time(0));
+    fprintf(outputStream, "---- Direct --- Signal --- %s - %s ----\n", inOutStr, mytime());
 #else
     fprintf(outputStream, "---- Direct --- Signal --- %s ----------------\n", inOutStr);
 #endif
     // XXX pass in/out to print* function somehow
     printSignalHeader(outputStream, sh, 0, node, true);
     printSignalData(outputStream, sh, theData);
+    unlock();
   }
 }
 
@@ -249,8 +272,9 @@ SignalLoggerManager::executeSignal(const
      (traceId == 0 || traceId == trace) &&
      (logMatch(receiverBlockNo, LogOut) ||
       (m_logDistributed && m_ownNodeId != senderNode))){
+    lock();
 #ifdef VM_TRACE_TIME
-    fprintf(outputStream, "---- Received - Signal - %d ----\n", time(0));
+    fprintf(outputStream, "---- Received - Signal - %s ----\n", mytime());
 #else
     fprintf(outputStream, "---- Received - Signal ----------------\n");
 #endif
@@ -259,6 +283,7 @@ SignalLoggerManager::executeSignal(const
     printSignalData(outputStream, sh, theData);
     for (unsigned i = 0; i < secs; i++)
       printSegmentedSection(outputStream, sh, ptr, i);
+    unlock();
   }
 }
 
@@ -276,8 +301,9 @@ SignalLoggerManager::executeSignal(const
      (traceId == 0 || traceId == trace) &&
      (logMatch(receiverBlockNo, LogOut) ||
       (m_logDistributed && m_ownNodeId != senderNode))){
+    lock();
 #ifdef VM_TRACE_TIME
-    fprintf(outputStream, "---- Received - Signal - %d ----\n", time(0));
+    fprintf(outputStream, "---- Received - Signal - %s ----\n", mytime());
 #else
     fprintf(outputStream, "---- Received - Signal ----------------\n");
 #endif
@@ -286,6 +312,7 @@ SignalLoggerManager::executeSignal(const
     printSignalData(outputStream, sh, theData);
     for (unsigned i = 0; i < secs; i++)
       printLinearSection(outputStream, sh, ptr, i);
+    unlock();
   }
 }
 
@@ -306,8 +333,9 @@ SignalLoggerManager::sendSignal(const Si
      (traceId == 0 || traceId == trace) &&
      (logMatch(senderBlockNo, LogOut) ||
       (m_logDistributed && m_ownNodeId != node))){
+    lock();
 #ifdef VM_TRACE_TIME
-    fprintf(outputStream, "---- Send ----- Signal - %d ----\n", time(0));
+    fprintf(outputStream, "---- Send ----- Signal - %s ----\n", mytime());
 #else
     fprintf(outputStream, "---- Send ----- Signal ----------------\n");
 #endif
@@ -316,6 +344,7 @@ SignalLoggerManager::sendSignal(const Si
     printSignalData(outputStream, sh, theData);
     for (unsigned i = 0; i < secs; i++)
       printLinearSection(outputStream, sh, ptr, i);
+    unlock();
   }
 }
 
@@ -335,8 +364,9 @@ SignalLoggerManager::sendSignal(const Si
      (traceId == 0 || traceId == trace) &&
      (logMatch(senderBlockNo, LogOut) ||
       (m_logDistributed && m_ownNodeId != node))){
+    lock();
 #ifdef VM_TRACE_TIME
-    fprintf(outputStream, "---- Send ----- Signal - %d ----\n", time(0));
+    fprintf(outputStream, "---- Send ----- Signal - %s ----\n", mytime());
 #else
     fprintf(outputStream, "---- Send ----- Signal ----------------\n");
 #endif
@@ -345,6 +375,7 @@ SignalLoggerManager::sendSignal(const Si
     printSignalData(outputStream, sh, theData);
     for (unsigned i = 0; i < secs; i++)
       printSegmentedSection(outputStream, sh, ptr, i);
+    unlock();
   }
 }
 
@@ -362,8 +393,9 @@ SignalLoggerManager::sendSignal(const Si
      (traceId == 0 || traceId == trace) &&
      (logMatch(senderBlockNo, LogOut) ||
       (m_logDistributed && m_ownNodeId != node))){
+    lock();
 #ifdef VM_TRACE_TIME
-    fprintf(outputStream, "---- Send ----- Signal - %d ----\n", time(0));
+    fprintf(outputStream, "---- Send ----- Signal - %s ----\n", mytime());
 #else
     fprintf(outputStream, "---- Send ----- Signal ----------------\n");
 #endif
@@ -372,6 +404,7 @@ SignalLoggerManager::sendSignal(const Si
     printSignalData(outputStream, sh, theData);
     for (unsigned i = 0; i < secs; i++)
       printGenericSection(outputStream, sh, ptr, i);
+    unlock();
   }
 }
 
@@ -388,11 +421,12 @@ SignalLoggerManager::sendSignalWithDelay
   if(outputStream != 0 && 
      (traceId == 0 || traceId == trace) &&
      logMatch(senderBlockNo, LogOut)){
+    lock();
 #ifdef VM_TRACE_TIME
     fprintf(outputStream, 
-	    "---- Send ----- Signal (%d ms) %d\n", 
+	    "---- Send ----- Signal (%d ms) %s\n", 
 	    delayInMilliSeconds, 
-	    time(0));
+	    mytime());
 #else
     fprintf(outputStream, "---- Send delay Signal (%d ms) ----------\n", 
 	    delayInMilliSeconds);
@@ -402,6 +436,7 @@ SignalLoggerManager::sendSignalWithDelay
     printSignalData(outputStream, sh, theData);
     for (unsigned i = 0; i < secs; i++)
       printSegmentedSection(outputStream, sh, ptr, i);
+    unlock();
   }
 }
 
@@ -417,15 +452,40 @@ SignalLoggerManager::log(BlockNumber bno
 
   if(outputStream != 0 &&
      logModes[bno2] != LogOff){
+    lock();
     va_list ap;
     va_start(ap, msg);
     fprintf(outputStream, "%s: ", getBlockName(bno, "API"));
     vfprintf(outputStream, msg, ap);
     fprintf(outputStream, "\n");
     va_end(ap);
+    unlock();
   }
 }
 
+static inline bool
+isSysBlock(BlockNumber block, Uint32 gsn)
+{
+  if (block != 0)
+    return false;
+  switch (gsn) {
+  case GSN_START_ORD:
+    return true; // first sig
+  case GSN_CONNECT_REP:
+  case GSN_DISCONNECT_REP:
+  case GSN_EVENT_REP:
+    return true; // transporter
+  case GSN_STOP_FOR_CRASH:
+    return true; // mt scheduler
+  }
+  return false;
+}
+
+static inline bool
+isApiBlock(BlockNumber block)
+{
+  return block >= 0x8000 || block == 4002 || block == 2047;
+}
 
 void 
 SignalLoggerManager::printSignalHeader(FILE * output, 
@@ -434,36 +494,79 @@ SignalLoggerManager::printSignalHeader(F
 				       Uint32 node,
 				       bool printReceiversSignalId)
 {
-  Uint32 receiverBlockNo = sh.theReceiversBlockNumber;
+  const char* const dummy_block_name = "UUNET";
+
+  bool receiverIsApi = isApiBlock(sh.theReceiversBlockNumber);
+  Uint32 receiverBlockNo;
+  Uint32 receiverInstanceNo;
+  if (!receiverIsApi) {
+    receiverBlockNo = blockToMain(sh.theReceiversBlockNumber);
+    receiverInstanceNo = blockToInstance(sh.theReceiversBlockNumber);
+  } else {
+    receiverBlockNo = sh.theReceiversBlockNumber;
+    receiverInstanceNo = 0;
+  }
   Uint32 receiverProcessor = node;
+
   Uint32 gsn = sh.theVerId_signalNumber;
-  Uint32 senderBlockNo = refToBlock(sh.theSendersBlockRef);
-  Uint32 senderProcessor = refToNode(sh.theSendersBlockRef);
+
+  Uint32 sbref = sh.theSendersBlockRef;
+  bool senderIsSys = isSysBlock(refToBlock(sbref), gsn);
+  bool senderIsApi = isApiBlock(refToBlock(sbref));
+  Uint32 senderBlockNo;
+  Uint32 senderInstanceNo;
+  if (!senderIsSys && !senderIsApi) {
+    senderBlockNo = refToMain(sbref);
+    senderInstanceNo = refToInstance(sbref);
+  } else {
+    senderBlockNo = refToBlock(sbref);
+    senderInstanceNo = 0;
+  }
+  Uint32 senderProcessor = refToNode(sbref);
+
   Uint32 length = sh.theLength;
   Uint32 trace = sh.theTrace;
   Uint32 rSigId = sh.theSignalId;
   Uint32 sSigId = sh.theSendersSignalId;
 
   const char * signalName = getSignalName(gsn);
-  const char * rBlockName = getBlockName(receiverBlockNo, "API");
-  const char * sBlockName = getBlockName(senderBlockNo, "API");
-  
-  if(printReceiversSignalId)
+  const char * rBlockName =
+    receiverIsApi ? "API" :
+    getBlockName(receiverBlockNo, dummy_block_name);
+  const char * sBlockName =
+    senderIsSys ? "SYS" :
+    senderIsApi ? "API" :
+    getBlockName(senderBlockNo, dummy_block_name);
+
+  char rInstanceText[20];
+  char sInstanceText[20];
+  rInstanceText[0] = 0;
+  sInstanceText[0] = 0;
+  if (receiverInstanceNo != 0)
+    sprintf(rInstanceText, "/%u", (uint)receiverInstanceNo);
+  if (senderInstanceNo != 0)
+    sprintf(sInstanceText, "/%u", (uint)senderInstanceNo);
+// wl4391_todo senders instance missing
+// assert(gsn != GSN_LQHKEYREQ || receiverProcessor == senderProcessor || senderInstanceNo != 0);
+  if (printReceiversSignalId)
     fprintf(output, 
-	    "r.bn: %d \"%s\", r.proc: %d, r.sigId: %d gsn: %d \"%s\" prio: %d\n"
-	    ,receiverBlockNo, rBlockName, receiverProcessor, rSigId, 
-	    gsn, signalName, prio);
+	    "r.bn: %d%s \"%s\", r.proc: %d, r.sigId: %d gsn: %d \"%s\" prio: %d\n"
+	    ,receiverBlockNo, rInstanceText, rBlockName, receiverProcessor,
+            rSigId, gsn, signalName, prio);
   else 
     fprintf(output,
-	    "r.bn: %d \"%s\", r.proc: %d, gsn: %d \"%s\" prio: %d\n",
-	    receiverBlockNo, rBlockName, receiverProcessor, gsn, 
-	    signalName, prio);
+	    "r.bn: %d%s \"%s\", r.proc: %d, gsn: %d \"%s\" prio: %d\n",
+	    receiverBlockNo, rInstanceText, rBlockName, receiverProcessor,
+            gsn, signalName, prio);
   
   fprintf(output, 
-	  "s.bn: %d \"%s\", s.proc: %d, s.sigId: %d length: %d trace: %d "
+	  "s.bn: %d%s \"%s\", s.proc: %d, s.sigId: %d length: %d trace: %d "
 	  "#sec: %d fragInf: %d\n",
-	  senderBlockNo, sBlockName, senderProcessor, sSigId, length, trace,
-	  sh.m_noOfSections, sh.m_fragmentInfo);
+	  senderBlockNo, sInstanceText, sBlockName, senderProcessor,
+          sSigId, length, trace, sh.m_noOfSections, sh.m_fragmentInfo);
+
+  //assert(strcmp(rBlockName, dummy_block_name) != 0);
+  //assert(strcmp(sBlockName, dummy_block_name) != 0);
 }
 
 void

=== modified file 'storage/ndb/src/kernel/blocks/dbdict/Dbdict.cpp'
--- a/storage/ndb/src/kernel/blocks/dbdict/Dbdict.cpp	2008-07-01 12:35:34 +0000
+++ b/storage/ndb/src/kernel/blocks/dbdict/Dbdict.cpp	2008-07-23 16:00:25 +0000
@@ -1642,9 +1642,6 @@ Dbdict::Dbdict(Block_context& ctx):
   c_opDropEvent(c_opRecordPool),
   c_opSignalUtil(c_opRecordPool),
   c_opRecordSequence(0)
-#ifdef VM_TRACE
-  ,debugOut(*new NullOutputStream())
-#endif
 {
   BLOCK_CONSTRUCTOR(Dbdict);
   
@@ -2394,25 +2391,8 @@ void Dbdict::execREAD_CONFIG_REQ(Signal*
       new (ptr.p) DictObject();
     objs.release();
   }
-
-#ifdef VM_TRACE
-  {
-    FILE* debugFile = globalSignalLoggers.getOutputStream();
-    if (debugFile != NULL)
-      debugOut = *new NdbOut(*new FileOutputStream(debugFile));
-  }
-#endif
 }//execSIZEALT_REP()
 
-#ifdef VM_TRACE
-bool
-Dbdict::debugOutOn() const
-{
-  SignalLoggerManager::LogMode mask = SignalLoggerManager::LogInOut;
-  return globalSignalLoggers.logMatch(DBDICT, mask);
-}
-#endif
-
 /* ---------------------------------------------------------------- */
 // Start phase signals sent by CNTR. We reply with NDB_STTORRY when
 // we completed this phase.

=== modified file 'storage/ndb/src/kernel/blocks/dbdict/Dbdict.hpp'
--- a/storage/ndb/src/kernel/blocks/dbdict/Dbdict.hpp	2008-06-05 20:34:20 +0000
+++ b/storage/ndb/src/kernel/blocks/dbdict/Dbdict.hpp	2008-07-23 16:00:25 +0000
@@ -74,20 +74,6 @@
 #include <LockQueue.hpp>
 #include <signaldata/CopyData.hpp>
 
-// Debug Macros
-
-#ifdef VM_TRACE
-#define D(x) \
-  do { \
-    if (!debugOutOn()) break; \
-    debugOut << "DBDICT:" << __LINE__ << " " << x << endl; \
-  } while (0)
-#define V(x) " " << #x << ":" << (x)
-#else
-#define D(x)
-#undef V
-#endif
-
 #ifdef DBDICT_C
 
 /*--------------------------------------------------------------*/
@@ -3422,8 +3408,6 @@ public:
   int checkSingleUserMode(Uint32 senderRef);
 
 #ifdef VM_TRACE
-  NdbOut debugOut;
-  bool debugOutOn() const;
   friend NdbOut& operator<<(NdbOut& out, const DictObject&);
   friend NdbOut& operator<<(NdbOut& out, const ErrorInfo&);
   friend NdbOut& operator<<(NdbOut& out, const SchemaOp&);

=== modified file 'storage/ndb/src/kernel/blocks/dbdih/Dbdih.hpp'
--- a/storage/ndb/src/kernel/blocks/dbdih/Dbdih.hpp	2008-06-05 20:31:21 +0000
+++ b/storage/ndb/src/kernel/blocks/dbdih/Dbdih.hpp	2008-07-23 16:00:25 +0000
@@ -1773,6 +1773,16 @@ private:
   bool c_sr_wait_to;
   NdbNodeBitmask m_sr_nodes;
   NdbNodeBitmask m_to_nodes;
+
+  // block instances
+public:
+  Uint32 dihGetInstanceKey(FragmentstorePtr tFragPtr) {
+    ndbrequire(!tFragPtr.isNull());
+    Uint32 log_part_id = tFragPtr.p->m_log_part_id;
+    Uint32 instanceKey = 1 + log_part_id % MAX_NDBMT_WORKERS;
+    return instanceKey;
+  }
+  Uint32 dihGetInstanceKey(Uint32 tabId, Uint32 fragId);
 };
 
 #if (DIH_CDATA_SIZE < _SYSFILE_SIZE32)

=== modified file 'storage/ndb/src/kernel/blocks/dbdih/DbdihMain.cpp'
--- a/storage/ndb/src/kernel/blocks/dbdih/DbdihMain.cpp	2008-06-09 14:32:01 +0000
+++ b/storage/ndb/src/kernel/blocks/dbdih/DbdihMain.cpp	2008-07-23 16:00:25 +0000
@@ -16870,4 +16870,17 @@ do_send:
   infoEvent("%s %s", msg, buf);
 }
 
+// block instances
+Uint32
+Dbdih::dihGetInstanceKey(Uint32 tabId, Uint32 fragId)
+{
+  TabRecordPtr tTabPtr;
+  tTabPtr.i = tabId;
+  ptrCheckGuard(tTabPtr, ctabFileSize, tabRecord);
+  FragmentstorePtr tFragPtr;
+  getFragstore(tTabPtr.p, fragId, tFragPtr);
+  Uint32 instanceKey = dihGetInstanceKey(tFragPtr);
+  return instanceKey;
+}
+
 #endif

=== modified file 'storage/ndb/src/kernel/blocks/ndbfs/PosixAsyncFile.cpp'
--- a/storage/ndb/src/kernel/blocks/ndbfs/PosixAsyncFile.cpp	2008-04-09 10:41:03 +0000
+++ b/storage/ndb/src/kernel/blocks/ndbfs/PosixAsyncFile.cpp	2008-07-23 16:00:25 +0000
@@ -335,10 +335,8 @@ no_odirect:
       req->data.pageData[0] = m_page_ptr.i;
 
       m_fs.EXECUTE_DIRECT(block, GSN_FSWRITEREQ, signal,
-			  FsReadWriteReq::FixedLength + 1
-#ifdef VM_TRACE
-                          , true     // This EXECUTE_DIRECT is thread safe
-#endif
+			  FsReadWriteReq::FixedLength + 1,
+                          0 // wl4391_todo This EXECUTE_DIRECT is thread safe
                           );
   retry:
       Uint32 size = request->par.open.page_size;

=== modified file 'storage/ndb/src/kernel/main.cpp'
--- a/storage/ndb/src/kernel/main.cpp	2008-04-23 13:52:37 +0000
+++ b/storage/ndb/src/kernel/main.cpp	2008-07-23 16:00:25 +0000
@@ -1,4 +1,4 @@
-/* Copyright (C) 2003 MySQL AB
+ /* Copyright (C) 2003 MySQL AB
 
    This program is free software; you can redistribute it and/or modify
    it under the terms of the GNU General Public License as published by
@@ -33,6 +33,7 @@
 
 #include <LogLevel.hpp>
 #include <EventLogger.hpp>
+#include <NdbEnv.h>
 
 #include <NdbAutoPtr.hpp>
 
@@ -285,6 +286,56 @@ init_global_memory_manager(EmulatorData 
   return 0;                     // Success
 }
 
+bool g_ndbMt = false;
+bool g_ndbMtLqh = false;
+
+static int
+get_multithreaded_config(EmulatorData& ed)
+{
+  // multithreaded is compiled in ndbd/ndbmtd
+  g_ndbMt = SimulatedBlock::isMultiThreaded();
+  if (!g_ndbMt)
+    return 0;
+
+  // mt lqh via environment during development
+  {
+    const char* p = NdbEnv_GetEnv("NDB_MT_LQH", (char*)0, 0);
+    if (p != 0 && strchr("1Y", p[0]) != 0)
+      g_ndbMtLqh = true;
+    if (!g_ndbMtLqh)
+      return 0;
+  }
+
+  const ndb_mgm_configuration_iterator * p =
+    ed.theConfiguration->getOwnConfigIterator();
+  if (p == 0)
+  {
+    abort();
+  }
+
+  Uint32 workers = 0;
+  Uint32 threads = 0;
+  if (ndb_mgm_get_int_parameter(p, CFG_NDBMT_WORKERS, &workers) ||
+      ndb_mgm_get_int_parameter(p, CFG_NDBMT_THREADS, &threads))
+  {
+    g_eventLogger->alert("Failed to get CFG_NDBMT parameters from "
+                        "config, exiting.");
+    return -1;
+  }
+
+  ndbout << "NDBMT: workers=" << workers
+         << " threads=" << threads << endl;
+
+  assert(workers != 0 && workers <= MAX_NDBMT_WORKERS);
+  assert(threads != 0 && threads <= MAX_NDBMT_THREADS);
+  assert(workers % threads == 0);
+
+  globalData.ndbmtWorkers = workers;
+  globalData.ndbmtThreads = threads;
+  return 0;
+}
+
+
 int main(int argc, char** argv)
 {
   NDB_INIT(argv[0]);
@@ -480,28 +531,41 @@ int main(int argc, char** argv)
     globalEmulatorData.theWatchDog->unregisterWatchedThread(0);
   }
 
-  globalEmulatorData.theThreadConfig->init(&globalEmulatorData);
+  if (get_multithreaded_config(globalEmulatorData))
+    return -1;
 
-    // Load blocks
-  globalEmulatorData.theSimBlockList->load(globalEmulatorData);
-    
-  // Set thread concurrency for Solaris' light weight processes
-  int status;
-  status = NdbThread_SetConcurrencyLevel(30);
-  assert(status == 0);
+  globalEmulatorData.theThreadConfig->init(&globalEmulatorData);
   
 #ifdef VM_TRACE
-  // Create a signal logger
+  // Create a signal logger before block constructors
   char *buf= NdbConfig_SignalLogFileName(globalData.ownId);
   NdbAutoPtr<char> tmp_aptr(buf);
   FILE * signalLog = fopen(buf, "a");
   globalSignalLoggers.setOwnNodeId(globalData.ownId);
   globalSignalLoggers.setOutputStream(signalLog);
-#if 0 // to log startup
-  globalSignalLoggers.log(SignalLoggerManager::LogInOut, "BLOCK=DBDICT,DBDIH");
-  globalData.testOn = 1;
+#if 1 // to log startup
+  { const char* p = NdbEnv_GetEnv("NDB_SIGNAL_LOG", (char*)0, 0);
+    if (p != 0) {
+      char buf[200];
+      snprintf(buf, sizeof(buf), "BLOCK=%s", p);
+      for (char* q = buf; *q != 0; q++) *q = toupper(toascii(*q));
+      globalSignalLoggers.log(SignalLoggerManager::LogInOut, buf);
+      globalData.testOn = 1;
+      assert(signalLog != 0);
+      fprintf(signalLog, "START\n");
+      fflush(signalLog);
+    }
+  }
 #endif
 #endif
+
+    // Load blocks
+  globalEmulatorData.theSimBlockList->load(globalEmulatorData);
+    
+  // Set thread concurrency for Solaris' light weight processes
+  int status;
+  status = NdbThread_SetConcurrencyLevel(30);
+  assert(status == 0);
   
   catchsigs(false);
    

=== modified file 'storage/ndb/src/kernel/vm/Configuration.cpp'
--- a/storage/ndb/src/kernel/vm/Configuration.cpp	2008-05-29 15:06:11 +0000
+++ b/storage/ndb/src/kernel/vm/Configuration.cpp	2008-07-23 16:00:25 +0000
@@ -999,6 +999,17 @@ Configuration::calcSizeAlt(ConfigValues 
     cfg.put(CFG_TUX_SCAN_OP, noOfLocalScanRecords); 
   }
 
+  // NDBMT
+  {
+    uint workers = 4;
+    uint threads = 2;
+    assert(workers <= MAX_NDBMT_WORKERS);
+    assert(threads <= MAX_NDBMT_THREADS);
+    assert(workers % threads == 0);
+    cfg.put(CFG_NDBMT_WORKERS, workers);
+    cfg.put(CFG_NDBMT_THREADS, threads);
+  }
+
   m_ownConfig = (ndb_mgm_configuration*)cfg.getConfigValues();
   m_ownConfigIterator = ndb_mgm_create_configuration_iterator
     (m_ownConfig, 0);

=== added file 'storage/ndb/src/kernel/vm/GlobalData.cpp'
--- a/storage/ndb/src/kernel/vm/GlobalData.cpp	1970-01-01 00:00:00 +0000
+++ b/storage/ndb/src/kernel/vm/GlobalData.cpp	2008-07-23 16:00:25 +0000
@@ -0,0 +1,28 @@
+/* Copyright (C) 2003 MySQL AB
+
+   This program is free software; you can redistribute it and/or modify
+   it under the terms of the GNU General Public License as published by
+   the Free Software Foundation; version 2 of the License.
+
+   This program is distributed in the hope that it will be useful,
+   but WITHOUT ANY WARRANTY; without even the implied warranty of
+   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+   GNU General Public License for more details.
+
+   You should have received a copy of the GNU General Public License
+   along with this program; if not, write to the Free Software
+   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
+
+#include <ndb_global.h>
+#include <kernel_types.h>
+#include <SimulatedBlock.hpp>
+#include "GlobalData.hpp"
+
+SimulatedBlock*
+GlobalData::getBlock(BlockNumber blockNo, Uint32 instanceNo)
+{
+  SimulatedBlock* b = getBlock(blockNo);
+  if (b != 0 && instanceNo != 0)
+    b = b->getInstance(instanceNo);
+  return b;
+}

=== modified file 'storage/ndb/src/kernel/vm/GlobalData.hpp'
--- a/storage/ndb/src/kernel/vm/GlobalData.hpp	2008-01-01 12:45:11 +0000
+++ b/storage/ndb/src/kernel/vm/GlobalData.hpp	2008-07-23 16:00:25 +0000
@@ -67,11 +67,16 @@ struct GlobalData {
   
   Uint32     sendPackedActivated;
   Uint32     activateSendPacked;
+
+  Uint32     ndbmtWorkers;
+  Uint32     ndbmtThreads;
   
   GlobalData(){ 
     theSignalId = 0; 
     theStartLevel = NodeState::SL_NOTHING;
     theRestartFlag = perform_start;
+    ndbmtWorkers = 0;
+    ndbmtThreads = 0;
 #ifdef GCP_TIMER_HACK
     gcp_timer_limit = 0;
 #endif
@@ -80,6 +85,7 @@ struct GlobalData {
   
   void             setBlock(BlockNumber blockNo, SimulatedBlock * block);
   SimulatedBlock * getBlock(BlockNumber blockNo);
+  SimulatedBlock * getBlock(BlockNumber blockNo, Uint32 instanceNo);
   
   void           incrementWatchDogCounter(Uint32 place);
   Uint32 * getWatchDogPtr();

=== modified file 'storage/ndb/src/kernel/vm/Makefile.am'
--- a/storage/ndb/src/kernel/vm/Makefile.am	2008-04-25 16:53:41 +0000
+++ b/storage/ndb/src/kernel/vm/Makefile.am	2008-07-23 16:00:25 +0000
@@ -36,13 +36,15 @@ libkernel_a_SOURCES = \
         Rope.cpp \
         ndbd_malloc.cpp \
 	ndbd_malloc_impl.cpp \
-	Pool.cpp WOPool.cpp RWPool.cpp DynArr256.cpp LockQueue.cpp
+	Pool.cpp WOPool.cpp RWPool.cpp DynArr256.cpp LockQueue.cpp \
+	GlobalData.cpp
 
 libsched_a_SOURCES = TimeQueue.cpp		\
                      ThreadConfig.cpp \
                      FastScheduler.cpp \
                      TransporterCallback_nonmt.cpp \
-                     SimulatedBlock_nonmt.cpp
+                     SimulatedBlock_nonmt.cpp \
+		     dummy_nonmt.cpp
 
 libsched_mt_a_SOURCES = SimulatedBlock_mt.cpp \
                         TransporterCallback_mt.cpp \

=== modified file 'storage/ndb/src/kernel/vm/SimulatedBlock.cpp'
--- a/storage/ndb/src/kernel/vm/SimulatedBlock.cpp	2008-07-01 12:35:34 +0000
+++ b/storage/ndb/src/kernel/vm/SimulatedBlock.cpp	2008-07-23 16:00:25 +0000
@@ -24,6 +24,7 @@
 
 #include "SimulatedBlock.hpp"
 #include <NdbOut.hpp>
+#include <OutputStream.hpp>
 #include <GlobalData.hpp>
 #include <Emulator.hpp>
 #include <WatchDog.hpp>
@@ -46,6 +47,8 @@
 #include <AttributeDescriptor.hpp>
 #include <NdbSqlUtil.hpp>
 
+#include "../blocks/dbdih/Dbdih.hpp"
+
 #include <EventLogger.hpp>
 extern EventLogger * g_eventLogger;
 
@@ -56,10 +59,15 @@ extern EventLogger * g_eventLogger;
 // Constructor, Destructor
 //
 SimulatedBlock::SimulatedBlock(BlockNumber blockNumber,
-			       struct Block_context & ctx) 
+			       struct Block_context & ctx,
+                               Uint32 instanceNumber)
   : theNodeId(globalData.ownId),
     theNumber(blockNumber),
-    theReference(numberToRef(blockNumber, globalData.ownId)),
+    theInstance(instanceNumber),
+    theReference(numberToRef(blockNumber, instanceNumber, globalData.ownId)),
+    theInstanceCount(0),
+    theInstanceList(0),
+    theMainInstance(0),
     m_ctx(ctx),
     m_global_page_pool(globalData.m_global_page_pool),
     m_shared_page_pool(globalData.m_shared_page_pool),
@@ -68,13 +76,41 @@ SimulatedBlock::SimulatedBlock(BlockNumb
     c_segmentedFragmentSendList(c_fragmentSendPool),
     c_mutexMgr(* this),
     c_counterMgr(* this)
+#ifdef VM_TRACE
+    ,debugOut(*new NdbOut(*new FileOutputStream(globalSignalLoggers.getOutputStream())))
+#endif
 {
   m_threadId = 0;
   m_watchDogCounter = NULL;
   m_jamBuffer = (EmulatedJamBuffer *)NdbThread_GetTlsKey(NDB_THREAD_TLS_JAM);
   NewVarRef = 0;
   
-  globalData.setBlock(blockNumber, this);
+  SimulatedBlock* main = globalData.getBlock(blockNumber);
+
+  if (theInstance == 0) {
+    ndbrequire(main == 0);
+    main = this;
+    globalData.setBlock(blockNumber, main);
+    main->theInstanceCount = 1;
+  } else {
+    ndbrequire(main != 0);
+    ndbrequire(theInstance == main->theInstanceCount);
+    ndbrequire(theInstance < MaxInstances);
+    if (theInstance == 1) {
+      ndbrequire(main->theInstanceList == 0);
+      main->theInstanceList = new SimulatedBlock* [MaxInstances];
+      Uint32 i;
+      for (i = 0; i < MaxInstances; i++)
+        main->theInstanceList[i] = 0;
+    } else {
+      ndbrequire(main->theInstanceList != 0);
+    }
+    ndbrequire(main->theInstanceList[theInstance] == 0);
+    main->theInstanceList[theInstance] = this;
+    main->theInstanceCount = theInstance + 1;
+  }
+  theMainInstance = main;
+
   c_fragmentIdCounter = 1;
   c_fragSenderRunning = false;
   
@@ -130,6 +166,14 @@ SimulatedBlock::~SimulatedBlock()
 #ifdef VM_TRACE
   delete [] m_global_variables;
 #endif
+
+  if (theInstanceList != 0) {
+    Uint32 i;
+    for (i = 0; i < theInstanceCount; i++)
+      delete theInstanceList[i];
+    delete [] theInstanceList;
+  }
+  theInstanceList = 0;
 }
 
 void 
@@ -172,6 +216,15 @@ SimulatedBlock::addRecSignalImpl(GlobalS
   theExecArray[gsn] = f;
 }
 
+Uint32
+SimulatedBlock::getInstanceKey(Uint32 tabId, Uint32 fragId)
+{
+  Dbdih* dbdih = (Dbdih*)globalData.getBlock(DBDIH);
+  ndbrequire(dbdih != 0);
+  Uint32 instanceKey = dbdih->dihGetInstanceKey(tabId, fragId);
+  return instanceKey;
+}
+
 void
 SimulatedBlock::assignToThread(Uint32 threadId, EmulatedJamBuffer *jamBuffer,
                                Uint32 *watchDogCounter)
@@ -509,9 +562,9 @@ SimulatedBlock::sendSignal(BlockReferenc
     signal->header.theSendersBlockRef = sendBRef;
 #ifdef NDBD_MULTITHREADED
     if (jobBuffer == JBB)
-      sendlocal(m_threadId, recBlock, &signal->header, signal->theData, NULL);
+      sendlocal(m_threadId, &signal->header, signal->theData, NULL);
     else
-      sendprioa(m_threadId, recBlock, &signal->header, signal->theData, NULL);
+      sendprioa(m_threadId, &signal->header, signal->theData, NULL);
 #else
     globalScheduler.execute(signal, jobBuffer, recBlock,
 			    gsn);
@@ -607,9 +660,9 @@ SimulatedBlock::sendSignal(NodeReceiverG
 
 #ifdef NDBD_MULTITHREADED
     if (jobBuffer == JBB)
-      sendlocal(m_threadId, recBlock, &signal->header, signal->theData, NULL);
+      sendlocal(m_threadId, &signal->header, signal->theData, NULL);
     else
-      sendprioa(m_threadId, recBlock, &signal->header, signal->theData, NULL);
+      sendprioa(m_threadId, &signal->header, signal->theData, NULL);
 #else
     globalScheduler.execute(signal, jobBuffer, recBlock, gsn);
 #endif
@@ -716,10 +769,10 @@ SimulatedBlock::sendSignal(BlockReferenc
     
 #ifdef NDBD_MULTITHREADED
     if (jobBuffer == JBB)
-      sendlocal(m_threadId, recBlock, &signal->header, signal->theData,
+      sendlocal(m_threadId, &signal->header, signal->theData,
                 signal->theData+length);
     else
-      sendprioa(m_threadId, recBlock, &signal->header, signal->theData,
+      sendprioa(m_threadId, &signal->header, signal->theData,
                 signal->theData+length);
 #else
     globalScheduler.execute(signal, jobBuffer, recBlock,
@@ -831,10 +884,10 @@ SimulatedBlock::sendSignal(NodeReceiverG
 
 #ifdef NDBD_MULTITHREADED
     if (jobBuffer == JBB)
-      sendlocal(m_threadId, recBlock, &signal->header, signal->theData,
+      sendlocal(m_threadId, &signal->header, signal->theData,
                 signal->theData + length);
     else
-      sendprioa(m_threadId, recBlock, &signal->header, signal->theData,
+      sendprioa(m_threadId, &signal->header, signal->theData,
                 signal->theData + length);
 #else
     globalScheduler.execute(signal, jobBuffer, recBlock, gsn);
@@ -944,10 +997,10 @@ SimulatedBlock::sendSignal(BlockReferenc
 
 #ifdef NDBD_MULTITHREADED
     if (jobBuffer == JBB)
-      sendlocal(m_threadId, recBlock, &signal->header, signal->theData,
+      sendlocal(m_threadId, &signal->header, signal->theData,
                 signal->theData + length);
     else
-      sendprioa(m_threadId, recBlock, &signal->header, signal->theData,
+      sendprioa(m_threadId, &signal->header, signal->theData,
                 signal->theData + length);
 #else
     globalScheduler.execute(signal, jobBuffer, recBlock, gsn);
@@ -1059,10 +1112,10 @@ SimulatedBlock::sendSignal(NodeReceiverG
     * dst ++ = sections->m_ptr[2].i;
 #ifdef NDBD_MULTITHREADED
     if (jobBuffer == JBB)
-      sendlocal(m_threadId, recBlock, &signal->header, signal->theData,
+      sendlocal(m_threadId, &signal->header, signal->theData,
                 signal->theData + length);
     else
-      sendprioa(m_threadId, recBlock, &signal->header, signal->theData,
+      sendprioa(m_threadId, &signal->header, signal->theData,
                 signal->theData + length);
 #else
     globalScheduler.execute(signal, jobBuffer, recBlock, gsn);
@@ -1246,16 +1299,20 @@ SimulatedBlock::freeBat(){
 }
 
 const NewVARIABLE *
-SimulatedBlock::getBat(Uint16 blockNo){
+SimulatedBlock::getBat(Uint16 blockNo, Uint32 instanceNo){
   SimulatedBlock * sb = globalData.getBlock(blockNo);
+  if (sb != 0 && instanceNo != 0)
+    sb = sb->getInstance(instanceNo);
   if(sb == 0)
     return 0;
   return sb->NewVarRef;
 }
 
 Uint16
-SimulatedBlock::getBatSize(Uint16 blockNo){
+SimulatedBlock::getBatSize(Uint16 blockNo, Uint32 instanceNo){
   SimulatedBlock * sb = globalData.getBlock(blockNo);
+  if (sb != 0 && instanceNo != 0)
+    sb = sb->getInstance(instanceNo);
   if(sb == 0)
     return 0;
   return sb->theBATSize;
@@ -1418,7 +1475,7 @@ SimulatedBlock::infoEvent(const char * m
   signalT.header.theLength               = ((len+3)/4)+1;
   
 #ifdef NDBD_MULTITHREADED
-  sendlocal(m_threadId, signalT.header.theReceiversBlockNumber,
+  sendlocal(m_threadId,
             &signalT.header, signalT.theData, signalT.m_sectionPtrI);
 #else
   globalScheduler.execute(&signalT.header, JBB, signalT.theData,
@@ -1463,7 +1520,7 @@ SimulatedBlock::warningEvent(const char 
   signalT.header.theLength               = ((len+3)/4)+1;
 
 #ifdef NDBD_MULTITHREADED
-  sendlocal(m_threadId, signalT.header.theReceiversBlockNumber,
+  sendlocal(m_threadId,
             &signalT.header, signalT.theData, signalT.m_sectionPtrI);
 #else
   globalScheduler.execute(&signalT.header, JBB, signalT.theData,
@@ -1621,7 +1678,7 @@ SimulatedBlock::printTimes(FILE * output
     }
   }
   sum = (sum + 500)/ 1000;
-  fprintf(output, "-- %s : %d --\n", getBlockName(number()), sum);
+  fprintf(output, "-- %s : %u --\n", getBlockName(number()), (Uint32)sum);
   fprintf(output, "\n");
   fflush(output);
 }
@@ -2387,7 +2444,7 @@ SimulatedBlock::setNodeInfo(NodeId nodeI
 }
 
 bool
-SimulatedBlock::isMultiThreaded() const
+SimulatedBlock::isMultiThreaded()
 {
 #ifdef NDBD_MULTITHREADED
   return true;
@@ -2720,3 +2777,37 @@ SimulatedBlock::create_distr_key(Uint32 
 }
 
 CArray<KeyDescriptor> g_key_descriptor_pool;
+
+#ifdef VM_TRACE
+bool
+SimulatedBlock::debugOutOn() const
+{
+  SignalLoggerManager::LogMode mask = SignalLoggerManager::LogInOut;
+  return globalSignalLoggers.logMatch(number(), mask);
+}
+
+const char*
+SimulatedBlock::debugOutTag(char *buf, int line)
+{
+  char blockbuf[40];
+  char instancebuf[40];
+  char linebuf[40];
+  char timebuf[40];
+  sprintf(blockbuf, "%s", getBlockName(number(), "UNKNOWN"));
+  instancebuf[0] = 0;
+  if (instance() != 0)
+    sprintf(instancebuf, "/%u", instance());
+  sprintf(linebuf, " %d", line);
+  timebuf[0] = 0;
+#ifdef VM_TRACE_TIME
+  {
+    NDB_TICKS t = NdbTick_CurrentMillisecond();
+    uint s = (t / 1000) % 3600;
+    uint ms = t % 1000;
+    sprintf(timebuf, " - %u.%03u -", s, ms);
+  }
+#endif
+  sprintf(buf, "%s%s%s%s ", blockbuf, instancebuf, linebuf, timebuf);
+  return buf;
+}
+#endif

=== modified file 'storage/ndb/src/kernel/vm/SimulatedBlock.hpp'
--- a/storage/ndb/src/kernel/vm/SimulatedBlock.hpp	2008-06-17 20:28:45 +0000
+++ b/storage/ndb/src/kernel/vm/SimulatedBlock.hpp	2008-07-23 16:00:25 +0000
@@ -56,6 +56,19 @@
 #include "ndbd_malloc_impl.hpp"
 #include <blocks/record_types.hpp>
 
+#ifdef VM_TRACE
+#define D(x) \
+  do { \
+    char buf[200]; \
+    if (!debugOutOn()) break; \
+    debugOut << debugOutTag(buf, __LINE__) << x << endl; \
+  } while (0)
+#define V(x) " " << #x << ":" << (x)
+#else
+#define D(x)
+#undef V
+#endif
+
 /**
  * Something for filesystem access
  */
@@ -108,7 +121,8 @@ protected:
    * Constructor
    */
   SimulatedBlock(BlockNumber blockNumber,
-		 struct Block_context & ctx); 
+		 struct Block_context & ctx,
+                 Uint32 instanceNumber = 0);
   
   /**********************************************************
    * Handling of execFunctions
@@ -125,12 +139,34 @@ public:
    */
   inline void executeFunction(GlobalSignalNumber gsn, Signal* signal);
 
+  /* Multiple block instances */
+  Uint32 instance() const {
+    return theInstance;
+  }
+  Uint32 getWorkerCount() const {
+    ndbrequire(theInstance == 0); // valid only on main instance
+    ndbrequire(theInstanceCount >= 1);
+    return theInstanceCount - 1;
+  }
+  SimulatedBlock* getInstance(Uint32 instanceNumber) {
+    ndbrequire(theInstance == 0);
+    if (instanceNumber == 0)
+      return this;
+    if (instanceNumber < theInstanceCount) {
+      ndbrequire(theInstanceList != 0);
+      return theInstanceList[instanceNumber];
+    }
+    return 0;
+  }
+  Uint32 getInstanceKey(Uint32 tabId, Uint32 fragId);
+
   /* Setup state of a block object for executing in a particular thread. */
   void assignToThread(Uint32 threadId, EmulatedJamBuffer *jamBuffer,
                       Uint32 *watchDogCounter);
   /* For multithreaded ndbd, get the id of owning thread. */
   uint32 getThreadId() const { return m_threadId; }
-  bool isMultiThreaded() const;
+  static bool isMultiThreaded();
+
 public:
   typedef void (SimulatedBlock::* CallbackFunction)(class Signal*, 
 						    Uint32 callbackData,
@@ -210,14 +246,15 @@ protected:
 			   Uint32 length,
 			   SectionHandle* sections) const;
 
+  /*
+   * Instance defaults to instance of sender.  Using explicit
+   * instance argument asserts that the call is thread-safe.
+   */
   void EXECUTE_DIRECT(Uint32 block, 
 		      Uint32 gsn, 
 		      Signal* signal, 
-		      Uint32 len
-#ifdef VM_TRACE
-                      , bool is_thread_safe = false
-#endif
-);
+		      Uint32 len,
+                      Uint32 givenInstanceNo = ZNIL);
   
   class SectionSegmentPool& getSectionSegmentPool();
   void releaseSections(struct SectionHandle&);
@@ -398,8 +435,18 @@ private:
   void  signal_error(Uint32, Uint32, Uint32, const char*, int) const ;
   const NodeId         theNodeId;
   const BlockNumber    theNumber;
+  const Uint32 theInstance;
   const BlockReference theReference;
   /*
+   * Instance 0 is the main instance.  It creates/owns other instances.
+   * In MT LQH main instance is the LQH proxy and the others ("workers")
+   * are real LQHs run by multiple threads.
+   */
+  enum { MaxInstances = 1 + MAX_NDBMT_WORKERS };
+  Uint32 theInstanceCount;          // set in main
+  SimulatedBlock** theInstanceList; // set in main, indexed by instance
+  SimulatedBlock* theMainInstance;  // set in all
+  /*
     Thread id currently executing this block.
     Not used in singlethreaded ndbd.
   */
@@ -416,8 +463,10 @@ protected:
   Block_context m_ctx;
   NewVARIABLE* allocateBat(int batSize);
   void freeBat();
-  static const NewVARIABLE* getBat    (BlockNumber blockNo);
-  static Uint16             getBatSize(BlockNumber blockNo);
+  static const NewVARIABLE* getBat    (BlockNumber blockNo,
+                                       Uint32 instanceNo = 0);
+  static Uint16             getBatSize(BlockNumber blockNo,
+                                       Uint32 instanceNo = 0);
   
   static BlockReference calcTcBlockRef   (NodeId aNode);
   static BlockReference calcLqhBlockRef  (NodeId aNode);
@@ -629,6 +678,12 @@ public:
   void clear_global_variables();
   void init_globals_list(void ** tmp, size_t cnt);
 #endif
+
+#ifdef VM_TRACE
+  NdbOut debugOut;
+  bool debugOutOn() const;
+  const char* debugOutTag(char* buf, int line);
+#endif
 };
 
 inline 
@@ -806,17 +861,32 @@ void
 SimulatedBlock::EXECUTE_DIRECT(Uint32 block, 
 			       Uint32 gsn, 
 			       Signal* signal, 
-			       Uint32 len
-#ifdef VM_TRACE
-                               , bool is_thread_safe
-#endif
-)
+			       Uint32 len,
+                               Uint32 givenInstanceNo)
 {
   signal->setLength(len);
+  SimulatedBlock* b = globalData.getBlock(block);
+  ndbassert(b != 0);
+  /**
+   * In multithreaded NDB, blocks run in different threads, and EXECUTE_DIRECT
+   * (unlike sendSignal) is generally not thread-safe.
+   * So only allow EXECUTE_DIRECT between blocks that run in the same thread,
+   * unless caller explicitly marks it as being thread safe (eg NDBFS),
+   * by using an explicit instance argument.
+   * By default instance of sender is used.  This is automatically thread-safe
+   * for worker instances (instance != 0).
+   */
+  Uint32 instanceNo = givenInstanceNo;
+  if (instanceNo == ZNIL)
+    instanceNo = instance();
+  if (instanceNo != 0)
+    b = b->getInstance(instanceNo);
+  ndbassert(b != 0);
+  ndbassert(givenInstanceNo != ZNIL || b->getThreadId() == getThreadId());
 #ifdef VM_TRACE
   if(globalData.testOn){
     signal->header.theVerId_signalNumber = gsn;
-    signal->header.theReceiversBlockNumber = block;
+    signal->header.theReceiversBlockNumber = numberToBlock(block, instanceNo);
     signal->header.theSendersBlockRef = reference();
     globalSignalLoggers.executeDirect(signal->header,
 				      0,        // in
@@ -824,14 +894,6 @@ SimulatedBlock::EXECUTE_DIRECT(Uint32 bl
                                       globalData.ownId);
   }
 #endif
-  SimulatedBlock* b = globalData.getBlock(block);
-  /**
-   * In multithreaded NDB, blocks run in different threads, and EXECUTE_DIRECT
-   * (unlike sendSignal() is generally not thread-safe.
-   * So only allow EXECUTE_DIRECT between blocks that run in the same thread,
-   * unless caller explicitly marks it as being thread safe (eg NDBFS).
-   */
-  ndbassert(is_thread_safe || b->getThreadId() == getThreadId());
 #ifdef VM_TRACE_TIME
   Uint32 us1, us2;
   Uint64 ms1, ms2;
@@ -854,7 +916,7 @@ SimulatedBlock::EXECUTE_DIRECT(Uint32 bl
 #ifdef VM_TRACE
   if(globalData.testOn){
     signal->header.theVerId_signalNumber = gsn;
-    signal->header.theReceiversBlockNumber = block;
+    signal->header.theReceiversBlockNumber = numberToBlock(block, instanceNo);
     signal->header.theSendersBlockRef = reference();
     globalSignalLoggers.executeDirect(signal->header,
 				      1,        // out
@@ -875,6 +937,8 @@ SimulatedBlock::addTime(Uint32 gsn, Uint
 inline
 void
 SimulatedBlock::subTime(Uint32 gsn, Uint64 time){
+  // wl4391_todo got 0xf3f3f3f3 here on GSN_UPGRADE_PROTOCOL_ORD...
+  if (gsn < 0xffff)
   m_timeTrace[gsn].sub += time;
 }
 #endif

=== modified file 'storage/ndb/src/kernel/vm/TransporterCallback.cpp'
--- a/storage/ndb/src/kernel/vm/TransporterCallback.cpp	2008-07-01 12:35:34 +0000
+++ b/storage/ndb/src/kernel/vm/TransporterCallback.cpp	2008-07-23 16:00:25 +0000
@@ -191,7 +191,14 @@ TransporterCallbackKernel::deliver_signa
 
   const Uint32 secCount = header->m_noOfSections;
   const Uint32 length = header->theLength;
+
+  /*
+   * Strip instance bits if not multithreaded.  This is also
+   * done in versions prior to MT LQH, to simplify online upgrade.
+   */
+#ifndef NDBD_MULTITHREADED
   header->theReceiversBlockNumber &= NDBMT_BLOCK_MASK;
+#endif
 
 #ifdef TRACE_DISTRIBUTED
   ndbout_c("recv: %s(%d) from (%s, %d)",
@@ -232,10 +239,10 @@ TransporterCallbackKernel::deliver_signa
     globalScheduler.execute(header, prio, theData, secPtrI);  
 #else
     if (prio == JBB)
-      sendlocal(receiverThreadId, header->theReceiversBlockNumber,
+      sendlocal(receiverThreadId,
                 header, theData, secPtrI);
     else
-      sendprioa(receiverThreadId, header->theReceiversBlockNumber,
+      sendprioa(receiverThreadId,
                 header, theData, secPtrI);
 
 #endif
@@ -271,10 +278,10 @@ TransporterCallbackKernel::deliver_signa
   globalScheduler.execute(header, prio, theData, secPtrI);    
 #else
   if (prio == JBB)
-    sendlocal(receiverThreadId, header->theReceiversBlockNumber,
+    sendlocal(receiverThreadId,
               header, theData, NULL);
   else
-    sendprioa(receiverThreadId, header->theReceiversBlockNumber,
+    sendprioa(receiverThreadId,
               header, theData, NULL);
     
 #endif
@@ -383,7 +390,7 @@ TransporterCallbackKernel::reportError(N
 #else
   signal.header.theVerId_signalNumber = GSN_EVENT_REP;
   signal.header.theReceiversBlockNumber = CMVMI;
-  sendprioa(receiverThreadId, signal.header.theReceiversBlockNumber,
+  sendprioa(receiverThreadId,
             &signalT.header, signalT.theData, NULL);
 #endif
 
@@ -436,7 +443,7 @@ TransporterCallbackKernel::reportReceive
 #else
   signal.header.theVerId_signalNumber = GSN_EVENT_REP;
   signal.header.theReceiversBlockNumber = CMVMI;
-  sendprioa(receiverThreadId, signal.header.theReceiversBlockNumber,
+  sendprioa(receiverThreadId,
             &signalT.header, signalT.theData, NULL);
 #endif
 }
@@ -463,7 +470,7 @@ TransporterCallbackKernel::reportConnect
 #else
   signal.header.theVerId_signalNumber = GSN_CONNECT_REP;
   signal.header.theReceiversBlockNumber = CMVMI;
-  sendprioa(receiverThreadId, signal.header.theReceiversBlockNumber,
+  sendprioa(receiverThreadId,
             &signalT.header, signalT.theData, NULL);
 #endif
 }
@@ -495,7 +502,7 @@ TransporterCallbackKernel::reportDisconn
 #else
   signal.header.theVerId_signalNumber = GSN_DISCONNECT_REP;
   signal.header.theReceiversBlockNumber = CMVMI;
-  sendprioa(receiverThreadId, signal.header.theReceiversBlockNumber,
+  sendprioa(receiverThreadId,
             &signalT.header, signalT.theData, NULL);
 #endif
 

=== added file 'storage/ndb/src/kernel/vm/dummy_nonmt.cpp'
--- a/storage/ndb/src/kernel/vm/dummy_nonmt.cpp	1970-01-01 00:00:00 +0000
+++ b/storage/ndb/src/kernel/vm/dummy_nonmt.cpp	2008-07-23 16:00:25 +0000
@@ -0,0 +1,14 @@
+#include <assert.h>
+#include <ndb_types.h>
+
+void
+add_thr_map(Uint32, Uint32, Uint32)
+{
+  assert(false);
+}
+
+void
+add_worker_thr_map(Uint32, Uint32)
+{
+  assert(false);
+}

=== modified file 'storage/ndb/src/kernel/vm/mt.cpp'
--- a/storage/ndb/src/kernel/vm/mt.cpp	2008-05-30 06:33:02 +0000
+++ b/storage/ndb/src/kernel/vm/mt.cpp	2008-07-23 16:00:25 +0000
@@ -43,9 +43,14 @@ static const Uint32 MAX_SIGNALS_PER_JB =
 
 //#define NDB_MT_LOCK_TO_CPU
 
-static const Uint32 NUM_THREADS = 3;
-static const Uint32 RECEIVER_THREAD_NO = 2;
-#define MAX_THREADS 4
+#define MAX_BLOCK_INSTANCES (1 + MAX_NDBMT_WORKERS)
+#define NUM_MAIN_THREADS 2 // except receiver
+#define MAX_THREADS (NUM_MAIN_THREADS + MAX_NDBMT_THREADS + 1)
+
+static Uint32 ndbmt_workers = 0;
+static Uint32 ndbmt_threads = 0;
+static Uint32 num_threads = 0;
+static Uint32 receiver_thread_no = 0;
 
 #ifdef NDBMTD_X86
 static inline
@@ -444,8 +449,7 @@ struct thr_data
 
   thr_wait m_waiter;
   unsigned m_thr_no;
-  struct SimulatedBlock* m_blocks[NO_OF_BLOCKS];
-  
+
   Uint64 m_time;
   struct thr_tq m_tq;
 
@@ -487,9 +491,9 @@ struct thr_data
   struct thr_jb_read_state m_read_states[MAX_THREADS];
 
   /* Jam buffers for making trace files at crashes. */
-  EmulatedJamBuffer *m_jam;
+  EmulatedJamBuffer m_jam;
   /* Watchdog counter for this thread. */
-  Uint32 *m_watchdog_counter;
+  Uint32 m_watchdog_counter;
   /* Signal delivery statistics. */
   Uint32 m_prioa_count;
   Uint32 m_prioa_size;
@@ -699,7 +703,7 @@ struct trp_callback : public Transporter
   Uint32 total_bytes(NodeId node) const
   {
     Uint32 total = 0;
-    for (Uint32 i = 0; i < NUM_THREADS; i++)
+    for (Uint32 i = 0; i < num_threads; i++)
       total += m_thr_buffers[i]->m_buffers[node].used_bytes();
     return total;
   }
@@ -907,7 +911,7 @@ scan_queue(struct thr_data* selfptr, Uin
        * If they are frequent, we may want to optimize, as sending one prio A
        * signal is somewhat expensive compared to sending one prio B.
        */
-      sendprioa(thr_no, s->theReceiversBlockNumber, s, data,
+      sendprioa(thr_no, s, data,
                 data + s->theLength);
       * (page + pos) = free;
       free = idx;
@@ -1190,7 +1194,8 @@ thr_send_buf::updateWritePtr(NodeId node
 
 trp_callback::trp_callback()
 {
-  for (Uint32 i = 0; i < NUM_THREADS; i++)
+  // number of threads not yet set so use MAX_THREADS
+  for (Uint32 i = 0; i < MAX_THREADS; i++)
     m_thr_buffers[i] = new thr_send_buf(this, i, &g_thr_repository.m_free_list);
   for (int i = 0; i < MAX_NTRANSPORTERS; i++)
     m_send_thr[i] = ~(Uint32)0;
@@ -1211,7 +1216,7 @@ trp_callback::reportSendLen(NodeId nodeI
   signal.theData[2] = (bytes/count);
   signal.header.theVerId_signalNumber = GSN_EVENT_REP;
   signal.header.theReceiversBlockNumber = CMVMI;
-  sendprioa(m_send_thr[nodeId], signal.header.theReceiversBlockNumber,
+  sendprioa(m_send_thr[nodeId],
             &signalT.header, signalT.theData, NULL);
 }
 
@@ -1260,7 +1265,7 @@ trp_callback::get_bytes_to_send_iovec(No
   if (max_iovecs == 0)
     return 0;
 
-  for (Uint32 thr = 0; thr < NUM_THREADS && iovecs < max_iovecs; thr++)
+  for (Uint32 thr = 0; thr < num_threads && iovecs < max_iovecs; thr++)
   {
     thr_send_buf::send_buffer *b = m_thr_buffers[thr]->m_buffers + node;
     thr_send_buf::page *pg = b->m_current_page;
@@ -1345,7 +1350,7 @@ trp_callback::bytes_sent(NodeId node, co
           break;                                  // Found it
       }
 
-      curr_thr = (curr_thr + 1) % NUM_THREADS;
+      curr_thr = (curr_thr + 1) % num_threads;
 #ifdef VM_TRACE
       assert(curr_thr != start_thr);            // Sent data was not in buffer
 #endif
@@ -1378,7 +1383,7 @@ trp_callback::bytes_sent(NodeId node, co
 bool
 trp_callback::has_data_to_send(NodeId node)
 {
-  for (Uint32 thr = 0; thr < NUM_THREADS; thr++)
+  for (Uint32 thr = 0; thr < num_threads; thr++)
   {
     thr_send_buf::send_buffer *b = m_thr_buffers[thr]->m_buffers + node;
     thr_send_buf::page *pg = b->m_current_page;
@@ -1565,15 +1570,14 @@ inline
 void
 sendpacked(struct thr_data* selfptr, Signal* signal, Uint32 thr_no)
 {
-  SimulatedBlock** blockptr = selfptr->m_blocks - MIN_BLOCK_NO;
-  SimulatedBlock* b_lqh = * (blockptr + DBLQH);
-  SimulatedBlock* b_tc = * (blockptr + DBTC);
-  SimulatedBlock* b_tup = * (blockptr + DBTUP);
-  if (b_lqh)
+  SimulatedBlock* b_lqh = globalData.getBlock(DBLQH);
+  SimulatedBlock* b_tc = globalData.getBlock(DBTC);
+  SimulatedBlock* b_tup = globalData.getBlock(DBTUP);
+  if (thr_no == 1)
     b_lqh->executeFunction(GSN_SEND_PACKED, signal);
-  if (b_tc)
+  if (thr_no == 0)
     b_tc->executeFunction(GSN_SEND_PACKED, signal);
-  if (b_tup)
+  if (thr_no == 1)
     b_tup->executeFunction(GSN_SEND_PACKED, signal);
 }
 
@@ -1724,7 +1728,6 @@ execute_signals(thr_data *selfptr, thr_j
                       r->m_write_pos :
                       q->m_buffers[read_index]->m_len);
   thr_job_buffer *read_buffer = r->m_read_buffer;
-  SimulatedBlock** blockptr = selfptr->m_blocks - MIN_BLOCK_NO;
 
   while (num_signals < max_signals)
   {
@@ -1767,9 +1770,11 @@ execute_signals(thr_data *selfptr, thr_j
     Uint32 siglen = (sizeof(*s)>>2) + s->theLength;
     if(siglen>16)
       __builtin_prefetch (read_buffer->m_data + read_pos + 32, 0, 3);
-    Uint32 bno = s->theReceiversBlockNumber;
+    Uint32 bno = blockToMain(s->theReceiversBlockNumber);
+    Uint32 ino = blockToInstance(s->theReceiversBlockNumber);
     Uint32 gsn = s->theVerId_signalNumber;
-    SimulatedBlock * block = blockptr[bno];
+    SimulatedBlock * main = globalData.getBlock(bno);
+    SimulatedBlock * block = main->getInstance(ino);
     *watchDogCounter = 1;
     /* Must update original buffer so signal dump will see it. */
     s->theSignalId = (*signalIdCounter)++;
@@ -1787,6 +1792,16 @@ execute_signals(thr_data *selfptr, thr_j
     /* Update just before execute so signal dump can know how far we are. */
     r->m_read_pos = read_pos;
 
+#ifdef VM_TRACE
+    if (globalData.testOn) { //wl4391_todo segments
+      globalSignalLoggers.executeSignal(*s,
+                                        0,
+                                        &sig->theData[0], 
+                                        globalData.ownId);
+
+    }
+#endif
+
     block->executeFunction(gsn, sig);
 
     num_signals++;
@@ -1831,43 +1846,91 @@ run_job_buffers(thr_data *selfptr, Signa
   return signal_count;
 }
 
-static inline Uint32
-block2ThreadId(Uint32 block)
+struct thr_map_entry {
+  enum { NULL_THR_NO = 0xFFFF };
+  Uint32 thr_no;
+  SimulatedBlock* block;
+  thr_map_entry() : thr_no(NULL_THR_NO), block(0) {}
+};
+
+static struct thr_map_entry thr_map[NO_OF_BLOCKS][MAX_BLOCK_INSTANCES];
+
+void
+add_thr_map(Uint32 block, Uint32 instance, Uint32 thr_no)
 {
-  /**
-   * CMVMIis special, it runs in the receive thread due to calling directly
-   * into TransporterRegistry.
-   */
-  if (block == CMVMI)
-    return 2;
+  Uint32 index = block - MIN_BLOCK_NO;
+  assert(index < NO_OF_BLOCKS);
+  assert(instance < MAX_BLOCK_INSTANCES);
 
-  /*
-   * Block assignment:
-   *    0 BACKUP            LOCAL
-   *    1 DBTC      GLOBAL
-   *    2 DBDIH     GLOBAL
-   *    3 DBLQH             LOCAL
-   *    4 DBACC             LOCAL
-   *    5 DBTUP             LOCAL
-   *    6 DBDICT    GLOBAL
-   *    7 NDBCNTR   GLOBAL
-   *    8 QMGR      GLOBAL
-   *    9 NDBFS     GLOBAL
-   *   10 CMVMI                    [Receive thread]
-   *   11 TRIX      GLOBAL
-   *   12 DBUTIL    GLOBAL
-   *   13 SUMA              LOCAL
-   *   14 DBTUX             LOCAL
-   *   15 TSMAN             LOCAL
-   *   16 LGMAN             LOCAL
-   *   17 PGMAN             LOCAL
-   *   18 RESTORE           LOCAL
-   *
-   * Thread 0 is for global, thread 1 for locals.
-   */
-  static const Uint32 locals = 0x7e039;
+  SimulatedBlock* main = globalData.getBlock(block);
+  assert(main != 0);
+  SimulatedBlock* b = main->getInstance(instance);
+  assert(b != 0);
+
+  assert(thr_no < num_threads);
+  struct thr_repository* rep = &g_thr_repository;
+  thr_data* thr_ptr = rep->m_thread + thr_no;
+
+  b->assignToThread(thr_no, &thr_ptr->m_jam, &thr_ptr->m_watchdog_counter);
+
+  thr_map_entry& entry = thr_map[index][instance];
+  assert(entry.thr_no == thr_map_entry::NULL_THR_NO);
+  entry.thr_no = thr_no;
+  entry.block = b;
+}
+
+// static assignment of main instances before first signal
+static void
+add_main_thr_map()
+{
+  static int done = 0;
+  if (done++)
+    return;
+
+  static const Uint32 thr_GLOBAL = 0;
+  static const Uint32 thr_LOCAL = 1;
+  static const Uint32 thr_RECEIVER = receiver_thread_no;
+
+  add_thr_map(BACKUP, 0, thr_LOCAL);
+  add_thr_map(DBTC, 0, thr_GLOBAL);
+  add_thr_map(DBDIH, 0, thr_GLOBAL);
+  add_thr_map(DBLQH, 0, thr_LOCAL);
+  add_thr_map(DBACC, 0, thr_LOCAL);
+  add_thr_map(DBTUP, 0, thr_LOCAL);
+  add_thr_map(DBDICT, 0, thr_GLOBAL);
+  add_thr_map(NDBCNTR, 0, thr_GLOBAL);
+  add_thr_map(QMGR, 0, thr_GLOBAL);
+  add_thr_map(NDBFS, 0, thr_GLOBAL);
+  add_thr_map(CMVMI, 0, thr_RECEIVER);
+  add_thr_map(TRIX, 0, thr_GLOBAL);
+  add_thr_map(DBUTIL, 0, thr_GLOBAL);
+  add_thr_map(SUMA, 0, thr_LOCAL);
+  add_thr_map(DBTUX, 0, thr_LOCAL);
+  add_thr_map(TSMAN, 0, thr_LOCAL);
+  add_thr_map(LGMAN, 0, thr_LOCAL);
+  add_thr_map(PGMAN, 0, thr_LOCAL);
+  add_thr_map(RESTORE, 0, thr_LOCAL);
+}
+
+// workers added by LocalProxy
+void
+add_worker_thr_map(Uint32 block, Uint32 instance)
+{
+  assert(instance != 0);
+  Uint32 i = instance - 1;
+  Uint32 thr_no = NUM_MAIN_THREADS + i % ndbmt_threads;
+  add_thr_map(block, instance, thr_no);
+}
+
+static inline Uint32
+block2ThreadId(Uint32 block, Uint32 instance)
+{
   assert(block >= MIN_BLOCK_NO && block <= MAX_BLOCK_NO);
-  return (locals >> (block - MIN_BLOCK_NO)) & 1;
+  Uint32 index = block - MIN_BLOCK_NO;
+  assert(instance < MAX_BLOCK_INSTANCES);
+  const thr_map_entry& entry = thr_map[index][instance];
+  assert(entry.thr_no < num_threads);
+  return entry.thr_no;
 }
 
 static void reportSignalStats(Uint32 self, Uint32 a_count, Uint32 a_size,
@@ -1889,7 +1952,7 @@ static void reportSignalStats(Uint32 sel
   s->theData[4] = b_count;
   s->theData[5] = b_size;
   /* ToDo: need this really be prio A like in old code? */
-  sendlocal(self, s->header.theReceiversBlockNumber, &s->header, s->theData,
+  sendlocal(self, &s->header, s->theData,
             NULL);
 }
 
@@ -1911,17 +1974,15 @@ update_sched_stats(thr_data *selfptr)
 }
 
 static void
-init_thread(thr_data *selfptr, EmulatedJamBuffer *jam, Uint32 *watchDogCounter)
+init_thread(thr_data *selfptr)
 {
-  jam->theEmulatedJamIndex = 0;
-  jam->theEmulatedJamBlockNumber = 0;
-  memset(jam->theEmulatedJam, 0, sizeof(jam->theEmulatedJam));
-  NdbThread_SetTlsKey(NDB_THREAD_TLS_JAM, jam);
-  selfptr->m_jam = jam;
+  selfptr->m_jam.theEmulatedJamIndex = 0;
+  selfptr->m_jam.theEmulatedJamBlockNumber = 0;
+  memset(selfptr->m_jam.theEmulatedJam, 0, sizeof(selfptr->m_jam.theEmulatedJam));
+  NdbThread_SetTlsKey(NDB_THREAD_TLS_JAM, &selfptr->m_jam);
 
-  selfptr->m_watchdog_counter = watchDogCounter;
   unsigned thr_no = selfptr->m_thr_no;
-  globalEmulatorData.theWatchDog->registerWatchedThread(watchDogCounter,
+  globalEmulatorData.theWatchDog->registerWatchedThread(&selfptr->m_watchdog_counter,
                                                         thr_no);
 
   NdbThread_SetTlsKey(NDB_THREAD_TLS_THREAD, selfptr);
@@ -1941,22 +2002,6 @@ init_thread(thr_data *selfptr, EmulatedJ
 #endif
 }
 
-/* Assign to this thread all blocks that will run in this thread. */
-static void
-grab_threads(thr_data *selfptr, EmulatedJamBuffer *thread_jam,
-             Uint32 *watchDogCounter)
-{
-  for (Uint32 i = 0; i < NO_OF_BLOCKS; i++)
-  {
-    if (block2ThreadId(i + MIN_BLOCK_NO) == selfptr->m_thr_no)
-    {
-      require(selfptr->m_blocks[i] != 0);
-      selfptr->m_blocks[i]->assignToThread(selfptr->m_thr_no, thread_jam,
-                                           watchDogCounter);
-    }
-  }
-}
-
 /**
  * Align signal buffer for better cache performance.
  * Also skew it a litte for each thread to avoid cache pollution.
@@ -1998,13 +2043,12 @@ mt_receiver_thread_main(void *thr_arg)
   struct thr_data* selfptr = (struct thr_data *)thr_arg;
   unsigned thr_no = selfptr->m_thr_no;
   EmulatedJamBuffer thread_jam;
-  Uint32 watchDogCounter;
+  Uint32& watchDogCounter = selfptr->m_watchdog_counter;
   Uint32 thrSignalId = 0;
 
-  init_thread(selfptr, &thread_jam, &watchDogCounter);
+  init_thread(selfptr);
   receiverThreadId = thr_no;
   signal = aligned_signal(signal_buf, thr_no);
-  grab_threads(selfptr, &thread_jam, &watchDogCounter);
 
   while (globalData.theRestartFlag != perform_stop)
   { 
@@ -2062,19 +2106,17 @@ mt_job_thread_main(void *thr_arg)
   struct timespec nowait;
   nowait.tv_sec = 0;
   nowait.tv_nsec = 10 * 1000000;
-  EmulatedJamBuffer thread_jam;
-  Uint32 watchDogCounter;
   Uint32 thrSignalId = 0;
 
   struct thr_repository* rep = &g_thr_repository;
   struct thr_data* selfptr = (struct thr_data *)thr_arg;
-  init_thread(selfptr, &thread_jam, &watchDogCounter);
+  init_thread(selfptr);
+  EmulatedJamBuffer& thread_jam = selfptr->m_jam;
+  Uint32& watchDogCounter = selfptr->m_watchdog_counter;
 
   unsigned thr_no = selfptr->m_thr_no;
   signal = aligned_signal(signal_buf, thr_no);
 
-  grab_threads(selfptr, &thread_jam, &watchDogCounter);
-
   /* Avoid false watchdog alarms caused by race condition. */
   watchDogCounter = 1;
 
@@ -2090,6 +2132,7 @@ mt_job_thread_main(void *thr_arg)
                                  &watchDogCounter, &thrSignalId);
     
     watchDogCounter = 1;
+    signal->header.m_noOfSections = 0; /* valgrind */
     sendpacked(selfptr, signal, thr_no);
     
     if (sum)
@@ -2127,16 +2170,23 @@ mt_job_thread_main(void *thr_arg)
 }
 
 void
-sendlocal(Uint32 self, Uint32 block, const SignalHeader *s, const Uint32 *data,
+sendlocal(Uint32 self, const SignalHeader *s, const Uint32 *data,
           const Uint32 secPtr[3])
 {
+  Uint32 block = blockToMain(s->theReceiversBlockNumber);
+  Uint32 instance = blockToInstance(s->theReceiversBlockNumber);
+
+  // map on receiver side
+  if (instance != 0)
+    instance = 1 + (instance - 1) % ndbmt_workers;
+
   /*
    * Max number of signals to put into job buffer before flushing the buffer
    * to the other thread.
    * This parameter found to be reasonable by benchmarking.
    */
-  Uint32 MAX_SIGNALS_BEFORE_FLUSH = (self == RECEIVER_THREAD_NO ? 2 : 20);
-  Uint32 dst = block2ThreadId(block);
+  Uint32 MAX_SIGNALS_BEFORE_FLUSH = (self == receiver_thread_no ? 2 : 20);
+  Uint32 dst = block2ThreadId(block, instance);
   struct thr_repository* rep = &g_thr_repository;
   struct thr_data * selfptr = rep->m_thread + self;
 
@@ -2151,15 +2201,23 @@ sendlocal(Uint32 self, Uint32 block, con
     selfptr->m_next_buffer = seize_buffer(rep, self, false);
   }
 
-  if (w->m_pending_signals >= MAX_SIGNALS_BEFORE_FLUSH)
+  // wl4391_todo
+  if (true || w->m_pending_signals >= MAX_SIGNALS_BEFORE_FLUSH)
     flush_write_state(dst, q, w);
 }
 
 void
-sendprioa(Uint32 self, Uint32 block, const SignalHeader *s, const uint32 *data,
+sendprioa(Uint32 self, const SignalHeader *s, const uint32 *data,
           const Uint32 secPtr[3])
 {
-  Uint32 dst = block2ThreadId(block);
+  Uint32 block = blockToMain(s->theReceiversBlockNumber);
+  Uint32 instance = blockToInstance(s->theReceiversBlockNumber);
+
+  // map on receiver side
+  if (instance != 0)
+    instance = 1 + (instance - 1) % ndbmt_workers;
+
+  Uint32 dst = block2ThreadId(block, instance);
   struct thr_repository* rep = &g_thr_repository;
   struct thr_data * selfptr = rep->m_thread + self;
   struct thr_data *dstptr = rep->m_thread + dst;
@@ -2247,17 +2305,26 @@ sendprioa_STOP_FOR_CRASH(Uint32 dst)
   static thr_job_buffer dummy_buffer;
 
   /*
-   * Currently we have three threads with fixed block assignment.
+   * Before we had three main threads with fixed block assignment.
+   * Now there is also worker instances (we send to LQH instance).
    */
-  assert(dst == 0 || dst == 1 || dst == 2); // ToDo when/if more threads.
-  Uint32 bno = 0;
+  Uint32 main = 0;
+  Uint32 instance = 0;
   if (dst == 0)
-    bno = NDBCNTR;
+    main = NDBCNTR;
   else if (dst == 1)
-    bno = DBLQH;
-  else if (dst == 2)
-    bno = CMVMI;
-  assert(block2ThreadId(bno) == dst);
+    main = DBLQH;
+  else if (dst >= NUM_MAIN_THREADS && dst < NUM_MAIN_THREADS + ndbmt_threads)
+  {
+    main = DBLQH;
+    instance = dst - NUM_MAIN_THREADS + 1;
+  }
+  else if (dst == receiver_thread_no)
+    main = CMVMI;
+  else
+    assert(false);
+  Uint32 bno = numberToBlock(main, instance);
+  assert(block2ThreadId(main, instance) == dst);
   struct thr_data * dstptr = rep->m_thread + dst;
 
   memset(&signalT.header, 0, sizeof(SignalHeader));
@@ -2459,8 +2526,6 @@ thr_init(struct thr_repository* rep, str
 
   queue_init(&selfptr->m_tq);
 
-  selfptr->m_jam = NULL;
-
   selfptr->m_prioa_count = 0;
   selfptr->m_prioa_size = 0;
   selfptr->m_priob_count = 0;
@@ -2538,7 +2603,13 @@ ThreadConfig::~ThreadConfig()
 void
 ThreadConfig::init(EmulatorData *emulatorData)
 {
-  ::rep_init(&g_thr_repository, NUM_THREADS, emulatorData->m_mem_manager);
+  ndbmt_workers = globalData.ndbmtWorkers;
+  ndbmt_threads = globalData.ndbmtThreads;
+  num_threads = NUM_MAIN_THREADS + ndbmt_threads + 1;
+  assert(num_threads <= MAX_THREADS);
+  receiver_thread_no = num_threads - 1;
+
+  ::rep_init(&g_thr_repository, num_threads, emulatorData->m_mem_manager);
 }
 
 void ThreadConfig::ipControlLoop(Uint32 thread_index)
@@ -2546,25 +2617,19 @@ void ThreadConfig::ipControlLoop(Uint32 
   unsigned int i;
   unsigned int thr_no;
   struct thr_repository* rep = &g_thr_repository;
-  NdbThread *threads[NUM_THREADS];
+  NdbThread *threads[MAX_THREADS];
+
+  add_main_thr_map();
 
   /*
    * Start threads for all execution threads, except for the receiver
    * thread, which runs in the main thread.
    */
-  for (thr_no = 0; thr_no < NUM_THREADS; thr_no++)
+  for (thr_no = 0; thr_no < num_threads; thr_no++)
   {
-    for (i = 0; i<NO_OF_BLOCKS; i++)
-    {
-      if (block2ThreadId(i + MIN_BLOCK_NO) == thr_no)
-      {
-        rep->m_thread[thr_no].m_blocks[i] =
-          globalData.getBlock(i + MIN_BLOCK_NO);
-      }
-    }
     rep->m_thread[thr_no].m_time = NdbTick_CurrentMillisecond();
 
-    if (thr_no == RECEIVER_THREAD_NO)
+    if (thr_no == receiver_thread_no)
       continue;                 // Will run in the main thread.
     /*
      * The NdbThread_Create() takes void **, but that is cast to void * when
@@ -2579,12 +2644,12 @@ void ThreadConfig::ipControlLoop(Uint32 
   }
 
   /* Now run the main loop for thread 0 directly. */
-  mt_receiver_thread_main(&(rep->m_thread[RECEIVER_THREAD_NO]));
+  mt_receiver_thread_main(&(rep->m_thread[receiver_thread_no]));
 
   /* Wait for all threads to shutdown. */
-  for (thr_no = 0; thr_no < NUM_THREADS; thr_no++)
+  for (thr_no = 0; thr_no < num_threads; thr_no++)
   {
-    if (thr_no == RECEIVER_THREAD_NO)
+    if (thr_no == receiver_thread_no)
       continue;
     void *dummy_return_status;
     NdbThread_WaitFor(threads[thr_no], &dummy_return_status);
@@ -2595,6 +2660,8 @@ void ThreadConfig::ipControlLoop(Uint32 
 int
 ThreadConfig::doStart(NodeState::StartLevel startLevel)
 {
+  add_main_thr_map();
+
   SignalT<3> signalT;
   memset(&signalT.header, 0, sizeof(SignalHeader));
   
@@ -2608,7 +2675,7 @@ ThreadConfig::doStart(NodeState::StartLe
   StartOrd * const  startOrd = (StartOrd *)&signalT.theData[0];
   startOrd->restartInfo = 0;
   
-  senddelay(block2ThreadId(CMVMI), &signalT.header, 1);
+  senddelay(block2ThreadId(CMVMI, 0), &signalT.header, 1);
   return 0;
 }
 
@@ -2643,7 +2710,7 @@ Uint32
 FastScheduler::traceDumpGetNumThreads()
 {
   /* The last thread is only for receiver -> no trace file. */
-  return NUM_THREADS;
+  return num_threads;
 }
 
 bool
@@ -2651,7 +2718,7 @@ FastScheduler::traceDumpGetJam(Uint32 th
                                const Uint32 * & thrdTheEmulatedJam,
                                Uint32 & thrdTheEmulatedJamIndex)
 {
-  if (thr_no >= NUM_THREADS)
+  if (thr_no >= num_threads)
     return false;
 
 #ifdef NO_EMULATED_JAM
@@ -2659,7 +2726,7 @@ FastScheduler::traceDumpGetJam(Uint32 th
   thrdTheEmulatedJam = NULL;
   thrdTheEmulatedJamIndex = 0;
 #else
-  const EmulatedJamBuffer *jamBuffer = g_thr_repository.m_thread[thr_no].m_jam;
+  const EmulatedJamBuffer *jamBuffer = &g_thr_repository.m_thread[thr_no].m_jam;
   thrdTheEmulatedJam = jamBuffer->theEmulatedJam;
   thrdTheEmulatedJamIndex = jamBuffer->theEmulatedJamIndex;
   jamBlockNumber = jamBuffer->theEmulatedJamBlockNumber;
@@ -2695,7 +2762,7 @@ FastScheduler::traceDumpPrepare()
   pthread_mutex_lock(&g_thr_repository.stop_for_crash_mutex);
   g_thr_repository.stopped_threads = 0;
 
-  for (Uint32 thr_no = 0; thr_no < NUM_THREADS; thr_no++)
+  for (Uint32 thr_no = 0; thr_no < num_threads; thr_no++)
   {
     if (selfptr != NULL && selfptr->m_thr_no == thr_no)
     {
@@ -2753,7 +2820,7 @@ void
 FastScheduler::dumpSignalMemory(Uint32 thr_no, FILE* out)
 {
   void *value= NdbThread_GetTlsKey(NDB_THREAD_TLS_THREAD);
-  const thr_data *selfptr = reinterpret_cast<const thr_data *>(value);
+  thr_data *selfptr = reinterpret_cast<thr_data *>(value);
   const thr_repository *rep = &g_thr_repository;
   /*
    * The selfptr might be NULL, or pointer to thread that is doing the crash
@@ -2762,7 +2829,7 @@ FastScheduler::dumpSignalMemory(Uint32 t
    */
   Uint32 *watchDogCounter;
   if (selfptr)
-    watchDogCounter = selfptr->m_watchdog_counter;
+    watchDogCounter = &selfptr->m_watchdog_counter;
   else
     watchDogCounter = NULL;
 

=== modified file 'storage/ndb/src/kernel/vm/mt.hpp'
--- a/storage/ndb/src/kernel/vm/mt.hpp	2008-05-29 15:06:11 +0000
+++ b/storage/ndb/src/kernel/vm/mt.hpp	2008-07-23 16:00:25 +0000
@@ -14,9 +14,13 @@
 */
 extern Uint32 receiverThreadId;
 
-void sendlocal(Uint32 self, Uint32 block, const struct SignalHeader *s,
+/* Assign block instance to thread */
+void add_thr_map(Uint32 block, Uint32 instance, Uint32 thr_no);
+void add_worker_thr_map(Uint32 block, Uint32 instance);
+
+void sendlocal(Uint32 self, const struct SignalHeader *s,
                const Uint32 *data, const Uint32 secPtr[3]);
-void sendprioa(Uint32 self, Uint32 block, const struct SignalHeader *s,
+void sendprioa(Uint32 self, const struct SignalHeader *s,
                const Uint32 *data, const Uint32 secPtr[3]);
 void senddelay(Uint32 thr_no, const struct SignalHeader*, Uint32 delay);
 void mt_execSTOP_FOR_CRASH();

Thread
bzr commit into mysql-5.1-telco-6.4 branch (pekka:2677) WL#4391Pekka Nousiainen23 Jul