List:Commits« Previous MessageNext Message »
From:knielsen Date:September 24 2007 3:12pm
Subject:bk commit into 5.1 tree (knielsen:1.2610)
View as plain text  
Below is the list of changes that have just been committed into a local
5.1 repository of knielsen. When knielsen does a push these changes will
be propagated to the main repository and, within 24 hours after the
push, to the public repository.
For information on how to access the public repository
see http://dev.mysql.com/doc/mysql/en/installing-source-tree.html

ChangeSet@stripped, 2007-09-24 17:12:21+02:00, knielsen@ymer.(none) +41 -0
  ndbmtd import 1

  configure.in@stripped, 2007-09-24 17:12:17+02:00, knielsen@ymer.(none) +15 -14
    ndbmtd import 1

  storage/ndb/include/portlib/NdbThread.h@stripped, 2007-09-24 17:12:17+02:00, knielsen@ymer.(none) +13 -9
    ndbmtd import 1

  storage/ndb/src/common/portlib/NdbThread.c@stripped, 2007-09-24 17:12:17+02:00, knielsen@ymer.(none) +17 -0
    ndbmtd import 1

  storage/ndb/src/kernel/Makefile.am@stripped, 2007-09-24 17:12:17+02:00, knielsen@ymer.(none) +6 -2
    ndbmtd import 1

  storage/ndb/src/kernel/blocks/backup/Backup.cpp@stripped, 2007-09-24 17:12:17+02:00, knielsen@ymer.(none) +2 -2
    ndbmtd import 1

  storage/ndb/src/kernel/blocks/backup/Backup.hpp@stripped, 2007-09-24 17:12:17+02:00, knielsen@ymer.(none) +3 -0
    ndbmtd import 1

  storage/ndb/src/kernel/blocks/cmvmi/Cmvmi.cpp@stripped, 2007-09-24 17:12:17+02:00, knielsen@ymer.(none) +2 -5
    ndbmtd import 1

  storage/ndb/src/kernel/blocks/dbdict/Dbdict.cpp@stripped, 2007-09-24 17:12:17+02:00, knielsen@ymer.(none) +15 -13
    ndbmtd import 1

  storage/ndb/src/kernel/blocks/dbdih/DbdihMain.cpp@stripped, 2007-09-24 17:12:17+02:00, knielsen@ymer.(none) +2 -2
    ndbmtd import 1

  storage/ndb/src/kernel/blocks/dblqh/DblqhInit.cpp@stripped, 2007-09-24 17:12:17+02:00, knielsen@ymer.(none) +1 -0
    ndbmtd import 1

  storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp@stripped, 2007-09-24 17:12:17+02:00, knielsen@ymer.(none) +1 -1
    ndbmtd import 1

  storage/ndb/src/kernel/blocks/dbtc/DbtcInit.cpp@stripped, 2007-09-24 17:12:17+02:00, knielsen@ymer.(none) +1 -0
    ndbmtd import 1

  storage/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp@stripped, 2007-09-24 17:12:17+02:00, knielsen@ymer.(none) +2 -2
    ndbmtd import 1

  storage/ndb/src/kernel/blocks/dbtup/Dbtup.hpp@stripped, 2007-09-24 17:12:17+02:00, knielsen@ymer.(none) +1 -0
    ndbmtd import 1

  storage/ndb/src/kernel/blocks/dbtup/DbtupExecQuery.cpp@stripped, 2007-09-24 17:12:17+02:00, knielsen@ymer.(none) +1 -1
    ndbmtd import 1

  storage/ndb/src/kernel/blocks/dbtup/DbtupGen.cpp@stripped, 2007-09-24 17:12:17+02:00, knielsen@ymer.(none) +1 -0
    ndbmtd import 1

  storage/ndb/src/kernel/blocks/dbutil/DbUtil.cpp@stripped, 2007-09-24 17:12:17+02:00, knielsen@ymer.(none) +1 -1
    ndbmtd import 1

  storage/ndb/src/kernel/blocks/ndbcntr/Ndbcntr.hpp@stripped, 2007-09-24 17:12:17+02:00, knielsen@ymer.(none) +2 -0
    ndbmtd import 1

  storage/ndb/src/kernel/blocks/ndbcntr/NdbcntrMain.cpp@stripped, 2007-09-24 17:12:17+02:00, knielsen@ymer.(none) +1 -1
    ndbmtd import 1

  storage/ndb/src/kernel/blocks/suma/Suma.cpp@stripped, 2007-09-24 17:12:17+02:00, knielsen@ymer.(none) +22 -23
    ndbmtd import 1

  storage/ndb/src/kernel/blocks/suma/Suma.hpp@stripped, 2007-09-24 17:12:17+02:00, knielsen@ymer.(none) +3 -0
    ndbmtd import 1

  storage/ndb/src/kernel/error/ErrorReporter.cpp@stripped, 2007-09-24 17:12:17+02:00, knielsen@ymer.(none) +41 -17
    ndbmtd import 1

  storage/ndb/src/kernel/main.cpp@stripped, 2007-09-24 17:12:17+02:00, knielsen@ymer.(none) +1 -0
    ndbmtd import 1

  storage/ndb/src/kernel/vm/Emulator.cpp@stripped, 2007-09-24 17:12:18+02:00, knielsen@ymer.(none) +12 -3
    ndbmtd import 1

  storage/ndb/src/kernel/vm/Emulator.hpp@stripped, 2007-09-24 17:12:18+02:00, knielsen@ymer.(none) +9 -8
    ndbmtd import 1

  storage/ndb/src/kernel/vm/GlobalData.hpp@stripped, 2007-09-24 17:12:18+02:00, knielsen@ymer.(none) +2 -2
    ndbmtd import 1

  storage/ndb/src/kernel/vm/Makefile.am@stripped, 2007-09-24 17:12:18+02:00, knielsen@ymer.(none) +17 -9
    ndbmtd import 1

  storage/ndb/src/kernel/vm/SimulatedBlock.cpp@stripped, 2007-09-24 17:12:18+02:00, knielsen@ymer.(none) +103 -1
    ndbmtd import 1

  storage/ndb/src/kernel/vm/SimulatedBlock.hpp@stripped, 2007-09-24 17:12:18+02:00, knielsen@ymer.(none) +29 -1
    ndbmtd import 1

  storage/ndb/src/kernel/vm/SimulatedBlock_mt.cpp@stripped, 2007-09-24 17:12:18+02:00, knielsen@ymer.(none) +19 -0
    New BitKeeper file ``storage/ndb/src/kernel/vm/SimulatedBlock_mt.cpp''

  storage/ndb/src/kernel/vm/SimulatedBlock_mt.cpp@stripped, 2007-09-24 17:12:18+02:00, knielsen@ymer.(none) +0 -0

  storage/ndb/src/kernel/vm/SimulatedBlock_nonmt.cpp@stripped, 2007-09-24 17:12:18+02:00, knielsen@ymer.(none) +18 -0
    New BitKeeper file ``storage/ndb/src/kernel/vm/SimulatedBlock_nonmt.cpp''

  storage/ndb/src/kernel/vm/SimulatedBlock_nonmt.cpp@stripped, 2007-09-24 17:12:18+02:00, knielsen@ymer.(none) +0 -0

  storage/ndb/src/kernel/vm/ThreadConfig.cpp@stripped, 2007-09-24 17:12:18+02:00, knielsen@ymer.(none) +7 -0
    ndbmtd import 1

  storage/ndb/src/kernel/vm/TransporterCallback.cpp@stripped, 2007-09-24 17:12:18+02:00, knielsen@ymer.(none) +108 -2
    ndbmtd import 1

  storage/ndb/src/kernel/vm/TransporterCallback_mt.cpp@stripped, 2007-09-24 17:12:18+02:00, knielsen@ymer.(none) +19 -0
    New BitKeeper file ``storage/ndb/src/kernel/vm/TransporterCallback_mt.cpp''

  storage/ndb/src/kernel/vm/TransporterCallback_mt.cpp@stripped, 2007-09-24 17:12:18+02:00, knielsen@ymer.(none) +0 -0

  storage/ndb/src/kernel/vm/TransporterCallback_nonmt.cpp@stripped, 2007-09-24 17:12:18+02:00, knielsen@ymer.(none) +18 -0
    New BitKeeper file ``storage/ndb/src/kernel/vm/TransporterCallback_nonmt.cpp''

  storage/ndb/src/kernel/vm/TransporterCallback_nonmt.cpp@stripped, 2007-09-24 17:12:18+02:00, knielsen@ymer.(none) +0 -0

  storage/ndb/src/kernel/vm/WatchDog.cpp@stripped, 2007-09-24 17:12:18+02:00, knielsen@ymer.(none) +111 -28
    ndbmtd import 1

  storage/ndb/src/kernel/vm/WatchDog.hpp@stripped, 2007-09-24 17:12:18+02:00, knielsen@ymer.(none) +41 -3
    ndbmtd import 1

  storage/ndb/src/kernel/vm/mt/dummy_mt.cpp@stripped, 2007-09-24 17:12:18+02:00, knielsen@ymer.(none) +83 -0
    New BitKeeper file ``storage/ndb/src/kernel/vm/mt/dummy_mt.cpp''

  storage/ndb/src/kernel/vm/mt/dummy_mt.cpp@stripped, 2007-09-24 17:12:18+02:00, knielsen@ymer.(none) +0 -0

  storage/ndb/src/kernel/vm/mt/mt.cpp@stripped, 2007-09-24 17:12:18+02:00, knielsen@ymer.(none) +1202 -0
    New BitKeeper file ``storage/ndb/src/kernel/vm/mt/mt.cpp''

  storage/ndb/src/kernel/vm/mt/mt.cpp@stripped, 2007-09-24 17:12:18+02:00, knielsen@ymer.(none) +0 -0

  storage/ndb/src/kernel/vm/mt/mt.hpp@stripped, 2007-09-24 17:12:18+02:00, knielsen@ymer.(none) +30 -0
    New BitKeeper file ``storage/ndb/src/kernel/vm/mt/mt.hpp''

  storage/ndb/src/kernel/vm/mt/mt.hpp@stripped, 2007-09-24 17:12:18+02:00, knielsen@ymer.(none) +0 -0

  storage/ndb/src/kernel/vm/pc.hpp@stripped, 2007-09-24 17:12:18+02:00, knielsen@ymer.(none) +33 -45
    ndbmtd import 1

diff -Nrup a/configure.in b/configure.in
--- a/configure.in	2007-09-14 11:58:44 +02:00
+++ b/configure.in	2007-09-24 17:12:17 +02:00
@@ -884,6 +884,21 @@ then
   AC_CHECK_FUNC(gtty, , AC_CHECK_LIB(compat, gtty))
 fi
 
+#
+#
+#
+case "$target" in
+ *-*-aix4* | *-*-sco*)
+	# (grr) aix 4.3 has a stub for clock_gettime, (returning ENOSYS)
+	# and using AC_TRY_RUN is hard when cross-compiling
+	# We also disable for SCO for the time being, the headers for the
+	# thread library we use conflicts with other headers.
+    ;;
+ *) AC_CHECK_LIB(rt, clock_gettime)
+    AC_CHECK_FUNCS(clock_gettime)
+    ;;
+esac
+
 # We make a special variable for non-threaded version of LIBS to avoid
 # including thread libs into non-threaded version of MySQL client library.
 # Later in this script LIBS will be augmented with a threads library.
@@ -1942,20 +1957,6 @@ AC_CHECK_FUNCS(alarm bcmp bfill bmove bs
   snprintf socket stpcpy strcasecmp strerror strsignal strnlen strpbrk strstr \
   strtol strtoll strtoul strtoull tell tempnam thr_setconcurrency vidattr \
   posix_fallocate)
-
-#
-#
-#
-case "$target" in
- *-*-aix4* | *-*-sco*)
-	# (grr) aix 4.3 has a stub for clock_gettime, (returning ENOSYS)
-	# and using AC_TRY_RUN is hard when cross-compiling
-	# We also disable for SCO for the time being, the headers for the
-	# thread library we use conflicts with other headers.
-    ;;
- *) AC_CHECK_FUNCS(clock_gettime)
-    ;;
-esac
 
 # Check that isinf() is available in math.h and can be used in both C and C++ 
 # code
diff -Nrup a/storage/ndb/include/portlib/NdbThread.h b/storage/ndb/include/portlib/NdbThread.h
--- a/storage/ndb/include/portlib/NdbThread.h	2006-12-23 20:20:08 +01:00
+++ b/storage/ndb/include/portlib/NdbThread.h	2007-09-24 17:12:17 +02:00
@@ -30,6 +30,11 @@ typedef enum NDB_THREAD_PRIO_ENUM {
   NDB_THREAD_PRIO_LOWEST
 } NDB_THREAD_PRIO;
 
+typedef enum NDB_THREAD_TLS_ENUM {
+  NDB_THREAD_TLS_JAM,           /* Jam buffer pointer. */
+  NDB_THREAD_TLS_MAX
+} NDB_THREAD_TLS;
+
 typedef void* (NDB_THREAD_FUNC)(void*);
 typedef void* NDB_THREAD_ARG;
 typedef size_t NDB_THREAD_STACKSIZE;
@@ -92,18 +97,17 @@ void NdbThread_Exit(void *status);
  */
 int NdbThread_SetConcurrencyLevel(int level);
 
+/**
+ * Fetch and set thread-local storage entry.
+ */
+void *NdbThread_GetTlsKey(NDB_THREAD_TLS key);
+void NdbThread_SetTlsKey(NDB_THREAD_TLS key, void *value);
+
+void NdbThread_Init();
+
 
 #ifdef	__cplusplus
 }
 #endif
 
 #endif
-
-
-
-
-
-
-
-
-
diff -Nrup a/storage/ndb/src/common/portlib/NdbThread.c b/storage/ndb/src/common/portlib/NdbThread.c
--- a/storage/ndb/src/common/portlib/NdbThread.c	2006-12-23 20:20:12 +01:00
+++ b/storage/ndb/src/common/portlib/NdbThread.c	2007-09-24 17:12:17 +02:00
@@ -190,3 +190,20 @@ int NdbThread_SetConcurrencyLevel(int le
   return 0;
 #endif
 }
+
+static pthread_key_t tls_keys[NDB_THREAD_TLS_MAX];
+
+void NdbThread_Init()
+{
+  pthread_key_create(&(tls_keys[NDB_THREAD_TLS_JAM]), NULL);
+}
+
+void *NdbThread_GetTlsKey(NDB_THREAD_TLS key)
+{
+  return pthread_getspecific(tls_keys[key]);
+}
+
+void NdbThread_SetTlsKey(NDB_THREAD_TLS key, void *value)
+{
+  pthread_setspecific(tls_keys[key], value);
+}
diff -Nrup a/storage/ndb/src/kernel/Makefile.am b/storage/ndb/src/kernel/Makefile.am
--- a/storage/ndb/src/kernel/Makefile.am	2006-12-31 01:26:56 +01:00
+++ b/storage/ndb/src/kernel/Makefile.am	2007-09-24 17:12:17 +02:00
@@ -17,9 +17,10 @@ SUBDIRS = vm error blocks
 
 include $(top_srcdir)/storage/ndb/config/common.mk.am
 
-ndbbin_PROGRAMS = ndbd
+ndbbin_PROGRAMS = ndbd ndbmtd
 
 ndbd_SOURCES = main.cpp SimBlockList.cpp
+ndbmtd_SOURCES = main.cpp SimBlockList.cpp
 
 include $(top_srcdir)/storage/ndb/config/type_kernel.mk.am
 
@@ -41,7 +42,7 @@ INCLUDES += \
 	-I$(srcdir)/blocks/dbtux \
 	-I$(srcdir)/blocks
 
-LDADD +=  \
+LIBS= \
               blocks/libblocks.a \
               vm/libkernel.a	\
               error/liberror.a \
@@ -56,6 +57,9 @@ LDADD +=  \
          $(top_builddir)/dbug/libdbug.a \
          $(top_builddir)/mysys/libmysys.a \
          $(top_builddir)/strings/libmystrings.a @NDB_SCI_LIBS@
+
+ndbd_LDADD = $(LIBS) vm/libsched.a
+ndbmtd_LDADD = $(LIBS) vm/libsched_mt.a
 
 windoze-dsp: ndbd.dsp
 
diff -Nrup a/storage/ndb/src/kernel/blocks/backup/Backup.cpp b/storage/ndb/src/kernel/blocks/backup/Backup.cpp
--- a/storage/ndb/src/kernel/blocks/backup/Backup.cpp	2007-09-24 17:09:25 +02:00
+++ b/storage/ndb/src/kernel/blocks/backup/Backup.cpp	2007-09-24 17:12:17 +02:00
@@ -576,10 +576,10 @@ static Uint32 xps(Uint64 x, Uint64 ms)
   float fs = ms;
   
   if(ms == 0 || x == 0) {
-    jam();
+    jamNoBlock();
     return 0;
   }//if
-  jam();
+  jamNoBlock();
   return ((Uint32)(1000.0f * (fx + fs/2.1f))) / ((Uint32)fs);
 }
 
diff -Nrup a/storage/ndb/src/kernel/blocks/backup/Backup.hpp b/storage/ndb/src/kernel/blocks/backup/Backup.hpp
--- a/storage/ndb/src/kernel/blocks/backup/Backup.hpp	2007-06-09 07:25:43 +02:00
+++ b/storage/ndb/src/kernel/blocks/backup/Backup.hpp	2007-09-24 17:12:17 +02:00
@@ -312,6 +312,7 @@ public:
 
     Backup & backup;
     BlockNumber number() const { return backup.number(); }
+    EmulatedJamBuffer *jamBuffer() const { return backup.jamBuffer(); }
     void progError(int line, int cause, const char * extra) { 
       backup.progError(line, cause, extra); 
     }
@@ -404,6 +405,7 @@ public:
     void forceState(State s);
     
     BlockNumber number() const { return backup.number(); }
+    EmulatedJamBuffer *jamBuffer() const { return backup.jamBuffer(); }
     void progError(int line, int cause, const char * extra) { 
       backup.progError(line, cause, extra); 
     }
@@ -532,6 +534,7 @@ public:
 
     Backup & backup;
     BlockNumber number() const { return backup.number(); }
+    EmulatedJamBuffer *jamBuffer() const { return backup.jamBuffer(); }
     void progError(int line, int cause, const char * extra) { 
       backup.progError(line, cause, extra); 
     }
diff -Nrup a/storage/ndb/src/kernel/blocks/cmvmi/Cmvmi.cpp b/storage/ndb/src/kernel/blocks/cmvmi/Cmvmi.cpp
--- a/storage/ndb/src/kernel/blocks/cmvmi/Cmvmi.cpp	2007-09-24 17:09:25 +02:00
+++ b/storage/ndb/src/kernel/blocks/cmvmi/Cmvmi.cpp	2007-09-24 17:12:17 +02:00
@@ -808,7 +808,7 @@ Cmvmi::execSTART_ORD(Signal* signal) {
   }
 
   if(globalData.theRestartFlag == system_started){
-    jam()
+    jam();
     /**
      * START_ORD received when already started(ignored)
      */
@@ -817,7 +817,7 @@ Cmvmi::execSTART_ORD(Signal* signal) {
   }
   
   if(globalData.theRestartFlag == perform_stop){
-    jam()
+    jam();
     /**
      * START_ORD received when stopping(ignored)
      */
@@ -866,9 +866,6 @@ Cmvmi::execSTART_ORD(Signal* signal) {
      *
      * Do Restart
      */
-
-    globalScheduler.clear();
-    globalTimeQueue.clear();
     
     // Disconnect all nodes as part of the system restart. 
     // We need to ensure that we are starting up
diff -Nrup a/storage/ndb/src/kernel/blocks/dbdict/Dbdict.cpp b/storage/ndb/src/kernel/blocks/dbdict/Dbdict.cpp
--- a/storage/ndb/src/kernel/blocks/dbdict/Dbdict.cpp	2007-09-24 17:09:25 +02:00
+++ b/storage/ndb/src/kernel/blocks/dbdict/Dbdict.cpp	2007-09-24 17:12:17 +02:00
@@ -5710,14 +5710,14 @@ Dbdict::createTab_dih(Signal* signal, 
 static
 void
 calcLHbits(Uint32 * lhPageBits, Uint32 * lhDistrBits, 
-	   Uint32 fid, Uint32 totalFragments) 
+	   Uint32 fid, Uint32 totalFragments, const Dbdict *dict) 
 {
   Uint32 distrBits = 0;
   Uint32 pageBits = 0;
   
   Uint32 tmp = 1;
   while (tmp < totalFragments) {
-    jam();
+    jamBlock(dict);
     tmp <<= 1;
     distrBits++;
   }//while
@@ -5772,7 +5772,7 @@ Dbdict::execADD_FRAGREQ(Signal* signal) 
    */
   Uint32 lhDistrBits = 0;
   Uint32 lhPageBits = 0;
-  ::calcLHbits(&lhPageBits, &lhDistrBits, fragId, fragCount);
+  ::calcLHbits(&lhPageBits, &lhDistrBits, fragId, fragCount, this);
 
   Uint64 maxRows = tabPtr.p->maxRowsLow +
     (((Uint64)tabPtr.p->maxRowsHigh) << 32);
@@ -9663,42 +9663,43 @@ Dbdict::createEventComplete_RT_USER_CREA
  */
 
 void interpretUtilPrepareErrorCode(UtilPrepareRef::ErrorCode errorCode,
-				   Uint32& error, Uint32& line)
+				   Uint32& error, Uint32& line,
+                                   const Dbdict *dict)
 {
   DBUG_ENTER("interpretUtilPrepareErrorCode");
   switch (errorCode) {
   case UtilPrepareRef::NO_ERROR:
-    jam();
+    jamBlock(dict);
     error = 1;
     line = __LINE__;
     DBUG_VOID_RETURN;
   case UtilPrepareRef::PREPARE_SEIZE_ERROR:
-    jam();
+    jamBlock(dict);
     error = 748;
     line = __LINE__;
     DBUG_VOID_RETURN;
   case UtilPrepareRef::PREPARE_PAGES_SEIZE_ERROR:
-    jam();
+    jamBlock(dict);
     error = 1;
     line = __LINE__;
     DBUG_VOID_RETURN;
   case UtilPrepareRef::PREPARED_OPERATION_SEIZE_ERROR:
-    jam();
+    jamBlock(dict);
     error = 1;
     line = __LINE__;
     DBUG_VOID_RETURN;
   case UtilPrepareRef::DICT_TAB_INFO_ERROR:
-    jam();
+    jamBlock(dict);
     error = 1;
     line = __LINE__;
     DBUG_VOID_RETURN;
   case UtilPrepareRef::MISSING_PROPERTIES_SECTION:
-    jam();
+    jamBlock(dict);
     error = 1;
     line = __LINE__;
     DBUG_VOID_RETURN;
   default:
-    jam();
+    jamBlock(dict);
     error = 1;
     line = __LINE__;
     DBUG_VOID_RETURN;
@@ -9774,7 +9775,7 @@ Dbdict::createEventUTIL_PREPARE(Signal* 
     ndbrequire((evntRecPtr.p = c_opCreateEvent.getPtr(evntRecPtr.i)) != NULL);
 
     interpretUtilPrepareErrorCode(errorCode, evntRecPtr.p->m_errorCode,
-				  evntRecPtr.p->m_errorLine);
+				  evntRecPtr.p->m_errorLine, this);
     evntRecPtr.p->m_errorNode = reference();
 
     createEvent_sendReply(signal, evntRecPtr);
@@ -11264,7 +11265,8 @@ Dbdict::dropEventUtilPrepareRef(Signal* 
   ndbrequire((evntRecPtr.p = c_opDropEvent.getPtr(evntRecPtr.i)) != NULL);
 
   interpretUtilPrepareErrorCode((UtilPrepareRef::ErrorCode)ref->getErrorCode(),
-				evntRecPtr.p->m_errorCode, evntRecPtr.p->m_errorLine);
+				evntRecPtr.p->m_errorCode,
+                                evntRecPtr.p->m_errorLine, this);
   evntRecPtr.p->m_errorNode = reference();
 
   dropEvent_sendReply(signal, evntRecPtr);
diff -Nrup a/storage/ndb/src/kernel/blocks/dbdih/DbdihMain.cpp b/storage/ndb/src/kernel/blocks/dbdih/DbdihMain.cpp
--- a/storage/ndb/src/kernel/blocks/dbdih/DbdihMain.cpp	2007-09-24 17:09:26 +02:00
+++ b/storage/ndb/src/kernel/blocks/dbdih/DbdihMain.cpp	2007-09-24 17:12:17 +02:00
@@ -16041,8 +16041,8 @@ void Dbdih::checkWaitGCPMaster(Signal* s
     
     c_waitGCPMasterList.next(ptr);
     if (nodeId == failedNodeId) {
-      jam()     
-	c_waitGCPMasterList.release(i);
+      jam();
+      c_waitGCPMasterList.release(i);
     }//if
   }//while
 }//Dbdih::checkWaitGCPMaster()
diff -Nrup a/storage/ndb/src/kernel/blocks/dblqh/DblqhInit.cpp b/storage/ndb/src/kernel/blocks/dblqh/DblqhInit.cpp
--- a/storage/ndb/src/kernel/blocks/dblqh/DblqhInit.cpp	2007-06-05 18:15:19 +02:00
+++ b/storage/ndb/src/kernel/blocks/dblqh/DblqhInit.cpp	2007-09-24 17:12:17 +02:00
@@ -60,6 +60,7 @@ void Dblqh::initData() 
   cLqhTimeOutCount = 0;
   cLqhTimeOutCheckCount = 0;
   cbookedAccOps = 0;
+  cpackedListIndex = 0;
   m_backup_ptr = RNIL;
   clogFileSize = 16;
   cmaxLogFilesInPageZero = 40;
diff -Nrup a/storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp b/storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp
--- a/storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp	2007-09-12 13:35:57 +02:00
+++ b/storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp	2007-09-24 17:12:17 +02:00
@@ -12756,7 +12756,7 @@ void Dblqh::execFSREADREF(Signal* signal
     jam();
     break;
   case LogFileOperationRecord::READ_SR_INVALIDATE_PAGES:
-    jam()
+    jam();
     break;
   default:
     jam();
diff -Nrup a/storage/ndb/src/kernel/blocks/dbtc/DbtcInit.cpp b/storage/ndb/src/kernel/blocks/dbtc/DbtcInit.cpp
--- a/storage/ndb/src/kernel/blocks/dbtc/DbtcInit.cpp	2007-02-14 18:24:37 +01:00
+++ b/storage/ndb/src/kernel/blocks/dbtc/DbtcInit.cpp	2007-09-24 17:12:17 +02:00
@@ -292,6 +292,7 @@ Dbtc::Dbtc(Block_context& ctx):
   tcFailRecord = 0;
   c_apiConTimer = 0;
   c_apiConTimer_line = 0;
+  cpackedListIndex = 0;
 
 #ifdef VM_TRACE
   {
diff -Nrup a/storage/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp b/storage/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp
--- a/storage/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp	2007-09-24 17:09:26 +02:00
+++ b/storage/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp	2007-09-24 17:12:17 +02:00
@@ -7712,7 +7712,7 @@ Dbtc::sendTCKEY_FAILCONF(Signal* signal,
   const Uint32 nodeId = refToNode(ref);
   if(ref != 0)
   {
-    jam()
+    jam();
     failConf->apiConnectPtr = regApiPtr->ndbapiConnect | (marker != RNIL);
     failConf->transId1 = regApiPtr->transid[0];
     failConf->transId2 = regApiPtr->transid[1];
@@ -9025,7 +9025,7 @@ void Dbtc::execSCAN_TABREQ(Signal* signa
 
  SCAN_error_check:
   if (aiLength == 0) {
-    jam()
+    jam();
     errCode = ZSCAN_AI_LEN_ERROR;
     goto SCAN_TAB_error;
   }//if
diff -Nrup a/storage/ndb/src/kernel/blocks/dbtup/Dbtup.hpp b/storage/ndb/src/kernel/blocks/dbtup/Dbtup.hpp
--- a/storage/ndb/src/kernel/blocks/dbtup/Dbtup.hpp	2007-09-05 15:19:57 +02:00
+++ b/storage/ndb/src/kernel/blocks/dbtup/Dbtup.hpp	2007-09-24 17:12:17 +02:00
@@ -1991,6 +1991,7 @@ private:
 
 //------------------------------------------------------------------
 //------------------------------------------------------------------
+  Uint32 brancher(Uint32, Uint32);
   int interpreterNextLab(Signal* signal,
                          KeyReqStruct *req_struct,
                          Uint32* logMemory,
diff -Nrup a/storage/ndb/src/kernel/blocks/dbtup/DbtupExecQuery.cpp b/storage/ndb/src/kernel/blocks/dbtup/DbtupExecQuery.cpp
--- a/storage/ndb/src/kernel/blocks/dbtup/DbtupExecQuery.cpp	2007-09-05 15:19:57 +02:00
+++ b/storage/ndb/src/kernel/blocks/dbtup/DbtupExecQuery.cpp	2007-09-24 17:12:17 +02:00
@@ -2000,7 +2000,7 @@ void Dbtup::sendLogAttrinfo(Signal* sign
 
 inline
 Uint32 
-brancher(Uint32 TheInstruction, Uint32 TprogramCounter)
+Dbtup::brancher(Uint32 TheInstruction, Uint32 TprogramCounter)
 {         
   Uint32 TbranchDirection= TheInstruction >> 31;
   Uint32 TbranchLength= (TheInstruction >> 16) & 0x7fff;
diff -Nrup a/storage/ndb/src/kernel/blocks/dbtup/DbtupGen.cpp b/storage/ndb/src/kernel/blocks/dbtup/DbtupGen.cpp
--- a/storage/ndb/src/kernel/blocks/dbtup/DbtupGen.cpp	2007-06-14 19:08:12 +02:00
+++ b/storage/ndb/src/kernel/blocks/dbtup/DbtupGen.cpp	2007-09-24 17:12:17 +02:00
@@ -46,6 +46,7 @@ void Dbtup::initData() 
 
   // Records with constant sizes
   init_list_sizes();
+  cpackedListIndex = 0;  
 }//Dbtup::initData()
 
 Dbtup::Dbtup(Block_context& ctx, Pgman* pgman)
diff -Nrup a/storage/ndb/src/kernel/blocks/dbutil/DbUtil.cpp b/storage/ndb/src/kernel/blocks/dbutil/DbUtil.cpp
--- a/storage/ndb/src/kernel/blocks/dbutil/DbUtil.cpp	2007-09-24 17:09:26 +02:00
+++ b/storage/ndb/src/kernel/blocks/dbutil/DbUtil.cpp	2007-09-24 17:12:17 +02:00
@@ -313,7 +313,7 @@ DbUtil::execDUMP_STATE_ORD(Signal* signa
    */
   const Uint32 tCase = signal->theData[0];
   if(tCase == 200){
-    jam()
+    jam();
     ndbout << "--------------------------------------------------" << endl;
     UtilSequenceReq * req = (UtilSequenceReq*)signal->getDataPtrSend();
     Uint32 seqId = 1;
diff -Nrup a/storage/ndb/src/kernel/blocks/ndbcntr/Ndbcntr.hpp b/storage/ndb/src/kernel/blocks/ndbcntr/Ndbcntr.hpp
--- a/storage/ndb/src/kernel/blocks/ndbcntr/Ndbcntr.hpp	2007-02-26 08:11:56 +01:00
+++ b/storage/ndb/src/kernel/blocks/ndbcntr/Ndbcntr.hpp	2007-09-24 17:12:17 +02:00
@@ -336,6 +336,7 @@ public:
     void checkLqhTimeout_2(Signal* signal);
     
     BlockNumber number() const { return cntr.number(); }
+    EmulatedJamBuffer *jamBuffer() const { return cntr.jamBuffer(); }
     void progError(int line, int cause, const char * extra) { 
       cntr.progError(line, cause, extra); 
     }
@@ -368,6 +369,7 @@ private:
     void sendNextREAD_CONFIG_REQ(Signal* signal);
     
     BlockNumber number() const { return cntr.number(); }
+    EmulatedJamBuffer *jamBuffer() const { return cntr.jamBuffer(); }
     void progError(int line, int cause, const char * extra) { 
       cntr.progError(line, cause, extra); 
     }
diff -Nrup a/storage/ndb/src/kernel/blocks/ndbcntr/NdbcntrMain.cpp b/storage/ndb/src/kernel/blocks/ndbcntr/NdbcntrMain.cpp
--- a/storage/ndb/src/kernel/blocks/ndbcntr/NdbcntrMain.cpp	2007-09-13 13:59:44 +02:00
+++ b/storage/ndb/src/kernel/blocks/ndbcntr/NdbcntrMain.cpp	2007-09-24 17:12:17 +02:00
@@ -2975,7 +2975,7 @@ void
 UpgradeStartup::sendCmAppChg(Ndbcntr& cntr, Signal* signal, Uint32 startLevel){
   
   if(cntr.getNodeInfo(cntr.cmasterNodeId).m_version >= MAKE_VERSION(3,5,0)){
-    jam();
+    jamNoBlock();
     return;
   }
 
diff -Nrup a/storage/ndb/src/kernel/blocks/suma/Suma.cpp b/storage/ndb/src/kernel/blocks/suma/Suma.cpp
--- a/storage/ndb/src/kernel/blocks/suma/Suma.cpp	2007-09-24 17:09:26 +02:00
+++ b/storage/ndb/src/kernel/blocks/suma/Suma.cpp	2007-09-24 17:12:17 +02:00
@@ -1791,14 +1791,14 @@ Suma::Table::parseTable(SegmentedSection
 			       DictTabInfo::TableMappingSize, 
 			       true, true);
 
-  jam();
+  jamBlock(&suma);
   suma.suma_ndbrequire(s == SimpleProperties::Break);
 
 #if 0
   //ToDo handle this
   if(table_version_major(m_schemaVersion) !=
      table_version_major(tableDesc.TableVersion)){
-    jam();
+    jamBlock(&suma);
 
     release(* this);
 
@@ -1815,25 +1815,25 @@ Suma::Table::parseTable(SegmentedSection
     c_subscriptions.first(i_subPtr);
     SubscriptionPtr subPtr;
     for(;!i_subPtr.isNull();c_subscriptions.next(i_subPtr)){
-      jam();
+      jamBlock(&suma);
       c_subscriptions.getPtr(subPtr, i_subPtr.curr.i);
       SyncRecord* tmp = c_syncPool.getPtr(subPtr.p->m_syncPtrI);
       if (tmp == syncPtr_p) {
-	jam();
+	jamBlock(&suma);
 	continue;
       }
       if (subPtr.p->m_tables.get(tableId)) {
-	jam();
+	jamBlock(&suma);
 	subPtr.p->m_tables.clear(tableId); // remove this old table reference
 	TableList::DataBufferIterator it;
 	for(tmp->m_tableList.first(it);!it.isNull();tmp->m_tableList.next(it)) {
-	  jam();
+	  jamBlock(&suma);
 	  if (*it.data == tableId){
-	    jam();
+	    jamBlock(&suma);
 	    Uint32 *pdata = it.data;
 	    tmp->m_tableList.next(it);
 	    for(;!it.isNull();tmp->m_tableList.next(it)) {
-	      jam();
+	      jamBlock(&suma);
 	      *pdata = *it.data;
 	      pdata = it.data;
 	    }
@@ -2874,7 +2874,7 @@ int
 Suma::Table::setupTrigger(Signal* signal,
 			  Suma &suma)
 {
-  jam();
+  jamBlock(&suma);
   DBUG_ENTER("Suma::Table::setupTrigger");
 
   int ret= 0;
@@ -2921,7 +2921,7 @@ void
 Suma::Table::createAttributeMask(AttributeMask& mask,
                                             Suma &suma)
 {
-  jam();
+  jamBlock(&suma);
   mask.clear();
   for(Uint32 i = 0; i<m_noOfAttributes; i++)
     mask.set(i);
@@ -2995,17 +2995,17 @@ Suma::execCREATE_TRIG_REF(Signal* signal
 void
 Suma::Table::dropTrigger(Signal* signal,Suma& suma)
 {
-  jam();
+  jamBlock(&suma);
   DBUG_ENTER("Suma::dropTrigger");
   
   m_hasOutstandingTriggerReq[0] =
     m_hasOutstandingTriggerReq[1] =
     m_hasOutstandingTriggerReq[2] = 1;
   for(Uint32 j = 0; j<3; j++){
-    jam();
+    jamBlock(&suma);
     suma.suma_ndbrequire(m_triggerIds[j] != ILLEGAL_TRIGGER_ID);
     if(m_hasTriggerDefined[j] == 1) {
-      jam();
+      jamBlock(&suma);
 
       DropTrigReq * const req = (DropTrigReq*)signal->getDataPtrSend();
       req->setConnectionPtr(m_ptrI);
@@ -3028,7 +3028,7 @@ Suma::Table::dropTrigger(Signal* signal,
       suma.sendSignal(DBTUP_REF, GSN_DROP_TRIG_REQ,
 		      signal, DropTrigReq::SignalLength, JBB);
     } else {
-      jam();
+      jamBlock(&suma);
       suma.suma_ndbrequire(m_hasTriggerDefined[j] > 1);
       runDropTrigger(signal,m_triggerIds[j],suma);
     }
@@ -3074,7 +3074,7 @@ Suma::Table::runDropTrigger(Signal* sign
 				       Uint32 triggerId,
 				       Suma &suma)
 {
-  jam();
+  jamBlock(&suma);
   Uint32 type = (triggerId >> 16) & 0x3;
 
   suma.suma_ndbrequire(type < 3);
@@ -3085,7 +3085,7 @@ Suma::Table::runDropTrigger(Signal* sign
   m_hasOutstandingTriggerReq[type] = 0;
   if (m_hasTriggerDefined[type] == 0)
   {
-    jam();
+    jamBlock(&suma);
     m_triggerIds[type] = ILLEGAL_TRIGGER_ID;
   }
   if( m_hasOutstandingTriggerReq[0] ||
@@ -3093,7 +3093,7 @@ Suma::Table::runDropTrigger(Signal* sign
       m_hasOutstandingTriggerReq[2])
   {
     // more to come
-    jam();
+    jamBlock(&suma);
     return;
   }
 
@@ -3116,11 +3116,11 @@ void Suma::suma_ndbrequire(bool v) { ndb
 void
 Suma::Table::checkRelease(Suma &suma)
 {
-  jam();
+  jamBlock(&suma);
   DBUG_ENTER("Suma::Table::checkRelease");
   if (n_subscribers == 0)
   {
-    jam();
+    jamBlock(&suma);
     suma.suma_ndbrequire(m_hasTriggerDefined[0] == 0);
     suma.suma_ndbrequire(m_hasTriggerDefined[1] == 0);
     suma.suma_ndbrequire(m_hasTriggerDefined[2] == 0);
@@ -3132,7 +3132,7 @@ Suma::Table::checkRelease(Suma &suma)
       for (subscribers.first(subbPtr);!subbPtr.isNull();
 	   subscribers.next(subbPtr))
       {
-	jam();
+	jamBlock(&suma);
 	DBUG_PRINT("info",("subscriber: %u", subbPtr.i));
       }
       suma.suma_ndbrequire(false);
@@ -3145,7 +3145,7 @@ Suma::Table::checkRelease(Suma &suma)
       for (syncRecords.first(syncPtr);!syncPtr.isNull();
 	   syncRecords.next(syncPtr))
       {
-	jam();
+	jamBlock(&suma);
 	DBUG_PRINT("info",("syncRecord: %u", syncPtr.i));
       }
       suma.suma_ndbrequire(false);
@@ -3390,7 +3390,6 @@ reformat(Signal* signal, LinearSectionPt
   ptr[1].p  = dst;
   
   while(sz_1 > 0){
-    jam();
     Uint32 tmp = * src_1 ++;
     * headers ++ = tmp;
     Uint32 len = AttributeHeader::getDataSize(tmp);
@@ -4089,7 +4088,7 @@ Suma::sendSubRemoveRef(Signal* signal, c
 
 void
 Suma::Table::release(Suma & suma){
-  jam();
+  jamBlock(&suma);
 
   LocalDataBuffer<15> fragBuf(suma.c_dataBufferPool, m_fragments);
   fragBuf.release();
diff -Nrup a/storage/ndb/src/kernel/blocks/suma/Suma.hpp b/storage/ndb/src/kernel/blocks/suma/Suma.hpp
--- a/storage/ndb/src/kernel/blocks/suma/Suma.hpp	2007-09-24 17:07:27 +02:00
+++ b/storage/ndb/src/kernel/blocks/suma/Suma.hpp	2007-09-24 17:12:17 +02:00
@@ -236,6 +236,7 @@ public:
     UintR &cerrorInsert;
 #endif
     BlockNumber number() const { return suma.number(); }
+    EmulatedJamBuffer *jamBuffer() const { return suma.jamBuffer(); }
     void progError(int line, int cause, const char * extra) { 
       suma.progError(line, cause, extra); 
     }
@@ -461,6 +462,8 @@ public:
     Restart(Suma& s);
 
     Suma & suma;
+    BlockNumber number() const { return suma.number(); }
+    EmulatedJamBuffer *jamBuffer() const { return suma.jamBuffer(); }
     Uint32 nodeId;
 
     DLHashTable<Subscription>::Iterator c_subIt;
diff -Nrup a/storage/ndb/src/kernel/error/ErrorReporter.cpp b/storage/ndb/src/kernel/error/ErrorReporter.cpp
--- a/storage/ndb/src/kernel/error/ErrorReporter.cpp	2007-02-05 16:44:42 +01:00
+++ b/storage/ndb/src/kernel/error/ErrorReporter.cpp	2007-09-24 17:12:17 +02:00
@@ -34,11 +34,13 @@ static int WriteMessage(int thrdMessageI
 			const char* thrdProblemData, 
 			const char* thrdObjRef,
 			Uint32 thrdTheEmulatedJamIndex,
-			Uint8 thrdTheEmulatedJam[]);
+			const Uint32 thrdTheEmulatedJam[],
+                        Uint32 jamBlockNumber);
 
 static void dumpJam(FILE* jamStream, 
 		    Uint32 thrdTheEmulatedJamIndex, 
-		    Uint8 thrdTheEmulatedJam[]);
+		    const Uint32 thrdTheEmulatedJam[],
+                    Uint32 aBlockNumber);
 
 extern EventLogger g_eventLogger;
 const char*
@@ -168,19 +170,28 @@ void
 ErrorReporter::handleAssert(const char* message, const char* file, int line, int ec)
 {
   char refMessage[100];
+  const Uint32 *jam;
+  Uint32 jamIndex;
+  Uint32 jamBlockNumber;
 
 #ifdef NO_EMULATED_JAM
   BaseString::snprintf(refMessage, 100, "file: %s lineNo: %d",
 	   file, line);
+  jam = NULL;
+  jamIndex = 0;
+  jamBlockNumber = 0;
 #else
-  const Uint32 blockNumber = theEmulatedJamBlockNumber;
-  const char *blockName = getBlockName(blockNumber);
+  const EmulatedJamBuffer *jamBuffer =
+    (EmulatedJamBuffer *)NdbThread_GetTlsKey(NDB_THREAD_TLS_JAM);
+  jam = jamBuffer->theEmulatedJam;
+  jamIndex = jamBuffer->theEmulatedJamIndex;
+  jamBlockNumber = jamBuffer->theEmulatedJamBlockNumber;
+  const char *blockName = getBlockName(jamBlockNumber);
 
   BaseString::snprintf(refMessage, 100, "%s line: %d (block: %s)",
 	   file, line, blockName);
 #endif
-  WriteMessage(ec, message, refMessage,
-	       theEmulatedJamIndex, theEmulatedJam);
+  WriteMessage(ec, message, refMessage, jamIndex, jam, jamBlockNumber);
 
   childReportError(ec);
 
@@ -194,8 +205,21 @@ ErrorReporter::handleError(int messageID
 			   const char* objRef,
 			   NdbShutdownType nst)
 {
-  WriteMessage(messageID, problemData,
-	       objRef, theEmulatedJamIndex, theEmulatedJam);
+  const Uint32 *jam;
+  Uint32 jamIndex;
+  Uint32 jamBlockNumber;
+  const EmulatedJamBuffer *jamBuffer =
+    (EmulatedJamBuffer *)NdbThread_GetTlsKey(NDB_THREAD_TLS_JAM);
+  if (jamBuffer) {
+    jam = jamBuffer->theEmulatedJam;
+    jamIndex = jamBuffer->theEmulatedJamIndex;
+    jamBlockNumber = jamBuffer->theEmulatedJamBlockNumber;
+  } else {
+    jam = NULL;
+    jamIndex = 0;
+    jamBlockNumber = 0;
+  }
+  WriteMessage(messageID, problemData, objRef, jamIndex, jam, jamBlockNumber);
 
   g_eventLogger.info(problemData);
   g_eventLogger.info(objRef);
@@ -215,7 +239,8 @@ int 
 WriteMessage(int thrdMessageID,
 	     const char* thrdProblemData, const char* thrdObjRef,
 	     Uint32 thrdTheEmulatedJamIndex,
-	     Uint8 thrdTheEmulatedJam[]){
+	     const Uint32 thrdTheEmulatedJam[],
+             Uint32 jamBlockNumber){
   FILE *stream;
   unsigned offset;
   unsigned long maxOffset;  // Maximum size of file.
@@ -312,7 +337,8 @@ WriteMessage(int thrdMessageID,
     //  ...and "dump the jam" there.
     //  ErrorReporter::dumpJam(jamStream);
     if(thrdTheEmulatedJam != 0){
-      dumpJam(jamStream, thrdTheEmulatedJamIndex, thrdTheEmulatedJam);
+      dumpJam(jamStream, thrdTheEmulatedJamIndex,
+              thrdTheEmulatedJam, jamBlockNumber);
     }
   
     /* Dont print the jobBuffers until a way to copy them, 
@@ -333,7 +359,8 @@ WriteMessage(int thrdMessageID,
 void 
 dumpJam(FILE *jamStream, 
 	Uint32 thrdTheEmulatedJamIndex, 
-	Uint8 thrdTheEmulatedJam[]) {
+	const Uint32 thrdTheEmulatedJam[],
+        Uint32 aBlockNumber) {
 #ifndef NO_EMULATED_JAM   
   // print header
   const int maxaddr = 8;
@@ -343,16 +370,14 @@ dumpJam(FILE *jamStream, 
     fprintf(jamStream, "%-6s ", "ADDR");
   fprintf(jamStream, "\n");
 
-  // treat as array of Uint32
-  const Uint32 *base = (Uint32 *)thrdTheEmulatedJam;
-  const int first = thrdTheEmulatedJamIndex / sizeof(Uint32);	// oldest
+  const int first = thrdTheEmulatedJamIndex;	// oldest
   int cnt, idx;
 
   // look for first block entry
   for (cnt = 0, idx = first; cnt < EMULATED_JAM_SIZE; cnt++, idx++) {
     if (idx >= EMULATED_JAM_SIZE)
       idx = 0;
-    const Uint32 aJamEntry = base[idx];
+    const Uint32 aJamEntry = thrdTheEmulatedJam[idx];
     if (aJamEntry > (1 << 20))
       break;
   }
@@ -366,7 +391,6 @@ dumpJam(FILE *jamStream, 
   else if (cnt < EMULATED_JAM_SIZE)
     fprintf(jamStream, "%-7s?", "");
   else {
-    const Uint32 aBlockNumber = theEmulatedJamBlockNumber;
     const char *aBlockName = getBlockName(aBlockNumber);
     if (aBlockName != 0)
       fprintf(jamStream, "%-7s?", aBlockName);
@@ -380,7 +404,7 @@ dumpJam(FILE *jamStream, 
     globalData.incrementWatchDogCounter(4);	// watchdog not to kill us ?
     if (idx >= EMULATED_JAM_SIZE)
       idx = 0;
-    const Uint32 aJamEntry = base[idx];
+    const Uint32 aJamEntry = thrdTheEmulatedJam[idx];
     if (aJamEntry > (1 << 20)) {
       const Uint32 aBlockNumber = aJamEntry >> 20;
       const char *aBlockName = getBlockName(aBlockNumber);
diff -Nrup a/storage/ndb/src/kernel/main.cpp b/storage/ndb/src/kernel/main.cpp
--- a/storage/ndb/src/kernel/main.cpp	2007-01-06 01:21:21 +01:00
+++ b/storage/ndb/src/kernel/main.cpp	2007-09-24 17:12:17 +02:00
@@ -239,6 +239,7 @@ int main(int argc, char** argv)
 
   g_eventLogger.m_logLevel.setLogLevel(LogLevel::llStartUp, 15);
 
+  NdbThread_Init();
   globalEmulatorData.create();
 
   // Parse command line options
diff -Nrup a/storage/ndb/src/kernel/vm/Emulator.cpp b/storage/ndb/src/kernel/vm/Emulator.cpp
--- a/storage/ndb/src/kernel/vm/Emulator.cpp	2006-12-23 20:20:19 +01:00
+++ b/storage/ndb/src/kernel/vm/Emulator.cpp	2007-09-24 17:12:18 +02:00
@@ -51,9 +51,12 @@ extern Uint32 g_currentStartPhase;
  */
 
 #ifndef NO_EMULATED_JAM
-Uint8 theEmulatedJam[EMULATED_JAM_SIZE * 4];
-Uint32 theEmulatedJamIndex = 0;
-Uint32 theEmulatedJamBlockNumber = 0;
+/*
+  This is the jam buffer used for non-threaded ndbd (but present also
+  in threaded ndbd to allow sharing of object files among the two
+  binaries).
+ */
+EmulatedJamBuffer theEmulatedJamBuffer;
 #endif
 
    GlobalData globalData;
@@ -87,6 +90,12 @@ ndb_new_handler_impl(){
 
 void
 EmulatorData::create(){
+  /*
+    Global jam() buffer, for non-multithreaded operation.
+    For multithreaded ndbd, each thread will set a local jam buffer later.
+  */
+  NdbThread_SetTlsKey(NDB_THREAD_TLS_JAM, (void *)&theEmulatedJamBuffer);
+
   NdbMem_Create();
 
   theConfiguration = new Configuration();
diff -Nrup a/storage/ndb/src/kernel/vm/Emulator.hpp b/storage/ndb/src/kernel/vm/Emulator.hpp
--- a/storage/ndb/src/kernel/vm/Emulator.hpp	2006-12-23 20:20:19 +01:00
+++ b/storage/ndb/src/kernel/vm/Emulator.hpp	2007-09-24 17:12:18 +02:00
@@ -31,22 +31,23 @@ extern class  TimeQueue           global
 extern class  FastScheduler       globalScheduler;
 extern class  TransporterRegistry globalTransporterRegistry;
 extern struct GlobalData          globalData;
+extern struct thr_repository      g_thr_repository;
 
 #ifdef VM_TRACE
 extern class SignalLoggerManager globalSignalLoggers;
 #endif
 
 #ifndef NO_EMULATED_JAM
-  #define EMULATED_JAM_SIZE 1024
-  #define JAM_MASK ((EMULATED_JAM_SIZE * 4) - 1)
+/* EMULATED_JAM_SIZE must be a power of two, so JAM_MASK will work. */
+#define EMULATED_JAM_SIZE 1024
+#define JAM_MASK (EMULATED_JAM_SIZE - 1)
 
-  extern Uint8 theEmulatedJam[];
-  extern Uint32 theEmulatedJamIndex;
+struct EmulatedJamBuffer {
+  Uint32 theEmulatedJamIndex;
   // last block entry, used in dumpJam() if jam contains no block entries
-  extern Uint32 theEmulatedJamBlockNumber;
-#else
-  const Uint8 theEmulatedJam[]=0;
-  const Uint32 theEmulatedJamIndex=0;
+  Uint32 theEmulatedJamBlockNumber;
+  Uint32 theEmulatedJam[EMULATED_JAM_SIZE];
+};
 #endif
 
 struct EmulatorData {
diff -Nrup a/storage/ndb/src/kernel/vm/GlobalData.hpp b/storage/ndb/src/kernel/vm/GlobalData.hpp
--- a/storage/ndb/src/kernel/vm/GlobalData.hpp	2007-09-05 15:19:57 +02:00
+++ b/storage/ndb/src/kernel/vm/GlobalData.hpp	2007-09-24 17:12:18 +02:00
@@ -82,7 +82,7 @@ struct GlobalData {
   SimulatedBlock * getBlock(BlockNumber blockNo);
   
   void           incrementWatchDogCounter(Uint32 place);
-  const Uint32 * getWatchDogPtr();
+  Uint32 * getWatchDogPtr();
   
 private:
   Uint32     watchDog;
@@ -136,7 +136,7 @@ GlobalData::incrementWatchDogCounter(Uin
 }
 
 inline
-const Uint32 *
+Uint32 *
 GlobalData::getWatchDogPtr(){
   return &watchDog;
 }
diff -Nrup a/storage/ndb/src/kernel/vm/Makefile.am b/storage/ndb/src/kernel/vm/Makefile.am
--- a/storage/ndb/src/kernel/vm/Makefile.am	2006-12-31 01:06:42 +01:00
+++ b/storage/ndb/src/kernel/vm/Makefile.am	2007-09-24 17:12:18 +02:00
@@ -18,15 +18,10 @@
 #DIRS += testLongSig
 #endif
 
-noinst_LIBRARIES = libkernel.a
+noinst_LIBRARIES = libkernel.a libsched.a libsched_mt.a
 
 libkernel_a_SOURCES = \
-	SimulatedBlock.cpp	\
-	FastScheduler.cpp		\
-	TimeQueue.cpp		\
 	VMSignal.cpp		\
-	ThreadConfig.cpp          \
-	TransporterCallback.cpp \
 	Emulator.cpp		\
 	Configuration.cpp		\
 	WatchDog.cpp \
@@ -34,9 +29,22 @@ libkernel_a_SOURCES = \
         SectionReader.cpp \
         Mutex.cpp SafeCounter.cpp \
         Rope.cpp \
-	ndbd_malloc.cpp ndbd_malloc_impl.cpp \
-        Pool.cpp WOPool.cpp RWPool.cpp \
-        DynArr256.cpp
+        ndbd_malloc.cpp \
+	ndbd_malloc_impl.cpp \
+	Pool.cpp WOPool.cpp RWPool.cpp DynArr256.cpp
+
+libsched_a_SOURCES = TimeQueue.cpp		\
+                     ThreadConfig.cpp \
+                     FastScheduler.cpp \
+                     TransporterCallback_nonmt.cpp \
+                     SimulatedBlock_nonmt.cpp
+
+libsched_mt_a_SOURCES = SimulatedBlock_mt.cpp \
+                        TransporterCallback_mt.cpp \
+                        mt/mt.cpp \
+                        mt/dummy_mt.cpp
+
+EXTRA_DIST=SimulatedBlock.cpp TransporterCallback.cpp
 
 INCLUDES_LOC = -I$(top_srcdir)/storage/ndb/src/mgmapi
 
diff -Nrup a/storage/ndb/src/kernel/vm/SimulatedBlock.cpp b/storage/ndb/src/kernel/vm/SimulatedBlock.cpp
--- a/storage/ndb/src/kernel/vm/SimulatedBlock.cpp	2007-09-24 17:09:26 +02:00
+++ b/storage/ndb/src/kernel/vm/SimulatedBlock.cpp	2007-09-24 17:12:18 +02:00
@@ -13,6 +13,13 @@
    along with this program; if not, write to the Free Software
    Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
 
+/*
+  This file is used to build both the multithreaded and the singlethreaded
+  ndbd. It is built twice, included from either SimulatedBlock_mt.cpp (with
+  the macro NDBD_MULTITHREADED defined) or SimulatedBlock_nonmt.cpp (with the
+  macro not defined).
+*/
+
 #include <ndb_global.h>
 
 #include "SimulatedBlock.hpp"
@@ -61,8 +68,11 @@ SimulatedBlock::SimulatedBlock(BlockNumb
     c_linearFragmentSendList(c_fragmentSendPool),
     c_segmentedFragmentSendList(c_fragmentSendPool),
     c_mutexMgr(* this),
-    c_counterMgr(* this)
+    c_counterMgr(* this),
+    m_threadId(0),
+    m_watchDogCounter(NULL)
 {
+  m_jamBuffer = (EmulatedJamBuffer *)NdbThread_GetTlsKey(NDB_THREAD_TLS_JAM);
   NewVarRef = 0;
   
   globalData.setBlock(blockNumber, this);
@@ -172,6 +182,15 @@ SimulatedBlock::addRecSignalImpl(GlobalS
 }
 
 void
+SimulatedBlock::assignToThread(Uint32 threadId, EmulatedJamBuffer *jamBuffer,
+                               Uint32 *watchDogCounter)
+{
+  m_threadId = threadId;
+  m_jamBuffer = jamBuffer;
+  m_watchDogCounter = watchDogCounter;
+}
+
+void
 SimulatedBlock::signal_error(Uint32 gsn, Uint32 len, Uint32 recBlockNo, 
 			     const char* filename, int lineno) const 
 {
@@ -232,6 +251,15 @@ SimulatedBlock::handle_lingering_section
 }
 
 
+#ifdef NDBD_MULTITHREADED
+#define MT_SEND_LOCK mt_send_lock();
+#define MT_SEND_UNLOCK mt_send_unlock();
+#else
+#define MT_SEND_LOCK
+#define MT_SEND_UNLOCK
+#endif
+
+
 void 
 SimulatedBlock::sendSignal(BlockReference ref, 
 			   GlobalSignalNumber gsn, 
@@ -275,8 +303,14 @@ SimulatedBlock::sendSignal(BlockReferenc
   if(recNode == ourProcessor || recNode == 0) {
     signal->header.theSendersSignalId = tSignalId;
     signal->header.theSendersBlockRef = sendBRef;
+#ifdef NDBD_MULTITHREADED
+    jobBuffer == JBB ?
+      sendlocal(m_threadId, recBlock, &signal->header) :
+      sendprioa(m_threadId, recBlock, &signal->header);
+#else
     globalScheduler.execute(signal, jobBuffer, recBlock,
 			    gsn);
+#endif
     return;
   } else { 
     // send distributed Signal
@@ -298,10 +332,12 @@ SimulatedBlock::sendSignal(BlockReferenc
 	     getSignalName(gsn), gsn, getBlockName(recBlock),
 	     recNode);
 #endif
+    MT_SEND_LOCK
     SendStatus ss = globalTransporterRegistry.prepareSend(&sh, jobBuffer, 
 							  &signal->theData[0],
 							  recNode, 
 							  (LinearSectionPtr*)0);
+    MT_SEND_UNLOCK
     
     ndbrequire(ss == SEND_OK || ss == SEND_BLOCKED || ss == SEND_DISCONNECTED);
   }
@@ -359,7 +395,14 @@ SimulatedBlock::sendSignal(NodeReceiverG
 				     ourProcessor);
     }
 #endif
+
+#ifdef NDBD_MULTITHREADED
+    jobBuffer == JBB ?
+      sendlocal(m_threadId, recBlock, &signal->header) :
+      sendprioa(m_threadId, recBlock, &signal->header);
+#else
     globalScheduler.execute(signal, jobBuffer, recBlock, gsn);
+#endif
     
     rg.m_nodes.clear((Uint32)0);
     rg.m_nodes.clear(ourProcessor);
@@ -387,10 +430,12 @@ SimulatedBlock::sendSignal(NodeReceiverG
 	     recNode);
 #endif
 
+    MT_SEND_LOCK
     SendStatus ss = globalTransporterRegistry.prepareSend(&sh, jobBuffer, 
 							  &signal->theData[0],
 							  recNode,
 							  (LinearSectionPtr*)0);
+    MT_SEND_UNLOCK
     ndbrequire(ss == SEND_OK || ss == SEND_BLOCKED || ss == SEND_DISCONNECTED);
   }
 
@@ -455,8 +500,14 @@ SimulatedBlock::sendSignal(BlockReferenc
       signal->theData[length+i] = segptr[i].i;
     }
     
+#ifdef NDBD_MULTITHREADED
+    jobBuffer == JBB ?
+      sendlocal(m_threadId, recBlock, &signal->header) :
+      sendprioa(m_threadId, recBlock, &signal->header);
+#else
     globalScheduler.execute(signal, jobBuffer, recBlock,
 			    gsn);
+#endif
     signal->header.m_noOfSections = 0;
     return;
   } else { 
@@ -481,10 +532,12 @@ SimulatedBlock::sendSignal(BlockReferenc
 	     recNode);
 #endif
 
+    MT_SEND_LOCK
     SendStatus ss = globalTransporterRegistry.prepareSend(&sh, jobBuffer, 
 							  &signal->theData[0],
 							  recNode, 
 							  ptr);
+    MT_SEND_UNLOCK
     ndbrequire(ss == SEND_OK || ss == SEND_BLOCKED || ss == SEND_DISCONNECTED);
   }
 
@@ -554,7 +607,14 @@ SimulatedBlock::sendSignal(NodeReceiverG
       ndbrequire(import(segptr[i], ptr[i].p, ptr[i].sz));
       signal->theData[length+i] = segptr[i].i;
     }
+
+#ifdef NDBD_MULTITHREADED
+    jobBuffer == JBB ?
+      sendlocal(m_threadId, recBlock, &signal->header) :
+      sendprioa(m_threadId, recBlock, &signal->header);
+#else
     globalScheduler.execute(signal, jobBuffer, recBlock, gsn);
+#endif
     
     rg.m_nodes.clear((Uint32)0);
     rg.m_nodes.clear(ourProcessor);
@@ -584,10 +644,12 @@ SimulatedBlock::sendSignal(NodeReceiverG
 	     recNode);
 #endif
 
+    MT_SEND_LOCK
     SendStatus ss = globalTransporterRegistry.prepareSend(&sh, jobBuffer, 
 							  &signal->theData[0],
 							  recNode,
 							  ptr);
+    MT_SEND_UNLOCK
     ndbrequire(ss == SEND_OK || ss == SEND_BLOCKED || ss == SEND_DISCONNECTED);
   }
   
@@ -652,7 +714,13 @@ SimulatedBlock::sendSignal(BlockReferenc
     * dst ++ = sections->m_ptr[1].i;
     * dst ++ = sections->m_ptr[2].i;
 
+#ifdef NDBD_MULTITHREADED
+    jobBuffer == JBB ?
+      sendlocal(m_threadId, recBlock, &signal->header) :
+      sendprioa(m_threadId, recBlock, &signal->header);
+#else
     globalScheduler.execute(signal, jobBuffer, recBlock, gsn);
+#endif
   } else {
     // send distributed Signal
     SignalHeader sh;
@@ -674,11 +742,13 @@ SimulatedBlock::sendSignal(BlockReferenc
 	     recNode);
 #endif
 
+    MT_SEND_LOCK
     SendStatus ss = globalTransporterRegistry.prepareSend(&sh, jobBuffer,
 							  &signal->theData[0],
 							  recNode,
 							  g_sectionSegmentPool,
 							  sections->m_ptr);
+    MT_SEND_UNLOCK
     ndbrequire(ss == SEND_OK || ss == SEND_BLOCKED || ss == SEND_DISCONNECTED);
     ::releaseSections(noOfSections, sections->m_ptr);
   }
@@ -752,7 +822,13 @@ SimulatedBlock::sendSignal(NodeReceiverG
     * dst ++ = sections->m_ptr[0].i;
     * dst ++ = sections->m_ptr[1].i;
     * dst ++ = sections->m_ptr[2].i;
+#ifdef NDBD_MULTITHREADED
+    jobBuffer == JBB ?
+      sendlocal(m_threadId, recBlock, &signal->header) :
+      sendprioa(m_threadId, recBlock, &signal->header);
+#else
     globalScheduler.execute(signal, jobBuffer, recBlock, gsn);
+#endif
 
     rg.m_nodes.clear((Uint32)0);
     rg.m_nodes.clear(ourProcessor);
@@ -782,11 +858,13 @@ SimulatedBlock::sendSignal(NodeReceiverG
 	     recNode);
 #endif
 
+    MT_SEND_LOCK
     SendStatus ss = globalTransporterRegistry.prepareSend(&sh, jobBuffer,
 							  &signal->theData[0],
 							  recNode,
 							  g_sectionSegmentPool,
 							  sections->m_ptr);
+    MT_SEND_UNLOCK
     ndbrequire(ss == SEND_OK || ss == SEND_BLOCKED || ss == SEND_DISCONNECTED);
   }
 
@@ -830,7 +908,12 @@ SimulatedBlock::sendSignalWithDelay(Bloc
     }
   }
 #endif
+
+#ifdef NDBD_MULTITHREADED
+  senddelay(m_threadId, &signal->header, delayInMilliSeconds);
+#else
   globalTimeQueue.insert(signal, bnr, gsn, delayInMilliSeconds);
+#endif
 
   // befor 2nd parameter to globalTimeQueue.insert
   // (Priority)theSendSig[sigIndex].jobBuffer
@@ -879,7 +962,12 @@ SimulatedBlock::sendSignalWithDelay(Bloc
     }
   }
 #endif
+
+#ifdef NDBD_MULTITHREADED
+  senddelay(m_threadId, &signal->header, delayInMilliSeconds);
+#else
   globalTimeQueue.insert(signal, bnr, gsn, delayInMilliSeconds);
+#endif
 
   signal->header.m_noOfSections = 0;
   signal->header.m_fragmentInfo = 0;
@@ -1014,7 +1102,11 @@ SimulatedBlock::deallocRecord(void ** pt
 void
 SimulatedBlock::refresh_watch_dog(Uint32 place)
 {
+#ifdef NDBD_MULTITHREADED
+  (*m_watchDogCounter) = place;
+#else
   globalData.incrementWatchDogCounter(place);
+#endif
 }
 
 void
@@ -1083,8 +1175,13 @@ SimulatedBlock::infoEvent(const char * m
   signalT.header.theSignalId             = tSignalId;
   signalT.header.theLength               = ((len+3)/4)+1;
   
+#ifdef NDBD_MULTITHREADED
+  sendlocal(m_threadId, signalT.header.theReceiversBlockNumber,
+            &signalT.header);
+#else
   globalScheduler.execute(&signalT.header, JBB, signalT.theData,
                           signalT.m_sectionPtrI);
+#endif
 }
 
 void 
@@ -1123,8 +1220,13 @@ SimulatedBlock::warningEvent(const char 
   signalT.header.theSignalId             = tSignalId;
   signalT.header.theLength               = ((len+3)/4)+1;
 
+#ifdef NDBD_MULTITHREADED
+  sendlocal(m_threadId, signalT.header.theReceiversBlockNumber,
+            &signalT.header);
+#else
   globalScheduler.execute(&signalT.header, JBB, signalT.theData,
                           signalT.m_sectionPtrI);
+#endif
 }
 
 void
diff -Nrup a/storage/ndb/src/kernel/vm/SimulatedBlock.hpp b/storage/ndb/src/kernel/vm/SimulatedBlock.hpp
--- a/storage/ndb/src/kernel/vm/SimulatedBlock.hpp	2007-09-24 17:09:26 +02:00
+++ b/storage/ndb/src/kernel/vm/SimulatedBlock.hpp	2007-09-24 17:12:18 +02:00
@@ -120,6 +120,12 @@ public:
    * 
    */
   inline void executeFunction(GlobalSignalNumber gsn, Signal* signal);
+
+  /* Setup state of a block object for executing in a particular thread. */
+  void assignToThread(Uint32 threadId, EmulatedJamBuffer *jamBuffer,
+                      Uint32 *watchDogCounter);
+  /* For multithreaded ndbd, get the id of owning thread. */
+  uint32 getThreadId() const { return m_threadId; }
 public:
   typedef void (SimulatedBlock::* CallbackFunction)(class Signal*, 
 						    Uint32 callbackData,
@@ -358,6 +364,10 @@ protected:
   void sendNextLinearFragment(Signal* signal, FragmentSendInfo & info);
   
   BlockNumber    number() const;
+public:
+  /* Must be public so that we can jam() outside of block scope. */
+  EmulatedJamBuffer *jamBuffer() const;
+protected:
   BlockReference reference() const;
   NodeId         getOwnNodeId() const;
 
@@ -380,7 +390,19 @@ private:
   const NodeId         theNodeId;
   const BlockNumber    theNumber;
   const BlockReference theReference;
-  
+  /*
+    Thread id currently executing this block.
+    Not used in singlethreaded ndbd.
+  */
+  Uint32 m_threadId;
+  /*
+    Jam buffer reference.
+    In multithreaded ndbd, this is different in each thread, and must be
+    updated if migrating the block to another thread.
+  */
+  EmulatedJamBuffer *m_jamBuffer;
+  /* For multithreaded ndb, the thread-specific watchdog counter. */
+  Uint32 *m_watchDogCounter;
 protected:
   Block_context m_ctx;
   NewVARIABLE* allocateBat(int batSize);
@@ -646,6 +668,12 @@ inline 
 BlockNumber
 SimulatedBlock::number() const {
    return theNumber;
+}
+
+inline
+EmulatedJamBuffer *
+SimulatedBlock::jamBuffer() const {
+   return m_jamBuffer;
 }
 
 inline
diff -Nrup a/storage/ndb/src/kernel/vm/SimulatedBlock_mt.cpp b/storage/ndb/src/kernel/vm/SimulatedBlock_mt.cpp
--- /dev/null	Wed Dec 31 16:00:00 196900
+++ b/storage/ndb/src/kernel/vm/SimulatedBlock_mt.cpp	2007-09-24 17:12:18 +02:00
@@ -0,0 +1,19 @@
+/* Copyright (C) 2007 MySQL AB
+
+   This program is free software; you can redistribute it and/or modify
+   it under the terms of the GNU General Public License as published by
+   the Free Software Foundation; version 2 of the License.
+
+   This program is distributed in the hope that it will be useful,
+   but WITHOUT ANY WARRANTY; without even the implied warranty of
+   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+   GNU General Public License for more details.
+
+   You should have received a copy of the GNU General Public License
+   along with this program; if not, write to the Free Software
+   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
+
+/* This stub file defines the SimulatedBlock class for multithreaded NDBD. */
+#define NDBD_MULTITHREADED
+#include "mt/mt.hpp"
+#include "SimulatedBlock.cpp"
diff -Nrup a/storage/ndb/src/kernel/vm/SimulatedBlock_nonmt.cpp b/storage/ndb/src/kernel/vm/SimulatedBlock_nonmt.cpp
--- /dev/null	Wed Dec 31 16:00:00 196900
+++ b/storage/ndb/src/kernel/vm/SimulatedBlock_nonmt.cpp	2007-09-24 17:12:18 +02:00
@@ -0,0 +1,18 @@
+/* Copyright (C) 2007 MySQL AB
+
+   This program is free software; you can redistribute it and/or modify
+   it under the terms of the GNU General Public License as published by
+   the Free Software Foundation; version 2 of the License.
+
+   This program is distributed in the hope that it will be useful,
+   but WITHOUT ANY WARRANTY; without even the implied warranty of
+   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+   GNU General Public License for more details.
+
+   You should have received a copy of the GNU General Public License
+   along with this program; if not, write to the Free Software
+   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
+
+/* This stub file defines the SimulatedBlock class for singlethreaded NDBD. */
+#undef NDBD_MULTITHREADED
+#include "SimulatedBlock.cpp"
diff -Nrup a/storage/ndb/src/kernel/vm/ThreadConfig.cpp b/storage/ndb/src/kernel/vm/ThreadConfig.cpp
--- a/storage/ndb/src/kernel/vm/ThreadConfig.cpp	2006-12-23 20:20:20 +01:00
+++ b/storage/ndb/src/kernel/vm/ThreadConfig.cpp	2007-09-24 17:12:18 +02:00
@@ -27,6 +27,7 @@
 #include <NdbSleep.h>
 #include <NdbTick.h>
 #include <NdbOut.hpp>
+#include <WatchDog.hpp>
 
 #include <signaldata/StartOrd.hpp>
 
@@ -104,6 +105,10 @@ void ThreadConfig::ipControlLoop()
 // initialise the counter that keeps track of the current millisecond
 //--------------------------------------------------------------------
   globalData.internalMillisecCounter = NdbTick_CurrentMillisecond();
+
+  Uint32 *watchCounter = globalData.getWatchDogPtr();
+  globalEmulatorData.theWatchDog->registerWatchedThread(watchCounter, 0);
+
   Uint32 i = 0;
   while (globalData.theRestartFlag != perform_stop)  { 
 
@@ -155,6 +160,8 @@ void ThreadConfig::ipControlLoop()
 
   globalData.incrementWatchDogCounter(6);
   globalTransporterRegistry.performSend();
+
+  globalEmulatorData.theWatchDog->unregisterWatchedThread(0);
 
 }//ThreadConfig::ipControlLoop()
 
diff -Nrup a/storage/ndb/src/kernel/vm/TransporterCallback.cpp b/storage/ndb/src/kernel/vm/TransporterCallback.cpp
--- a/storage/ndb/src/kernel/vm/TransporterCallback.cpp	2006-12-23 20:20:20 +01:00
+++ b/storage/ndb/src/kernel/vm/TransporterCallback.cpp	2007-09-24 17:12:18 +02:00
@@ -69,9 +69,16 @@ import(Ptr<SectionSegment> & first, cons
   Uint32 dummyPrev[4]; 
 
   first.p = 0;
+#ifdef NDBD_MULTITHREADED
+  mt_section_lock();
+#endif
   if(g_sectionSegmentPool.seize(first)){
     ;
   } else {
+#ifdef NDBD_MULTITHREADED
+    mt_section_unlock();
+#endif
+    ndbout_c("here");
     return false;
   }
 
@@ -91,9 +98,16 @@ import(Ptr<SectionSegment> & first, cons
       ;
     } else {
       first.p->m_lastSegment = prevPtr.i;
+#ifdef NDBD_MULTITHREADED
+      mt_section_unlock();
+#endif
+      ndbout_c("hera");
       return false;
     }
   }
+#ifdef NDBD_MULTITHREADED
+  mt_section_unlock();
+#endif
 
   first.p->m_lastSegment = currPtr.i;
   currPtr.p->m_nextSegment = RNIL;
@@ -180,9 +194,15 @@ getSection(SegmentedSectionPtr & ptr, Ui
 
 void
 release(SegmentedSectionPtr & ptr){
+#ifdef NDBD_MULTITHREADED
+  mt_section_lock();
+#endif
   g_sectionSegmentPool.releaseList(relSz(ptr.sz),
 				   ptr.i, 
 				   ptr.p->m_lastSegment);
+#ifdef NDBD_MULTITHREADED
+  mt_section_unlock();
+#endif
 }
 
 void
@@ -193,6 +213,9 @@ releaseSections(Uint32 secCount, Segment
   Uint32 tSz1 = ptr[1].sz;
   Uint32 tSec2 = ptr[2].i;
   Uint32 tSz2 = ptr[2].sz;
+#ifdef NDBD_MULTITHREADED
+  mt_section_lock();
+#endif
   switch(secCount){
   case 3:
     g_sectionSegmentPool.releaseList(relSz(tSz2), tSec2, 
@@ -204,6 +227,9 @@ releaseSections(Uint32 secCount, Segment
     g_sectionSegmentPool.releaseList(relSz(tSz0), tSec0, 
 				     ptr[0].p->m_lastSegment);
   case 0:
+#ifdef NDBD_MULTITHREADED
+    mt_section_unlock();
+#endif
     return;
   }
   char msg[40];
@@ -230,7 +256,7 @@ execute(void * callbackObj, 
 	   getBlockName(refToBlock(header->theSendersBlockRef)),
 	   refToNode(header->theSendersBlockRef));
 #endif
-  
+
   bool ok = true;
   Ptr<SectionSegment> secPtr[3];
   switch(secCount){
@@ -247,7 +273,14 @@ execute(void * callbackObj, 
    */
   ok &= (length + secCount <= 25);
   
+#ifndef NDBD_MULTITHREADED
   Uint32 secPtrI[3];
+#else
+  SignalT<25> signalT;
+  Uint32 * secPtrI = signalT.theData + length;
+  signalT.header = * header;
+  memcpy(signalT.theData, theData, 4*length);
+#endif
   if(ok){
     /**
      * Normal path 
@@ -256,31 +289,66 @@ execute(void * callbackObj, 
     secPtrI[1] = secPtr[1].i;
     secPtrI[2] = secPtr[2].i;
 
+#ifndef NDBD_MULTITHREADED
     globalScheduler.execute(header, prio, theData, secPtrI);  
+#else
+    if (prio == JBB)
+      sendlocal(receiverThreadId, signalT.header.theReceiversBlockNumber,
+                &signalT.header);
+    else
+      sendprioa(receiverThreadId, signalT.header.theReceiversBlockNumber,
+                &signalT.header);
+
+#endif
     return;
   }
   
   /**
    * Out of memory
    */
+#ifdef NDBD_MULTITHREADED
+  mt_section_lock();
+#endif
   for(Uint32 i = 0; i<secCount; i++){
     if(secPtr[i].p != 0){
       g_sectionSegmentPool.releaseList(relSz(ptr[i].sz), secPtr[i].i, 
 				       secPtr[i].p->m_lastSegment);
     }
   }
+#ifdef NDBD_MULTITHREADED
+  mt_section_unlock();
+#endif
+
+
+#ifndef NDBD_MULTITHREADED
+  SignalDroppedRep * rep = (SignalDroppedRep*)theData;
+#else
+  SignalDroppedRep * rep = (SignalDroppedRep*)signalT.theData;
+#endif
   Uint32 gsn = header->theVerId_signalNumber;
   Uint32 len = header->theLength;
   Uint32 newLen= (len > 22 ? 22 : len);
-  SignalDroppedRep * rep = (SignalDroppedRep*)theData;
   memmove(rep->originalData, theData, (4 * newLen));
   rep->originalGsn = gsn;
   rep->originalLength = len;
   rep->originalSectionCount = secCount;
+#ifndef NDBD_MULTITHREADED
   header->theVerId_signalNumber = GSN_SIGNAL_DROPPED_REP;
   header->theLength = newLen + 3;
   header->m_noOfSections = 0;
   globalScheduler.execute(header, prio, theData, secPtrI);    
+#else
+  signalT.header.theVerId_signalNumber = GSN_SIGNAL_DROPPED_REP;
+  signalT.header.theLength = newLen + 3;
+  signalT.header.m_noOfSections = 0;
+  if (prio == JBB)
+    sendlocal(receiverThreadId, signalT.header.theReceiversBlockNumber,
+              &signalT.header);
+  else
+    sendprioa(receiverThreadId, signalT.header.theReceiversBlockNumber,
+              &signalT.header);
+    
+#endif
 }
 
 NdbOut & 
@@ -323,7 +391,11 @@ checkJobBuffer() {
    * Check to see if jobbbuffers are starting to get full
    * and if so call doJob
    */
+#ifndef NDBD_MULTITHREADED
   return globalScheduler.checkDoJob();
+#else
+  return 0;
+#endif
 }
 
 void
@@ -389,7 +461,14 @@ reportError(void * callbackObj, NodeId n
   signal.header.theLength = 3;  
   signal.header.theSendersSignalId = 0;
   signal.header.theSendersBlockRef = numberToRef(0, globalData.ownId);
+#ifndef NDBD_MULTITHREADED
   globalScheduler.execute(&signal, JBA, CMVMI, GSN_EVENT_REP);
+#else
+  signal.header.theVerId_signalNumber = GSN_EVENT_REP;
+  signal.header.theReceiversBlockNumber = CMVMI;
+//  sendprioa(THREAD LOCAL STORAGE, signal.header.theReceiversBlockNumber, &signalT.header);
+  assert(false);
+#endif
 
   DBUG_VOID_RETURN;
 }
@@ -411,7 +490,14 @@ reportSendLen(void * callbackObj, 
   signal.theData[0] = NDB_LE_SendBytesStatistic;
   signal.theData[1] = nodeId;
   signal.theData[2] = (bytes/count);
+#ifndef NDBD_MULTITHREADED
   globalScheduler.execute(&signal, JBA, CMVMI, GSN_EVENT_REP);
+#else
+  signal.header.theVerId_signalNumber = GSN_EVENT_REP;
+  signal.header.theReceiversBlockNumber = CMVMI;
+  sendprioa(senderThreadId, signal.header.theReceiversBlockNumber,
+            &signalT.header);
+#endif
 }
 
 /**
@@ -431,7 +517,14 @@ reportReceiveLen(void * callbackObj, 
   signal.theData[0] = NDB_LE_ReceiveBytesStatistic;
   signal.theData[1] = nodeId;
   signal.theData[2] = (bytes/count);
+#ifndef NDBD_MULTITHREADED
   globalScheduler.execute(&signal, JBA, CMVMI, GSN_EVENT_REP);
+#else
+  signal.header.theVerId_signalNumber = GSN_EVENT_REP;
+  signal.header.theReceiversBlockNumber = CMVMI;
+  sendprioa(receiverThreadId, signal.header.theReceiversBlockNumber,
+            &signalT.header);
+#endif
 }
 
 /**
@@ -450,7 +543,14 @@ reportConnect(void * callbackObj, NodeId
   signal.header.theSendersBlockRef = numberToRef(0, globalData.ownId);
   signal.theData[0] = nodeId;
   
+#ifndef NDBD_MULTITHREADED
   globalScheduler.execute(&signal, JBA, CMVMI, GSN_CONNECT_REP);
+#else
+  signal.header.theVerId_signalNumber = GSN_CONNECT_REP;
+  signal.header.theReceiversBlockNumber = CMVMI;
+  sendprioa(senderThreadId, signal.header.theReceiversBlockNumber,
+            &signalT.header);
+#endif
 }
 
 /**
@@ -474,7 +574,13 @@ reportDisconnect(void * callbackObj, Nod
   rep->nodeId = nodeId;
   rep->err = errNo;
 
+#ifndef NDBD_MULTITHREADED
   globalScheduler.execute(&signal, JBA, CMVMI, GSN_DISCONNECT_REP);
+#else
+  signal.header.theVerId_signalNumber = GSN_DISCONNECT_REP;
+  signal.header.theReceiversBlockNumber = CMVMI;
+  sendlocal(0, signal.header.theReceiversBlockNumber, &signalT.header);
+#endif
 
   DBUG_VOID_RETURN;
 }
diff -Nrup a/storage/ndb/src/kernel/vm/TransporterCallback_mt.cpp b/storage/ndb/src/kernel/vm/TransporterCallback_mt.cpp
--- /dev/null	Wed Dec 31 16:00:00 196900
+++ b/storage/ndb/src/kernel/vm/TransporterCallback_mt.cpp	2007-09-24 17:12:18 +02:00
@@ -0,0 +1,19 @@
+/* Copyright (C) 2007 MySQL AB
+
+   This program is free software; you can redistribute it and/or modify
+   it under the terms of the GNU General Public License as published by
+   the Free Software Foundation; version 2 of the License.
+
+   This program is distributed in the hope that it will be useful,
+   but WITHOUT ANY WARRANTY; without even the implied warranty of
+   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+   GNU General Public License for more details.
+
+   You should have received a copy of the GNU General Public License
+   along with this program; if not, write to the Free Software
+   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
+
+/* This stub file defines the SimulatedBlock class for multithreaded NDBD. */
+#define NDBD_MULTITHREADED
+#include "mt/mt.hpp"
+#include "TransporterCallback.cpp"
diff -Nrup a/storage/ndb/src/kernel/vm/TransporterCallback_nonmt.cpp b/storage/ndb/src/kernel/vm/TransporterCallback_nonmt.cpp
--- /dev/null	Wed Dec 31 16:00:00 196900
+++ b/storage/ndb/src/kernel/vm/TransporterCallback_nonmt.cpp	2007-09-24 17:12:18 +02:00
@@ -0,0 +1,18 @@
+/* Copyright (C) 2007 MySQL AB
+
+   This program is free software; you can redistribute it and/or modify
+   it under the terms of the GNU General Public License as published by
+   the Free Software Foundation; version 2 of the License.
+
+   This program is distributed in the hope that it will be useful,
+   but WITHOUT ANY WARRANTY; without even the implied warranty of
+   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+   GNU General Public License for more details.
+
+   You should have received a copy of the GNU General Public License
+   along with this program; if not, write to the Free Software
+   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
+
+/* This stub file defines the SimulatedBlock class for singlethreaded NDBD. */
+#undef NDBD_MULTITHREADED
+#include "TransporterCallback.cpp"
diff -Nrup a/storage/ndb/src/kernel/vm/WatchDog.cpp b/storage/ndb/src/kernel/vm/WatchDog.cpp
--- a/storage/ndb/src/kernel/vm/WatchDog.cpp	2007-06-09 07:25:44 +02:00
+++ b/storage/ndb/src/kernel/vm/WatchDog.cpp	2007-09-24 17:12:18 +02:00
@@ -37,15 +37,17 @@ runWatchDog(void* w){
 }
 
 WatchDog::WatchDog(Uint32 interval) : 
-  theIPValue(globalData.getWatchDogPtr())
+  m_watchedCount(0)
 {
   setCheckInterval(interval);
+  m_mutex = NdbMutex_Create();
   theStop = false;
   theThreadPtr = 0;
 }
 
 WatchDog::~WatchDog(){
   doStop();
+  NdbMutex_Destroy(m_mutex);
 }
 
 Uint32
@@ -54,6 +56,50 @@ WatchDog::setCheckInterval(Uint32 interv
   return theInterval = (interval < 70 ? 70 : interval);
 }
 
+bool
+WatchDog::registerWatchedThread(Uint32 *counter, Uint32 threadId)
+{
+  bool ret;
+
+  NdbMutex_Lock(m_mutex);
+
+  if (m_watchedCount >= MAX_WATCHED_THREADS)
+  {
+    ret = false;
+  }
+  else
+  {
+    m_watchedList[m_watchedCount].m_watchCounter = counter;
+    m_watchedList[m_watchedCount].m_threadId = threadId;
+    NdbTick_getMicroTimer(&(m_watchedList[m_watchedCount].m_startTime));
+    m_watchedList[m_watchedCount].m_slowWarnDelay = theInterval;
+    m_watchedList[m_watchedCount].m_lastCounterValue = 0;
+    ++m_watchedCount;
+    ret = true;
+  }
+
+  NdbMutex_Unlock(m_mutex);
+  return ret;
+}
+
+void
+WatchDog::unregisterWatchedThread(Uint32 threadId)
+{
+  Uint32 i;
+  NdbMutex_Lock(m_mutex);
+
+  for (i = 0; i < m_watchedCount; i++)
+  {
+    if (threadId == m_watchedList[i].m_threadId)
+      break;
+  }
+  assert(i < m_watchedCount);
+  m_watchedList[i] = m_watchedList[m_watchedCount - 1];
+  --m_watchedCount;
+
+  NdbMutex_Unlock(m_mutex);
+}
+
 void
 WatchDog::doStart(){
   theStop = false;
@@ -115,12 +161,17 @@ const char *get_action(Uint32 IPValue)
 void 
 WatchDog::run()
 {
-  unsigned int anIPValue, sleep_time;
-  unsigned int oldIPValue = 0;
-  unsigned int theIntervalCheck = theInterval;
-  struct MicroSecondTimer start_time, last_time, now;
-  NdbTick_getMicroTimer(&start_time);
-  last_time = start_time;
+  unsigned int sleep_time;
+  struct MicroSecondTimer last_time, now;
+  Uint32 numThreads;
+  Uint32 counterValue[MAX_WATCHED_THREADS];
+  Uint32 oldCounterValue[MAX_WATCHED_THREADS];
+  Uint32 threadId[MAX_WATCHED_THREADS];
+  struct MicroSecondTimer start_time[MAX_WATCHED_THREADS];
+  Uint32 theIntervalCheck[MAX_WATCHED_THREADS];
+  Uint32 elapsed[MAX_WATCHED_THREADS];
+
+  NdbTick_getMicroTimer(&last_time);
 
   // WatchDog for the single threaded NDB
   while (!theStop)
@@ -145,33 +196,65 @@ WatchDog::run()
     }
     last_time = now;
 
-    // Verify that the IP thread is not stuck in a loop
-    anIPValue = *theIPValue;
-    if (anIPValue != 0)
+    /*
+      Copy out all active counters under locked mutex, then check them
+      afterwards without holding the mutex.
+    */
+    NdbMutex_Lock(m_mutex);
+    numThreads = m_watchedCount;
+    for (Uint32 i = 0; i < numThreads; i++)
     {
-      oldIPValue = anIPValue;
-      globalData.incrementWatchDogCounter(0);
-      NdbTick_getMicroTimer(&start_time);
-      theIntervalCheck = theInterval;
+      counterValue[i] = *(m_watchedList[i].m_watchCounter);
+      if (counterValue[i] != 0)
+      {
+        /*
+          The thread responded since last check, so just update state until
+          next check.
+
+          There is a small race here. If the thread changes the counter
+          in-between the read and setting to zero here in the watchdog
+          thread, then gets stuck immediately after, we may report the
+          wrong action that it got stuck on.
+          But there will be no reporting of non-stuck thread because of
+          this race, nor will there be missed reporting.
+        */
+        *(m_watchedList[i].m_watchCounter) = 0;
+        m_watchedList[i].m_startTime = now;
+        m_watchedList[i].m_slowWarnDelay = theInterval;
+        m_watchedList[i].m_lastCounterValue = counterValue[i];
+      }
+      else
+      {
+        start_time[i] = m_watchedList[i].m_startTime;
+        threadId[i] = m_watchedList[i].m_threadId;
+        oldCounterValue[i] = m_watchedList[i].m_lastCounterValue;
+        theIntervalCheck[i] = m_watchedList[i].m_slowWarnDelay;
+        elapsed[i] = NdbTick_getMicrosPassed(start_time[i], now)/1000;
+        if (oldCounterValue[i] == 9 && elapsed[i] >= theIntervalCheck[i])
+          m_watchedList[i].m_slowWarnDelay += theInterval;
+      }
     }
-    else
+    NdbMutex_Unlock(m_mutex);
+
+    /*
+      Now check each watched thread if it has reported progress since previous
+      check. Warn about any stuck threads, and eventually force shutdown the
+      server.
+    */
+    for (Uint32 i = 0; i < numThreads; i++)
     {
-      int warn = 1;
-      Uint32 elapsed = NdbTick_getMicrosPassed(start_time, now)/1000;
+      if (counterValue[i] != 0)
+        continue;
+
       /*
-        oldIPValue == 9 indicates malloc going on, this can take some time
+        Counter value == 9 indicates malloc going on, this can take some time
         so only warn if we pass the watchdog interval
       */
-      if (oldIPValue == 9)
-        if (elapsed < theIntervalCheck)
-          warn = 0;
-        else
-          theIntervalCheck += theInterval;
-
-      if (warn)
+      if (oldCounterValue[i] != 9 || elapsed[i] >= theIntervalCheck[i])
       {
-        const char *last_stuck_action = get_action(oldIPValue);
-        g_eventLogger.warning("Ndb kernel is stuck in: %s", last_stuck_action);
+        const char *last_stuck_action = get_action(oldCounterValue[i]);
+        g_eventLogger.warning("Ndb kernel thread %u is stuck in: %s",
+                              threadId[i], last_stuck_action);
         {
           struct tms my_tms;
           times(&my_tms);
@@ -179,7 +262,7 @@ WatchDog::run()
                              (Uint64)my_tms.tms_utime,
                              (Uint64)my_tms.tms_stime);
         }
-        if (elapsed > 3 * theInterval)
+        if (elapsed[i] > 3 * theInterval)
         {
           shutdownSystem(last_stuck_action);
         }
diff -Nrup a/storage/ndb/src/kernel/vm/WatchDog.hpp b/storage/ndb/src/kernel/vm/WatchDog.hpp
--- a/storage/ndb/src/kernel/vm/WatchDog.hpp	2006-12-23 20:20:20 +01:00
+++ b/storage/ndb/src/kernel/vm/WatchDog.hpp	2007-09-24 17:12:18 +02:00
@@ -18,10 +18,31 @@
 
 #include <kernel_types.h>
 #include <NdbThread.h>
+#include <NdbMutex.h>
+#include <NdbTick.h>
 
 extern "C" void* runWatchDog(void* w);
 
 class WatchDog{
+  enum { MAX_WATCHED_THREADS = 64 };
+
+  struct WatchedThread {
+    Uint32 *m_watchCounter;
+    Uint32 m_threadId;
+    /* This is the time that activity was last registered from thread. */
+    MicroSecondTimer m_startTime;
+    /*
+      During slow operation (memory allocation), warnings are output less
+      frequently, and this is the point when the next warning should be given.
+     */
+    Uint32 m_slowWarnDelay;
+    /*
+      This is the last counter value update seen, telling us what the thread
+      was doing when it got stuck.
+    */
+    Uint32 m_lastCounterValue;
+  };
+
 public:
   WatchDog(Uint32 interval = 3000);
   ~WatchDog();
@@ -30,7 +51,15 @@ public:
   void doStop();
 
   Uint32 setCheckInterval(Uint32 interval);
-  
+
+  /*
+    Register a thread for being watched.
+    Returns true if ok, false if out of slots.
+  */
+  bool registerWatchedThread(Uint32 *counter, Uint32 threadId);
+  /* Remove a thread from registration, identified by thread id. */
+  void unregisterWatchedThread(Uint32 threadId);
+
 protected:
   /**
    * Thread function
@@ -44,8 +73,17 @@ protected:
   
 private:
   Uint32 theInterval;
-  const Uint32 * theIPValue;
-  
+  /*
+    List of watched threads.
+    Threads are identified by the m_threadId.
+    Active entries are kept at the start of the entries.
+    Access to the list is protected by m_mutex.
+  */
+  WatchedThread m_watchedList[MAX_WATCHED_THREADS];
+  /* Number of active entries in m_watchedList. */
+  Uint32 m_watchedCount;
+  NdbMutex *m_mutex;
+
   bool theStop;
   
   void run();
diff -Nrup a/storage/ndb/src/kernel/vm/mt/dummy_mt.cpp b/storage/ndb/src/kernel/vm/mt/dummy_mt.cpp
--- /dev/null	Wed Dec 31 16:00:00 196900
+++ b/storage/ndb/src/kernel/vm/mt/dummy_mt.cpp	2007-09-24 17:12:18 +02:00
@@ -0,0 +1,83 @@
+#include "FastScheduler.hpp"
+#include "RefConvert.hpp"
+
+#include "Emulator.hpp"
+#include "VMSignal.hpp"
+
+#include <SignalLoggerManager.hpp>
+#include <BlockNumbers.h>
+#include <GlobalSignalNumbers.h>
+#include <signaldata/EventReport.hpp>
+#include "LongSignal.hpp"
+#include <NdbTick.h>
+
+FastScheduler::FastScheduler()
+{
+}
+
+FastScheduler::~FastScheduler()
+{
+}
+
+void
+FastScheduler::clear()
+{
+}
+
+void
+FastScheduler::dumpSignalMemory(FILE* out)
+{
+  /*
+    In single-threaded ndbd, we set the watchdog counter to 4 here to note that
+    we are dumping.
+
+    But for multi-threaded ndbd, we will need to first stop all
+    threads anyway, so we might as well just de-register them from the
+    watchdog, and this will not be needed.
+  */
+}
+
+
+void bnr_error()
+{
+}
+
+void jbuf_error()
+{
+}
+
+void
+FastScheduler::prio_level_error()
+{
+}
+
+APZJobBuffer::APZJobBuffer()
+{
+}
+
+APZJobBuffer::~APZJobBuffer()
+{
+}
+
+void
+APZJobBuffer::insert(const SignalHeader * const sh,
+		     const Uint32 * const theData, const Uint32 secPtrI[3]){
+}
+
+void 
+APZJobBuffer::signal2buffer(Signal* signal,
+			    BlockNumber bnr, GlobalSignalNumber gsn,
+			    BufferEntry& buf)
+{
+}
+
+#include <TimeQueue.hpp>
+
+TimeQueue::TimeQueue()
+{
+}
+
+TimeQueue::~TimeQueue()
+{
+}
+
diff -Nrup a/storage/ndb/src/kernel/vm/mt/mt.cpp b/storage/ndb/src/kernel/vm/mt/mt.cpp
--- /dev/null	Wed Dec 31 16:00:00 196900
+++ b/storage/ndb/src/kernel/vm/mt/mt.cpp	2007-09-24 17:12:18 +02:00
@@ -0,0 +1,1202 @@
+#include <sys/uio.h>
+#include <stdio.h>
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <fcntl.h>
+#include <unistd.h>
+#include <pthread.h>
+#include <sys/time.h>
+#include <errno.h>
+#include <assert.h>
+#include <linux/futex.h>
+#include <stdlib.h>
+#include <string.h>
+
+#include <VMSignal.hpp>
+#include <kernel_types.h>
+#include <Prio.hpp>
+#include <SignalLoggerManager.hpp>
+#include <SimulatedBlock.hpp>
+#include <ErrorHandlingMacros.hpp>
+#include <GlobalData.hpp>
+#include <WatchDog.hpp>
+#include <TransporterDefinitions.hpp>
+#include "mt.hpp"
+#include <DebuggerNames.hpp>
+
+#ifndef _GNU_SOURCE
+#define _GNU_SOURCE
+#endif
+#include <unistd.h>
+#include <sys/syscall.h>
+#include <sys/types.h>
+
+static const Uint32 NUM_THREADS = 3;
+#define MAX_THREADS 63
+
+static inline
+int
+xcng(volatile unsigned * addr, int val)
+{
+  asm volatile ("xchg %0, %1;" : "+r" (val) , "+m" (*addr));
+  return val;
+}
+
+#ifdef USE_FUTEX
+
+static inline
+int
+futex_wait(volatile unsigned * addr, int val, const struct timespec * timeout)
+{
+  return syscall(SYS_futex,
+                 FUTEX_WAIT, val, timeout, 0, 0) == 0 ? 0 : errno;
+}
+
+static inline
+int
+futex_wake(volatile unsigned * addr)
+{
+  return syscall(SYS_futex, addr, FUTEX_WAKE, 1, 0, 0, 0) == 0 ? 0 : errno;
+}
+
+struct thr_wait
+{
+  volatile unsigned m_futex_state;
+  enum {
+    FS_RUNNING = 0,
+    FS_SLEEPING = 1,
+  };
+  thr_wait() { xcng(&m_futex_state, FS_RUNNING);}
+};
+
+static
+void
+yield(struct thr_wait* wait, const struct timespec *timeout)
+{
+  volatile unsigned * val = &wait->m_futex_state;
+  int old = xcng(val, thr_wait::FS_SLEEPING);
+  assert(old == thr_wait::FS_RUNNING);
+  int ret;
+  ret = futex_wait(val, thr_wait::FS_SLEEPING, timeout);
+  if (ret == ETIMEDOUT || ret == EINTR)
+  {
+    xcng(val, thr_wait::FS_RUNNING);
+  }
+  else
+  {
+    int old = xcng(val, thr_wait::FS_RUNNING);
+    if (old != thr_wait::FS_RUNNING)
+    {
+      printf("ret %u old: %u in wakeup\n", ret, old);
+    }
+    //assert(ret == thr_wait::FS_RUNNING);
+  }
+}
+
+static
+int
+wakeup(struct thr_wait* wait)
+{
+  volatile unsigned * val = &wait->m_futex_state;
+  if (xcng(val, thr_wait::FS_RUNNING) == thr_wait::FS_SLEEPING)
+  {
+    return futex_wake(val);
+  }
+  return 0;
+}
+#else
+#include <NdbMutex.h>
+#include <NdbCondition.h>
+
+struct thr_wait
+{
+  NdbMutex *m_mutex;
+  NdbCondition *m_cond;
+  thr_wait() {
+    m_mutex = NdbMutex_Create();
+    m_cond = NdbCondition_Create();
+    NdbMutex_Lock(m_mutex);
+  }
+};
+
+static
+void
+yield(struct thr_wait* wait, const struct timespec *timeout)
+{
+  NdbCondition_WaitTimeout(wait->m_cond, wait->m_mutex, 1);
+}
+
+static
+int
+wakeup(struct thr_wait* wait)
+{
+  NdbCondition_Signal(wait->m_cond);
+  return 0;
+}
+#endif
+
+inline void require(bool x)
+{
+  if (unlikely(!(x)))
+    abort();
+}
+
+struct thr_spin_lock
+{
+  thr_spin_lock(const char * name) { m_lock = 0; m_name = name;}
+
+  const char * m_name;
+  volatile unsigned m_lock;
+};
+
+struct thr_mutex
+{
+  thr_mutex(const char * name) {
+    m_mutex = NdbMutex_Create();
+    m_name = name;
+  }
+
+  const char * m_name;
+  NdbMutex * m_mutex;
+};
+
+static
+inline
+void
+lock(struct thr_spin_lock* sl)
+{
+  volatile unsigned* val = &sl->m_lock;
+test:
+  if (likely(xcng(val, 1) == 0))
+    return;
+
+  printf("%s waiting for lock\n", sl->m_name);
+
+  while (* val == 1);
+  goto test;
+}
+
+static
+inline
+void
+unlock(struct thr_spin_lock* sl)
+{
+  sl->m_lock = 0;
+}
+
+static
+inline
+int
+trylock(struct thr_spin_lock* sl)
+{
+  volatile unsigned* val = &sl->m_lock;
+  return xcng(val, 1);
+}
+
+static
+inline
+void
+lock(struct thr_mutex* sl)
+{
+  NdbMutex_Lock(sl->m_mutex);
+}
+
+static
+inline
+void
+unlock(struct thr_mutex* sl)
+{
+  NdbMutex_Unlock(sl->m_mutex);
+}
+
+static
+inline
+int
+trylock(struct thr_mutex * sl)
+{
+  return NdbMutex_Trylock(sl->m_mutex);
+}
+
+/**
+ *
+ */
+struct thr_job_buffer // 32k
+{
+  static const unsigned SIZE = 8190;
+
+  unsigned m_len;
+  unsigned m_pos;
+  unsigned m_data[SIZE];
+};  
+
+struct thr_job_queue
+{
+  static const unsigned SIZE = 62;
+
+  unsigned m_read_index; // used by consumer
+  unsigned m_write_index; // used by producer
+  struct thr_job_buffer* m_buffers[SIZE];
+};
+
+struct thr_jba 
+{
+  static const unsigned SIZE = 4;
+  thr_jba() : m_write_lock("jbalock") {}
+
+  unsigned m_read_index;
+  volatile unsigned m_write_index;
+  struct thr_spin_lock m_write_lock;
+  struct thr_job_buffer* m_next_buffer;
+  struct thr_job_buffer* m_buffers[SIZE];
+};
+
+struct thr_tq
+{
+  static const unsigned SQ_SIZE = 512;
+  static const unsigned LQ_SIZE = 512;
+  static const unsigned PAGES = 32 * (SQ_SIZE + LQ_SIZE) / 8192;
+  
+  Uint32 m_next_timer;
+  Uint32 m_current_time;
+  Uint32 m_next_free;
+  Uint32 m_cnt[2];
+  Uint32 * m_delayed_signals[PAGES];
+  Uint32 m_short_queue[SQ_SIZE];
+  Uint32 m_long_queue[LQ_SIZE];
+};
+
+struct thr_data
+{
+  thr_wait m_waiter;
+  unsigned m_thr_no;
+  struct SimulatedBlock* m_blocks[NO_OF_BLOCKS];
+  
+  Uint64 m_time;
+  struct thr_jba m_jba;
+  struct thr_tq m_tq;
+  unsigned m_free;
+  struct thr_job_buffer* m_free_list;
+  struct thr_job_buffer* m_out_queue[MAX_THREADS];
+  struct thr_job_queue m_in_queue[MAX_THREADS];
+};
+
+#define DBG_MALLOC 0
+#define THR_TO_REP(selfptr) &g_thr_repository
+
+template<typename T>
+struct thr_safe_pool
+{
+  thr_safe_pool() : m_lock("mempool"), m_free_list(0) {}
+
+  thr_spin_lock m_lock;
+  T* m_free_list;
+
+  T* seize() {
+    T* ret = 0;
+    lock(&m_lock);
+    if (m_free_list)
+    {
+      ret = m_free_list;
+      m_free_list = *reinterpret_cast<T**>(m_free_list);
+    }
+    else
+    {
+      ret = reinterpret_cast<T*>(malloc(sizeof(T)));
+    }
+    unlock(&m_lock);
+    return ret;
+  }
+
+  void release(T* t){
+    lock(&m_lock);
+    T** nextptr = reinterpret_cast<T**>(t);
+    * nextptr = m_free_list;
+    m_free_list = t;
+    unlock(&m_lock);
+  }
+};
+
+struct thr_repository
+{
+  thr_repository()
+    : m_send_lock("sendlock"),
+      m_receive_lock("recvlock"),
+      m_section_lock("sectionlock") {}
+
+  unsigned m_thread_count;
+  struct thr_spin_lock m_send_lock;
+  struct thr_spin_lock m_receive_lock;
+  struct thr_spin_lock m_section_lock;
+  struct thr_data m_thread[MAX_THREADS];
+  struct thr_safe_pool<thr_job_buffer> m_free_list;
+};
+
+static
+thr_job_buffer*
+seize_buffer(struct thr_repository* rep, int thr_no)
+{
+  thr_job_buffer* jb;
+  thr_data* selfptr = rep->m_thread + thr_no;
+  if (likely((jb = selfptr->m_free_list) != 0))
+  {
+    selfptr->m_free --;
+    thr_job_buffer* next = *reinterpret_cast<thr_job_buffer**>(jb);
+    rep->m_thread[thr_no].m_free_list = next;
+    jb->m_len = 0;
+    jb->m_pos = 0;
+    return jb;
+  }
+  
+  /* global alloc */
+  jb = rep->m_free_list.seize();
+  jb->m_len = 0;
+  jb->m_pos = 0;
+  return jb;
+}
+
+static
+void
+release_buffer(struct thr_repository* rep, int thr_no, thr_job_buffer* jb)
+{
+  struct thr_data* selfptr = rep->m_thread + thr_no;
+
+  if (selfptr->m_free >= 32)
+  {
+    rep->m_free_list.release(jb);
+  }
+  else
+  {
+    thr_job_buffer** next = reinterpret_cast<thr_job_buffer**>(jb);
+    * next = selfptr->m_free_list;
+    selfptr->m_free_list = jb;
+    selfptr->m_free ++;
+  }
+}
+
+static
+void
+transfer_buffer(struct thr_repository* rep, unsigned from, unsigned to)
+{
+  require(from < NUM_THREADS && to < NUM_THREADS);
+  unsigned old;
+  unsigned * writeptr = &(rep->m_thread[to].m_in_queue[from].m_write_index);
+  volatile unsigned * readptr = &(rep->m_thread[to].m_in_queue[from].m_read_index);
+  unsigned readidx = * readptr;
+  unsigned writeidx = * writeptr;
+  unsigned nextidx = (writeidx + 1) % thr_job_queue::SIZE;
+  thr_job_buffer* src = rep->m_thread[from].m_out_queue[to];
+
+  if (unlikely(readidx == nextidx))
+    goto check_full;
+
+  if (0)
+    ndbout_c("transfer from: %d to: %d (read: %d write: %d)",
+	     from, to, readidx, writeidx);
+  
+do_transfer:  
+  rep->m_thread[to].m_in_queue[from].m_buffers[writeidx] = src;
+  // need storestore barrier
+  // but xcng is full barrier
+  old = xcng(writeptr, nextidx);
+  assert(old == writeidx);
+  wakeup(&rep->m_thread[to].m_waiter); // potentially wakeup
+  
+  rep->m_thread[from].m_out_queue[to] = seize_buffer(rep, from);
+  return ;
+check_full:
+  ndbout_c("sleep in transfer from %u to %u", from, to);
+  for (unsigned i = 0; i<1000; i++)
+  {
+    usleep(1000);
+    if (* readptr != nextidx)
+      goto do_transfer;
+  }
+  
+  abort();
+}
+
+static
+unsigned
+dojob(struct thr_data* selfptr, struct thr_job_buffer* ptr, struct Signal* sig,
+      Uint32 *watchDogCounter)
+{
+  unsigned cnt = 0;
+  unsigned pos = ptr->m_pos;
+  unsigned end = ptr->m_len;
+  unsigned *posptr = ptr->m_data + pos;
+  unsigned *endptr = ptr->m_data + end;
+  SimulatedBlock** blockptr = selfptr->m_blocks - MIN_BLOCK_NO;
+  
+  ptr->m_pos = end;
+  
+  while (posptr < endptr)
+  {
+    SignalHeader* s = reinterpret_cast<SignalHeader*>(posptr);
+    unsigned seccnt = s->m_noOfSections;
+    unsigned siglen = (sizeof(*s)>>2) + s->theLength;
+    unsigned bno = s->theReceiversBlockNumber;
+    unsigned gsn = s->theVerId_signalNumber;
+    SimulatedBlock * block = * (blockptr + bno);
+    *watchDogCounter = 1;
+    memcpy(&sig->header, s, 4*siglen);
+    sig->m_sectionPtrI[0] = posptr[siglen + 0];
+    sig->m_sectionPtrI[1] = posptr[siglen + 1];
+    sig->m_sectionPtrI[2] = posptr[siglen + 2];
+    block->executeFunction(gsn, sig);
+    
+    cnt++;
+    posptr += siglen + seccnt;
+  }
+  
+  return cnt;
+}
+
+static
+unsigned
+dojoba(struct thr_repository * rep, unsigned thr_no,
+       Signal* signal, unsigned write_index,
+       Uint32 *watchDogCounter)
+{
+  unsigned sum = 0;
+  struct thr_job_buffer* buffer;
+  struct thr_data* selfptr = rep->m_thread + thr_no;
+  unsigned read_index = selfptr->m_jba.m_read_index;
+  unsigned wi_buf = write_index >> 16;
+  //unsigned wi_pos = write_index & 0xFFFF;
+
+  unsigned ri_buf = read_index >> 16;
+  while (ri_buf != wi_buf)
+  {
+    buffer = * (selfptr->m_jba.m_buffers + ri_buf);
+    sum += dojob(selfptr, buffer, signal, watchDogCounter);
+    release_buffer(rep, thr_no, buffer);
+    ri_buf = (ri_buf + 1) % thr_jba::SIZE;
+  }
+  
+  buffer = * (selfptr->m_jba.m_buffers + ri_buf);
+  sum += dojob(selfptr, buffer, signal, watchDogCounter);
+
+  unsigned ri_pos = buffer->m_pos;
+  selfptr->m_jba.m_read_index = (ri_buf << 16) + ri_pos;
+
+  return sum;
+}
+
+static
+inline
+Uint32
+scan_queue(struct thr_data* selfptr, Uint32 cnt, Uint32 end, Uint32* ptr)
+{
+  Uint32 thr_no = selfptr->m_thr_no;
+  Uint32 **pages = selfptr->m_tq.m_delayed_signals;
+  Uint32 free = selfptr->m_tq.m_next_free;
+  Uint32* save = ptr;
+  for (Uint32 i = 0; i < cnt; i++, ptr++)
+  {
+    Uint32 val = * ptr;
+    if ((val & 0xFFFF) <= end)
+    {
+      Uint32 idx = val >> 16;
+      Uint32 buf = idx >> 8;
+      Uint32 pos = 32 * (idx & 0xFF);
+
+      Uint32* page = * (pages + buf);
+
+      SignalHeader* s = reinterpret_cast<SignalHeader*>(page + pos);
+      if (0)
+	ndbout_c("found %p val: %d end: %d", s, val & 0xFFFF, end);
+      sendprioa(thr_no, s->theReceiversBlockNumber, s);
+      * (page + pos) = free;
+      free = idx;
+    }
+    else if (i > 0)
+    {
+      selfptr->m_tq.m_next_free = free;
+      memmove(save, ptr, 4 * (cnt - i));
+      return i;
+    }
+    else
+    {
+      return 0;
+    }
+  }
+  selfptr->m_tq.m_next_free = free;
+  return cnt;
+}
+
+static
+void
+handle_time_wrap(struct thr_data* selfptr)
+{
+  Uint32 i;
+  struct thr_tq * tq = &selfptr->m_tq;
+  Uint32 cnt0 = tq->m_cnt[0];
+  Uint32 cnt1 = tq->m_cnt[1];
+  Uint32 tmp0 = scan_queue(selfptr, cnt0, 32767, tq->m_short_queue);
+  Uint32 tmp1 = scan_queue(selfptr, cnt1, 32767, tq->m_long_queue);
+  cnt0 -= tmp0;
+  cnt1 -= tmp1;
+  tq->m_cnt[0] = cnt0;
+  tq->m_cnt[1] = cnt1;
+  for (i = 0; i<cnt0; i++)
+  {
+    assert((tq->m_short_queue[i] & 0xFFFF) > 32767);
+    tq->m_short_queue[i] -= 32767;
+  }
+  for (i = 0; i<cnt1; i++)
+  {
+    assert((tq->m_long_queue[i] & 0xFFFF) > 32767);
+    tq->m_long_queue[i] -= 32767;
+  }
+}
+
+static
+void
+scan_time_queues(struct thr_data* selfptr)
+{
+  struct thr_tq * tq = &selfptr->m_tq;
+  NDB_TICKS now = NdbTick_CurrentMillisecond();
+  NDB_TICKS last = selfptr->m_time;
+
+  Uint32 curr = tq->m_current_time;
+  Uint32 cnt0 = tq->m_cnt[0];
+  Uint32 cnt1 = tq->m_cnt[1];
+
+  Uint64 diff = now - last;
+  if (diff == 0)
+  {
+    return;
+  }
+  else if (diff > 0)
+  {
+    Uint32 step = (Uint32)((diff > 20) ? 20 : diff);
+    Uint32 end = (curr + step);
+    if (end >= 32767)
+    {
+      handle_time_wrap(selfptr);
+      cnt0 = tq->m_cnt[0];
+      cnt1 = tq->m_cnt[1];
+      end -= 32767;
+    }
+    
+    Uint32 tmp0 = scan_queue(selfptr, cnt0, end, tq->m_short_queue);
+    Uint32 tmp1 = scan_queue(selfptr, cnt1, end, tq->m_long_queue);
+
+    tq->m_current_time = end;
+    tq->m_cnt[0] = cnt0 - tmp0;
+    tq->m_cnt[1] = cnt1 - tmp1;
+    selfptr->m_time = last + step;
+    
+    return;
+  }
+  else if (diff == 0)
+  {
+    return;
+  }
+  abort();
+}
+
+Uint32 senderThreadId;
+
+static
+void
+do_send(struct thr_repository* rep, struct thr_data* selfptr,
+        Uint32 *watchDogCounter)
+{
+  static int cnt = 0;
+
+  senderThreadId = selfptr->m_thr_no;
+
+  if (cnt == 0)
+  {
+    *watchDogCounter = 5;
+    globalTransporterRegistry.update_connections();
+  }
+  cnt = (cnt + 1) & 15;
+
+  *watchDogCounter = 6;
+  globalTransporterRegistry.performSend();
+  unlock(&rep->m_send_lock);
+}
+
+Uint32 receiverThreadId;
+
+static
+void
+do_receive(struct thr_repository*rep, struct thr_data* selfptr, unsigned delay,
+           Uint32 *watchDogCounter)
+{
+  unsigned thr_no = selfptr->m_thr_no;
+  *watchDogCounter = 7;
+  receiverThreadId = thr_no;
+  if (globalTransporterRegistry.pollReceive(delay)) 
+  {
+    *watchDogCounter = 8;
+    globalTransporterRegistry.performReceive();
+  }
+  unlock(&rep->m_receive_lock);
+  
+  unsigned cnt = rep->m_thread_count;
+  
+  /**
+   * Transfer all out buffers
+   */
+  for (unsigned i = 0; i<cnt; i++)
+  {
+    thr_job_buffer * jb = selfptr->m_out_queue[i];
+    if (jb->m_len)
+    {
+      transfer_buffer(rep, thr_no, i);
+    }
+  }
+}
+
+static
+inline
+void
+sendpacked(struct thr_data* selfptr, Signal* signal, Uint32 thr_no)
+{
+  SimulatedBlock** blockptr = selfptr->m_blocks - MIN_BLOCK_NO;
+  SimulatedBlock* b_lqh = * (blockptr + DBLQH);
+  SimulatedBlock* b_tc = * (blockptr + DBTC);
+  SimulatedBlock* b_tup = * (blockptr + DBTUP);
+  if (b_lqh)
+    b_lqh->executeFunction(GSN_SEND_PACKED, signal);
+  if (b_tc)
+    b_tc->executeFunction(GSN_SEND_PACKED, signal);
+  if (b_tup)
+    b_tup->executeFunction(GSN_SEND_PACKED, signal);
+}
+
+static inline Uint32
+block2ThreadId(Uint32 block)
+{
+  /*
+    Block assignment:
+       0 PGMAN    BACKUP            LOCAL
+       1 LGMAN    DBTC      GLOBAL       
+       2 TSMAN    DBDIH     GLOBAL       
+       3 ACC      DBLQH             LOCAL
+       4 CMVMI    DBACC             LOCAL
+       5 FS       DBTUP             LOCAL
+       6 DICT     DBDICT    GLOBAL       
+       7 DIH      NDBCNTR   GLOBAL       
+       8 LQH      QMGR      GLOBAL       
+       9 TC       NDBFS     GLOBAL       
+      10 TUP      CMVMI     GLOBAL       
+      11 NDBCNTR  TRIX      GLOBAL       
+      12 QMGR     DBUTIL    GLOBAL       
+      13 TRIX     SUMA              LOCAL
+      14 BACKUP   DBTUX             LOCAL
+      15 UTIL     TSMAN             LOCAL
+      16 SUMA     LGMAN             LOCAL
+      17 TUX      PGMAN             LOCAL
+      18 RESTORE  RESTORE           LOCAL
+
+    Thread 0 is for global, thread 1 for locals.
+  */
+  static const Uint32 locals = 0x7e039;
+  assert(block >= MIN_BLOCK_NO && block <= MAX_BLOCK_NO);
+  return (locals >> (block - MIN_BLOCK_NO)) & 1;
+}
+
+extern "C"
+void *
+mt_thr_main(void *thr_arg)
+{
+  Signal signal;
+  struct timespec nowait;
+  nowait.tv_sec = 0;
+  nowait.tv_nsec = 10 * 1000000;
+  EmulatedJamBuffer thread_jam;
+  Uint32 watchDogCounter;
+
+  thread_jam.theEmulatedJamIndex = 0;
+  thread_jam.theEmulatedJamBlockNumber = 0;
+  memset(thread_jam.theEmulatedJam, 0, sizeof(thread_jam.theEmulatedJam));
+  NdbThread_SetTlsKey(NDB_THREAD_TLS_JAM, &thread_jam);
+
+  struct thr_repository* rep = &g_thr_repository;
+  struct thr_data* selfptr = (struct thr_data *)thr_arg;
+  /*
+    The thread initialisation argument is void *, not numeric, so we obtain
+    the numeric thread id in this slightly backwards way.
+  */
+  unsigned thr_no = selfptr - &(rep->m_thread[0]);
+  pid_t tid = (unsigned)syscall(SYS_gettid);
+  ndbout_c("Tread %u started, tid=%u", thr_no, tid);
+  ndbout_c("lock to cpu %u", (thr_no & 1));
+  {
+    cpu_set_t mask;
+    CPU_ZERO(&mask);
+    CPU_SET((thr_no & 1), &mask);
+    sched_setaffinity(tid, sizeof(mask), &mask);
+  }
+
+  /*
+    Now we need to somehow assign to this thread all blocks that will run in
+    this thread.
+  */
+  for (Uint32 i = 0; i < NO_OF_BLOCKS; i++)
+  {
+    if (block2ThreadId(i + MIN_BLOCK_NO) == thr_no)
+    {
+      require(selfptr->m_blocks[i] != 0);
+      selfptr->m_blocks[i]->assignToThread(thr_no, &thread_jam,
+                                           &watchDogCounter);
+    }
+  }
+
+  /* Avoid false watchdog alarms caused by race condition. */
+  watchDogCounter = 1;
+  globalEmulatorData.theWatchDog->registerWatchedThread(&watchDogCounter,
+                                                        thr_no);
+
+  /**
+   * prio A
+   */
+  unsigned* a_re_idxptr = &selfptr->m_jba.m_read_index;
+  volatile unsigned * a_wr_idxptr = &selfptr->m_jba.m_write_index;
+
+  while (globalData.theRestartFlag != perform_stop)
+  { 
+    unsigned sum = 0;
+    unsigned cnt = rep->m_thread_count;
+
+    watchDogCounter = 2;
+    scan_time_queues(selfptr);
+    unsigned jba_read_index = * a_re_idxptr;
+    unsigned jba_write_index = * a_wr_idxptr;
+
+    /**
+     * Process each inbuffer
+     */
+    for (unsigned i = 0; i<cnt; i++)
+    {
+      if (jba_read_index != jba_write_index)
+      {
+	sum += dojoba(rep, thr_no, &signal, jba_write_index, &watchDogCounter);
+	jba_read_index = * a_re_idxptr;
+      }
+      
+      unsigned ri = selfptr->m_in_queue[i].m_read_index;
+      unsigned wi = selfptr->m_in_queue[i].m_write_index;
+      thr_job_buffer * jb = selfptr->m_in_queue[i].m_buffers[ri];
+      if (ri != wi)
+      {
+	ri = (ri + 1) % thr_job_queue::SIZE;
+	selfptr->m_in_queue[i].m_read_index = ri;
+	sum += dojob(selfptr, jb, &signal, &watchDogCounter);
+	release_buffer(rep, thr_no, jb);
+      }
+      jba_write_index = * a_wr_idxptr;
+    }
+    
+    watchDogCounter = 1;
+    sendpacked(selfptr, &signal, thr_no);
+    
+    if (sum)
+    {
+      /**
+       * Transfer all out buffers
+       */
+      watchDogCounter = 6;
+      for (unsigned i = 0; i<cnt; i++)
+      {
+	thr_job_buffer * jb = selfptr->m_out_queue[i];
+	if (jb->m_len)
+	{
+	  transfer_buffer(rep, thr_no, i);
+	}
+      }
+    }
+    
+    if (trylock(&rep->m_send_lock) == 0)
+    {
+      do_send(rep, selfptr, &watchDogCounter);
+    }
+    
+    if (
+thr_no == 2 &&
+trylock(&rep->m_receive_lock) == 0)
+    {
+      do_receive(rep, selfptr, sum ? 0 : 1, &watchDogCounter);
+    }
+    else if (sum == 0)
+    {
+      yield(&selfptr->m_waiter, &nowait);
+    }
+  }
+
+  globalEmulatorData.theWatchDog->unregisterWatchedThread(thr_no);
+  return NULL;                  // Return value not currently used
+}
+
+void
+thr_transporter_main(struct thr_repository* rep, unsigned thr_no)
+{
+}
+
+static
+inline
+int
+insert(thr_job_buffer* ptr, const struct SignalHeader* s)
+{
+  unsigned len = ptr->m_len;
+  unsigned* pos = ptr->m_data + len;
+  unsigned siglen = (sizeof(*s) >> 2) + s->theLength + s->m_noOfSections;
+  ptr->m_len = len + siglen;
+  memcpy(pos, s, 4*siglen);
+  return thr_job_buffer::SIZE - (len + siglen + 32); // > 0 not full, <=0 full
+}
+
+void
+sendlocal(Uint32 self, Uint32 block, const SignalHeader* s)
+{
+  Uint32 dst = block2ThreadId(block);
+  struct thr_repository* rep = &g_thr_repository;
+  struct thr_data * selfptr = rep->m_thread + self;
+
+  int res = insert(selfptr->m_out_queue[dst], s);
+  if (res <= 0)
+  {
+    transfer_buffer(rep, self, dst);
+  }
+}
+
+void
+sendprioa(Uint32 self, Uint32 block, const SignalHeader* s)
+{
+  Uint32 dst = block2ThreadId(block);
+  struct thr_repository* rep = &g_thr_repository;
+  struct thr_data * selfptr = rep->m_thread + self;
+  struct thr_data * dstptr = rep->m_thread + dst;
+
+  unsigned siglen = (sizeof(*s) >> 2) + s->theLength + s->m_noOfSections;
+  
+  struct thr_job_buffer* newbuffer = selfptr->m_jba.m_next_buffer;
+
+  lock(&dstptr->m_jba.m_write_lock);
+  
+  unsigned wi = dstptr->m_jba.m_write_index;
+  unsigned buf = wi >> 16;
+  unsigned newwi = wi + siglen;
+  unsigned pos = wi & 0xFFFF;
+  unsigned newpos = pos + siglen;
+
+  struct thr_job_buffer * buffer = * (dstptr->m_jba.m_buffers + buf);
+  assert(buffer->m_len == pos);
+  buffer->m_len = newpos;
+  memcpy(buffer->m_data + pos, s, 4*siglen);
+
+  if (0)
+    ndbout_c("sendprioa %s from %s to %s",
+	     getSignalName(s->theVerId_signalNumber),
+	     getBlockName(refToBlock(s->theSendersBlockRef)),
+	     getBlockName(s->theReceiversBlockNumber));
+  
+  if (unlikely(newpos + 32 >= thr_job_buffer::SIZE))
+  {
+    newwi = (buf + 1) % thr_jba::SIZE;
+    * (dstptr->m_jba.m_buffers + newwi) = newbuffer;
+    newwi = (newwi << 16);
+  }
+  dstptr->m_jba.m_write_index = newwi;
+  
+  unlock(&dstptr->m_jba.m_write_lock);
+
+  /**
+   * Note, do malloc outside of critical region
+   */
+  if (newpos + 32 >= thr_job_buffer::SIZE)
+  {
+    selfptr->m_jba.m_next_buffer = seize_buffer(rep, self);
+  }
+}
+
+static
+inline
+Uint32*
+get_free_slot(struct thr_repository* rep, 
+	      struct thr_data* selfptr, 
+	      Uint32* idxptr)
+{
+  struct thr_tq * tq = &selfptr->m_tq;
+  Uint32 idx = tq->m_next_free;
+retry:
+  Uint32 buf = idx >> 8;
+  Uint32 pos = idx & 0xFF;
+
+  if (idx != RNIL)
+  {
+    Uint32* page = * (tq->m_delayed_signals + buf);
+    Uint32* ptr = page + (32 * pos);
+    tq->m_next_free = * ptr;
+    * idxptr = idx;
+    return ptr;
+  }
+
+  Uint32 thr_no = selfptr->m_thr_no;
+  for (Uint32 i = 0; i<thr_tq::PAGES; i++)
+  {
+    if (tq->m_delayed_signals[i] == 0)
+    {
+      struct thr_job_buffer *jb = seize_buffer(rep, thr_no);
+      Uint32 * page = reinterpret_cast<Uint32*>(jb);
+      tq->m_delayed_signals[i] = page;
+   
+      ndbout_c("saving %p at %p (%d)", page, tq->m_delayed_signals+i, i);
+   
+      /**
+       * Init page
+       */
+      for (Uint32 j = 0; j<255; j ++)
+      {
+	page[j * 32] = (i << 8) + (j + 1);
+      }
+      page[255*32] = RNIL;
+      idx = (i << 8);
+      goto retry;
+    }
+  }
+  abort();
+}
+
+void
+senddelay(Uint32 thr_no, const SignalHeader* s, Uint32 delay)
+{
+  struct thr_repository* rep = &g_thr_repository;
+  struct thr_data * selfptr = rep->m_thread + thr_no;
+  unsigned siglen = (sizeof(*s) >> 2) + s->theLength + s->m_noOfSections;
+
+  Uint32 max;
+  Uint32 * cntptr;
+  Uint32 * queueptr;
+
+  Uint32 alarm = selfptr->m_tq.m_current_time + delay;
+  Uint32 nexttimer = selfptr->m_tq.m_next_timer;
+  if (delay < 100)
+  {
+    cntptr = selfptr->m_tq.m_cnt + 0;
+    queueptr = selfptr->m_tq.m_short_queue;
+    max = thr_tq::SQ_SIZE;
+  }
+  else
+  {
+    cntptr = selfptr->m_tq.m_cnt + 1;
+    queueptr = selfptr->m_tq.m_long_queue;
+    max = thr_tq::LQ_SIZE;
+  }
+
+  Uint32 idx;
+  Uint32* ptr = get_free_slot(rep, selfptr, &idx);
+  memcpy(ptr, s, 4*siglen);
+
+  if (0)
+    ndbout_c("now: %d alarm: %d send %s from %s to %s delay: %d idx: %x %p", 
+	     selfptr->m_tq.m_current_time,
+	     alarm,
+	     getSignalName(s->theVerId_signalNumber),
+	     getBlockName(refToBlock(s->theSendersBlockRef)),
+	     getBlockName(s->theReceiversBlockNumber),
+	     delay,
+	     idx, ptr);
+  
+  Uint32 i;
+  Uint32 cnt = *cntptr;
+  Uint32 newentry = (idx << 16) | (alarm & 0xFFFF);
+
+  * cntptr = cnt + 1;
+  selfptr->m_tq.m_next_timer = alarm < nexttimer ? alarm : nexttimer;
+  
+  if (cnt == 0)
+  {
+    queueptr[0] = newentry;
+    return;
+  }
+  else if (cnt < max)
+  {
+    for (i = 0; i<cnt; i++)
+    {
+      Uint32 save = queueptr[i];
+      if ((save & 0xFFFF) > alarm)
+      {
+	memmove(queueptr+i+1, queueptr+i, 4*(cnt - i));
+	queueptr[i] = newentry;
+	return;
+      }
+    }
+    assert(i == cnt);
+    queueptr[i] = newentry;
+    return;
+  }
+  else
+  {
+    abort();
+  }
+}
+
+static
+void
+init(struct thr_tq* tq)
+{
+  tq->m_next_timer = 0;
+  tq->m_current_time = 0;
+  tq->m_next_free = RNIL;
+  tq->m_cnt[0] = tq->m_cnt[1] = 0;
+  bzero(tq->m_delayed_signals, sizeof(tq->m_delayed_signals));
+}
+
+static
+void
+init(struct thr_repository* rep, struct thr_data *selfptr, unsigned int cnt,
+     unsigned thr_no)
+{
+  unsigned int i;
+
+  selfptr->m_thr_no = thr_no;
+  selfptr->m_free_list = 0;
+  for (i = 0; i<cnt; i++)
+    selfptr->m_out_queue[i] = seize_buffer(rep, thr_no);
+  
+  selfptr->m_free = 0;
+  selfptr->m_jba.m_read_index = 0;
+  selfptr->m_jba.m_write_index = 0;
+  selfptr->m_jba.m_buffers[0] = seize_buffer(rep, thr_no);
+  selfptr->m_jba.m_next_buffer = seize_buffer(rep, thr_no);
+  
+  for (i = 0; i<cnt; i++)
+  {
+    selfptr->m_in_queue[i].m_read_index = 0;
+    selfptr->m_in_queue[i].m_write_index = 0;
+  }
+
+  init(&selfptr->m_tq);
+}
+
+static
+void
+init(struct thr_repository* rep, unsigned int cnt)
+{
+  rep->m_thread_count = cnt;
+  for (unsigned int i = 0; i<cnt; i++)
+  {
+    init(rep, rep->m_thread + i, cnt, i);
+  }
+}
+
+
+/**
+ * Thread Config
+ */
+
+#include "ThreadConfig.hpp"
+#include <signaldata/StartOrd.hpp>
+
+ThreadConfig::ThreadConfig()
+{
+  init(&g_thr_repository, NUM_THREADS);
+}
+
+ThreadConfig::~ThreadConfig()
+{
+}
+
+
+void ThreadConfig::ipControlLoop()
+{
+  unsigned int i;
+  unsigned int thr_no;
+  struct thr_repository* rep = &g_thr_repository;
+  NdbThread *threads[NUM_THREADS - 1];
+
+  /*
+    Start threads for all execution threads, except for thread 0, which
+    runs in the main thread.
+  */
+  for (thr_no = 0; thr_no < NUM_THREADS; thr_no++)
+  {
+    for (i = 0; i<NO_OF_BLOCKS; i++)
+    {
+      if (block2ThreadId(i + MIN_BLOCK_NO) == thr_no)
+      {
+        rep->m_thread[thr_no].m_blocks[i] =
+          globalData.getBlock(i + MIN_BLOCK_NO);
+      }
+    }
+    rep->m_thread[thr_no].m_time = NdbTick_CurrentMillisecond();
+
+    if (thr_no == 0)
+      continue;                 // Will run in the main thread.
+    /*
+      The NdbThread_Create() takes void **, but that is cast to void * when
+      passed to the mt_thr_main(). Which is kind of strange ...
+    */
+    threads[thr_no - 1] = NdbThread_Create(mt_thr_main,
+                                           (void **)(&(rep->m_thread[thr_no])),
+                                           1024*1024,
+                                           "execute thread", //ToDo add number
+                                           NDB_THREAD_PRIO_MEAN);
+    assert(threads[thr_no - 1] != NULL);
+  }
+
+  /* Now run the main loop for thread 0 directly. */
+  mt_thr_main(&(rep->m_thread[0]));
+
+  /* Wait for all threads to shutdown. */
+  for (thr_no = 1; thr_no < NUM_THREADS; thr_no++)
+  {
+    void *dummy_return_status;
+    NdbThread_WaitFor(threads[thr_no - 1], &dummy_return_status);
+    NdbThread_Destroy(&(threads[thr_no - 1]));
+  }
+}
+
+int
+ThreadConfig::doStart(NodeState::StartLevel startLevel)
+{
+  SignalT<3> signalT;
+  memset(&signalT.header, 0, sizeof(SignalHeader));
+  
+  signalT.header.theVerId_signalNumber   = GSN_START_ORD;
+  signalT.header.theReceiversBlockNumber = CMVMI;
+  signalT.header.theSendersBlockRef      = 0;
+  signalT.header.theTrace                = 0;
+  signalT.header.theSignalId             = 0;
+  signalT.header.theLength               = StartOrd::SignalLength;
+  
+  StartOrd * const  startOrd = (StartOrd *)&signalT.theData[0];
+  startOrd->restartInfo = 0;
+  
+  senddelay(0, &signalT.header, 1);
+  return 0;
+}
+
+void
+mt_send_lock()
+{
+  lock(&(g_thr_repository.m_send_lock));
+}
+
+void
+mt_send_unlock()
+{
+  unlock(&(g_thr_repository.m_send_lock));
+}
+
+void
+mt_section_lock()
+{
+  lock(&(g_thr_repository.m_section_lock));
+}
+
+void
+mt_section_unlock()
+{
+  unlock(&(g_thr_repository.m_section_lock));
+}
+
+/**
+ * Global data
+ */
+struct thr_repository g_thr_repository;
diff -Nrup a/storage/ndb/src/kernel/vm/mt/mt.hpp b/storage/ndb/src/kernel/vm/mt/mt.hpp
--- /dev/null	Wed Dec 31 16:00:00 196900
+++ b/storage/ndb/src/kernel/vm/mt/mt.hpp	2007-09-24 17:12:18 +02:00
@@ -0,0 +1,30 @@
+#include <kernel_types.h>
+
+#ifndef ndb_mt_hpp
+#define ndb_mt_hpp
+
+
+/*
+  For now, we use locks to only have one thread at the time running in the
+  transporter as sender, and only one as receiver.
+
+  Thus, we can use a global variable to record the id of the current
+  transporter threads. Only valid while holding the transporter receive lock.
+*/
+extern Uint32 receiverThreadId;
+extern Uint32 senderThreadId;
+
+void sendlocal(Uint32 self, Uint32 dst, const struct SignalHeader*);
+void sendprioa(Uint32 self, Uint32 dst, const struct SignalHeader*);
+void senddelay(Uint32 thr_no, const struct SignalHeader*, Uint32 delay);
+
+void mt_send_lock();
+void mt_send_unlock();
+
+/**
+ * Lock/unlock pools for long signal section(s)
+ */
+void mt_section_lock();
+void mt_section_unlock();
+
+#endif
diff -Nrup a/storage/ndb/src/kernel/vm/pc.hpp b/storage/ndb/src/kernel/vm/pc.hpp
--- a/storage/ndb/src/kernel/vm/pc.hpp	2006-12-23 20:20:20 +01:00
+++ b/storage/ndb/src/kernel/vm/pc.hpp	2007-09-24 17:12:18 +02:00
@@ -27,57 +27,45 @@
 #define jamLine(line)
 #define jamEntry()
 #define jamEntryLine(line)
+#define jamBlock(block)
+#define jamBlockLine(block, line)
+#define jamEntryBlock(block)
+#define jamEntryBlockLine(block, line)
+#define jamNoBlock()
+#define jamNoBlockLine(line)
 
 #else
-#ifdef NDB_WIN32
 
-#define jam() { \
-  Uint32 tEmulatedJamIndex = theEmulatedJamIndex; \
-  *(Uint32*)(theEmulatedJam + tEmulatedJamIndex) = __LINE__; \
-  theEmulatedJamIndex = (tEmulatedJamIndex + 4) & JAM_MASK; }
-#define jamLine(line) { \
-  Uint32 tEmulatedJamIndex = theEmulatedJamIndex; \
-  *(Uint32*)(theEmulatedJam + tEmulatedJamIndex) = line; \
-  theEmulatedJamIndex = (tEmulatedJamIndex + 4) & JAM_MASK; }
-#define jamEntry() { \
-  theEmulatedJamBlockNumber = number(); \
-  Uint32 tEmulatedJamIndex = theEmulatedJamIndex; \
-  *(Uint32*)(theEmulatedJam + tEmulatedJamIndex) = \
-    ((theEmulatedJamBlockNumber << 20) | __LINE__); \
-  theEmulatedJamIndex = (tEmulatedJamIndex + 4) & JAM_MASK; }
-#define jamEntryLine(line) { \
-  theEmulatedJamBlockNumber = number(); \
-  Uint32 tEmulatedJamIndex = theEmulatedJamIndex; \
-  *(Uint32*)(theEmulatedJam + tEmulatedJamIndex) = \
-    ((theEmulatedJamBlockNumber << 20) | line); \
-  theEmulatedJamIndex = (tEmulatedJamIndex + 4) & JAM_MASK; }
-
-#else
-
-#define jam() { \
-  Uint32 tEmulatedJamIndex = theEmulatedJamIndex; \
-  *(Uint32*)((UintPtr)theEmulatedJam + (Uint32)tEmulatedJamIndex) = __LINE__; \
-  theEmulatedJamIndex = (tEmulatedJamIndex + 4) & JAM_MASK; }
-#define jamLine(line) { \
-  Uint32 tEmulatedJamIndex = theEmulatedJamIndex; \
-  *(Uint32*)((UintPtr)theEmulatedJam + (Uint32)tEmulatedJamIndex) = line; \
-  theEmulatedJamIndex = (tEmulatedJamIndex + 4) & JAM_MASK; }
-#define jamEntry() { \
-  theEmulatedJamBlockNumber = number(); \
-  Uint32 tEmulatedJamIndex = theEmulatedJamIndex; \
-  *(Uint32*)((UintPtr)theEmulatedJam + (Uint32)tEmulatedJamIndex) = \
-    ((theEmulatedJamBlockNumber << 20) | __LINE__); \
-  theEmulatedJamIndex = (tEmulatedJamIndex + 4) & JAM_MASK; }
-#define jamEntryLine(line) { \
-  theEmulatedJamBlockNumber = number(); \
-  Uint32 tEmulatedJamIndex = theEmulatedJamIndex; \
-  *(Uint32*)((UintPtr)theEmulatedJam + (Uint32)tEmulatedJamIndex) = \
-    ((theEmulatedJamBlockNumber << 20) | line); \
-  theEmulatedJamIndex = (tEmulatedJamIndex + 4) & JAM_MASK; }
+#define _jamBlockLine(block, line, jam_buf_getter) \
+  do { \
+    EmulatedJamBuffer *jamBuffer = jam_buf_getter; \
+    Uint32 jamIndex = jamBuffer->theEmulatedJamIndex; \
+    jamBuffer->theEmulatedJam[jamIndex++] = line; \
+    jamBuffer->theEmulatedJamIndex = jamIndex & JAM_MASK; \
+  } while(0)
+#define jamBlockLine(block, line) \
+   _jamBlockLine(block, line, (block)->jamBuffer())
+#define jamBlock(block) jamBlockLine(block, __LINE__)
+#define jamLine(line) jamBlockLine(this, line)
+#define jam() jamLine(__LINE__)
+#define jamBlockEntryLine(block, line) \
+  do { \
+    EmulatedJamBuffer *jamBuffer = (block)->jamBuffer(); \
+    Uint32 blockNumber= (block)->number(); \
+    Uint32 jamIndex = jamBuffer->theEmulatedJamIndex; \
+    jamBuffer->theEmulatedJam[jamIndex++] = (blockNumber << 20) | line; \
+    jamBuffer->theEmulatedJamBlockNumber = blockNumber; \
+    jamBuffer->theEmulatedJamIndex = jamIndex & JAM_MASK; \
+  } while(0)
+#define jamEntryBlock(block) jamEntryBlockLine(block, __LINE__)
+#define jamEntryLine(line) jamBlockEntryLine(this, line)
+#define jamEntry() jamEntryLine(__LINE__)
+#define jamNoBlockLine(line) _jamBlockLine \
+    (block, line, (EmulatedJamBuffer *)NdbThread_GetTlsKey(NDB_THREAD_TLS_JAM))
+#define jamNoBlock() jamNoBlockLine(__LINE__)
 
 #endif
 
-#endif
 #ifndef NDB_OPT
 #define ptrCheck(ptr, limit, rec) if (ptr.i < (limit)) ptr.p = &rec[ptr.i]; else ptr.p = NULL
 
Thread
bk commit into 5.1 tree (knielsen:1.2610)knielsen24 Sep