Below is the list of changes that have just been committed into a local
5.1 repository of knielsen. When knielsen does a push these changes will
be propagated to the main repository and, within 24 hours after the
push, to the public repository.
For information on how to access the public repository
see http://dev.mysql.com/doc/mysql/en/installing-source-tree.html
ChangeSet@stripped, 2007-09-24 17:12:21+02:00, knielsen@ymer.(none) +41 -0
ndbmtd import 1
configure.in@stripped, 2007-09-24 17:12:17+02:00, knielsen@ymer.(none) +15 -14
ndbmtd import 1
storage/ndb/include/portlib/NdbThread.h@stripped, 2007-09-24 17:12:17+02:00, knielsen@ymer.(none) +13 -9
ndbmtd import 1
storage/ndb/src/common/portlib/NdbThread.c@stripped, 2007-09-24 17:12:17+02:00, knielsen@ymer.(none) +17 -0
ndbmtd import 1
storage/ndb/src/kernel/Makefile.am@stripped, 2007-09-24 17:12:17+02:00, knielsen@ymer.(none) +6 -2
ndbmtd import 1
storage/ndb/src/kernel/blocks/backup/Backup.cpp@stripped, 2007-09-24 17:12:17+02:00, knielsen@ymer.(none) +2 -2
ndbmtd import 1
storage/ndb/src/kernel/blocks/backup/Backup.hpp@stripped, 2007-09-24 17:12:17+02:00, knielsen@ymer.(none) +3 -0
ndbmtd import 1
storage/ndb/src/kernel/blocks/cmvmi/Cmvmi.cpp@stripped, 2007-09-24 17:12:17+02:00, knielsen@ymer.(none) +2 -5
ndbmtd import 1
storage/ndb/src/kernel/blocks/dbdict/Dbdict.cpp@stripped, 2007-09-24 17:12:17+02:00, knielsen@ymer.(none) +15 -13
ndbmtd import 1
storage/ndb/src/kernel/blocks/dbdih/DbdihMain.cpp@stripped, 2007-09-24 17:12:17+02:00, knielsen@ymer.(none) +2 -2
ndbmtd import 1
storage/ndb/src/kernel/blocks/dblqh/DblqhInit.cpp@stripped, 2007-09-24 17:12:17+02:00, knielsen@ymer.(none) +1 -0
ndbmtd import 1
storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp@stripped, 2007-09-24 17:12:17+02:00, knielsen@ymer.(none) +1 -1
ndbmtd import 1
storage/ndb/src/kernel/blocks/dbtc/DbtcInit.cpp@stripped, 2007-09-24 17:12:17+02:00, knielsen@ymer.(none) +1 -0
ndbmtd import 1
storage/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp@stripped, 2007-09-24 17:12:17+02:00, knielsen@ymer.(none) +2 -2
ndbmtd import 1
storage/ndb/src/kernel/blocks/dbtup/Dbtup.hpp@stripped, 2007-09-24 17:12:17+02:00, knielsen@ymer.(none) +1 -0
ndbmtd import 1
storage/ndb/src/kernel/blocks/dbtup/DbtupExecQuery.cpp@stripped, 2007-09-24 17:12:17+02:00, knielsen@ymer.(none) +1 -1
ndbmtd import 1
storage/ndb/src/kernel/blocks/dbtup/DbtupGen.cpp@stripped, 2007-09-24 17:12:17+02:00, knielsen@ymer.(none) +1 -0
ndbmtd import 1
storage/ndb/src/kernel/blocks/dbutil/DbUtil.cpp@stripped, 2007-09-24 17:12:17+02:00, knielsen@ymer.(none) +1 -1
ndbmtd import 1
storage/ndb/src/kernel/blocks/ndbcntr/Ndbcntr.hpp@stripped, 2007-09-24 17:12:17+02:00, knielsen@ymer.(none) +2 -0
ndbmtd import 1
storage/ndb/src/kernel/blocks/ndbcntr/NdbcntrMain.cpp@stripped, 2007-09-24 17:12:17+02:00, knielsen@ymer.(none) +1 -1
ndbmtd import 1
storage/ndb/src/kernel/blocks/suma/Suma.cpp@stripped, 2007-09-24 17:12:17+02:00, knielsen@ymer.(none) +22 -23
ndbmtd import 1
storage/ndb/src/kernel/blocks/suma/Suma.hpp@stripped, 2007-09-24 17:12:17+02:00, knielsen@ymer.(none) +3 -0
ndbmtd import 1
storage/ndb/src/kernel/error/ErrorReporter.cpp@stripped, 2007-09-24 17:12:17+02:00, knielsen@ymer.(none) +41 -17
ndbmtd import 1
storage/ndb/src/kernel/main.cpp@stripped, 2007-09-24 17:12:17+02:00, knielsen@ymer.(none) +1 -0
ndbmtd import 1
storage/ndb/src/kernel/vm/Emulator.cpp@stripped, 2007-09-24 17:12:18+02:00, knielsen@ymer.(none) +12 -3
ndbmtd import 1
storage/ndb/src/kernel/vm/Emulator.hpp@stripped, 2007-09-24 17:12:18+02:00, knielsen@ymer.(none) +9 -8
ndbmtd import 1
storage/ndb/src/kernel/vm/GlobalData.hpp@stripped, 2007-09-24 17:12:18+02:00, knielsen@ymer.(none) +2 -2
ndbmtd import 1
storage/ndb/src/kernel/vm/Makefile.am@stripped, 2007-09-24 17:12:18+02:00, knielsen@ymer.(none) +17 -9
ndbmtd import 1
storage/ndb/src/kernel/vm/SimulatedBlock.cpp@stripped, 2007-09-24 17:12:18+02:00, knielsen@ymer.(none) +103 -1
ndbmtd import 1
storage/ndb/src/kernel/vm/SimulatedBlock.hpp@stripped, 2007-09-24 17:12:18+02:00, knielsen@ymer.(none) +29 -1
ndbmtd import 1
storage/ndb/src/kernel/vm/SimulatedBlock_mt.cpp@stripped, 2007-09-24 17:12:18+02:00, knielsen@ymer.(none) +19 -0
New BitKeeper file ``storage/ndb/src/kernel/vm/SimulatedBlock_mt.cpp''
storage/ndb/src/kernel/vm/SimulatedBlock_mt.cpp@stripped, 2007-09-24 17:12:18+02:00, knielsen@ymer.(none) +0 -0
storage/ndb/src/kernel/vm/SimulatedBlock_nonmt.cpp@stripped, 2007-09-24 17:12:18+02:00, knielsen@ymer.(none) +18 -0
New BitKeeper file ``storage/ndb/src/kernel/vm/SimulatedBlock_nonmt.cpp''
storage/ndb/src/kernel/vm/SimulatedBlock_nonmt.cpp@stripped, 2007-09-24 17:12:18+02:00, knielsen@ymer.(none) +0 -0
storage/ndb/src/kernel/vm/ThreadConfig.cpp@stripped, 2007-09-24 17:12:18+02:00, knielsen@ymer.(none) +7 -0
ndbmtd import 1
storage/ndb/src/kernel/vm/TransporterCallback.cpp@stripped, 2007-09-24 17:12:18+02:00, knielsen@ymer.(none) +108 -2
ndbmtd import 1
storage/ndb/src/kernel/vm/TransporterCallback_mt.cpp@stripped, 2007-09-24 17:12:18+02:00, knielsen@ymer.(none) +19 -0
New BitKeeper file ``storage/ndb/src/kernel/vm/TransporterCallback_mt.cpp''
storage/ndb/src/kernel/vm/TransporterCallback_mt.cpp@stripped, 2007-09-24 17:12:18+02:00, knielsen@ymer.(none) +0 -0
storage/ndb/src/kernel/vm/TransporterCallback_nonmt.cpp@stripped, 2007-09-24 17:12:18+02:00, knielsen@ymer.(none) +18 -0
New BitKeeper file ``storage/ndb/src/kernel/vm/TransporterCallback_nonmt.cpp''
storage/ndb/src/kernel/vm/TransporterCallback_nonmt.cpp@stripped, 2007-09-24 17:12:18+02:00, knielsen@ymer.(none) +0 -0
storage/ndb/src/kernel/vm/WatchDog.cpp@stripped, 2007-09-24 17:12:18+02:00, knielsen@ymer.(none) +111 -28
ndbmtd import 1
storage/ndb/src/kernel/vm/WatchDog.hpp@stripped, 2007-09-24 17:12:18+02:00, knielsen@ymer.(none) +41 -3
ndbmtd import 1
storage/ndb/src/kernel/vm/mt/dummy_mt.cpp@stripped, 2007-09-24 17:12:18+02:00, knielsen@ymer.(none) +83 -0
New BitKeeper file ``storage/ndb/src/kernel/vm/mt/dummy_mt.cpp''
storage/ndb/src/kernel/vm/mt/dummy_mt.cpp@stripped, 2007-09-24 17:12:18+02:00, knielsen@ymer.(none) +0 -0
storage/ndb/src/kernel/vm/mt/mt.cpp@stripped, 2007-09-24 17:12:18+02:00, knielsen@ymer.(none) +1202 -0
New BitKeeper file ``storage/ndb/src/kernel/vm/mt/mt.cpp''
storage/ndb/src/kernel/vm/mt/mt.cpp@stripped, 2007-09-24 17:12:18+02:00, knielsen@ymer.(none) +0 -0
storage/ndb/src/kernel/vm/mt/mt.hpp@stripped, 2007-09-24 17:12:18+02:00, knielsen@ymer.(none) +30 -0
New BitKeeper file ``storage/ndb/src/kernel/vm/mt/mt.hpp''
storage/ndb/src/kernel/vm/mt/mt.hpp@stripped, 2007-09-24 17:12:18+02:00, knielsen@ymer.(none) +0 -0
storage/ndb/src/kernel/vm/pc.hpp@stripped, 2007-09-24 17:12:18+02:00, knielsen@ymer.(none) +33 -45
ndbmtd import 1
diff -Nrup a/configure.in b/configure.in
--- a/configure.in 2007-09-14 11:58:44 +02:00
+++ b/configure.in 2007-09-24 17:12:17 +02:00
@@ -884,6 +884,21 @@ then
AC_CHECK_FUNC(gtty, , AC_CHECK_LIB(compat, gtty))
fi
+#
+#
+#
+case "$target" in
+ *-*-aix4* | *-*-sco*)
+ # (grr) aix 4.3 has a stub for clock_gettime, (returning ENOSYS)
+ # and using AC_TRY_RUN is hard when cross-compiling
+ # We also disable for SCO for the time being, the headers for the
+ # thread library we use conflicts with other headers.
+ ;;
+ *) AC_CHECK_LIB(rt, clock_gettime)
+ AC_CHECK_FUNCS(clock_gettime)
+ ;;
+esac
+
# We make a special variable for non-threaded version of LIBS to avoid
# including thread libs into non-threaded version of MySQL client library.
# Later in this script LIBS will be augmented with a threads library.
@@ -1942,20 +1957,6 @@ AC_CHECK_FUNCS(alarm bcmp bfill bmove bs
snprintf socket stpcpy strcasecmp strerror strsignal strnlen strpbrk strstr \
strtol strtoll strtoul strtoull tell tempnam thr_setconcurrency vidattr \
posix_fallocate)
-
-#
-#
-#
-case "$target" in
- *-*-aix4* | *-*-sco*)
- # (grr) aix 4.3 has a stub for clock_gettime, (returning ENOSYS)
- # and using AC_TRY_RUN is hard when cross-compiling
- # We also disable for SCO for the time being, the headers for the
- # thread library we use conflicts with other headers.
- ;;
- *) AC_CHECK_FUNCS(clock_gettime)
- ;;
-esac
# Check that isinf() is available in math.h and can be used in both C and C++
# code
diff -Nrup a/storage/ndb/include/portlib/NdbThread.h b/storage/ndb/include/portlib/NdbThread.h
--- a/storage/ndb/include/portlib/NdbThread.h 2006-12-23 20:20:08 +01:00
+++ b/storage/ndb/include/portlib/NdbThread.h 2007-09-24 17:12:17 +02:00
@@ -30,6 +30,11 @@ typedef enum NDB_THREAD_PRIO_ENUM {
NDB_THREAD_PRIO_LOWEST
} NDB_THREAD_PRIO;
+typedef enum NDB_THREAD_TLS_ENUM {
+ NDB_THREAD_TLS_JAM, /* Jam buffer pointer. */
+ NDB_THREAD_TLS_MAX
+} NDB_THREAD_TLS;
+
typedef void* (NDB_THREAD_FUNC)(void*);
typedef void* NDB_THREAD_ARG;
typedef size_t NDB_THREAD_STACKSIZE;
@@ -92,18 +97,17 @@ void NdbThread_Exit(void *status);
*/
int NdbThread_SetConcurrencyLevel(int level);
+/**
+ * Fetch and set thread-local storage entry.
+ */
+void *NdbThread_GetTlsKey(NDB_THREAD_TLS key);
+void NdbThread_SetTlsKey(NDB_THREAD_TLS key, void *value);
+
+void NdbThread_Init();
+
#ifdef __cplusplus
}
#endif
#endif
-
-
-
-
-
-
-
-
-
diff -Nrup a/storage/ndb/src/common/portlib/NdbThread.c b/storage/ndb/src/common/portlib/NdbThread.c
--- a/storage/ndb/src/common/portlib/NdbThread.c 2006-12-23 20:20:12 +01:00
+++ b/storage/ndb/src/common/portlib/NdbThread.c 2007-09-24 17:12:17 +02:00
@@ -190,3 +190,20 @@ int NdbThread_SetConcurrencyLevel(int le
return 0;
#endif
}
+
+static pthread_key_t tls_keys[NDB_THREAD_TLS_MAX];
+
+void NdbThread_Init()
+{
+ pthread_key_create(&(tls_keys[NDB_THREAD_TLS_JAM]), NULL);
+}
+
+void *NdbThread_GetTlsKey(NDB_THREAD_TLS key)
+{
+ return pthread_getspecific(tls_keys[key]);
+}
+
+void NdbThread_SetTlsKey(NDB_THREAD_TLS key, void *value)
+{
+ pthread_setspecific(tls_keys[key], value);
+}
diff -Nrup a/storage/ndb/src/kernel/Makefile.am b/storage/ndb/src/kernel/Makefile.am
--- a/storage/ndb/src/kernel/Makefile.am 2006-12-31 01:26:56 +01:00
+++ b/storage/ndb/src/kernel/Makefile.am 2007-09-24 17:12:17 +02:00
@@ -17,9 +17,10 @@ SUBDIRS = vm error blocks
include $(top_srcdir)/storage/ndb/config/common.mk.am
-ndbbin_PROGRAMS = ndbd
+ndbbin_PROGRAMS = ndbd ndbmtd
ndbd_SOURCES = main.cpp SimBlockList.cpp
+ndbmtd_SOURCES = main.cpp SimBlockList.cpp
include $(top_srcdir)/storage/ndb/config/type_kernel.mk.am
@@ -41,7 +42,7 @@ INCLUDES += \
-I$(srcdir)/blocks/dbtux \
-I$(srcdir)/blocks
-LDADD += \
+LIBS= \
blocks/libblocks.a \
vm/libkernel.a \
error/liberror.a \
@@ -56,6 +57,9 @@ LDADD += \
$(top_builddir)/dbug/libdbug.a \
$(top_builddir)/mysys/libmysys.a \
$(top_builddir)/strings/libmystrings.a @NDB_SCI_LIBS@
+
+ndbd_LDADD = $(LIBS) vm/libsched.a
+ndbmtd_LDADD = $(LIBS) vm/libsched_mt.a
windoze-dsp: ndbd.dsp
diff -Nrup a/storage/ndb/src/kernel/blocks/backup/Backup.cpp b/storage/ndb/src/kernel/blocks/backup/Backup.cpp
--- a/storage/ndb/src/kernel/blocks/backup/Backup.cpp 2007-09-24 17:09:25 +02:00
+++ b/storage/ndb/src/kernel/blocks/backup/Backup.cpp 2007-09-24 17:12:17 +02:00
@@ -576,10 +576,10 @@ static Uint32 xps(Uint64 x, Uint64 ms)
float fs = ms;
if(ms == 0 || x == 0) {
- jam();
+ jamNoBlock();
return 0;
}//if
- jam();
+ jamNoBlock();
return ((Uint32)(1000.0f * (fx + fs/2.1f))) / ((Uint32)fs);
}
diff -Nrup a/storage/ndb/src/kernel/blocks/backup/Backup.hpp b/storage/ndb/src/kernel/blocks/backup/Backup.hpp
--- a/storage/ndb/src/kernel/blocks/backup/Backup.hpp 2007-06-09 07:25:43 +02:00
+++ b/storage/ndb/src/kernel/blocks/backup/Backup.hpp 2007-09-24 17:12:17 +02:00
@@ -312,6 +312,7 @@ public:
Backup & backup;
BlockNumber number() const { return backup.number(); }
+ EmulatedJamBuffer *jamBuffer() const { return backup.jamBuffer(); }
void progError(int line, int cause, const char * extra) {
backup.progError(line, cause, extra);
}
@@ -404,6 +405,7 @@ public:
void forceState(State s);
BlockNumber number() const { return backup.number(); }
+ EmulatedJamBuffer *jamBuffer() const { return backup.jamBuffer(); }
void progError(int line, int cause, const char * extra) {
backup.progError(line, cause, extra);
}
@@ -532,6 +534,7 @@ public:
Backup & backup;
BlockNumber number() const { return backup.number(); }
+ EmulatedJamBuffer *jamBuffer() const { return backup.jamBuffer(); }
void progError(int line, int cause, const char * extra) {
backup.progError(line, cause, extra);
}
diff -Nrup a/storage/ndb/src/kernel/blocks/cmvmi/Cmvmi.cpp b/storage/ndb/src/kernel/blocks/cmvmi/Cmvmi.cpp
--- a/storage/ndb/src/kernel/blocks/cmvmi/Cmvmi.cpp 2007-09-24 17:09:25 +02:00
+++ b/storage/ndb/src/kernel/blocks/cmvmi/Cmvmi.cpp 2007-09-24 17:12:17 +02:00
@@ -808,7 +808,7 @@ Cmvmi::execSTART_ORD(Signal* signal) {
}
if(globalData.theRestartFlag == system_started){
- jam()
+ jam();
/**
* START_ORD received when already started(ignored)
*/
@@ -817,7 +817,7 @@ Cmvmi::execSTART_ORD(Signal* signal) {
}
if(globalData.theRestartFlag == perform_stop){
- jam()
+ jam();
/**
* START_ORD received when stopping(ignored)
*/
@@ -866,9 +866,6 @@ Cmvmi::execSTART_ORD(Signal* signal) {
*
* Do Restart
*/
-
- globalScheduler.clear();
- globalTimeQueue.clear();
// Disconnect all nodes as part of the system restart.
// We need to ensure that we are starting up
diff -Nrup a/storage/ndb/src/kernel/blocks/dbdict/Dbdict.cpp b/storage/ndb/src/kernel/blocks/dbdict/Dbdict.cpp
--- a/storage/ndb/src/kernel/blocks/dbdict/Dbdict.cpp 2007-09-24 17:09:25 +02:00
+++ b/storage/ndb/src/kernel/blocks/dbdict/Dbdict.cpp 2007-09-24 17:12:17 +02:00
@@ -5710,14 +5710,14 @@ Dbdict::createTab_dih(Signal* signal,
static
void
calcLHbits(Uint32 * lhPageBits, Uint32 * lhDistrBits,
- Uint32 fid, Uint32 totalFragments)
+ Uint32 fid, Uint32 totalFragments, const Dbdict *dict)
{
Uint32 distrBits = 0;
Uint32 pageBits = 0;
Uint32 tmp = 1;
while (tmp < totalFragments) {
- jam();
+ jamBlock(dict);
tmp <<= 1;
distrBits++;
}//while
@@ -5772,7 +5772,7 @@ Dbdict::execADD_FRAGREQ(Signal* signal)
*/
Uint32 lhDistrBits = 0;
Uint32 lhPageBits = 0;
- ::calcLHbits(&lhPageBits, &lhDistrBits, fragId, fragCount);
+ ::calcLHbits(&lhPageBits, &lhDistrBits, fragId, fragCount, this);
Uint64 maxRows = tabPtr.p->maxRowsLow +
(((Uint64)tabPtr.p->maxRowsHigh) << 32);
@@ -9663,42 +9663,43 @@ Dbdict::createEventComplete_RT_USER_CREA
*/
void interpretUtilPrepareErrorCode(UtilPrepareRef::ErrorCode errorCode,
- Uint32& error, Uint32& line)
+ Uint32& error, Uint32& line,
+ const Dbdict *dict)
{
DBUG_ENTER("interpretUtilPrepareErrorCode");
switch (errorCode) {
case UtilPrepareRef::NO_ERROR:
- jam();
+ jamBlock(dict);
error = 1;
line = __LINE__;
DBUG_VOID_RETURN;
case UtilPrepareRef::PREPARE_SEIZE_ERROR:
- jam();
+ jamBlock(dict);
error = 748;
line = __LINE__;
DBUG_VOID_RETURN;
case UtilPrepareRef::PREPARE_PAGES_SEIZE_ERROR:
- jam();
+ jamBlock(dict);
error = 1;
line = __LINE__;
DBUG_VOID_RETURN;
case UtilPrepareRef::PREPARED_OPERATION_SEIZE_ERROR:
- jam();
+ jamBlock(dict);
error = 1;
line = __LINE__;
DBUG_VOID_RETURN;
case UtilPrepareRef::DICT_TAB_INFO_ERROR:
- jam();
+ jamBlock(dict);
error = 1;
line = __LINE__;
DBUG_VOID_RETURN;
case UtilPrepareRef::MISSING_PROPERTIES_SECTION:
- jam();
+ jamBlock(dict);
error = 1;
line = __LINE__;
DBUG_VOID_RETURN;
default:
- jam();
+ jamBlock(dict);
error = 1;
line = __LINE__;
DBUG_VOID_RETURN;
@@ -9774,7 +9775,7 @@ Dbdict::createEventUTIL_PREPARE(Signal*
ndbrequire((evntRecPtr.p = c_opCreateEvent.getPtr(evntRecPtr.i)) != NULL);
interpretUtilPrepareErrorCode(errorCode, evntRecPtr.p->m_errorCode,
- evntRecPtr.p->m_errorLine);
+ evntRecPtr.p->m_errorLine, this);
evntRecPtr.p->m_errorNode = reference();
createEvent_sendReply(signal, evntRecPtr);
@@ -11264,7 +11265,8 @@ Dbdict::dropEventUtilPrepareRef(Signal*
ndbrequire((evntRecPtr.p = c_opDropEvent.getPtr(evntRecPtr.i)) != NULL);
interpretUtilPrepareErrorCode((UtilPrepareRef::ErrorCode)ref->getErrorCode(),
- evntRecPtr.p->m_errorCode, evntRecPtr.p->m_errorLine);
+ evntRecPtr.p->m_errorCode,
+ evntRecPtr.p->m_errorLine, this);
evntRecPtr.p->m_errorNode = reference();
dropEvent_sendReply(signal, evntRecPtr);
diff -Nrup a/storage/ndb/src/kernel/blocks/dbdih/DbdihMain.cpp b/storage/ndb/src/kernel/blocks/dbdih/DbdihMain.cpp
--- a/storage/ndb/src/kernel/blocks/dbdih/DbdihMain.cpp 2007-09-24 17:09:26 +02:00
+++ b/storage/ndb/src/kernel/blocks/dbdih/DbdihMain.cpp 2007-09-24 17:12:17 +02:00
@@ -16041,8 +16041,8 @@ void Dbdih::checkWaitGCPMaster(Signal* s
c_waitGCPMasterList.next(ptr);
if (nodeId == failedNodeId) {
- jam()
- c_waitGCPMasterList.release(i);
+ jam();
+ c_waitGCPMasterList.release(i);
}//if
}//while
}//Dbdih::checkWaitGCPMaster()
diff -Nrup a/storage/ndb/src/kernel/blocks/dblqh/DblqhInit.cpp b/storage/ndb/src/kernel/blocks/dblqh/DblqhInit.cpp
--- a/storage/ndb/src/kernel/blocks/dblqh/DblqhInit.cpp 2007-06-05 18:15:19 +02:00
+++ b/storage/ndb/src/kernel/blocks/dblqh/DblqhInit.cpp 2007-09-24 17:12:17 +02:00
@@ -60,6 +60,7 @@ void Dblqh::initData()
cLqhTimeOutCount = 0;
cLqhTimeOutCheckCount = 0;
cbookedAccOps = 0;
+ cpackedListIndex = 0;
m_backup_ptr = RNIL;
clogFileSize = 16;
cmaxLogFilesInPageZero = 40;
diff -Nrup a/storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp b/storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp
--- a/storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp 2007-09-12 13:35:57 +02:00
+++ b/storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp 2007-09-24 17:12:17 +02:00
@@ -12756,7 +12756,7 @@ void Dblqh::execFSREADREF(Signal* signal
jam();
break;
case LogFileOperationRecord::READ_SR_INVALIDATE_PAGES:
- jam()
+ jam();
break;
default:
jam();
diff -Nrup a/storage/ndb/src/kernel/blocks/dbtc/DbtcInit.cpp b/storage/ndb/src/kernel/blocks/dbtc/DbtcInit.cpp
--- a/storage/ndb/src/kernel/blocks/dbtc/DbtcInit.cpp 2007-02-14 18:24:37 +01:00
+++ b/storage/ndb/src/kernel/blocks/dbtc/DbtcInit.cpp 2007-09-24 17:12:17 +02:00
@@ -292,6 +292,7 @@ Dbtc::Dbtc(Block_context& ctx):
tcFailRecord = 0;
c_apiConTimer = 0;
c_apiConTimer_line = 0;
+ cpackedListIndex = 0;
#ifdef VM_TRACE
{
diff -Nrup a/storage/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp b/storage/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp
--- a/storage/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp 2007-09-24 17:09:26 +02:00
+++ b/storage/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp 2007-09-24 17:12:17 +02:00
@@ -7712,7 +7712,7 @@ Dbtc::sendTCKEY_FAILCONF(Signal* signal,
const Uint32 nodeId = refToNode(ref);
if(ref != 0)
{
- jam()
+ jam();
failConf->apiConnectPtr = regApiPtr->ndbapiConnect | (marker != RNIL);
failConf->transId1 = regApiPtr->transid[0];
failConf->transId2 = regApiPtr->transid[1];
@@ -9025,7 +9025,7 @@ void Dbtc::execSCAN_TABREQ(Signal* signa
SCAN_error_check:
if (aiLength == 0) {
- jam()
+ jam();
errCode = ZSCAN_AI_LEN_ERROR;
goto SCAN_TAB_error;
}//if
diff -Nrup a/storage/ndb/src/kernel/blocks/dbtup/Dbtup.hpp b/storage/ndb/src/kernel/blocks/dbtup/Dbtup.hpp
--- a/storage/ndb/src/kernel/blocks/dbtup/Dbtup.hpp 2007-09-05 15:19:57 +02:00
+++ b/storage/ndb/src/kernel/blocks/dbtup/Dbtup.hpp 2007-09-24 17:12:17 +02:00
@@ -1991,6 +1991,7 @@ private:
//------------------------------------------------------------------
//------------------------------------------------------------------
+ Uint32 brancher(Uint32, Uint32);
int interpreterNextLab(Signal* signal,
KeyReqStruct *req_struct,
Uint32* logMemory,
diff -Nrup a/storage/ndb/src/kernel/blocks/dbtup/DbtupExecQuery.cpp b/storage/ndb/src/kernel/blocks/dbtup/DbtupExecQuery.cpp
--- a/storage/ndb/src/kernel/blocks/dbtup/DbtupExecQuery.cpp 2007-09-05 15:19:57 +02:00
+++ b/storage/ndb/src/kernel/blocks/dbtup/DbtupExecQuery.cpp 2007-09-24 17:12:17 +02:00
@@ -2000,7 +2000,7 @@ void Dbtup::sendLogAttrinfo(Signal* sign
inline
Uint32
-brancher(Uint32 TheInstruction, Uint32 TprogramCounter)
+Dbtup::brancher(Uint32 TheInstruction, Uint32 TprogramCounter)
{
Uint32 TbranchDirection= TheInstruction >> 31;
Uint32 TbranchLength= (TheInstruction >> 16) & 0x7fff;
diff -Nrup a/storage/ndb/src/kernel/blocks/dbtup/DbtupGen.cpp b/storage/ndb/src/kernel/blocks/dbtup/DbtupGen.cpp
--- a/storage/ndb/src/kernel/blocks/dbtup/DbtupGen.cpp 2007-06-14 19:08:12 +02:00
+++ b/storage/ndb/src/kernel/blocks/dbtup/DbtupGen.cpp 2007-09-24 17:12:17 +02:00
@@ -46,6 +46,7 @@ void Dbtup::initData()
// Records with constant sizes
init_list_sizes();
+ cpackedListIndex = 0;
}//Dbtup::initData()
Dbtup::Dbtup(Block_context& ctx, Pgman* pgman)
diff -Nrup a/storage/ndb/src/kernel/blocks/dbutil/DbUtil.cpp b/storage/ndb/src/kernel/blocks/dbutil/DbUtil.cpp
--- a/storage/ndb/src/kernel/blocks/dbutil/DbUtil.cpp 2007-09-24 17:09:26 +02:00
+++ b/storage/ndb/src/kernel/blocks/dbutil/DbUtil.cpp 2007-09-24 17:12:17 +02:00
@@ -313,7 +313,7 @@ DbUtil::execDUMP_STATE_ORD(Signal* signa
*/
const Uint32 tCase = signal->theData[0];
if(tCase == 200){
- jam()
+ jam();
ndbout << "--------------------------------------------------" << endl;
UtilSequenceReq * req = (UtilSequenceReq*)signal->getDataPtrSend();
Uint32 seqId = 1;
diff -Nrup a/storage/ndb/src/kernel/blocks/ndbcntr/Ndbcntr.hpp b/storage/ndb/src/kernel/blocks/ndbcntr/Ndbcntr.hpp
--- a/storage/ndb/src/kernel/blocks/ndbcntr/Ndbcntr.hpp 2007-02-26 08:11:56 +01:00
+++ b/storage/ndb/src/kernel/blocks/ndbcntr/Ndbcntr.hpp 2007-09-24 17:12:17 +02:00
@@ -336,6 +336,7 @@ public:
void checkLqhTimeout_2(Signal* signal);
BlockNumber number() const { return cntr.number(); }
+ EmulatedJamBuffer *jamBuffer() const { return cntr.jamBuffer(); }
void progError(int line, int cause, const char * extra) {
cntr.progError(line, cause, extra);
}
@@ -368,6 +369,7 @@ private:
void sendNextREAD_CONFIG_REQ(Signal* signal);
BlockNumber number() const { return cntr.number(); }
+ EmulatedJamBuffer *jamBuffer() const { return cntr.jamBuffer(); }
void progError(int line, int cause, const char * extra) {
cntr.progError(line, cause, extra);
}
diff -Nrup a/storage/ndb/src/kernel/blocks/ndbcntr/NdbcntrMain.cpp b/storage/ndb/src/kernel/blocks/ndbcntr/NdbcntrMain.cpp
--- a/storage/ndb/src/kernel/blocks/ndbcntr/NdbcntrMain.cpp 2007-09-13 13:59:44 +02:00
+++ b/storage/ndb/src/kernel/blocks/ndbcntr/NdbcntrMain.cpp 2007-09-24 17:12:17 +02:00
@@ -2975,7 +2975,7 @@ void
UpgradeStartup::sendCmAppChg(Ndbcntr& cntr, Signal* signal, Uint32 startLevel){
if(cntr.getNodeInfo(cntr.cmasterNodeId).m_version >= MAKE_VERSION(3,5,0)){
- jam();
+ jamNoBlock();
return;
}
diff -Nrup a/storage/ndb/src/kernel/blocks/suma/Suma.cpp b/storage/ndb/src/kernel/blocks/suma/Suma.cpp
--- a/storage/ndb/src/kernel/blocks/suma/Suma.cpp 2007-09-24 17:09:26 +02:00
+++ b/storage/ndb/src/kernel/blocks/suma/Suma.cpp 2007-09-24 17:12:17 +02:00
@@ -1791,14 +1791,14 @@ Suma::Table::parseTable(SegmentedSection
DictTabInfo::TableMappingSize,
true, true);
- jam();
+ jamBlock(&suma);
suma.suma_ndbrequire(s == SimpleProperties::Break);
#if 0
//ToDo handle this
if(table_version_major(m_schemaVersion) !=
table_version_major(tableDesc.TableVersion)){
- jam();
+ jamBlock(&suma);
release(* this);
@@ -1815,25 +1815,25 @@ Suma::Table::parseTable(SegmentedSection
c_subscriptions.first(i_subPtr);
SubscriptionPtr subPtr;
for(;!i_subPtr.isNull();c_subscriptions.next(i_subPtr)){
- jam();
+ jamBlock(&suma);
c_subscriptions.getPtr(subPtr, i_subPtr.curr.i);
SyncRecord* tmp = c_syncPool.getPtr(subPtr.p->m_syncPtrI);
if (tmp == syncPtr_p) {
- jam();
+ jamBlock(&suma);
continue;
}
if (subPtr.p->m_tables.get(tableId)) {
- jam();
+ jamBlock(&suma);
subPtr.p->m_tables.clear(tableId); // remove this old table reference
TableList::DataBufferIterator it;
for(tmp->m_tableList.first(it);!it.isNull();tmp->m_tableList.next(it)) {
- jam();
+ jamBlock(&suma);
if (*it.data == tableId){
- jam();
+ jamBlock(&suma);
Uint32 *pdata = it.data;
tmp->m_tableList.next(it);
for(;!it.isNull();tmp->m_tableList.next(it)) {
- jam();
+ jamBlock(&suma);
*pdata = *it.data;
pdata = it.data;
}
@@ -2874,7 +2874,7 @@ int
Suma::Table::setupTrigger(Signal* signal,
Suma &suma)
{
- jam();
+ jamBlock(&suma);
DBUG_ENTER("Suma::Table::setupTrigger");
int ret= 0;
@@ -2921,7 +2921,7 @@ void
Suma::Table::createAttributeMask(AttributeMask& mask,
Suma &suma)
{
- jam();
+ jamBlock(&suma);
mask.clear();
for(Uint32 i = 0; i<m_noOfAttributes; i++)
mask.set(i);
@@ -2995,17 +2995,17 @@ Suma::execCREATE_TRIG_REF(Signal* signal
void
Suma::Table::dropTrigger(Signal* signal,Suma& suma)
{
- jam();
+ jamBlock(&suma);
DBUG_ENTER("Suma::dropTrigger");
m_hasOutstandingTriggerReq[0] =
m_hasOutstandingTriggerReq[1] =
m_hasOutstandingTriggerReq[2] = 1;
for(Uint32 j = 0; j<3; j++){
- jam();
+ jamBlock(&suma);
suma.suma_ndbrequire(m_triggerIds[j] != ILLEGAL_TRIGGER_ID);
if(m_hasTriggerDefined[j] == 1) {
- jam();
+ jamBlock(&suma);
DropTrigReq * const req = (DropTrigReq*)signal->getDataPtrSend();
req->setConnectionPtr(m_ptrI);
@@ -3028,7 +3028,7 @@ Suma::Table::dropTrigger(Signal* signal,
suma.sendSignal(DBTUP_REF, GSN_DROP_TRIG_REQ,
signal, DropTrigReq::SignalLength, JBB);
} else {
- jam();
+ jamBlock(&suma);
suma.suma_ndbrequire(m_hasTriggerDefined[j] > 1);
runDropTrigger(signal,m_triggerIds[j],suma);
}
@@ -3074,7 +3074,7 @@ Suma::Table::runDropTrigger(Signal* sign
Uint32 triggerId,
Suma &suma)
{
- jam();
+ jamBlock(&suma);
Uint32 type = (triggerId >> 16) & 0x3;
suma.suma_ndbrequire(type < 3);
@@ -3085,7 +3085,7 @@ Suma::Table::runDropTrigger(Signal* sign
m_hasOutstandingTriggerReq[type] = 0;
if (m_hasTriggerDefined[type] == 0)
{
- jam();
+ jamBlock(&suma);
m_triggerIds[type] = ILLEGAL_TRIGGER_ID;
}
if( m_hasOutstandingTriggerReq[0] ||
@@ -3093,7 +3093,7 @@ Suma::Table::runDropTrigger(Signal* sign
m_hasOutstandingTriggerReq[2])
{
// more to come
- jam();
+ jamBlock(&suma);
return;
}
@@ -3116,11 +3116,11 @@ void Suma::suma_ndbrequire(bool v) { ndb
void
Suma::Table::checkRelease(Suma &suma)
{
- jam();
+ jamBlock(&suma);
DBUG_ENTER("Suma::Table::checkRelease");
if (n_subscribers == 0)
{
- jam();
+ jamBlock(&suma);
suma.suma_ndbrequire(m_hasTriggerDefined[0] == 0);
suma.suma_ndbrequire(m_hasTriggerDefined[1] == 0);
suma.suma_ndbrequire(m_hasTriggerDefined[2] == 0);
@@ -3132,7 +3132,7 @@ Suma::Table::checkRelease(Suma &suma)
for (subscribers.first(subbPtr);!subbPtr.isNull();
subscribers.next(subbPtr))
{
- jam();
+ jamBlock(&suma);
DBUG_PRINT("info",("subscriber: %u", subbPtr.i));
}
suma.suma_ndbrequire(false);
@@ -3145,7 +3145,7 @@ Suma::Table::checkRelease(Suma &suma)
for (syncRecords.first(syncPtr);!syncPtr.isNull();
syncRecords.next(syncPtr))
{
- jam();
+ jamBlock(&suma);
DBUG_PRINT("info",("syncRecord: %u", syncPtr.i));
}
suma.suma_ndbrequire(false);
@@ -3390,7 +3390,6 @@ reformat(Signal* signal, LinearSectionPt
ptr[1].p = dst;
while(sz_1 > 0){
- jam();
Uint32 tmp = * src_1 ++;
* headers ++ = tmp;
Uint32 len = AttributeHeader::getDataSize(tmp);
@@ -4089,7 +4088,7 @@ Suma::sendSubRemoveRef(Signal* signal, c
void
Suma::Table::release(Suma & suma){
- jam();
+ jamBlock(&suma);
LocalDataBuffer<15> fragBuf(suma.c_dataBufferPool, m_fragments);
fragBuf.release();
diff -Nrup a/storage/ndb/src/kernel/blocks/suma/Suma.hpp b/storage/ndb/src/kernel/blocks/suma/Suma.hpp
--- a/storage/ndb/src/kernel/blocks/suma/Suma.hpp 2007-09-24 17:07:27 +02:00
+++ b/storage/ndb/src/kernel/blocks/suma/Suma.hpp 2007-09-24 17:12:17 +02:00
@@ -236,6 +236,7 @@ public:
UintR &cerrorInsert;
#endif
BlockNumber number() const { return suma.number(); }
+ EmulatedJamBuffer *jamBuffer() const { return suma.jamBuffer(); }
void progError(int line, int cause, const char * extra) {
suma.progError(line, cause, extra);
}
@@ -461,6 +462,8 @@ public:
Restart(Suma& s);
Suma & suma;
+ BlockNumber number() const { return suma.number(); }
+ EmulatedJamBuffer *jamBuffer() const { return suma.jamBuffer(); }
Uint32 nodeId;
DLHashTable<Subscription>::Iterator c_subIt;
diff -Nrup a/storage/ndb/src/kernel/error/ErrorReporter.cpp b/storage/ndb/src/kernel/error/ErrorReporter.cpp
--- a/storage/ndb/src/kernel/error/ErrorReporter.cpp 2007-02-05 16:44:42 +01:00
+++ b/storage/ndb/src/kernel/error/ErrorReporter.cpp 2007-09-24 17:12:17 +02:00
@@ -34,11 +34,13 @@ static int WriteMessage(int thrdMessageI
const char* thrdProblemData,
const char* thrdObjRef,
Uint32 thrdTheEmulatedJamIndex,
- Uint8 thrdTheEmulatedJam[]);
+ const Uint32 thrdTheEmulatedJam[],
+ Uint32 jamBlockNumber);
static void dumpJam(FILE* jamStream,
Uint32 thrdTheEmulatedJamIndex,
- Uint8 thrdTheEmulatedJam[]);
+ const Uint32 thrdTheEmulatedJam[],
+ Uint32 aBlockNumber);
extern EventLogger g_eventLogger;
const char*
@@ -168,19 +170,28 @@ void
ErrorReporter::handleAssert(const char* message, const char* file, int line, int ec)
{
char refMessage[100];
+ const Uint32 *jam;
+ Uint32 jamIndex;
+ Uint32 jamBlockNumber;
#ifdef NO_EMULATED_JAM
BaseString::snprintf(refMessage, 100, "file: %s lineNo: %d",
file, line);
+ jam = NULL;
+ jamIndex = 0;
+ jamBlockNumber = 0;
#else
- const Uint32 blockNumber = theEmulatedJamBlockNumber;
- const char *blockName = getBlockName(blockNumber);
+ const EmulatedJamBuffer *jamBuffer =
+ (EmulatedJamBuffer *)NdbThread_GetTlsKey(NDB_THREAD_TLS_JAM);
+ jam = jamBuffer->theEmulatedJam;
+ jamIndex = jamBuffer->theEmulatedJamIndex;
+ jamBlockNumber = jamBuffer->theEmulatedJamBlockNumber;
+ const char *blockName = getBlockName(jamBlockNumber);
BaseString::snprintf(refMessage, 100, "%s line: %d (block: %s)",
file, line, blockName);
#endif
- WriteMessage(ec, message, refMessage,
- theEmulatedJamIndex, theEmulatedJam);
+ WriteMessage(ec, message, refMessage, jamIndex, jam, jamBlockNumber);
childReportError(ec);
@@ -194,8 +205,21 @@ ErrorReporter::handleError(int messageID
const char* objRef,
NdbShutdownType nst)
{
- WriteMessage(messageID, problemData,
- objRef, theEmulatedJamIndex, theEmulatedJam);
+ const Uint32 *jam;
+ Uint32 jamIndex;
+ Uint32 jamBlockNumber;
+ const EmulatedJamBuffer *jamBuffer =
+ (EmulatedJamBuffer *)NdbThread_GetTlsKey(NDB_THREAD_TLS_JAM);
+ if (jamBuffer) {
+ jam = jamBuffer->theEmulatedJam;
+ jamIndex = jamBuffer->theEmulatedJamIndex;
+ jamBlockNumber = jamBuffer->theEmulatedJamBlockNumber;
+ } else {
+ jam = NULL;
+ jamIndex = 0;
+ jamBlockNumber = 0;
+ }
+ WriteMessage(messageID, problemData, objRef, jamIndex, jam, jamBlockNumber);
g_eventLogger.info(problemData);
g_eventLogger.info(objRef);
@@ -215,7 +239,8 @@ int
WriteMessage(int thrdMessageID,
const char* thrdProblemData, const char* thrdObjRef,
Uint32 thrdTheEmulatedJamIndex,
- Uint8 thrdTheEmulatedJam[]){
+ const Uint32 thrdTheEmulatedJam[],
+ Uint32 jamBlockNumber){
FILE *stream;
unsigned offset;
unsigned long maxOffset; // Maximum size of file.
@@ -312,7 +337,8 @@ WriteMessage(int thrdMessageID,
// ...and "dump the jam" there.
// ErrorReporter::dumpJam(jamStream);
if(thrdTheEmulatedJam != 0){
- dumpJam(jamStream, thrdTheEmulatedJamIndex, thrdTheEmulatedJam);
+ dumpJam(jamStream, thrdTheEmulatedJamIndex,
+ thrdTheEmulatedJam, jamBlockNumber);
}
/* Dont print the jobBuffers until a way to copy them,
@@ -333,7 +359,8 @@ WriteMessage(int thrdMessageID,
void
dumpJam(FILE *jamStream,
Uint32 thrdTheEmulatedJamIndex,
- Uint8 thrdTheEmulatedJam[]) {
+ const Uint32 thrdTheEmulatedJam[],
+ Uint32 aBlockNumber) {
#ifndef NO_EMULATED_JAM
// print header
const int maxaddr = 8;
@@ -343,16 +370,14 @@ dumpJam(FILE *jamStream,
fprintf(jamStream, "%-6s ", "ADDR");
fprintf(jamStream, "\n");
- // treat as array of Uint32
- const Uint32 *base = (Uint32 *)thrdTheEmulatedJam;
- const int first = thrdTheEmulatedJamIndex / sizeof(Uint32); // oldest
+ const int first = thrdTheEmulatedJamIndex; // oldest
int cnt, idx;
// look for first block entry
for (cnt = 0, idx = first; cnt < EMULATED_JAM_SIZE; cnt++, idx++) {
if (idx >= EMULATED_JAM_SIZE)
idx = 0;
- const Uint32 aJamEntry = base[idx];
+ const Uint32 aJamEntry = thrdTheEmulatedJam[idx];
if (aJamEntry > (1 << 20))
break;
}
@@ -366,7 +391,6 @@ dumpJam(FILE *jamStream,
else if (cnt < EMULATED_JAM_SIZE)
fprintf(jamStream, "%-7s?", "");
else {
- const Uint32 aBlockNumber = theEmulatedJamBlockNumber;
const char *aBlockName = getBlockName(aBlockNumber);
if (aBlockName != 0)
fprintf(jamStream, "%-7s?", aBlockName);
@@ -380,7 +404,7 @@ dumpJam(FILE *jamStream,
globalData.incrementWatchDogCounter(4); // watchdog not to kill us ?
if (idx >= EMULATED_JAM_SIZE)
idx = 0;
- const Uint32 aJamEntry = base[idx];
+ const Uint32 aJamEntry = thrdTheEmulatedJam[idx];
if (aJamEntry > (1 << 20)) {
const Uint32 aBlockNumber = aJamEntry >> 20;
const char *aBlockName = getBlockName(aBlockNumber);
diff -Nrup a/storage/ndb/src/kernel/main.cpp b/storage/ndb/src/kernel/main.cpp
--- a/storage/ndb/src/kernel/main.cpp 2007-01-06 01:21:21 +01:00
+++ b/storage/ndb/src/kernel/main.cpp 2007-09-24 17:12:17 +02:00
@@ -239,6 +239,7 @@ int main(int argc, char** argv)
g_eventLogger.m_logLevel.setLogLevel(LogLevel::llStartUp, 15);
+ NdbThread_Init();
globalEmulatorData.create();
// Parse command line options
diff -Nrup a/storage/ndb/src/kernel/vm/Emulator.cpp b/storage/ndb/src/kernel/vm/Emulator.cpp
--- a/storage/ndb/src/kernel/vm/Emulator.cpp 2006-12-23 20:20:19 +01:00
+++ b/storage/ndb/src/kernel/vm/Emulator.cpp 2007-09-24 17:12:18 +02:00
@@ -51,9 +51,12 @@ extern Uint32 g_currentStartPhase;
*/
#ifndef NO_EMULATED_JAM
-Uint8 theEmulatedJam[EMULATED_JAM_SIZE * 4];
-Uint32 theEmulatedJamIndex = 0;
-Uint32 theEmulatedJamBlockNumber = 0;
+/*
+ This is the jam buffer used for non-threaded ndbd (but present also
+ in threaded ndbd to allow sharing of object files among the two
+ binaries).
+ */
+EmulatedJamBuffer theEmulatedJamBuffer;
#endif
GlobalData globalData;
@@ -87,6 +90,12 @@ ndb_new_handler_impl(){
void
EmulatorData::create(){
+ /*
+ Global jam() buffer, for non-multithreaded operation.
+ For multithreaded ndbd, each thread will set a local jam buffer later.
+ */
+ NdbThread_SetTlsKey(NDB_THREAD_TLS_JAM, (void *)&theEmulatedJamBuffer);
+
NdbMem_Create();
theConfiguration = new Configuration();
diff -Nrup a/storage/ndb/src/kernel/vm/Emulator.hpp b/storage/ndb/src/kernel/vm/Emulator.hpp
--- a/storage/ndb/src/kernel/vm/Emulator.hpp 2006-12-23 20:20:19 +01:00
+++ b/storage/ndb/src/kernel/vm/Emulator.hpp 2007-09-24 17:12:18 +02:00
@@ -31,22 +31,23 @@ extern class TimeQueue global
extern class FastScheduler globalScheduler;
extern class TransporterRegistry globalTransporterRegistry;
extern struct GlobalData globalData;
+extern struct thr_repository g_thr_repository;
#ifdef VM_TRACE
extern class SignalLoggerManager globalSignalLoggers;
#endif
#ifndef NO_EMULATED_JAM
- #define EMULATED_JAM_SIZE 1024
- #define JAM_MASK ((EMULATED_JAM_SIZE * 4) - 1)
+/* EMULATED_JAM_SIZE must be a power of two, so JAM_MASK will work. */
+#define EMULATED_JAM_SIZE 1024
+#define JAM_MASK (EMULATED_JAM_SIZE - 1)
- extern Uint8 theEmulatedJam[];
- extern Uint32 theEmulatedJamIndex;
+struct EmulatedJamBuffer {
+ Uint32 theEmulatedJamIndex;
// last block entry, used in dumpJam() if jam contains no block entries
- extern Uint32 theEmulatedJamBlockNumber;
-#else
- const Uint8 theEmulatedJam[]=0;
- const Uint32 theEmulatedJamIndex=0;
+ Uint32 theEmulatedJamBlockNumber;
+ Uint32 theEmulatedJam[EMULATED_JAM_SIZE];
+};
#endif
struct EmulatorData {
diff -Nrup a/storage/ndb/src/kernel/vm/GlobalData.hpp b/storage/ndb/src/kernel/vm/GlobalData.hpp
--- a/storage/ndb/src/kernel/vm/GlobalData.hpp 2007-09-05 15:19:57 +02:00
+++ b/storage/ndb/src/kernel/vm/GlobalData.hpp 2007-09-24 17:12:18 +02:00
@@ -82,7 +82,7 @@ struct GlobalData {
SimulatedBlock * getBlock(BlockNumber blockNo);
void incrementWatchDogCounter(Uint32 place);
- const Uint32 * getWatchDogPtr();
+ Uint32 * getWatchDogPtr();
private:
Uint32 watchDog;
@@ -136,7 +136,7 @@ GlobalData::incrementWatchDogCounter(Uin
}
inline
-const Uint32 *
+Uint32 *
GlobalData::getWatchDogPtr(){
return &watchDog;
}
diff -Nrup a/storage/ndb/src/kernel/vm/Makefile.am b/storage/ndb/src/kernel/vm/Makefile.am
--- a/storage/ndb/src/kernel/vm/Makefile.am 2006-12-31 01:06:42 +01:00
+++ b/storage/ndb/src/kernel/vm/Makefile.am 2007-09-24 17:12:18 +02:00
@@ -18,15 +18,10 @@
#DIRS += testLongSig
#endif
-noinst_LIBRARIES = libkernel.a
+noinst_LIBRARIES = libkernel.a libsched.a libsched_mt.a
libkernel_a_SOURCES = \
- SimulatedBlock.cpp \
- FastScheduler.cpp \
- TimeQueue.cpp \
VMSignal.cpp \
- ThreadConfig.cpp \
- TransporterCallback.cpp \
Emulator.cpp \
Configuration.cpp \
WatchDog.cpp \
@@ -34,9 +29,22 @@ libkernel_a_SOURCES = \
SectionReader.cpp \
Mutex.cpp SafeCounter.cpp \
Rope.cpp \
- ndbd_malloc.cpp ndbd_malloc_impl.cpp \
- Pool.cpp WOPool.cpp RWPool.cpp \
- DynArr256.cpp
+ ndbd_malloc.cpp \
+ ndbd_malloc_impl.cpp \
+ Pool.cpp WOPool.cpp RWPool.cpp DynArr256.cpp
+
+libsched_a_SOURCES = TimeQueue.cpp \
+ ThreadConfig.cpp \
+ FastScheduler.cpp \
+ TransporterCallback_nonmt.cpp \
+ SimulatedBlock_nonmt.cpp
+
+libsched_mt_a_SOURCES = SimulatedBlock_mt.cpp \
+ TransporterCallback_mt.cpp \
+ mt/mt.cpp \
+ mt/dummy_mt.cpp
+
+EXTRA_DIST=SimulatedBlock.cpp TransporterCallback.cpp
INCLUDES_LOC = -I$(top_srcdir)/storage/ndb/src/mgmapi
diff -Nrup a/storage/ndb/src/kernel/vm/SimulatedBlock.cpp b/storage/ndb/src/kernel/vm/SimulatedBlock.cpp
--- a/storage/ndb/src/kernel/vm/SimulatedBlock.cpp 2007-09-24 17:09:26 +02:00
+++ b/storage/ndb/src/kernel/vm/SimulatedBlock.cpp 2007-09-24 17:12:18 +02:00
@@ -13,6 +13,13 @@
along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
+/*
+ This file is used to build both the multithreaded and the singlethreaded
+ ndbd. It is built twice, included from either SimulatedBlock_mt.cpp (with
+ the macro NDBD_MULTITHREADED defined) or SimulatedBlock_nonmt.cpp (with the
+ macro not defined).
+*/
+
#include <ndb_global.h>
#include "SimulatedBlock.hpp"
@@ -61,8 +68,11 @@ SimulatedBlock::SimulatedBlock(BlockNumb
c_linearFragmentSendList(c_fragmentSendPool),
c_segmentedFragmentSendList(c_fragmentSendPool),
c_mutexMgr(* this),
- c_counterMgr(* this)
+ c_counterMgr(* this),
+ m_threadId(0),
+ m_watchDogCounter(NULL)
{
+ m_jamBuffer = (EmulatedJamBuffer *)NdbThread_GetTlsKey(NDB_THREAD_TLS_JAM);
NewVarRef = 0;
globalData.setBlock(blockNumber, this);
@@ -172,6 +182,15 @@ SimulatedBlock::addRecSignalImpl(GlobalS
}
void
+SimulatedBlock::assignToThread(Uint32 threadId, EmulatedJamBuffer *jamBuffer,
+ Uint32 *watchDogCounter)
+{
+ m_threadId = threadId;
+ m_jamBuffer = jamBuffer;
+ m_watchDogCounter = watchDogCounter;
+}
+
+void
SimulatedBlock::signal_error(Uint32 gsn, Uint32 len, Uint32 recBlockNo,
const char* filename, int lineno) const
{
@@ -232,6 +251,15 @@ SimulatedBlock::handle_lingering_section
}
+#ifdef NDBD_MULTITHREADED
+#define MT_SEND_LOCK mt_send_lock();
+#define MT_SEND_UNLOCK mt_send_unlock();
+#else
+#define MT_SEND_LOCK
+#define MT_SEND_UNLOCK
+#endif
+
+
void
SimulatedBlock::sendSignal(BlockReference ref,
GlobalSignalNumber gsn,
@@ -275,8 +303,14 @@ SimulatedBlock::sendSignal(BlockReferenc
if(recNode == ourProcessor || recNode == 0) {
signal->header.theSendersSignalId = tSignalId;
signal->header.theSendersBlockRef = sendBRef;
+#ifdef NDBD_MULTITHREADED
+ jobBuffer == JBB ?
+ sendlocal(m_threadId, recBlock, &signal->header) :
+ sendprioa(m_threadId, recBlock, &signal->header);
+#else
globalScheduler.execute(signal, jobBuffer, recBlock,
gsn);
+#endif
return;
} else {
// send distributed Signal
@@ -298,10 +332,12 @@ SimulatedBlock::sendSignal(BlockReferenc
getSignalName(gsn), gsn, getBlockName(recBlock),
recNode);
#endif
+ MT_SEND_LOCK
SendStatus ss = globalTransporterRegistry.prepareSend(&sh, jobBuffer,
&signal->theData[0],
recNode,
(LinearSectionPtr*)0);
+ MT_SEND_UNLOCK
ndbrequire(ss == SEND_OK || ss == SEND_BLOCKED || ss == SEND_DISCONNECTED);
}
@@ -359,7 +395,14 @@ SimulatedBlock::sendSignal(NodeReceiverG
ourProcessor);
}
#endif
+
+#ifdef NDBD_MULTITHREADED
+ jobBuffer == JBB ?
+ sendlocal(m_threadId, recBlock, &signal->header) :
+ sendprioa(m_threadId, recBlock, &signal->header);
+#else
globalScheduler.execute(signal, jobBuffer, recBlock, gsn);
+#endif
rg.m_nodes.clear((Uint32)0);
rg.m_nodes.clear(ourProcessor);
@@ -387,10 +430,12 @@ SimulatedBlock::sendSignal(NodeReceiverG
recNode);
#endif
+ MT_SEND_LOCK
SendStatus ss = globalTransporterRegistry.prepareSend(&sh, jobBuffer,
&signal->theData[0],
recNode,
(LinearSectionPtr*)0);
+ MT_SEND_UNLOCK
ndbrequire(ss == SEND_OK || ss == SEND_BLOCKED || ss == SEND_DISCONNECTED);
}
@@ -455,8 +500,14 @@ SimulatedBlock::sendSignal(BlockReferenc
signal->theData[length+i] = segptr[i].i;
}
+#ifdef NDBD_MULTITHREADED
+ jobBuffer == JBB ?
+ sendlocal(m_threadId, recBlock, &signal->header) :
+ sendprioa(m_threadId, recBlock, &signal->header);
+#else
globalScheduler.execute(signal, jobBuffer, recBlock,
gsn);
+#endif
signal->header.m_noOfSections = 0;
return;
} else {
@@ -481,10 +532,12 @@ SimulatedBlock::sendSignal(BlockReferenc
recNode);
#endif
+ MT_SEND_LOCK
SendStatus ss = globalTransporterRegistry.prepareSend(&sh, jobBuffer,
&signal->theData[0],
recNode,
ptr);
+ MT_SEND_UNLOCK
ndbrequire(ss == SEND_OK || ss == SEND_BLOCKED || ss == SEND_DISCONNECTED);
}
@@ -554,7 +607,14 @@ SimulatedBlock::sendSignal(NodeReceiverG
ndbrequire(import(segptr[i], ptr[i].p, ptr[i].sz));
signal->theData[length+i] = segptr[i].i;
}
+
+#ifdef NDBD_MULTITHREADED
+ jobBuffer == JBB ?
+ sendlocal(m_threadId, recBlock, &signal->header) :
+ sendprioa(m_threadId, recBlock, &signal->header);
+#else
globalScheduler.execute(signal, jobBuffer, recBlock, gsn);
+#endif
rg.m_nodes.clear((Uint32)0);
rg.m_nodes.clear(ourProcessor);
@@ -584,10 +644,12 @@ SimulatedBlock::sendSignal(NodeReceiverG
recNode);
#endif
+ MT_SEND_LOCK
SendStatus ss = globalTransporterRegistry.prepareSend(&sh, jobBuffer,
&signal->theData[0],
recNode,
ptr);
+ MT_SEND_UNLOCK
ndbrequire(ss == SEND_OK || ss == SEND_BLOCKED || ss == SEND_DISCONNECTED);
}
@@ -652,7 +714,13 @@ SimulatedBlock::sendSignal(BlockReferenc
* dst ++ = sections->m_ptr[1].i;
* dst ++ = sections->m_ptr[2].i;
+#ifdef NDBD_MULTITHREADED
+ jobBuffer == JBB ?
+ sendlocal(m_threadId, recBlock, &signal->header) :
+ sendprioa(m_threadId, recBlock, &signal->header);
+#else
globalScheduler.execute(signal, jobBuffer, recBlock, gsn);
+#endif
} else {
// send distributed Signal
SignalHeader sh;
@@ -674,11 +742,13 @@ SimulatedBlock::sendSignal(BlockReferenc
recNode);
#endif
+ MT_SEND_LOCK
SendStatus ss = globalTransporterRegistry.prepareSend(&sh, jobBuffer,
&signal->theData[0],
recNode,
g_sectionSegmentPool,
sections->m_ptr);
+ MT_SEND_UNLOCK
ndbrequire(ss == SEND_OK || ss == SEND_BLOCKED || ss == SEND_DISCONNECTED);
::releaseSections(noOfSections, sections->m_ptr);
}
@@ -752,7 +822,13 @@ SimulatedBlock::sendSignal(NodeReceiverG
* dst ++ = sections->m_ptr[0].i;
* dst ++ = sections->m_ptr[1].i;
* dst ++ = sections->m_ptr[2].i;
+#ifdef NDBD_MULTITHREADED
+ jobBuffer == JBB ?
+ sendlocal(m_threadId, recBlock, &signal->header) :
+ sendprioa(m_threadId, recBlock, &signal->header);
+#else
globalScheduler.execute(signal, jobBuffer, recBlock, gsn);
+#endif
rg.m_nodes.clear((Uint32)0);
rg.m_nodes.clear(ourProcessor);
@@ -782,11 +858,13 @@ SimulatedBlock::sendSignal(NodeReceiverG
recNode);
#endif
+ MT_SEND_LOCK
SendStatus ss = globalTransporterRegistry.prepareSend(&sh, jobBuffer,
&signal->theData[0],
recNode,
g_sectionSegmentPool,
sections->m_ptr);
+ MT_SEND_UNLOCK
ndbrequire(ss == SEND_OK || ss == SEND_BLOCKED || ss == SEND_DISCONNECTED);
}
@@ -830,7 +908,12 @@ SimulatedBlock::sendSignalWithDelay(Bloc
}
}
#endif
+
+#ifdef NDBD_MULTITHREADED
+ senddelay(m_threadId, &signal->header, delayInMilliSeconds);
+#else
globalTimeQueue.insert(signal, bnr, gsn, delayInMilliSeconds);
+#endif
// befor 2nd parameter to globalTimeQueue.insert
// (Priority)theSendSig[sigIndex].jobBuffer
@@ -879,7 +962,12 @@ SimulatedBlock::sendSignalWithDelay(Bloc
}
}
#endif
+
+#ifdef NDBD_MULTITHREADED
+ senddelay(m_threadId, &signal->header, delayInMilliSeconds);
+#else
globalTimeQueue.insert(signal, bnr, gsn, delayInMilliSeconds);
+#endif
signal->header.m_noOfSections = 0;
signal->header.m_fragmentInfo = 0;
@@ -1014,7 +1102,11 @@ SimulatedBlock::deallocRecord(void ** pt
void
SimulatedBlock::refresh_watch_dog(Uint32 place)
{
+#ifdef NDBD_MULTITHREADED
+ (*m_watchDogCounter) = place;
+#else
globalData.incrementWatchDogCounter(place);
+#endif
}
void
@@ -1083,8 +1175,13 @@ SimulatedBlock::infoEvent(const char * m
signalT.header.theSignalId = tSignalId;
signalT.header.theLength = ((len+3)/4)+1;
+#ifdef NDBD_MULTITHREADED
+ sendlocal(m_threadId, signalT.header.theReceiversBlockNumber,
+ &signalT.header);
+#else
globalScheduler.execute(&signalT.header, JBB, signalT.theData,
signalT.m_sectionPtrI);
+#endif
}
void
@@ -1123,8 +1220,13 @@ SimulatedBlock::warningEvent(const char
signalT.header.theSignalId = tSignalId;
signalT.header.theLength = ((len+3)/4)+1;
+#ifdef NDBD_MULTITHREADED
+ sendlocal(m_threadId, signalT.header.theReceiversBlockNumber,
+ &signalT.header);
+#else
globalScheduler.execute(&signalT.header, JBB, signalT.theData,
signalT.m_sectionPtrI);
+#endif
}
void
diff -Nrup a/storage/ndb/src/kernel/vm/SimulatedBlock.hpp b/storage/ndb/src/kernel/vm/SimulatedBlock.hpp
--- a/storage/ndb/src/kernel/vm/SimulatedBlock.hpp 2007-09-24 17:09:26 +02:00
+++ b/storage/ndb/src/kernel/vm/SimulatedBlock.hpp 2007-09-24 17:12:18 +02:00
@@ -120,6 +120,12 @@ public:
*
*/
inline void executeFunction(GlobalSignalNumber gsn, Signal* signal);
+
+ /* Setup state of a block object for executing in a particular thread. */
+ void assignToThread(Uint32 threadId, EmulatedJamBuffer *jamBuffer,
+ Uint32 *watchDogCounter);
+ /* For multithreaded ndbd, get the id of owning thread. */
+ uint32 getThreadId() const { return m_threadId; }
public:
typedef void (SimulatedBlock::* CallbackFunction)(class Signal*,
Uint32 callbackData,
@@ -358,6 +364,10 @@ protected:
void sendNextLinearFragment(Signal* signal, FragmentSendInfo & info);
BlockNumber number() const;
+public:
+ /* Must be public so that we can jam() outside of block scope. */
+ EmulatedJamBuffer *jamBuffer() const;
+protected:
BlockReference reference() const;
NodeId getOwnNodeId() const;
@@ -380,7 +390,19 @@ private:
const NodeId theNodeId;
const BlockNumber theNumber;
const BlockReference theReference;
-
+ /*
+ Thread id currently executing this block.
+ Not used in singlethreaded ndbd.
+ */
+ Uint32 m_threadId;
+ /*
+ Jam buffer reference.
+ In multithreaded ndbd, this is different in each thread, and must be
+ updated if migrating the block to another thread.
+ */
+ EmulatedJamBuffer *m_jamBuffer;
+ /* For multithreaded ndb, the thread-specific watchdog counter. */
+ Uint32 *m_watchDogCounter;
protected:
Block_context m_ctx;
NewVARIABLE* allocateBat(int batSize);
@@ -646,6 +668,12 @@ inline
BlockNumber
SimulatedBlock::number() const {
return theNumber;
+}
+
+inline
+EmulatedJamBuffer *
+SimulatedBlock::jamBuffer() const {
+ return m_jamBuffer;
}
inline
diff -Nrup a/storage/ndb/src/kernel/vm/SimulatedBlock_mt.cpp b/storage/ndb/src/kernel/vm/SimulatedBlock_mt.cpp
--- /dev/null Wed Dec 31 16:00:00 196900
+++ b/storage/ndb/src/kernel/vm/SimulatedBlock_mt.cpp 2007-09-24 17:12:18 +02:00
@@ -0,0 +1,19 @@
+/* Copyright (C) 2007 MySQL AB
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; version 2 of the License.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
+
+/* This stub file defines the SimulatedBlock class for multithreaded NDBD. */
+#define NDBD_MULTITHREADED
+#include "mt/mt.hpp"
+#include "SimulatedBlock.cpp"
diff -Nrup a/storage/ndb/src/kernel/vm/SimulatedBlock_nonmt.cpp b/storage/ndb/src/kernel/vm/SimulatedBlock_nonmt.cpp
--- /dev/null Wed Dec 31 16:00:00 196900
+++ b/storage/ndb/src/kernel/vm/SimulatedBlock_nonmt.cpp 2007-09-24 17:12:18 +02:00
@@ -0,0 +1,18 @@
+/* Copyright (C) 2007 MySQL AB
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; version 2 of the License.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
+
+/* This stub file defines the SimulatedBlock class for singlethreaded NDBD. */
+#undef NDBD_MULTITHREADED
+#include "SimulatedBlock.cpp"
diff -Nrup a/storage/ndb/src/kernel/vm/ThreadConfig.cpp b/storage/ndb/src/kernel/vm/ThreadConfig.cpp
--- a/storage/ndb/src/kernel/vm/ThreadConfig.cpp 2006-12-23 20:20:20 +01:00
+++ b/storage/ndb/src/kernel/vm/ThreadConfig.cpp 2007-09-24 17:12:18 +02:00
@@ -27,6 +27,7 @@
#include <NdbSleep.h>
#include <NdbTick.h>
#include <NdbOut.hpp>
+#include <WatchDog.hpp>
#include <signaldata/StartOrd.hpp>
@@ -104,6 +105,10 @@ void ThreadConfig::ipControlLoop()
// initialise the counter that keeps track of the current millisecond
//--------------------------------------------------------------------
globalData.internalMillisecCounter = NdbTick_CurrentMillisecond();
+
+ Uint32 *watchCounter = globalData.getWatchDogPtr();
+ globalEmulatorData.theWatchDog->registerWatchedThread(watchCounter, 0);
+
Uint32 i = 0;
while (globalData.theRestartFlag != perform_stop) {
@@ -155,6 +160,8 @@ void ThreadConfig::ipControlLoop()
globalData.incrementWatchDogCounter(6);
globalTransporterRegistry.performSend();
+
+ globalEmulatorData.theWatchDog->unregisterWatchedThread(0);
}//ThreadConfig::ipControlLoop()
diff -Nrup a/storage/ndb/src/kernel/vm/TransporterCallback.cpp b/storage/ndb/src/kernel/vm/TransporterCallback.cpp
--- a/storage/ndb/src/kernel/vm/TransporterCallback.cpp 2006-12-23 20:20:20 +01:00
+++ b/storage/ndb/src/kernel/vm/TransporterCallback.cpp 2007-09-24 17:12:18 +02:00
@@ -69,9 +69,16 @@ import(Ptr<SectionSegment> & first, cons
Uint32 dummyPrev[4];
first.p = 0;
+#ifdef NDBD_MULTITHREADED
+ mt_section_lock();
+#endif
if(g_sectionSegmentPool.seize(first)){
;
} else {
+#ifdef NDBD_MULTITHREADED
+ mt_section_unlock();
+#endif
+ ndbout_c("here");
return false;
}
@@ -91,9 +98,16 @@ import(Ptr<SectionSegment> & first, cons
;
} else {
first.p->m_lastSegment = prevPtr.i;
+#ifdef NDBD_MULTITHREADED
+ mt_section_unlock();
+#endif
+ ndbout_c("hera");
return false;
}
}
+#ifdef NDBD_MULTITHREADED
+ mt_section_unlock();
+#endif
first.p->m_lastSegment = currPtr.i;
currPtr.p->m_nextSegment = RNIL;
@@ -180,9 +194,15 @@ getSection(SegmentedSectionPtr & ptr, Ui
void
release(SegmentedSectionPtr & ptr){
+#ifdef NDBD_MULTITHREADED
+ mt_section_lock();
+#endif
g_sectionSegmentPool.releaseList(relSz(ptr.sz),
ptr.i,
ptr.p->m_lastSegment);
+#ifdef NDBD_MULTITHREADED
+ mt_section_unlock();
+#endif
}
void
@@ -193,6 +213,9 @@ releaseSections(Uint32 secCount, Segment
Uint32 tSz1 = ptr[1].sz;
Uint32 tSec2 = ptr[2].i;
Uint32 tSz2 = ptr[2].sz;
+#ifdef NDBD_MULTITHREADED
+ mt_section_lock();
+#endif
switch(secCount){
case 3:
g_sectionSegmentPool.releaseList(relSz(tSz2), tSec2,
@@ -204,6 +227,9 @@ releaseSections(Uint32 secCount, Segment
g_sectionSegmentPool.releaseList(relSz(tSz0), tSec0,
ptr[0].p->m_lastSegment);
case 0:
+#ifdef NDBD_MULTITHREADED
+ mt_section_unlock();
+#endif
return;
}
char msg[40];
@@ -230,7 +256,7 @@ execute(void * callbackObj,
getBlockName(refToBlock(header->theSendersBlockRef)),
refToNode(header->theSendersBlockRef));
#endif
-
+
bool ok = true;
Ptr<SectionSegment> secPtr[3];
switch(secCount){
@@ -247,7 +273,14 @@ execute(void * callbackObj,
*/
ok &= (length + secCount <= 25);
+#ifndef NDBD_MULTITHREADED
Uint32 secPtrI[3];
+#else
+ SignalT<25> signalT;
+ Uint32 * secPtrI = signalT.theData + length;
+ signalT.header = * header;
+ memcpy(signalT.theData, theData, 4*length);
+#endif
if(ok){
/**
* Normal path
@@ -256,31 +289,66 @@ execute(void * callbackObj,
secPtrI[1] = secPtr[1].i;
secPtrI[2] = secPtr[2].i;
+#ifndef NDBD_MULTITHREADED
globalScheduler.execute(header, prio, theData, secPtrI);
+#else
+ if (prio == JBB)
+ sendlocal(receiverThreadId, signalT.header.theReceiversBlockNumber,
+ &signalT.header);
+ else
+ sendprioa(receiverThreadId, signalT.header.theReceiversBlockNumber,
+ &signalT.header);
+
+#endif
return;
}
/**
* Out of memory
*/
+#ifdef NDBD_MULTITHREADED
+ mt_section_lock();
+#endif
for(Uint32 i = 0; i<secCount; i++){
if(secPtr[i].p != 0){
g_sectionSegmentPool.releaseList(relSz(ptr[i].sz), secPtr[i].i,
secPtr[i].p->m_lastSegment);
}
}
+#ifdef NDBD_MULTITHREADED
+ mt_section_unlock();
+#endif
+
+
+#ifndef NDBD_MULTITHREADED
+ SignalDroppedRep * rep = (SignalDroppedRep*)theData;
+#else
+ SignalDroppedRep * rep = (SignalDroppedRep*)signalT.theData;
+#endif
Uint32 gsn = header->theVerId_signalNumber;
Uint32 len = header->theLength;
Uint32 newLen= (len > 22 ? 22 : len);
- SignalDroppedRep * rep = (SignalDroppedRep*)theData;
memmove(rep->originalData, theData, (4 * newLen));
rep->originalGsn = gsn;
rep->originalLength = len;
rep->originalSectionCount = secCount;
+#ifndef NDBD_MULTITHREADED
header->theVerId_signalNumber = GSN_SIGNAL_DROPPED_REP;
header->theLength = newLen + 3;
header->m_noOfSections = 0;
globalScheduler.execute(header, prio, theData, secPtrI);
+#else
+ signalT.header.theVerId_signalNumber = GSN_SIGNAL_DROPPED_REP;
+ signalT.header.theLength = newLen + 3;
+ signalT.header.m_noOfSections = 0;
+ if (prio == JBB)
+ sendlocal(receiverThreadId, signalT.header.theReceiversBlockNumber,
+ &signalT.header);
+ else
+ sendprioa(receiverThreadId, signalT.header.theReceiversBlockNumber,
+ &signalT.header);
+
+#endif
}
NdbOut &
@@ -323,7 +391,11 @@ checkJobBuffer() {
* Check to see if jobbbuffers are starting to get full
* and if so call doJob
*/
+#ifndef NDBD_MULTITHREADED
return globalScheduler.checkDoJob();
+#else
+ return 0;
+#endif
}
void
@@ -389,7 +461,14 @@ reportError(void * callbackObj, NodeId n
signal.header.theLength = 3;
signal.header.theSendersSignalId = 0;
signal.header.theSendersBlockRef = numberToRef(0, globalData.ownId);
+#ifndef NDBD_MULTITHREADED
globalScheduler.execute(&signal, JBA, CMVMI, GSN_EVENT_REP);
+#else
+ signal.header.theVerId_signalNumber = GSN_EVENT_REP;
+ signal.header.theReceiversBlockNumber = CMVMI;
+// sendprioa(THREAD LOCAL STORAGE, signal.header.theReceiversBlockNumber, &signalT.header);
+ assert(false);
+#endif
DBUG_VOID_RETURN;
}
@@ -411,7 +490,14 @@ reportSendLen(void * callbackObj,
signal.theData[0] = NDB_LE_SendBytesStatistic;
signal.theData[1] = nodeId;
signal.theData[2] = (bytes/count);
+#ifndef NDBD_MULTITHREADED
globalScheduler.execute(&signal, JBA, CMVMI, GSN_EVENT_REP);
+#else
+ signal.header.theVerId_signalNumber = GSN_EVENT_REP;
+ signal.header.theReceiversBlockNumber = CMVMI;
+ sendprioa(senderThreadId, signal.header.theReceiversBlockNumber,
+ &signalT.header);
+#endif
}
/**
@@ -431,7 +517,14 @@ reportReceiveLen(void * callbackObj,
signal.theData[0] = NDB_LE_ReceiveBytesStatistic;
signal.theData[1] = nodeId;
signal.theData[2] = (bytes/count);
+#ifndef NDBD_MULTITHREADED
globalScheduler.execute(&signal, JBA, CMVMI, GSN_EVENT_REP);
+#else
+ signal.header.theVerId_signalNumber = GSN_EVENT_REP;
+ signal.header.theReceiversBlockNumber = CMVMI;
+ sendprioa(receiverThreadId, signal.header.theReceiversBlockNumber,
+ &signalT.header);
+#endif
}
/**
@@ -450,7 +543,14 @@ reportConnect(void * callbackObj, NodeId
signal.header.theSendersBlockRef = numberToRef(0, globalData.ownId);
signal.theData[0] = nodeId;
+#ifndef NDBD_MULTITHREADED
globalScheduler.execute(&signal, JBA, CMVMI, GSN_CONNECT_REP);
+#else
+ signal.header.theVerId_signalNumber = GSN_CONNECT_REP;
+ signal.header.theReceiversBlockNumber = CMVMI;
+ sendprioa(senderThreadId, signal.header.theReceiversBlockNumber,
+ &signalT.header);
+#endif
}
/**
@@ -474,7 +574,13 @@ reportDisconnect(void * callbackObj, Nod
rep->nodeId = nodeId;
rep->err = errNo;
+#ifndef NDBD_MULTITHREADED
globalScheduler.execute(&signal, JBA, CMVMI, GSN_DISCONNECT_REP);
+#else
+ signal.header.theVerId_signalNumber = GSN_DISCONNECT_REP;
+ signal.header.theReceiversBlockNumber = CMVMI;
+ sendlocal(0, signal.header.theReceiversBlockNumber, &signalT.header);
+#endif
DBUG_VOID_RETURN;
}
diff -Nrup a/storage/ndb/src/kernel/vm/TransporterCallback_mt.cpp b/storage/ndb/src/kernel/vm/TransporterCallback_mt.cpp
--- /dev/null Wed Dec 31 16:00:00 196900
+++ b/storage/ndb/src/kernel/vm/TransporterCallback_mt.cpp 2007-09-24 17:12:18 +02:00
@@ -0,0 +1,19 @@
+/* Copyright (C) 2007 MySQL AB
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; version 2 of the License.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
+
+/* This stub file defines the SimulatedBlock class for multithreaded NDBD. */
+#define NDBD_MULTITHREADED
+#include "mt/mt.hpp"
+#include "TransporterCallback.cpp"
diff -Nrup a/storage/ndb/src/kernel/vm/TransporterCallback_nonmt.cpp b/storage/ndb/src/kernel/vm/TransporterCallback_nonmt.cpp
--- /dev/null Wed Dec 31 16:00:00 196900
+++ b/storage/ndb/src/kernel/vm/TransporterCallback_nonmt.cpp 2007-09-24 17:12:18 +02:00
@@ -0,0 +1,18 @@
+/* Copyright (C) 2007 MySQL AB
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; version 2 of the License.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
+
+/* This stub file defines the SimulatedBlock class for singlethreaded NDBD. */
+#undef NDBD_MULTITHREADED
+#include "TransporterCallback.cpp"
diff -Nrup a/storage/ndb/src/kernel/vm/WatchDog.cpp b/storage/ndb/src/kernel/vm/WatchDog.cpp
--- a/storage/ndb/src/kernel/vm/WatchDog.cpp 2007-06-09 07:25:44 +02:00
+++ b/storage/ndb/src/kernel/vm/WatchDog.cpp 2007-09-24 17:12:18 +02:00
@@ -37,15 +37,17 @@ runWatchDog(void* w){
}
WatchDog::WatchDog(Uint32 interval) :
- theIPValue(globalData.getWatchDogPtr())
+ m_watchedCount(0)
{
setCheckInterval(interval);
+ m_mutex = NdbMutex_Create();
theStop = false;
theThreadPtr = 0;
}
WatchDog::~WatchDog(){
doStop();
+ NdbMutex_Destroy(m_mutex);
}
Uint32
@@ -54,6 +56,50 @@ WatchDog::setCheckInterval(Uint32 interv
return theInterval = (interval < 70 ? 70 : interval);
}
+bool
+WatchDog::registerWatchedThread(Uint32 *counter, Uint32 threadId)
+{
+ bool ret;
+
+ NdbMutex_Lock(m_mutex);
+
+ if (m_watchedCount >= MAX_WATCHED_THREADS)
+ {
+ ret = false;
+ }
+ else
+ {
+ m_watchedList[m_watchedCount].m_watchCounter = counter;
+ m_watchedList[m_watchedCount].m_threadId = threadId;
+ NdbTick_getMicroTimer(&(m_watchedList[m_watchedCount].m_startTime));
+ m_watchedList[m_watchedCount].m_slowWarnDelay = theInterval;
+ m_watchedList[m_watchedCount].m_lastCounterValue = 0;
+ ++m_watchedCount;
+ ret = true;
+ }
+
+ NdbMutex_Unlock(m_mutex);
+ return ret;
+}
+
+void
+WatchDog::unregisterWatchedThread(Uint32 threadId)
+{
+ Uint32 i;
+ NdbMutex_Lock(m_mutex);
+
+ for (i = 0; i < m_watchedCount; i++)
+ {
+ if (threadId == m_watchedList[i].m_threadId)
+ break;
+ }
+ assert(i < m_watchedCount);
+ m_watchedList[i] = m_watchedList[m_watchedCount - 1];
+ --m_watchedCount;
+
+ NdbMutex_Unlock(m_mutex);
+}
+
void
WatchDog::doStart(){
theStop = false;
@@ -115,12 +161,17 @@ const char *get_action(Uint32 IPValue)
void
WatchDog::run()
{
- unsigned int anIPValue, sleep_time;
- unsigned int oldIPValue = 0;
- unsigned int theIntervalCheck = theInterval;
- struct MicroSecondTimer start_time, last_time, now;
- NdbTick_getMicroTimer(&start_time);
- last_time = start_time;
+ unsigned int sleep_time;
+ struct MicroSecondTimer last_time, now;
+ Uint32 numThreads;
+ Uint32 counterValue[MAX_WATCHED_THREADS];
+ Uint32 oldCounterValue[MAX_WATCHED_THREADS];
+ Uint32 threadId[MAX_WATCHED_THREADS];
+ struct MicroSecondTimer start_time[MAX_WATCHED_THREADS];
+ Uint32 theIntervalCheck[MAX_WATCHED_THREADS];
+ Uint32 elapsed[MAX_WATCHED_THREADS];
+
+ NdbTick_getMicroTimer(&last_time);
// WatchDog for the single threaded NDB
while (!theStop)
@@ -145,33 +196,65 @@ WatchDog::run()
}
last_time = now;
- // Verify that the IP thread is not stuck in a loop
- anIPValue = *theIPValue;
- if (anIPValue != 0)
+ /*
+ Copy out all active counters under locked mutex, then check them
+ afterwards without holding the mutex.
+ */
+ NdbMutex_Lock(m_mutex);
+ numThreads = m_watchedCount;
+ for (Uint32 i = 0; i < numThreads; i++)
{
- oldIPValue = anIPValue;
- globalData.incrementWatchDogCounter(0);
- NdbTick_getMicroTimer(&start_time);
- theIntervalCheck = theInterval;
+ counterValue[i] = *(m_watchedList[i].m_watchCounter);
+ if (counterValue[i] != 0)
+ {
+ /*
+ The thread responded since last check, so just update state until
+ next check.
+
+ There is a small race here. If the thread changes the counter
+ in-between the read and setting to zero here in the watchdog
+ thread, then gets stuck immediately after, we may report the
+ wrong action that it got stuck on.
+ But there will be no reporting of non-stuck thread because of
+ this race, nor will there be missed reporting.
+ */
+ *(m_watchedList[i].m_watchCounter) = 0;
+ m_watchedList[i].m_startTime = now;
+ m_watchedList[i].m_slowWarnDelay = theInterval;
+ m_watchedList[i].m_lastCounterValue = counterValue[i];
+ }
+ else
+ {
+ start_time[i] = m_watchedList[i].m_startTime;
+ threadId[i] = m_watchedList[i].m_threadId;
+ oldCounterValue[i] = m_watchedList[i].m_lastCounterValue;
+ theIntervalCheck[i] = m_watchedList[i].m_slowWarnDelay;
+ elapsed[i] = NdbTick_getMicrosPassed(start_time[i], now)/1000;
+ if (oldCounterValue[i] == 9 && elapsed[i] >= theIntervalCheck[i])
+ m_watchedList[i].m_slowWarnDelay += theInterval;
+ }
}
- else
+ NdbMutex_Unlock(m_mutex);
+
+ /*
+ Now check each watched thread if it has reported progress since previous
+ check. Warn about any stuck threads, and eventually force shutdown the
+ server.
+ */
+ for (Uint32 i = 0; i < numThreads; i++)
{
- int warn = 1;
- Uint32 elapsed = NdbTick_getMicrosPassed(start_time, now)/1000;
+ if (counterValue[i] != 0)
+ continue;
+
/*
- oldIPValue == 9 indicates malloc going on, this can take some time
+ Counter value == 9 indicates malloc going on, this can take some time
so only warn if we pass the watchdog interval
*/
- if (oldIPValue == 9)
- if (elapsed < theIntervalCheck)
- warn = 0;
- else
- theIntervalCheck += theInterval;
-
- if (warn)
+ if (oldCounterValue[i] != 9 || elapsed[i] >= theIntervalCheck[i])
{
- const char *last_stuck_action = get_action(oldIPValue);
- g_eventLogger.warning("Ndb kernel is stuck in: %s", last_stuck_action);
+ const char *last_stuck_action = get_action(oldCounterValue[i]);
+ g_eventLogger.warning("Ndb kernel thread %u is stuck in: %s",
+ threadId[i], last_stuck_action);
{
struct tms my_tms;
times(&my_tms);
@@ -179,7 +262,7 @@ WatchDog::run()
(Uint64)my_tms.tms_utime,
(Uint64)my_tms.tms_stime);
}
- if (elapsed > 3 * theInterval)
+ if (elapsed[i] > 3 * theInterval)
{
shutdownSystem(last_stuck_action);
}
diff -Nrup a/storage/ndb/src/kernel/vm/WatchDog.hpp b/storage/ndb/src/kernel/vm/WatchDog.hpp
--- a/storage/ndb/src/kernel/vm/WatchDog.hpp 2006-12-23 20:20:20 +01:00
+++ b/storage/ndb/src/kernel/vm/WatchDog.hpp 2007-09-24 17:12:18 +02:00
@@ -18,10 +18,31 @@
#include <kernel_types.h>
#include <NdbThread.h>
+#include <NdbMutex.h>
+#include <NdbTick.h>
extern "C" void* runWatchDog(void* w);
class WatchDog{
+ enum { MAX_WATCHED_THREADS = 64 };
+
+ struct WatchedThread {
+ Uint32 *m_watchCounter;
+ Uint32 m_threadId;
+ /* This is the time that activity was last registered from thread. */
+ MicroSecondTimer m_startTime;
+ /*
+ During slow operation (memory allocation), warnings are output less
+ frequently, and this is the point when the next warning should be given.
+ */
+ Uint32 m_slowWarnDelay;
+ /*
+ This is the last counter value update seen, telling us what the thread
+ was doing when it got stuck.
+ */
+ Uint32 m_lastCounterValue;
+ };
+
public:
WatchDog(Uint32 interval = 3000);
~WatchDog();
@@ -30,7 +51,15 @@ public:
void doStop();
Uint32 setCheckInterval(Uint32 interval);
-
+
+ /*
+ Register a thread for being watched.
+ Returns true if ok, false if out of slots.
+ */
+ bool registerWatchedThread(Uint32 *counter, Uint32 threadId);
+ /* Remove a thread from registration, identified by thread id. */
+ void unregisterWatchedThread(Uint32 threadId);
+
protected:
/**
* Thread function
@@ -44,8 +73,17 @@ protected:
private:
Uint32 theInterval;
- const Uint32 * theIPValue;
-
+ /*
+ List of watched threads.
+ Threads are identified by the m_threadId.
+ Active entries are kept at the start of the entries.
+ Access to the list is protected by m_mutex.
+ */
+ WatchedThread m_watchedList[MAX_WATCHED_THREADS];
+ /* Number of active entries in m_watchedList. */
+ Uint32 m_watchedCount;
+ NdbMutex *m_mutex;
+
bool theStop;
void run();
diff -Nrup a/storage/ndb/src/kernel/vm/mt/dummy_mt.cpp b/storage/ndb/src/kernel/vm/mt/dummy_mt.cpp
--- /dev/null Wed Dec 31 16:00:00 196900
+++ b/storage/ndb/src/kernel/vm/mt/dummy_mt.cpp 2007-09-24 17:12:18 +02:00
@@ -0,0 +1,83 @@
+#include "FastScheduler.hpp"
+#include "RefConvert.hpp"
+
+#include "Emulator.hpp"
+#include "VMSignal.hpp"
+
+#include <SignalLoggerManager.hpp>
+#include <BlockNumbers.h>
+#include <GlobalSignalNumbers.h>
+#include <signaldata/EventReport.hpp>
+#include "LongSignal.hpp"
+#include <NdbTick.h>
+
+FastScheduler::FastScheduler()
+{
+}
+
+FastScheduler::~FastScheduler()
+{
+}
+
+void
+FastScheduler::clear()
+{
+}
+
+void
+FastScheduler::dumpSignalMemory(FILE* out)
+{
+ /*
+ In single-threaded ndbd, we set the watchdog counter to 4 here to note that
+ we are dumping.
+
+ But for multi-threaded ndbd, we will need to first stop all
+ threads anyway, so we might as well just de-register them from the
+ watchdog, and this will not be needed.
+ */
+}
+
+
+void bnr_error()
+{
+}
+
+void jbuf_error()
+{
+}
+
+void
+FastScheduler::prio_level_error()
+{
+}
+
+APZJobBuffer::APZJobBuffer()
+{
+}
+
+APZJobBuffer::~APZJobBuffer()
+{
+}
+
+void
+APZJobBuffer::insert(const SignalHeader * const sh,
+ const Uint32 * const theData, const Uint32 secPtrI[3]){
+}
+
+void
+APZJobBuffer::signal2buffer(Signal* signal,
+ BlockNumber bnr, GlobalSignalNumber gsn,
+ BufferEntry& buf)
+{
+}
+
+#include <TimeQueue.hpp>
+
+TimeQueue::TimeQueue()
+{
+}
+
+TimeQueue::~TimeQueue()
+{
+}
+
diff -Nrup a/storage/ndb/src/kernel/vm/mt/mt.cpp b/storage/ndb/src/kernel/vm/mt/mt.cpp
--- /dev/null Wed Dec 31 16:00:00 196900
+++ b/storage/ndb/src/kernel/vm/mt/mt.cpp 2007-09-24 17:12:18 +02:00
@@ -0,0 +1,1202 @@
+#include <sys/uio.h>
+#include <stdio.h>
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <fcntl.h>
+#include <unistd.h>
+#include <pthread.h>
+#include <sys/time.h>
+#include <errno.h>
+#include <assert.h>
+#include <linux/futex.h>
+#include <stdlib.h>
+#include <string.h>
+
+#include <VMSignal.hpp>
+#include <kernel_types.h>
+#include <Prio.hpp>
+#include <SignalLoggerManager.hpp>
+#include <SimulatedBlock.hpp>
+#include <ErrorHandlingMacros.hpp>
+#include <GlobalData.hpp>
+#include <WatchDog.hpp>
+#include <TransporterDefinitions.hpp>
+#include "mt.hpp"
+#include <DebuggerNames.hpp>
+
+#ifndef _GNU_SOURCE
+#define _GNU_SOURCE
+#endif
+#include <unistd.h>
+#include <sys/syscall.h>
+#include <sys/types.h>
+
+static const Uint32 NUM_THREADS = 3;
+#define MAX_THREADS 63
+
+static inline
+int
+xcng(volatile unsigned * addr, int val)
+{
+ asm volatile ("xchg %0, %1;" : "+r" (val) , "+m" (*addr));
+ return val;
+}
+
+#ifdef USE_FUTEX
+
+static inline
+int
+futex_wait(volatile unsigned * addr, int val, const struct timespec * timeout)
+{
+ return syscall(SYS_futex,
+ FUTEX_WAIT, val, timeout, 0, 0) == 0 ? 0 : errno;
+}
+
+static inline
+int
+futex_wake(volatile unsigned * addr)
+{
+ return syscall(SYS_futex, addr, FUTEX_WAKE, 1, 0, 0, 0) == 0 ? 0 : errno;
+}
+
+struct thr_wait
+{
+ volatile unsigned m_futex_state;
+ enum {
+ FS_RUNNING = 0,
+ FS_SLEEPING = 1,
+ };
+ thr_wait() { xcng(&m_futex_state, FS_RUNNING);}
+};
+
+static
+void
+yield(struct thr_wait* wait, const struct timespec *timeout)
+{
+ volatile unsigned * val = &wait->m_futex_state;
+ int old = xcng(val, thr_wait::FS_SLEEPING);
+ assert(old == thr_wait::FS_RUNNING);
+ int ret;
+ ret = futex_wait(val, thr_wait::FS_SLEEPING, timeout);
+ if (ret == ETIMEDOUT || ret == EINTR)
+ {
+ xcng(val, thr_wait::FS_RUNNING);
+ }
+ else
+ {
+ int old = xcng(val, thr_wait::FS_RUNNING);
+ if (old != thr_wait::FS_RUNNING)
+ {
+ printf("ret %u old: %u in wakeup\n", ret, old);
+ }
+ //assert(ret == thr_wait::FS_RUNNING);
+ }
+}
+
+static
+int
+wakeup(struct thr_wait* wait)
+{
+ volatile unsigned * val = &wait->m_futex_state;
+ if (xcng(val, thr_wait::FS_RUNNING) == thr_wait::FS_SLEEPING)
+ {
+ return futex_wake(val);
+ }
+ return 0;
+}
+#else
+#include <NdbMutex.h>
+#include <NdbCondition.h>
+
+struct thr_wait
+{
+ NdbMutex *m_mutex;
+ NdbCondition *m_cond;
+ thr_wait() {
+ m_mutex = NdbMutex_Create();
+ m_cond = NdbCondition_Create();
+ NdbMutex_Lock(m_mutex);
+ }
+};
+
+static
+void
+yield(struct thr_wait* wait, const struct timespec *timeout)
+{
+ NdbCondition_WaitTimeout(wait->m_cond, wait->m_mutex, 1);
+}
+
+static
+int
+wakeup(struct thr_wait* wait)
+{
+ NdbCondition_Signal(wait->m_cond);
+ return 0;
+}
+#endif
+
+inline void require(bool x)
+{
+ if (unlikely(!(x)))
+ abort();
+}
+
+struct thr_spin_lock
+{
+ thr_spin_lock(const char * name) { m_lock = 0; m_name = name;}
+
+ const char * m_name;
+ volatile unsigned m_lock;
+};
+
+struct thr_mutex
+{
+ thr_mutex(const char * name) {
+ m_mutex = NdbMutex_Create();
+ m_name = name;
+ }
+
+ const char * m_name;
+ NdbMutex * m_mutex;
+};
+
+static
+inline
+void
+lock(struct thr_spin_lock* sl)
+{
+ volatile unsigned* val = &sl->m_lock;
+test:
+ if (likely(xcng(val, 1) == 0))
+ return;
+
+ printf("%s waiting for lock\n", sl->m_name);
+
+ while (* val == 1);
+ goto test;
+}
+
+static
+inline
+void
+unlock(struct thr_spin_lock* sl)
+{
+ sl->m_lock = 0;
+}
+
+static
+inline
+int
+trylock(struct thr_spin_lock* sl)
+{
+ volatile unsigned* val = &sl->m_lock;
+ return xcng(val, 1);
+}
+
+static
+inline
+void
+lock(struct thr_mutex* sl)
+{
+ NdbMutex_Lock(sl->m_mutex);
+}
+
+static
+inline
+void
+unlock(struct thr_mutex* sl)
+{
+ NdbMutex_Unlock(sl->m_mutex);
+}
+
+static
+inline
+int
+trylock(struct thr_mutex * sl)
+{
+ return NdbMutex_Trylock(sl->m_mutex);
+}
+
+/**
+ *
+ */
+struct thr_job_buffer // 32k
+{
+ static const unsigned SIZE = 8190;
+
+ unsigned m_len;
+ unsigned m_pos;
+ unsigned m_data[SIZE];
+};
+
+struct thr_job_queue
+{
+ static const unsigned SIZE = 62;
+
+ unsigned m_read_index; // used by consumer
+ unsigned m_write_index; // used by producer
+ struct thr_job_buffer* m_buffers[SIZE];
+};
+
+struct thr_jba
+{
+ static const unsigned SIZE = 4;
+ thr_jba() : m_write_lock("jbalock") {}
+
+ unsigned m_read_index;
+ volatile unsigned m_write_index;
+ struct thr_spin_lock m_write_lock;
+ struct thr_job_buffer* m_next_buffer;
+ struct thr_job_buffer* m_buffers[SIZE];
+};
+
+struct thr_tq
+{
+ static const unsigned SQ_SIZE = 512;
+ static const unsigned LQ_SIZE = 512;
+ static const unsigned PAGES = 32 * (SQ_SIZE + LQ_SIZE) / 8192;
+
+ Uint32 m_next_timer;
+ Uint32 m_current_time;
+ Uint32 m_next_free;
+ Uint32 m_cnt[2];
+ Uint32 * m_delayed_signals[PAGES];
+ Uint32 m_short_queue[SQ_SIZE];
+ Uint32 m_long_queue[LQ_SIZE];
+};
+
+struct thr_data
+{
+ thr_wait m_waiter;
+ unsigned m_thr_no;
+ struct SimulatedBlock* m_blocks[NO_OF_BLOCKS];
+
+ Uint64 m_time;
+ struct thr_jba m_jba;
+ struct thr_tq m_tq;
+ unsigned m_free;
+ struct thr_job_buffer* m_free_list;
+ struct thr_job_buffer* m_out_queue[MAX_THREADS];
+ struct thr_job_queue m_in_queue[MAX_THREADS];
+};
+
+#define DBG_MALLOC 0
+#define THR_TO_REP(selfptr) &g_thr_repository
+
+template<typename T>
+struct thr_safe_pool
+{
+ thr_safe_pool() : m_lock("mempool"), m_free_list(0) {}
+
+ thr_spin_lock m_lock;
+ T* m_free_list;
+
+ T* seize() {
+ T* ret = 0;
+ lock(&m_lock);
+ if (m_free_list)
+ {
+ ret = m_free_list;
+ m_free_list = *reinterpret_cast<T**>(m_free_list);
+ }
+ else
+ {
+ ret = reinterpret_cast<T*>(malloc(sizeof(T)));
+ }
+ unlock(&m_lock);
+ return ret;
+ }
+
+ void release(T* t){
+ lock(&m_lock);
+ T** nextptr = reinterpret_cast<T**>(t);
+ * nextptr = m_free_list;
+ m_free_list = t;
+ unlock(&m_lock);
+ }
+};
+
+struct thr_repository
+{
+ thr_repository()
+ : m_send_lock("sendlock"),
+ m_receive_lock("recvlock"),
+ m_section_lock("sectionlock") {}
+
+ unsigned m_thread_count;
+ struct thr_spin_lock m_send_lock;
+ struct thr_spin_lock m_receive_lock;
+ struct thr_spin_lock m_section_lock;
+ struct thr_data m_thread[MAX_THREADS];
+ struct thr_safe_pool<thr_job_buffer> m_free_list;
+};
+
+static
+thr_job_buffer*
+seize_buffer(struct thr_repository* rep, int thr_no)
+{
+ thr_job_buffer* jb;
+ thr_data* selfptr = rep->m_thread + thr_no;
+ if (likely((jb = selfptr->m_free_list) != 0))
+ {
+ selfptr->m_free --;
+ thr_job_buffer* next = *reinterpret_cast<thr_job_buffer**>(jb);
+ rep->m_thread[thr_no].m_free_list = next;
+ jb->m_len = 0;
+ jb->m_pos = 0;
+ return jb;
+ }
+
+ /* global alloc */
+ jb = rep->m_free_list.seize();
+ jb->m_len = 0;
+ jb->m_pos = 0;
+ return jb;
+}
+
+static
+void
+release_buffer(struct thr_repository* rep, int thr_no, thr_job_buffer* jb)
+{
+ struct thr_data* selfptr = rep->m_thread + thr_no;
+
+ if (selfptr->m_free >= 32)
+ {
+ rep->m_free_list.release(jb);
+ }
+ else
+ {
+ thr_job_buffer** next = reinterpret_cast<thr_job_buffer**>(jb);
+ * next = selfptr->m_free_list;
+ selfptr->m_free_list = jb;
+ selfptr->m_free ++;
+ }
+}
+
+static
+void
+transfer_buffer(struct thr_repository* rep, unsigned from, unsigned to)
+{
+ require(from < NUM_THREADS && to < NUM_THREADS);
+ unsigned old;
+ unsigned * writeptr = &(rep->m_thread[to].m_in_queue[from].m_write_index);
+ volatile unsigned * readptr = &(rep->m_thread[to].m_in_queue[from].m_read_index);
+ unsigned readidx = * readptr;
+ unsigned writeidx = * writeptr;
+ unsigned nextidx = (writeidx + 1) % thr_job_queue::SIZE;
+ thr_job_buffer* src = rep->m_thread[from].m_out_queue[to];
+
+ if (unlikely(readidx == nextidx))
+ goto check_full;
+
+ if (0)
+ ndbout_c("transfer from: %d to: %d (read: %d write: %d)",
+ from, to, readidx, writeidx);
+
+do_transfer:
+ rep->m_thread[to].m_in_queue[from].m_buffers[writeidx] = src;
+ // need storestore barrier
+ // but xcng is full barrier
+ old = xcng(writeptr, nextidx);
+ assert(old == writeidx);
+ wakeup(&rep->m_thread[to].m_waiter); // potentially wakeup
+
+ rep->m_thread[from].m_out_queue[to] = seize_buffer(rep, from);
+ return ;
+check_full:
+ ndbout_c("sleep in transfer from %u to %u", from, to);
+ for (unsigned i = 0; i<1000; i++)
+ {
+ usleep(1000);
+ if (* readptr != nextidx)
+ goto do_transfer;
+ }
+
+ abort();
+}
+
+static
+unsigned
+dojob(struct thr_data* selfptr, struct thr_job_buffer* ptr, struct Signal* sig,
+ Uint32 *watchDogCounter)
+{
+ unsigned cnt = 0;
+ unsigned pos = ptr->m_pos;
+ unsigned end = ptr->m_len;
+ unsigned *posptr = ptr->m_data + pos;
+ unsigned *endptr = ptr->m_data + end;
+ SimulatedBlock** blockptr = selfptr->m_blocks - MIN_BLOCK_NO;
+
+ ptr->m_pos = end;
+
+ while (posptr < endptr)
+ {
+ SignalHeader* s = reinterpret_cast<SignalHeader*>(posptr);
+ unsigned seccnt = s->m_noOfSections;
+ unsigned siglen = (sizeof(*s)>>2) + s->theLength;
+ unsigned bno = s->theReceiversBlockNumber;
+ unsigned gsn = s->theVerId_signalNumber;
+ SimulatedBlock * block = * (blockptr + bno);
+ *watchDogCounter = 1;
+ memcpy(&sig->header, s, 4*siglen);
+ sig->m_sectionPtrI[0] = posptr[siglen + 0];
+ sig->m_sectionPtrI[1] = posptr[siglen + 1];
+ sig->m_sectionPtrI[2] = posptr[siglen + 2];
+ block->executeFunction(gsn, sig);
+
+ cnt++;
+ posptr += siglen + seccnt;
+ }
+
+ return cnt;
+}
+
+static
+unsigned
+dojoba(struct thr_repository * rep, unsigned thr_no,
+ Signal* signal, unsigned write_index,
+ Uint32 *watchDogCounter)
+{
+ unsigned sum = 0;
+ struct thr_job_buffer* buffer;
+ struct thr_data* selfptr = rep->m_thread + thr_no;
+ unsigned read_index = selfptr->m_jba.m_read_index;
+ unsigned wi_buf = write_index >> 16;
+ //unsigned wi_pos = write_index & 0xFFFF;
+
+ unsigned ri_buf = read_index >> 16;
+ while (ri_buf != wi_buf)
+ {
+ buffer = * (selfptr->m_jba.m_buffers + ri_buf);
+ sum += dojob(selfptr, buffer, signal, watchDogCounter);
+ release_buffer(rep, thr_no, buffer);
+ ri_buf = (ri_buf + 1) % thr_jba::SIZE;
+ }
+
+ buffer = * (selfptr->m_jba.m_buffers + ri_buf);
+ sum += dojob(selfptr, buffer, signal, watchDogCounter);
+
+ unsigned ri_pos = buffer->m_pos;
+ selfptr->m_jba.m_read_index = (ri_buf << 16) + ri_pos;
+
+ return sum;
+}
+
+static
+inline
+Uint32
+scan_queue(struct thr_data* selfptr, Uint32 cnt, Uint32 end, Uint32* ptr)
+{
+ Uint32 thr_no = selfptr->m_thr_no;
+ Uint32 **pages = selfptr->m_tq.m_delayed_signals;
+ Uint32 free = selfptr->m_tq.m_next_free;
+ Uint32* save = ptr;
+ for (Uint32 i = 0; i < cnt; i++, ptr++)
+ {
+ Uint32 val = * ptr;
+ if ((val & 0xFFFF) <= end)
+ {
+ Uint32 idx = val >> 16;
+ Uint32 buf = idx >> 8;
+ Uint32 pos = 32 * (idx & 0xFF);
+
+ Uint32* page = * (pages + buf);
+
+ SignalHeader* s = reinterpret_cast<SignalHeader*>(page + pos);
+ if (0)
+ ndbout_c("found %p val: %d end: %d", s, val & 0xFFFF, end);
+ sendprioa(thr_no, s->theReceiversBlockNumber, s);
+ * (page + pos) = free;
+ free = idx;
+ }
+ else if (i > 0)
+ {
+ selfptr->m_tq.m_next_free = free;
+ memmove(save, ptr, 4 * (cnt - i));
+ return i;
+ }
+ else
+ {
+ return 0;
+ }
+ }
+ selfptr->m_tq.m_next_free = free;
+ return cnt;
+}
+
+static
+void
+handle_time_wrap(struct thr_data* selfptr)
+{
+ Uint32 i;
+ struct thr_tq * tq = &selfptr->m_tq;
+ Uint32 cnt0 = tq->m_cnt[0];
+ Uint32 cnt1 = tq->m_cnt[1];
+ Uint32 tmp0 = scan_queue(selfptr, cnt0, 32767, tq->m_short_queue);
+ Uint32 tmp1 = scan_queue(selfptr, cnt1, 32767, tq->m_long_queue);
+ cnt0 -= tmp0;
+ cnt1 -= tmp1;
+ tq->m_cnt[0] = cnt0;
+ tq->m_cnt[1] = cnt1;
+ for (i = 0; i<cnt0; i++)
+ {
+ assert((tq->m_short_queue[i] & 0xFFFF) > 32767);
+ tq->m_short_queue[i] -= 32767;
+ }
+ for (i = 0; i<cnt1; i++)
+ {
+ assert((tq->m_long_queue[i] & 0xFFFF) > 32767);
+ tq->m_long_queue[i] -= 32767;
+ }
+}
+
+static
+void
+scan_time_queues(struct thr_data* selfptr)
+{
+ struct thr_tq * tq = &selfptr->m_tq;
+ NDB_TICKS now = NdbTick_CurrentMillisecond();
+ NDB_TICKS last = selfptr->m_time;
+
+ Uint32 curr = tq->m_current_time;
+ Uint32 cnt0 = tq->m_cnt[0];
+ Uint32 cnt1 = tq->m_cnt[1];
+
+ Uint64 diff = now - last;
+ if (diff == 0)
+ {
+ return;
+ }
+ else if (diff > 0)
+ {
+ Uint32 step = (Uint32)((diff > 20) ? 20 : diff);
+ Uint32 end = (curr + step);
+ if (end >= 32767)
+ {
+ handle_time_wrap(selfptr);
+ cnt0 = tq->m_cnt[0];
+ cnt1 = tq->m_cnt[1];
+ end -= 32767;
+ }
+
+ Uint32 tmp0 = scan_queue(selfptr, cnt0, end, tq->m_short_queue);
+ Uint32 tmp1 = scan_queue(selfptr, cnt1, end, tq->m_long_queue);
+
+ tq->m_current_time = end;
+ tq->m_cnt[0] = cnt0 - tmp0;
+ tq->m_cnt[1] = cnt1 - tmp1;
+ selfptr->m_time = last + step;
+
+ return;
+ }
+ else if (diff == 0)
+ {
+ return;
+ }
+ abort();
+}
+
+Uint32 senderThreadId;
+
+static
+void
+do_send(struct thr_repository* rep, struct thr_data* selfptr,
+ Uint32 *watchDogCounter)
+{
+ static int cnt = 0;
+
+ senderThreadId = selfptr->m_thr_no;
+
+ if (cnt == 0)
+ {
+ *watchDogCounter = 5;
+ globalTransporterRegistry.update_connections();
+ }
+ cnt = (cnt + 1) & 15;
+
+ *watchDogCounter = 6;
+ globalTransporterRegistry.performSend();
+ unlock(&rep->m_send_lock);
+}
+
+Uint32 receiverThreadId;
+
+static
+void
+do_receive(struct thr_repository*rep, struct thr_data* selfptr, unsigned delay,
+ Uint32 *watchDogCounter)
+{
+ unsigned thr_no = selfptr->m_thr_no;
+ *watchDogCounter = 7;
+ receiverThreadId = thr_no;
+ if (globalTransporterRegistry.pollReceive(delay))
+ {
+ *watchDogCounter = 8;
+ globalTransporterRegistry.performReceive();
+ }
+ unlock(&rep->m_receive_lock);
+
+ unsigned cnt = rep->m_thread_count;
+
+ /**
+ * Transfer all out buffers
+ */
+ for (unsigned i = 0; i<cnt; i++)
+ {
+ thr_job_buffer * jb = selfptr->m_out_queue[i];
+ if (jb->m_len)
+ {
+ transfer_buffer(rep, thr_no, i);
+ }
+ }
+}
+
+static
+inline
+void
+sendpacked(struct thr_data* selfptr, Signal* signal, Uint32 thr_no)
+{
+ SimulatedBlock** blockptr = selfptr->m_blocks - MIN_BLOCK_NO;
+ SimulatedBlock* b_lqh = * (blockptr + DBLQH);
+ SimulatedBlock* b_tc = * (blockptr + DBTC);
+ SimulatedBlock* b_tup = * (blockptr + DBTUP);
+ if (b_lqh)
+ b_lqh->executeFunction(GSN_SEND_PACKED, signal);
+ if (b_tc)
+ b_tc->executeFunction(GSN_SEND_PACKED, signal);
+ if (b_tup)
+ b_tup->executeFunction(GSN_SEND_PACKED, signal);
+}
+
+static inline Uint32
+block2ThreadId(Uint32 block)
+{
+ /*
+ Block assignment:
+ 0 PGMAN BACKUP LOCAL
+ 1 LGMAN DBTC GLOBAL
+ 2 TSMAN DBDIH GLOBAL
+ 3 ACC DBLQH LOCAL
+ 4 CMVMI DBACC LOCAL
+ 5 FS DBTUP LOCAL
+ 6 DICT DBDICT GLOBAL
+ 7 DIH NDBCNTR GLOBAL
+ 8 LQH QMGR GLOBAL
+ 9 TC NDBFS GLOBAL
+ 10 TUP CMVMI GLOBAL
+ 11 NDBCNTR TRIX GLOBAL
+ 12 QMGR DBUTIL GLOBAL
+ 13 TRIX SUMA LOCAL
+ 14 BACKUP DBTUX LOCAL
+ 15 UTIL TSMAN LOCAL
+ 16 SUMA LGMAN LOCAL
+ 17 TUX PGMAN LOCAL
+ 18 RESTORE RESTORE LOCAL
+
+ Thread 0 is for global, thread 1 for locals.
+ */
+ static const Uint32 locals = 0x7e039;
+ assert(block >= MIN_BLOCK_NO && block <= MAX_BLOCK_NO);
+ return (locals >> (block - MIN_BLOCK_NO)) & 1;
+}
+
+extern "C"
+void *
+mt_thr_main(void *thr_arg)
+{
+ Signal signal;
+ struct timespec nowait;
+ nowait.tv_sec = 0;
+ nowait.tv_nsec = 10 * 1000000;
+ EmulatedJamBuffer thread_jam;
+ Uint32 watchDogCounter;
+
+ thread_jam.theEmulatedJamIndex = 0;
+ thread_jam.theEmulatedJamBlockNumber = 0;
+ memset(thread_jam.theEmulatedJam, 0, sizeof(thread_jam.theEmulatedJam));
+ NdbThread_SetTlsKey(NDB_THREAD_TLS_JAM, &thread_jam);
+
+ struct thr_repository* rep = &g_thr_repository;
+ struct thr_data* selfptr = (struct thr_data *)thr_arg;
+ /*
+ The thread initialisation argument is void *, not numeric, so we obtain
+ the numeric thread id in this slightly backwards way.
+ */
+ unsigned thr_no = selfptr - &(rep->m_thread[0]);
+ pid_t tid = (unsigned)syscall(SYS_gettid);
+ ndbout_c("Tread %u started, tid=%u", thr_no, tid);
+ ndbout_c("lock to cpu %u", (thr_no & 1));
+ {
+ cpu_set_t mask;
+ CPU_ZERO(&mask);
+ CPU_SET((thr_no & 1), &mask);
+ sched_setaffinity(tid, sizeof(mask), &mask);
+ }
+
+ /*
+ Now we need to somehow assign to this thread all blocks that will run in
+ this thread.
+ */
+ for (Uint32 i = 0; i < NO_OF_BLOCKS; i++)
+ {
+ if (block2ThreadId(i + MIN_BLOCK_NO) == thr_no)
+ {
+ require(selfptr->m_blocks[i] != 0);
+ selfptr->m_blocks[i]->assignToThread(thr_no, &thread_jam,
+ &watchDogCounter);
+ }
+ }
+
+ /* Avoid false watchdog alarms caused by race condition. */
+ watchDogCounter = 1;
+ globalEmulatorData.theWatchDog->registerWatchedThread(&watchDogCounter,
+ thr_no);
+
+ /**
+ * prio A
+ */
+ unsigned* a_re_idxptr = &selfptr->m_jba.m_read_index;
+ volatile unsigned * a_wr_idxptr = &selfptr->m_jba.m_write_index;
+
+ while (globalData.theRestartFlag != perform_stop)
+ {
+ unsigned sum = 0;
+ unsigned cnt = rep->m_thread_count;
+
+ watchDogCounter = 2;
+ scan_time_queues(selfptr);
+ unsigned jba_read_index = * a_re_idxptr;
+ unsigned jba_write_index = * a_wr_idxptr;
+
+ /**
+ * Process each inbuffer
+ */
+ for (unsigned i = 0; i<cnt; i++)
+ {
+ if (jba_read_index != jba_write_index)
+ {
+ sum += dojoba(rep, thr_no, &signal, jba_write_index, &watchDogCounter);
+ jba_read_index = * a_re_idxptr;
+ }
+
+ unsigned ri = selfptr->m_in_queue[i].m_read_index;
+ unsigned wi = selfptr->m_in_queue[i].m_write_index;
+ thr_job_buffer * jb = selfptr->m_in_queue[i].m_buffers[ri];
+ if (ri != wi)
+ {
+ ri = (ri + 1) % thr_job_queue::SIZE;
+ selfptr->m_in_queue[i].m_read_index = ri;
+ sum += dojob(selfptr, jb, &signal, &watchDogCounter);
+ release_buffer(rep, thr_no, jb);
+ }
+ jba_write_index = * a_wr_idxptr;
+ }
+
+ watchDogCounter = 1;
+ sendpacked(selfptr, &signal, thr_no);
+
+ if (sum)
+ {
+ /**
+ * Transfer all out buffers
+ */
+ watchDogCounter = 6;
+ for (unsigned i = 0; i<cnt; i++)
+ {
+ thr_job_buffer * jb = selfptr->m_out_queue[i];
+ if (jb->m_len)
+ {
+ transfer_buffer(rep, thr_no, i);
+ }
+ }
+ }
+
+ if (trylock(&rep->m_send_lock) == 0)
+ {
+ do_send(rep, selfptr, &watchDogCounter);
+ }
+
+ if (
+thr_no == 2 &&
+trylock(&rep->m_receive_lock) == 0)
+ {
+ do_receive(rep, selfptr, sum ? 0 : 1, &watchDogCounter);
+ }
+ else if (sum == 0)
+ {
+ yield(&selfptr->m_waiter, &nowait);
+ }
+ }
+
+ globalEmulatorData.theWatchDog->unregisterWatchedThread(thr_no);
+ return NULL; // Return value not currently used
+}
+
+void
+thr_transporter_main(struct thr_repository* rep, unsigned thr_no)
+{
+}
+
+static
+inline
+int
+insert(thr_job_buffer* ptr, const struct SignalHeader* s)
+{
+ unsigned len = ptr->m_len;
+ unsigned* pos = ptr->m_data + len;
+ unsigned siglen = (sizeof(*s) >> 2) + s->theLength + s->m_noOfSections;
+ ptr->m_len = len + siglen;
+ memcpy(pos, s, 4*siglen);
+ return thr_job_buffer::SIZE - (len + siglen + 32); // > 0 not full, <=0 full
+}
+
+void
+sendlocal(Uint32 self, Uint32 block, const SignalHeader* s)
+{
+ Uint32 dst = block2ThreadId(block);
+ struct thr_repository* rep = &g_thr_repository;
+ struct thr_data * selfptr = rep->m_thread + self;
+
+ int res = insert(selfptr->m_out_queue[dst], s);
+ if (res <= 0)
+ {
+ transfer_buffer(rep, self, dst);
+ }
+}
+
+void
+sendprioa(Uint32 self, Uint32 block, const SignalHeader* s)
+{
+ Uint32 dst = block2ThreadId(block);
+ struct thr_repository* rep = &g_thr_repository;
+ struct thr_data * selfptr = rep->m_thread + self;
+ struct thr_data * dstptr = rep->m_thread + dst;
+
+ unsigned siglen = (sizeof(*s) >> 2) + s->theLength + s->m_noOfSections;
+
+ struct thr_job_buffer* newbuffer = selfptr->m_jba.m_next_buffer;
+
+ lock(&dstptr->m_jba.m_write_lock);
+
+ unsigned wi = dstptr->m_jba.m_write_index;
+ unsigned buf = wi >> 16;
+ unsigned newwi = wi + siglen;
+ unsigned pos = wi & 0xFFFF;
+ unsigned newpos = pos + siglen;
+
+ struct thr_job_buffer * buffer = * (dstptr->m_jba.m_buffers + buf);
+ assert(buffer->m_len == pos);
+ buffer->m_len = newpos;
+ memcpy(buffer->m_data + pos, s, 4*siglen);
+
+ if (0)
+ ndbout_c("sendprioa %s from %s to %s",
+ getSignalName(s->theVerId_signalNumber),
+ getBlockName(refToBlock(s->theSendersBlockRef)),
+ getBlockName(s->theReceiversBlockNumber));
+
+ if (unlikely(newpos + 32 >= thr_job_buffer::SIZE))
+ {
+ newwi = (buf + 1) % thr_jba::SIZE;
+ * (dstptr->m_jba.m_buffers + newwi) = newbuffer;
+ newwi = (newwi << 16);
+ }
+ dstptr->m_jba.m_write_index = newwi;
+
+ unlock(&dstptr->m_jba.m_write_lock);
+
+ /**
+ * Note, do malloc outside of critical region
+ */
+ if (newpos + 32 >= thr_job_buffer::SIZE)
+ {
+ selfptr->m_jba.m_next_buffer = seize_buffer(rep, self);
+ }
+}
+
+static
+inline
+Uint32*
+get_free_slot(struct thr_repository* rep,
+ struct thr_data* selfptr,
+ Uint32* idxptr)
+{
+ struct thr_tq * tq = &selfptr->m_tq;
+ Uint32 idx = tq->m_next_free;
+retry:
+ Uint32 buf = idx >> 8;
+ Uint32 pos = idx & 0xFF;
+
+ if (idx != RNIL)
+ {
+ Uint32* page = * (tq->m_delayed_signals + buf);
+ Uint32* ptr = page + (32 * pos);
+ tq->m_next_free = * ptr;
+ * idxptr = idx;
+ return ptr;
+ }
+
+ Uint32 thr_no = selfptr->m_thr_no;
+ for (Uint32 i = 0; i<thr_tq::PAGES; i++)
+ {
+ if (tq->m_delayed_signals[i] == 0)
+ {
+ struct thr_job_buffer *jb = seize_buffer(rep, thr_no);
+ Uint32 * page = reinterpret_cast<Uint32*>(jb);
+ tq->m_delayed_signals[i] = page;
+
+ ndbout_c("saving %p at %p (%d)", page, tq->m_delayed_signals+i, i);
+
+ /**
+ * Init page
+ */
+ for (Uint32 j = 0; j<255; j ++)
+ {
+ page[j * 32] = (i << 8) + (j + 1);
+ }
+ page[255*32] = RNIL;
+ idx = (i << 8);
+ goto retry;
+ }
+ }
+ abort();
+}
+
+void
+senddelay(Uint32 thr_no, const SignalHeader* s, Uint32 delay)
+{
+ struct thr_repository* rep = &g_thr_repository;
+ struct thr_data * selfptr = rep->m_thread + thr_no;
+ unsigned siglen = (sizeof(*s) >> 2) + s->theLength + s->m_noOfSections;
+
+ Uint32 max;
+ Uint32 * cntptr;
+ Uint32 * queueptr;
+
+ Uint32 alarm = selfptr->m_tq.m_current_time + delay;
+ Uint32 nexttimer = selfptr->m_tq.m_next_timer;
+ if (delay < 100)
+ {
+ cntptr = selfptr->m_tq.m_cnt + 0;
+ queueptr = selfptr->m_tq.m_short_queue;
+ max = thr_tq::SQ_SIZE;
+ }
+ else
+ {
+ cntptr = selfptr->m_tq.m_cnt + 1;
+ queueptr = selfptr->m_tq.m_long_queue;
+ max = thr_tq::LQ_SIZE;
+ }
+
+ Uint32 idx;
+ Uint32* ptr = get_free_slot(rep, selfptr, &idx);
+ memcpy(ptr, s, 4*siglen);
+
+ if (0)
+ ndbout_c("now: %d alarm: %d send %s from %s to %s delay: %d idx: %x %p",
+ selfptr->m_tq.m_current_time,
+ alarm,
+ getSignalName(s->theVerId_signalNumber),
+ getBlockName(refToBlock(s->theSendersBlockRef)),
+ getBlockName(s->theReceiversBlockNumber),
+ delay,
+ idx, ptr);
+
+ Uint32 i;
+ Uint32 cnt = *cntptr;
+ Uint32 newentry = (idx << 16) | (alarm & 0xFFFF);
+
+ * cntptr = cnt + 1;
+ selfptr->m_tq.m_next_timer = alarm < nexttimer ? alarm : nexttimer;
+
+ if (cnt == 0)
+ {
+ queueptr[0] = newentry;
+ return;
+ }
+ else if (cnt < max)
+ {
+ for (i = 0; i<cnt; i++)
+ {
+ Uint32 save = queueptr[i];
+ if ((save & 0xFFFF) > alarm)
+ {
+ memmove(queueptr+i+1, queueptr+i, 4*(cnt - i));
+ queueptr[i] = newentry;
+ return;
+ }
+ }
+ assert(i == cnt);
+ queueptr[i] = newentry;
+ return;
+ }
+ else
+ {
+ abort();
+ }
+}
+
+static
+void
+init(struct thr_tq* tq)
+{
+ tq->m_next_timer = 0;
+ tq->m_current_time = 0;
+ tq->m_next_free = RNIL;
+ tq->m_cnt[0] = tq->m_cnt[1] = 0;
+ bzero(tq->m_delayed_signals, sizeof(tq->m_delayed_signals));
+}
+
+static
+void
+init(struct thr_repository* rep, struct thr_data *selfptr, unsigned int cnt,
+ unsigned thr_no)
+{
+ unsigned int i;
+
+ selfptr->m_thr_no = thr_no;
+ selfptr->m_free_list = 0;
+ for (i = 0; i<cnt; i++)
+ selfptr->m_out_queue[i] = seize_buffer(rep, thr_no);
+
+ selfptr->m_free = 0;
+ selfptr->m_jba.m_read_index = 0;
+ selfptr->m_jba.m_write_index = 0;
+ selfptr->m_jba.m_buffers[0] = seize_buffer(rep, thr_no);
+ selfptr->m_jba.m_next_buffer = seize_buffer(rep, thr_no);
+
+ for (i = 0; i<cnt; i++)
+ {
+ selfptr->m_in_queue[i].m_read_index = 0;
+ selfptr->m_in_queue[i].m_write_index = 0;
+ }
+
+ init(&selfptr->m_tq);
+}
+
+static
+void
+init(struct thr_repository* rep, unsigned int cnt)
+{
+ rep->m_thread_count = cnt;
+ for (unsigned int i = 0; i<cnt; i++)
+ {
+ init(rep, rep->m_thread + i, cnt, i);
+ }
+}
+
+
+/**
+ * Thread Config
+ */
+
+#include "ThreadConfig.hpp"
+#include <signaldata/StartOrd.hpp>
+
+ThreadConfig::ThreadConfig()
+{
+ init(&g_thr_repository, NUM_THREADS);
+}
+
+ThreadConfig::~ThreadConfig()
+{
+}
+
+
+void ThreadConfig::ipControlLoop()
+{
+ unsigned int i;
+ unsigned int thr_no;
+ struct thr_repository* rep = &g_thr_repository;
+ NdbThread *threads[NUM_THREADS - 1];
+
+ /*
+ Start threads for all execution threads, except for thread 0, which
+ runs in the main thread.
+ */
+ for (thr_no = 0; thr_no < NUM_THREADS; thr_no++)
+ {
+ for (i = 0; i<NO_OF_BLOCKS; i++)
+ {
+ if (block2ThreadId(i + MIN_BLOCK_NO) == thr_no)
+ {
+ rep->m_thread[thr_no].m_blocks[i] =
+ globalData.getBlock(i + MIN_BLOCK_NO);
+ }
+ }
+ rep->m_thread[thr_no].m_time = NdbTick_CurrentMillisecond();
+
+ if (thr_no == 0)
+ continue; // Will run in the main thread.
+ /*
+ The NdbThread_Create() takes void **, but that is cast to void * when
+ passed to the mt_thr_main(). Which is kind of strange ...
+ */
+ threads[thr_no - 1] = NdbThread_Create(mt_thr_main,
+ (void **)(&(rep->m_thread[thr_no])),
+ 1024*1024,
+ "execute thread", //ToDo add number
+ NDB_THREAD_PRIO_MEAN);
+ assert(threads[thr_no - 1] != NULL);
+ }
+
+ /* Now run the main loop for thread 0 directly. */
+ mt_thr_main(&(rep->m_thread[0]));
+
+ /* Wait for all threads to shutdown. */
+ for (thr_no = 1; thr_no < NUM_THREADS; thr_no++)
+ {
+ void *dummy_return_status;
+ NdbThread_WaitFor(threads[thr_no - 1], &dummy_return_status);
+ NdbThread_Destroy(&(threads[thr_no - 1]));
+ }
+}
+
+int
+ThreadConfig::doStart(NodeState::StartLevel startLevel)
+{
+ SignalT<3> signalT;
+ memset(&signalT.header, 0, sizeof(SignalHeader));
+
+ signalT.header.theVerId_signalNumber = GSN_START_ORD;
+ signalT.header.theReceiversBlockNumber = CMVMI;
+ signalT.header.theSendersBlockRef = 0;
+ signalT.header.theTrace = 0;
+ signalT.header.theSignalId = 0;
+ signalT.header.theLength = StartOrd::SignalLength;
+
+ StartOrd * const startOrd = (StartOrd *)&signalT.theData[0];
+ startOrd->restartInfo = 0;
+
+ senddelay(0, &signalT.header, 1);
+ return 0;
+}
+
+void
+mt_send_lock()
+{
+ lock(&(g_thr_repository.m_send_lock));
+}
+
+void
+mt_send_unlock()
+{
+ unlock(&(g_thr_repository.m_send_lock));
+}
+
+void
+mt_section_lock()
+{
+ lock(&(g_thr_repository.m_section_lock));
+}
+
+void
+mt_section_unlock()
+{
+ unlock(&(g_thr_repository.m_section_lock));
+}
+
+/**
+ * Global data
+ */
+struct thr_repository g_thr_repository;
diff -Nrup a/storage/ndb/src/kernel/vm/mt/mt.hpp b/storage/ndb/src/kernel/vm/mt/mt.hpp
--- /dev/null Wed Dec 31 16:00:00 196900
+++ b/storage/ndb/src/kernel/vm/mt/mt.hpp 2007-09-24 17:12:18 +02:00
@@ -0,0 +1,30 @@
+#include <kernel_types.h>
+
+#ifndef ndb_mt_hpp
+#define ndb_mt_hpp
+
+
+/*
+ For now, we use locks to only have one thread at the time running in the
+ transporter as sender, and only one as receiver.
+
+ Thus, we can use a global variable to record the id of the current
+ transporter threads. Only valid while holding the transporter receive lock.
+*/
+extern Uint32 receiverThreadId;
+extern Uint32 senderThreadId;
+
+void sendlocal(Uint32 self, Uint32 dst, const struct SignalHeader*);
+void sendprioa(Uint32 self, Uint32 dst, const struct SignalHeader*);
+void senddelay(Uint32 thr_no, const struct SignalHeader*, Uint32 delay);
+
+void mt_send_lock();
+void mt_send_unlock();
+
+/**
+ * Lock/unlock pools for long signal section(s)
+ */
+void mt_section_lock();
+void mt_section_unlock();
+
+#endif
diff -Nrup a/storage/ndb/src/kernel/vm/pc.hpp b/storage/ndb/src/kernel/vm/pc.hpp
--- a/storage/ndb/src/kernel/vm/pc.hpp 2006-12-23 20:20:20 +01:00
+++ b/storage/ndb/src/kernel/vm/pc.hpp 2007-09-24 17:12:18 +02:00
@@ -27,57 +27,45 @@
#define jamLine(line)
#define jamEntry()
#define jamEntryLine(line)
+#define jamBlock(block)
+#define jamBlockLine(block, line)
+#define jamEntryBlock(block)
+#define jamEntryBlockLine(block, line)
+#define jamNoBlock()
+#define jamNoBlockLine(line)
#else
-#ifdef NDB_WIN32
-#define jam() { \
- Uint32 tEmulatedJamIndex = theEmulatedJamIndex; \
- *(Uint32*)(theEmulatedJam + tEmulatedJamIndex) = __LINE__; \
- theEmulatedJamIndex = (tEmulatedJamIndex + 4) & JAM_MASK; }
-#define jamLine(line) { \
- Uint32 tEmulatedJamIndex = theEmulatedJamIndex; \
- *(Uint32*)(theEmulatedJam + tEmulatedJamIndex) = line; \
- theEmulatedJamIndex = (tEmulatedJamIndex + 4) & JAM_MASK; }
-#define jamEntry() { \
- theEmulatedJamBlockNumber = number(); \
- Uint32 tEmulatedJamIndex = theEmulatedJamIndex; \
- *(Uint32*)(theEmulatedJam + tEmulatedJamIndex) = \
- ((theEmulatedJamBlockNumber << 20) | __LINE__); \
- theEmulatedJamIndex = (tEmulatedJamIndex + 4) & JAM_MASK; }
-#define jamEntryLine(line) { \
- theEmulatedJamBlockNumber = number(); \
- Uint32 tEmulatedJamIndex = theEmulatedJamIndex; \
- *(Uint32*)(theEmulatedJam + tEmulatedJamIndex) = \
- ((theEmulatedJamBlockNumber << 20) | line); \
- theEmulatedJamIndex = (tEmulatedJamIndex + 4) & JAM_MASK; }
-
-#else
-
-#define jam() { \
- Uint32 tEmulatedJamIndex = theEmulatedJamIndex; \
- *(Uint32*)((UintPtr)theEmulatedJam + (Uint32)tEmulatedJamIndex) = __LINE__; \
- theEmulatedJamIndex = (tEmulatedJamIndex + 4) & JAM_MASK; }
-#define jamLine(line) { \
- Uint32 tEmulatedJamIndex = theEmulatedJamIndex; \
- *(Uint32*)((UintPtr)theEmulatedJam + (Uint32)tEmulatedJamIndex) = line; \
- theEmulatedJamIndex = (tEmulatedJamIndex + 4) & JAM_MASK; }
-#define jamEntry() { \
- theEmulatedJamBlockNumber = number(); \
- Uint32 tEmulatedJamIndex = theEmulatedJamIndex; \
- *(Uint32*)((UintPtr)theEmulatedJam + (Uint32)tEmulatedJamIndex) = \
- ((theEmulatedJamBlockNumber << 20) | __LINE__); \
- theEmulatedJamIndex = (tEmulatedJamIndex + 4) & JAM_MASK; }
-#define jamEntryLine(line) { \
- theEmulatedJamBlockNumber = number(); \
- Uint32 tEmulatedJamIndex = theEmulatedJamIndex; \
- *(Uint32*)((UintPtr)theEmulatedJam + (Uint32)tEmulatedJamIndex) = \
- ((theEmulatedJamBlockNumber << 20) | line); \
- theEmulatedJamIndex = (tEmulatedJamIndex + 4) & JAM_MASK; }
+#define _jamBlockLine(block, line, jam_buf_getter) \
+ do { \
+ EmulatedJamBuffer *jamBuffer = jam_buf_getter; \
+ Uint32 jamIndex = jamBuffer->theEmulatedJamIndex; \
+ jamBuffer->theEmulatedJam[jamIndex++] = line; \
+ jamBuffer->theEmulatedJamIndex = jamIndex & JAM_MASK; \
+ } while(0)
+#define jamBlockLine(block, line) \
+ _jamBlockLine(block, line, (block)->jamBuffer())
+#define jamBlock(block) jamBlockLine(block, __LINE__)
+#define jamLine(line) jamBlockLine(this, line)
+#define jam() jamLine(__LINE__)
+#define jamBlockEntryLine(block, line) \
+ do { \
+ EmulatedJamBuffer *jamBuffer = (block)->jamBuffer(); \
+ Uint32 blockNumber= (block)->number(); \
+ Uint32 jamIndex = jamBuffer->theEmulatedJamIndex; \
+ jamBuffer->theEmulatedJam[jamIndex++] = (blockNumber << 20) | line; \
+ jamBuffer->theEmulatedJamBlockNumber = blockNumber; \
+ jamBuffer->theEmulatedJamIndex = jamIndex & JAM_MASK; \
+ } while(0)
+#define jamEntryBlock(block) jamEntryBlockLine(block, __LINE__)
+#define jamEntryLine(line) jamBlockEntryLine(this, line)
+#define jamEntry() jamEntryLine(__LINE__)
+#define jamNoBlockLine(line) _jamBlockLine \
+ (block, line, (EmulatedJamBuffer *)NdbThread_GetTlsKey(NDB_THREAD_TLS_JAM))
+#define jamNoBlock() jamNoBlockLine(__LINE__)
#endif
-#endif
#ifndef NDB_OPT
#define ptrCheck(ptr, limit, rec) if (ptr.i < (limit)) ptr.p = &rec[ptr.i]; else ptr.p = NULL
| Thread |
|---|
| • bk commit into 5.1 tree (knielsen:1.2610) | knielsen | 24 Sep |