Below is the list of changes that have just been committed into a local
5.1 repository of knielsen. When knielsen does a push these changes will
be propagated to the main repository and, within 24 hours after the
push, to the public repository.
For information on how to access the public repository
see http://dev.mysql.com/doc/mysql/en/installing-source-tree.html
ChangeSet@stripped, 2007-10-04 14:45:25+02:00, knielsen@ymer.(none) +14 -0
Implement trace file generation for multithreaded ndbd.
storage/ndb/include/kernel/GlobalSignalNumbers.h@stripped, 2007-10-04 14:45:19+02:00, knielsen@ymer.(none) +7 -1
Implement trace file generation for multithreaded ndbd.
storage/ndb/include/kernel/signaldata/StopForCrash.hpp@stripped, 2007-10-04 14:45:20+02:00, knielsen@ymer.(none) +41 -0
New BitKeeper file ``storage/ndb/include/kernel/signaldata/StopForCrash.hpp''
storage/ndb/include/kernel/signaldata/StopForCrash.hpp@stripped, 2007-10-04 14:45:20+02:00, knielsen@ymer.(none) +0 -0
storage/ndb/include/portlib/NdbThread.h@stripped, 2007-10-04 14:45:19+02:00, knielsen@ymer.(none) +1 -0
Implement trace file generation for multithreaded ndbd.
storage/ndb/src/common/debugger/signaldata/SignalNames.cpp@stripped, 2007-10-04 14:45:20+02:00, knielsen@ymer.(none) +1 -0
Implement trace file generation for multithreaded ndbd.
storage/ndb/src/common/portlib/NdbThread.c@stripped, 2007-10-04 14:45:20+02:00, knielsen@ymer.(none) +1 -0
Implement trace file generation for multithreaded ndbd.
storage/ndb/src/kernel/error/ErrorReporter.cpp@stripped, 2007-10-04 14:45:20+02:00, knielsen@ymer.(none) +56 -56
Implement trace file generation for multithreaded ndbd.
storage/ndb/src/kernel/error/ErrorReporter.hpp@stripped, 2007-10-04 14:45:20+02:00, knielsen@ymer.(none) +1 -1
Implement trace file generation for multithreaded ndbd.
storage/ndb/src/kernel/vm/FastScheduler.cpp@stripped, 2007-10-04 14:45:20+02:00, knielsen@ymer.(none) +39 -1
Implement trace file generation for multithreaded ndbd.
storage/ndb/src/kernel/vm/FastScheduler.hpp@stripped, 2007-10-04 14:45:20+02:00, knielsen@ymer.(none) +17 -1
Implement trace file generation for multithreaded ndbd.
storage/ndb/src/kernel/vm/SimulatedBlock.cpp@stripped, 2007-10-04 14:45:20+02:00, knielsen@ymer.(none) +9 -0
Implement trace file generation for multithreaded ndbd.
storage/ndb/src/kernel/vm/SimulatedBlock.hpp@stripped, 2007-10-04 14:45:20+02:00, knielsen@ymer.(none) +1 -0
Implement trace file generation for multithreaded ndbd.
storage/ndb/src/kernel/vm/mt/dummy_mt.cpp@stripped, 2007-10-04 14:45:20+02:00, knielsen@ymer.(none) +0 -13
Implement trace file generation for multithreaded ndbd.
storage/ndb/src/kernel/vm/mt/mt.cpp@stripped, 2007-10-04 14:45:20+02:00, knielsen@ymer.(none) +691 -79
Implement trace file generation for multithreaded ndbd.
storage/ndb/src/kernel/vm/mt/mt.hpp@stripped, 2007-10-04 14:45:20+02:00, knielsen@ymer.(none) +1 -0
Implement trace file generation for multithreaded ndbd.
diff -Nrup a/storage/ndb/include/kernel/GlobalSignalNumbers.h b/storage/ndb/include/kernel/GlobalSignalNumbers.h
--- a/storage/ndb/include/kernel/GlobalSignalNumbers.h 2007-09-12 13:30:24 +02:00
+++ b/storage/ndb/include/kernel/GlobalSignalNumbers.h 2007-10-04 14:45:19 +02:00
@@ -22,7 +22,7 @@
*
* When adding a new signal, remember to update MAX_GSN and SignalNames.cpp
*/
-const GlobalSignalNumber MAX_GSN = 730;
+const GlobalSignalNumber MAX_GSN = 731;
struct GsnName {
GlobalSignalNumber gsn;
@@ -735,6 +735,12 @@ extern const GlobalSignalNumber NO_OF_SI
*/
#define GSN_SIGNAL_DROPPED_REP 567
#define GSN_CONTINUE_FRAGMENTED 568
+
+/**
+ * In multithreaded ndbd, sent from crashing thread to other threads to make
+ * them stop prior to generating trace dump files.
+ */
+#define GSN_STOP_FOR_CRASH 731
/**
* Suma participant interface
diff -Nrup a/storage/ndb/include/kernel/signaldata/StopForCrash.hpp b/storage/ndb/include/kernel/signaldata/StopForCrash.hpp
--- /dev/null Wed Dec 31 16:00:00 196900
+++ b/storage/ndb/include/kernel/signaldata/StopForCrash.hpp 2007-10-04 14:45:20 +02:00
@@ -0,0 +1,41 @@
+/* Copyright (C) 2007 MySQL AB
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; version 2 of the License.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
+
+#ifndef STOP_FOR_CRASH_HPP
+#define STOP_FOR_CRASH_HPP
+
+#include "SignalData.hpp"
+
+/*
+ The GSN_STOP_FOR_CRASH signal is only used in multi-threaded ndbd.
+
+ It is used during crash handling, sent as a prio A signal from the
+ crashing thread to all other threads to make sure that they stop before
+ generating the crash dump (to avoid dumping an inconsistent state of jam()
+ or signal buffer).
+*/
+
+class StopForCrash {
+ friend class SimulatedBlock;
+
+ friend bool printSTOP_FOR_CRASH(FILE *,const Uint32 *, Uint32, Uint16);
+public:
+ STATIC_CONST( SignalLength = 1 );
+
+public:
+ Uint32 flags; // No information in this signal atm.
+};
+
+#endif
diff -Nrup a/storage/ndb/include/portlib/NdbThread.h b/storage/ndb/include/portlib/NdbThread.h
--- a/storage/ndb/include/portlib/NdbThread.h 2007-09-24 17:12:17 +02:00
+++ b/storage/ndb/include/portlib/NdbThread.h 2007-10-04 14:45:19 +02:00
@@ -32,6 +32,7 @@ typedef enum NDB_THREAD_PRIO_ENUM {
typedef enum NDB_THREAD_TLS_ENUM {
NDB_THREAD_TLS_JAM, /* Jam buffer pointer. */
+ NDB_THREAD_TLS_THREAD, /* Thread self pointer. */
NDB_THREAD_TLS_MAX
} NDB_THREAD_TLS;
diff -Nrup a/storage/ndb/src/common/debugger/signaldata/SignalNames.cpp b/storage/ndb/src/common/debugger/signaldata/SignalNames.cpp
--- a/storage/ndb/src/common/debugger/signaldata/SignalNames.cpp 2007-09-12 13:30:24 +02:00
+++ b/storage/ndb/src/common/debugger/signaldata/SignalNames.cpp 2007-10-04 14:45:20 +02:00
@@ -505,6 +505,7 @@ const GsnName SignalNames [] = {
,{ GSN_BACKUP_STATUS_CONF, "BACKUP_STATUS_CONF" }
,{ GSN_SIGNAL_DROPPED_REP, "SIGNAL_DROPPED_REP" }
,{ GSN_CONTINUE_FRAGMENTED, "CONTINUE_FRAGMENTED" }
+ ,{ GSN_STOP_FOR_CRASH, "STOP_FOR_CRASH" }
/** Util Block Services **/
,{ GSN_UTIL_SEQUENCE_REQ, "UTIL_SEQUENCE_REQ" }
diff -Nrup a/storage/ndb/src/common/portlib/NdbThread.c b/storage/ndb/src/common/portlib/NdbThread.c
--- a/storage/ndb/src/common/portlib/NdbThread.c 2007-09-24 17:12:17 +02:00
+++ b/storage/ndb/src/common/portlib/NdbThread.c 2007-10-04 14:45:20 +02:00
@@ -196,6 +196,7 @@ static pthread_key_t tls_keys[NDB_THREAD
void NdbThread_Init()
{
pthread_key_create(&(tls_keys[NDB_THREAD_TLS_JAM]), NULL);
+ pthread_key_create(&(tls_keys[NDB_THREAD_TLS_THREAD]), NULL);
}
void *NdbThread_GetTlsKey(NDB_THREAD_TLS key)
diff -Nrup a/storage/ndb/src/kernel/error/ErrorReporter.cpp b/storage/ndb/src/kernel/error/ErrorReporter.cpp
--- a/storage/ndb/src/kernel/error/ErrorReporter.cpp 2007-09-24 17:12:17 +02:00
+++ b/storage/ndb/src/kernel/error/ErrorReporter.cpp 2007-10-04 14:45:20 +02:00
@@ -32,10 +32,7 @@
static int WriteMessage(int thrdMessageID,
const char* thrdProblemData,
- const char* thrdObjRef,
- Uint32 thrdTheEmulatedJamIndex,
- const Uint32 thrdTheEmulatedJam[],
- Uint32 jamBlockNumber);
+ const char* thrdObjRef);
static void dumpJam(FILE* jamStream,
Uint32 thrdTheEmulatedJamIndex,
@@ -109,7 +106,7 @@ ErrorReporter::get_trace_no(){
void
-ErrorReporter::formatMessage(int faultID,
+ErrorReporter::formatMessage(Uint32 num_threads, int faultID,
const char* problemData,
const char* objRef,
const char* theNameOfTheTraceFile,
@@ -120,6 +117,7 @@ ErrorReporter::formatMessage(int faultID
const char *exit_msg = ndbd_exit_message(faultID, &cl);
const char *exit_cl_msg = ndbd_exit_classification_message(cl, &st);
const char *exit_st_msg = ndbd_exit_status_message(st);
+ int sofar;
processId = NdbHost_GetProcessId();
@@ -132,9 +130,7 @@ ErrorReporter::formatMessage(int faultID
"Error object: %s\n"
"Program: %s\n"
"Pid: %d\n"
- "Trace: %s\n"
- "Version: %s\n"
- "***EOM***\n",
+ "Trace: %s",
formatTimeStampString() ,
exit_st_msg,
exit_msg, exit_cl_msg,
@@ -143,10 +139,25 @@ ErrorReporter::formatMessage(int faultID
objRef,
my_progname,
processId,
- theNameOfTheTraceFile ? theNameOfTheTraceFile : "<no tracefile>",
- NDB_VERSION_STRING);
+ theNameOfTheTraceFile ? theNameOfTheTraceFile : "<no tracefile>");
+
+ if (theNameOfTheTraceFile)
+ {
+ for (Uint32 i = 1 ; i < num_threads; i++)
+ {
+ sofar = strlen(messptr);
+ BaseString::snprintf(messptr + sofar, MESSAGE_LENGTH - sofar,
+ " %s_t%u", theNameOfTheTraceFile, i);
+ }
+ }
- // Add trailing blanks to get a fixed lenght of the message
+ sofar = strlen(messptr);
+ BaseString::snprintf(messptr + sofar, MESSAGE_LENGTH - sofar,
+ "\n"
+ "Version: %s\n"
+ "***EOM***\n",
+ NDB_VERSION_STRING);
+ // Add trailing blanks to get a fixed length of the message
while (strlen(messptr) <= MESSAGE_LENGTH-3){
strcat(messptr, " ");
}
@@ -170,8 +181,6 @@ void
ErrorReporter::handleAssert(const char* message, const char* file, int line, int ec)
{
char refMessage[100];
- const Uint32 *jam;
- Uint32 jamIndex;
Uint32 jamBlockNumber;
#ifdef NO_EMULATED_JAM
@@ -183,15 +192,13 @@ ErrorReporter::handleAssert(const char*
#else
const EmulatedJamBuffer *jamBuffer =
(EmulatedJamBuffer *)NdbThread_GetTlsKey(NDB_THREAD_TLS_JAM);
- jam = jamBuffer->theEmulatedJam;
- jamIndex = jamBuffer->theEmulatedJamIndex;
jamBlockNumber = jamBuffer->theEmulatedJamBlockNumber;
const char *blockName = getBlockName(jamBlockNumber);
BaseString::snprintf(refMessage, 100, "%s line: %d (block: %s)",
file, line, blockName);
#endif
- WriteMessage(ec, message, refMessage, jamIndex, jam, jamBlockNumber);
+ WriteMessage(ec, message, refMessage);
childReportError(ec);
@@ -205,21 +212,7 @@ ErrorReporter::handleError(int messageID
const char* objRef,
NdbShutdownType nst)
{
- const Uint32 *jam;
- Uint32 jamIndex;
- Uint32 jamBlockNumber;
- const EmulatedJamBuffer *jamBuffer =
- (EmulatedJamBuffer *)NdbThread_GetTlsKey(NDB_THREAD_TLS_JAM);
- if (jamBuffer) {
- jam = jamBuffer->theEmulatedJam;
- jamIndex = jamBuffer->theEmulatedJamIndex;
- jamBlockNumber = jamBuffer->theEmulatedJamBlockNumber;
- } else {
- jam = NULL;
- jamIndex = 0;
- jamBlockNumber = 0;
- }
- WriteMessage(messageID, problemData, objRef, jamIndex, jam, jamBlockNumber);
+ WriteMessage(messageID, problemData, objRef);
g_eventLogger.info(problemData);
g_eventLogger.info(objRef);
@@ -237,14 +230,16 @@ ErrorReporter::handleError(int messageID
int
WriteMessage(int thrdMessageID,
- const char* thrdProblemData, const char* thrdObjRef,
- Uint32 thrdTheEmulatedJamIndex,
- const Uint32 thrdTheEmulatedJam[],
- Uint32 jamBlockNumber){
+ const char* thrdProblemData, const char* thrdObjRef){
FILE *stream;
unsigned offset;
unsigned long maxOffset; // Maximum size of file.
char theMessage[MESSAGE_LENGTH];
+ Uint32 thrdTheEmulatedJamIndex;
+ const Uint32 *thrdTheEmulatedJam;
+ Uint32 jamBlockNumber;
+
+ Uint32 threadCount = globalScheduler.traceDumpGetNumThreads();
/**
* Format trace file name
@@ -278,7 +273,7 @@ WriteMessage(int thrdMessageID,
" \n\n\n");
// ...and write the error-message...
- ErrorReporter::formatMessage(thrdMessageID,
+ ErrorReporter::formatMessage(threadCount, thrdMessageID,
thrdProblemData, thrdObjRef,
theTraceFileName, theMessage);
fprintf(stream, "%s", theMessage);
@@ -305,7 +300,7 @@ WriteMessage(int thrdMessageID,
fseek(stream, offset, SEEK_SET);
// ...and write the error-message there...
- ErrorReporter::formatMessage(thrdMessageID,
+ ErrorReporter::formatMessage(threadCount, thrdMessageID,
thrdProblemData, thrdObjRef,
theTraceFileName, theMessage);
fprintf(stream, "%s", theMessage);
@@ -331,26 +326,31 @@ WriteMessage(int thrdMessageID,
fclose(stream);
if (theTraceFileName) {
- // Open the tracefile...
- FILE *jamStream = fopen(theTraceFileName, "w");
-
- // ...and "dump the jam" there.
- // ErrorReporter::dumpJam(jamStream);
- if(thrdTheEmulatedJam != 0){
- dumpJam(jamStream, thrdTheEmulatedJamIndex,
- thrdTheEmulatedJam, jamBlockNumber);
+ /* Attempt to stop all processing to be able to dump a consistent state. */
+ globalScheduler.traceDumpPrepare();
+
+ char *traceFileEnd = theTraceFileName + strlen(theTraceFileName);
+ for (Uint32 i = 0; i < threadCount; i++)
+ {
+ // Open the tracefile...
+ if (i > 0)
+ sprintf(traceFileEnd, "_t%u", i);
+ FILE *jamStream = fopen(theTraceFileName, "w");
+
+ // ...and "dump the jam" there.
+ bool ok = globalScheduler.traceDumpGetJam(i, jamBlockNumber,
+ thrdTheEmulatedJam,
+ thrdTheEmulatedJamIndex);
+ if(ok && thrdTheEmulatedJam != 0)
+ {
+ dumpJam(jamStream, thrdTheEmulatedJamIndex,
+ thrdTheEmulatedJam, jamBlockNumber);
+ }
+
+ globalScheduler.dumpSignalMemory(i, jamStream);
+
+ fclose(jamStream);
}
-
- /* Dont print the jobBuffers until a way to copy them,
- like the other variables,
- is implemented. Otherwise when NDB keeps running,
- with this function running
- in the background, the jobBuffers will change during runtime. And when
- they're printed here, they will not be correct anymore.
- */
- globalScheduler.dumpSignalMemory(jamStream);
-
- fclose(jamStream);
}
return 0;
diff -Nrup a/storage/ndb/src/kernel/error/ErrorReporter.hpp b/storage/ndb/src/kernel/error/ErrorReporter.hpp
--- a/storage/ndb/src/kernel/error/ErrorReporter.hpp 2006-12-23 20:20:18 +01:00
+++ b/storage/ndb/src/kernel/error/ErrorReporter.hpp 2007-10-04 14:45:20 +02:00
@@ -39,7 +39,7 @@ public:
const char* problemData,
const char* objRef);
- static void formatMessage(int faultID,
+ static void formatMessage(Uint32 num_threads, int faultID,
const char* problemData,
const char* objRef,
const char* theNameOfTheTraceFile,
diff -Nrup a/storage/ndb/src/kernel/vm/FastScheduler.cpp b/storage/ndb/src/kernel/vm/FastScheduler.cpp
--- a/storage/ndb/src/kernel/vm/FastScheduler.cpp 2007-09-24 17:09:26 +02:00
+++ b/storage/ndb/src/kernel/vm/FastScheduler.cpp 2007-10-04 14:45:20 +02:00
@@ -370,7 +370,7 @@ APZJobBuffer::clear()
*/
void print_restart(FILE * output, Signal* signal, Uint32 aLevel);
-void FastScheduler::dumpSignalMemory(FILE * output)
+void FastScheduler::dumpSignalMemory(Uint32 thr_no, FILE * output)
{
SignalT<25> signalT;
Signal &signal= *(Signal*)&signalT;
@@ -378,6 +378,9 @@ void FastScheduler::dumpSignalMemory(FIL
Uint32 tJob;
Uint32 tLastJob;
+ /* Single threaded ndbd scheduler, no threads. */
+ assert(thr_no == 0);
+
fprintf(output, "\n");
if (globalData.JobLap > 4095) {
@@ -453,6 +456,41 @@ print_restart(FILE * output, Signal* sig
signal->header,
&signal->theData[0]);
}
+
+void
+FastScheduler::traceDumpPrepare()
+{
+ /* No-operation in single-threaded ndbd. */
+}
+
+Uint32
+FastScheduler::traceDumpGetNumThreads()
+{
+ return 1; // Single-threaded ndbd scheduler
+}
+
+bool
+FastScheduler::traceDumpGetJam(Uint32 thr_no, Uint32 & jamBlockNumber,
+ const Uint32 * & thrdTheEmulatedJam,
+ Uint32 & thrdTheEmulatedJamIndex)
+{
+ /* Single threaded ndbd scheduler, no threads. */
+ assert(thr_no == 0);
+
+#ifdef NO_EMULATED_JAM
+ jamBlockNumber = 0;
+ thrdTheEmulatedJam = NULL;
+ thrdTheEmulatedJamIndex = 0;
+#else
+ const EmulatedJamBuffer *jamBuffer =
+ (EmulatedJamBuffer *)NdbThread_GetTlsKey(NDB_THREAD_TLS_JAM);
+ thrdTheEmulatedJam = jamBuffer->theEmulatedJam;
+ thrdTheEmulatedJamIndex = jamBuffer->theEmulatedJamIndex;
+ jamBlockNumber = jamBuffer->theEmulatedJamBlockNumber;
+#endif
+ return true;
+}
+
/**
* This method used to be a Cmvmi member function
diff -Nrup a/storage/ndb/src/kernel/vm/FastScheduler.hpp b/storage/ndb/src/kernel/vm/FastScheduler.hpp
--- a/storage/ndb/src/kernel/vm/FastScheduler.hpp 2007-09-24 17:09:26 +02:00
+++ b/storage/ndb/src/kernel/vm/FastScheduler.hpp 2007-10-04 14:45:20 +02:00
@@ -107,7 +107,6 @@ public:
void clear();
Signal* getVMSignals();
- void dumpSignalMemory(FILE * output);
Priority highestAvailablePrio() const;
Uint32 getBOccupancy() const;
void sendPacked();
@@ -116,6 +115,23 @@ public:
GlobalSignalNumber gsn, Uint32 aIndex);
void scheduleTimeQueue(Uint32 aIndex);
+ /*
+ The following implement aspects of ErrorReporter that differs between
+ singlethreaded and multithread ndbd.
+ */
+
+ /* Called before dumping, intended to stop any still running processing. */
+ void traceDumpPrepare();
+ /* Number of threads to create trace files for (thread id 0 .. N-1). */
+ Uint32 traceDumpGetNumThreads();
+ /* Get jam() buffers etc. for specific thread. */
+ bool traceDumpGetJam(Uint32 thr_no, Uint32 & jamBlockNumber,
+ const Uint32 * & thrdTheEmulatedJam,
+ Uint32 & thrdTheEmulatedJamIndex);
+ /* Produce a signal dump. */
+ void dumpSignalMemory(Uint32 thr_no, FILE * output);
+
+
private:
void highestAvailablePrio(Priority prio);
void reportJob(Priority aPriority);
diff -Nrup a/storage/ndb/src/kernel/vm/SimulatedBlock.cpp b/storage/ndb/src/kernel/vm/SimulatedBlock.cpp
--- a/storage/ndb/src/kernel/vm/SimulatedBlock.cpp 2007-09-24 17:12:18 +02:00
+++ b/storage/ndb/src/kernel/vm/SimulatedBlock.cpp 2007-10-04 14:45:20 +02:00
@@ -150,6 +150,7 @@ SimulatedBlock::installSimulatedBlockFun
a[GSN_NDB_TAMPER] = &SimulatedBlock::execNDB_TAMPER;
a[GSN_SIGNAL_DROPPED_REP] = &SimulatedBlock::execSIGNAL_DROPPED_REP;
a[GSN_CONTINUE_FRAGMENTED]= &SimulatedBlock::execCONTINUE_FRAGMENTED;
+ a[GSN_STOP_FOR_CRASH]= &SimulatedBlock::execSTOP_FOR_CRASH;
a[GSN_UTIL_CREATE_LOCK_REF] = &SimulatedBlock::execUTIL_CREATE_LOCK_REF;
a[GSN_UTIL_CREATE_LOCK_CONF] = &SimulatedBlock::execUTIL_CREATE_LOCK_CONF;
a[GSN_UTIL_DESTROY_LOCK_REF] = &SimulatedBlock::execUTIL_DESTORY_LOCK_REF;
@@ -1324,6 +1325,14 @@ SimulatedBlock::execCONTINUE_FRAGMENTED(
ContinueFragmented * sig = (ContinueFragmented*)signal->getDataPtrSend();
sig->line = __LINE__;
sendSignal(reference(), GSN_CONTINUE_FRAGMENTED, signal, 1, JBB);
+}
+
+void
+SimulatedBlock::execSTOP_FOR_CRASH(Signal* signal)
+{
+#ifdef NDBD_MULTITHREADED
+ mt_execSTOP_FOR_CRASH();
+#endif
}
void
diff -Nrup a/storage/ndb/src/kernel/vm/SimulatedBlock.hpp b/storage/ndb/src/kernel/vm/SimulatedBlock.hpp
--- a/storage/ndb/src/kernel/vm/SimulatedBlock.hpp 2007-09-24 17:12:18 +02:00
+++ b/storage/ndb/src/kernel/vm/SimulatedBlock.hpp 2007-10-04 14:45:20 +02:00
@@ -501,6 +501,7 @@ private:
void execSIGNAL_DROPPED_REP(Signal* signal);
void execCONTINUE_FRAGMENTED(Signal* signal);
+ void execSTOP_FOR_CRASH(Signal* signal);
void execAPI_START_REP(Signal* signal);
void execNODE_START_REP(Signal* signal);
diff -Nrup a/storage/ndb/src/kernel/vm/mt/dummy_mt.cpp b/storage/ndb/src/kernel/vm/mt/dummy_mt.cpp
--- a/storage/ndb/src/kernel/vm/mt/dummy_mt.cpp 2007-09-24 17:12:18 +02:00
+++ b/storage/ndb/src/kernel/vm/mt/dummy_mt.cpp 2007-10-04 14:45:20 +02:00
@@ -24,19 +24,6 @@ FastScheduler::clear()
{
}
-void
-FastScheduler::dumpSignalMemory(FILE* out)
-{
- /*
- In single-threaded ndbd, we set the watchdog counter to 4 here to note that
- we are dumping.
-
- But for multi-threaded ndbd, we will need to first stop all
- threads anyway, so we might as well just de-register them from the
- watchdog, and this will not be needed.
- */
-}
-
void bnr_error()
{
diff -Nrup a/storage/ndb/src/kernel/vm/mt/mt.cpp b/storage/ndb/src/kernel/vm/mt/mt.cpp
--- a/storage/ndb/src/kernel/vm/mt/mt.cpp 2007-09-24 17:12:18 +02:00
+++ b/storage/ndb/src/kernel/vm/mt/mt.cpp 2007-10-04 14:45:20 +02:00
@@ -21,8 +21,10 @@
#include <GlobalData.hpp>
#include <WatchDog.hpp>
#include <TransporterDefinitions.hpp>
+#include "FastScheduler.hpp"
#include "mt.hpp"
#include <DebuggerNames.hpp>
+#include <signaldata/StopForCrash.hpp>
#ifndef _GNU_SOURCE
#define _GNU_SOURCE
@@ -222,10 +224,20 @@ trylock(struct thr_mutex * sl)
*/
struct thr_job_buffer // 32k
{
- static const unsigned SIZE = 8190;
+ static const unsigned SIZE = 8189;
+ /* Amount of signal data currently in m_data buffer. */
unsigned m_len;
+ /*
+ Used for prio A signals, where we execute partial buffers, this is the
+ position in buffer up to which signals were executed.
+ */
unsigned m_pos;
+ /*
+ Whether this buffer contained prio A or prio B signals, used when dumping
+ signals from released buffers.
+ */
+ bool m_prioa;
unsigned m_data[SIZE];
};
@@ -244,8 +256,18 @@ struct thr_jba
thr_jba() : m_write_lock("jbalock") {}
unsigned m_read_index;
+ /*
+ m_write_index, the position into which to write next prio A signal.
+ Upper 16 bits are the index into m_buffers.
+ Lower 16 bits are the index into that particular thr_job_buffer*.
+ */
volatile unsigned m_write_index;
struct thr_spin_lock m_write_lock;
+ /*
+ In m_next_buffer we keep a free buffer at all times, so that when
+ we hold the lock and find we need a new buffer, we can use this and this
+ way defer allocation to after releasing the lock.
+ */
struct thr_job_buffer* m_next_buffer;
struct thr_job_buffer* m_buffers[SIZE];
};
@@ -265,6 +287,19 @@ struct thr_tq
Uint32 m_long_queue[LQ_SIZE];
};
+/*
+ Max number of thread-local job buffers to keep before releasing to
+ global pool.
+*/
+#define THR_FREE_BUF_MAX 32
+/* Minimum number of buffers (to ensure useful trace dumps). */
+#define THR_FREE_BUF_MIN 12
+/*
+ 1/THR_FREE_BUF_BATCH is the fraction of job buffers to allocate/free
+ at a time from/to global pool.
+*/
+#define THR_FREE_BUF_BATCH 6
+
struct thr_data
{
thr_wait m_waiter;
@@ -274,10 +309,25 @@ struct thr_data
Uint64 m_time;
struct thr_jba m_jba;
struct thr_tq m_tq;
- unsigned m_free;
- struct thr_job_buffer* m_free_list;
+
+ /*
+ We keep a small number of buffers in a thread-local cyclic FIFO, so that
+ we can avoid going to the global pool in most cases, and so that we have
+ recent buffers available for dumping in trace files.
+ */
+ struct thr_job_buffer *m_free_fifo[THR_FREE_BUF_MAX];
+ /* m_first_free is the index of the entry to return next from seize(). */
+ Uint32 m_first_free;
+ /* m_first_unused is the first unused entry in m_free_fifo. */
+ Uint32 m_first_unused;
+
struct thr_job_buffer* m_out_queue[MAX_THREADS];
struct thr_job_queue m_in_queue[MAX_THREADS];
+
+ /* Jam buffers for making trace files at crashes. */
+ EmulatedJamBuffer *m_jam;
+ /* Watchdog counter for this thread. */
+ Uint32 *m_watchdog_counter;
};
#define DBG_MALLOC 0
@@ -329,28 +379,70 @@ struct thr_repository
struct thr_spin_lock m_section_lock;
struct thr_data m_thread[MAX_THREADS];
struct thr_safe_pool<thr_job_buffer> m_free_list;
+
+ /*
+ These are used to synchronize during crash / trace dumps.
+
+ ToDo: Replace pthread stuff with portable wrappers in portlib.
+ */
+ pthread_mutex_t stop_for_crash_mutex;
+ pthread_cond_t stop_for_crash_cond;
+ Uint32 stopped_threads;
};
static
thr_job_buffer*
-seize_buffer(struct thr_repository* rep, int thr_no)
+seize_buffer(struct thr_repository* rep, int thr_no, bool prioa)
{
thr_job_buffer* jb;
thr_data* selfptr = rep->m_thread + thr_no;
- if (likely((jb = selfptr->m_free_list) != 0))
+ Uint32 first_free = selfptr->m_first_free;
+ Uint32 first_unused = selfptr->m_first_unused;
+
+ /*
+ An empty FIFO is denoted by m_first_free == m_first_unused.
+ So we will never have a completely full FIFO array, at least one entry will
+ always be unused. But the code is simpler as a result.
+ */
+
+ /*
+ We never allow the fifo to become completely empty, as we want to have
+ a good number of signals available for trace files in case of a forced
+ shutdown.
+ */
+ Uint32 buffers = (first_free > first_unused ?
+ first_unused + THR_FREE_BUF_MAX - first_free :
+ first_unused - first_free);
+ if (unlikely(buffers <= THR_FREE_BUF_MIN))
{
- selfptr->m_free --;
- thr_job_buffer* next = *reinterpret_cast<thr_job_buffer**>(jb);
- rep->m_thread[thr_no].m_free_list = next;
- jb->m_len = 0;
- jb->m_pos = 0;
- return jb;
- }
-
- /* global alloc */
- jb = rep->m_free_list.seize();
+ /*
+ All used, allocate another batch from global pool.
+
+ Put the new buffers at the head of the fifo, so as not to needlessly
+ push out any existing buffers from the fifo (that would loose useful
+ data for signal dumps in trace files).
+ */
+ Uint32 batch = THR_FREE_BUF_MAX / THR_FREE_BUF_BATCH;
+ assert(batch > 0);
+ assert(batch + THR_FREE_BUF_MIN < THR_FREE_BUF_MAX);
+ do {
+ jb = rep->m_free_list.seize();
+ jb->m_len = 0;
+ jb->m_pos = 0;
+ jb->m_prioa = false;
+ first_free = (first_free ? first_free : THR_FREE_BUF_MAX) - 1;
+ selfptr->m_free_fifo[first_free] = jb;
+ batch--;
+ } while (batch > 0);
+ selfptr->m_first_free = first_free;
+ }
+
+ jb= selfptr->m_free_fifo[first_free];
+ selfptr->m_first_free = (first_free + 1) % THR_FREE_BUF_MAX;
+ /* Init here rather than in release_buffer() so signal dump will work. */
jb->m_len = 0;
jb->m_pos = 0;
+ jb->m_prioa = prioa;
return jb;
}
@@ -359,17 +451,62 @@ void
release_buffer(struct thr_repository* rep, int thr_no, thr_job_buffer* jb)
{
struct thr_data* selfptr = rep->m_thread + thr_no;
-
- if (selfptr->m_free >= 32)
+ Uint32 first_free = selfptr->m_first_free;
+ Uint32 first_unused = selfptr->m_first_unused;
+ Uint32 last_free = (first_unused ? first_unused : THR_FREE_BUF_MAX) - 1;
+ thr_job_buffer *last_jb = selfptr->m_free_fifo[last_free];
+ Uint32 len1, len2;
+
+ if (!jb->m_prioa &&
+ first_free != first_unused &&
+ !last_jb->m_prioa &&
+ (len2 = jb->m_len) <= (thr_job_buffer::SIZE / 4) &&
+ (len1 = last_jb->m_len) + len2 <= thr_job_buffer::SIZE)
{
- rep->m_free_list.release(jb);
+ /*
+ The buffer being release is fairly empty, and what data it contains fit
+ in the previously released buffer.
+
+ We want to avoid too many almost-empty buffers in the free fifo, as that
+ makes signal traces less useful due to too little data available. So in
+ this case we move the data from the buffer to be released into the
+ previous buffer, and place the to-be-released buffer at the head of the
+ fifo (to be immediately reused).
+
+ This is only done for prio B buffers, as we must not merge prio A and B
+ data (or dumps would be incorrect), and prio A buffers are in any case
+ full when released.
+ */
+ memcpy(&(last_jb->m_data[len1]), &(jb->m_data[0]),
+ len2*sizeof(jb->m_data[0]));
+ last_jb->m_len = len1 + len2;
+ last_jb->m_pos += jb->m_pos;
+ jb->m_len = 0;
+ jb->m_pos = 0;
+ first_free = (first_free ? first_free : THR_FREE_BUF_MAX) - 1;
+ selfptr->m_free_fifo[first_free] = jb;
+ selfptr->m_first_free = first_free;
}
else
{
- thr_job_buffer** next = reinterpret_cast<thr_job_buffer**>(jb);
- * next = selfptr->m_free_list;
- selfptr->m_free_list = jb;
- selfptr->m_free ++;
+ /* Just insert at the end of the fifo. */
+ selfptr->m_free_fifo[first_unused] = jb;
+ first_unused = (first_unused + 1) % THR_FREE_BUF_MAX;
+ selfptr->m_first_unused = first_unused;
+ }
+
+ if (unlikely(first_unused == first_free))
+ {
+ /* FIFO full, need to release to global pool. */
+ Uint32 batch = THR_FREE_BUF_MAX / THR_FREE_BUF_BATCH;
+ assert(batch > 0);
+ assert(batch < THR_FREE_BUF_MAX);
+ do {
+ rep->m_free_list.release(selfptr->m_free_fifo[first_free]);
+ first_free = (first_free + 1) % THR_FREE_BUF_MAX;
+ batch--;
+ } while (batch > 0);
+ selfptr->m_first_free = first_free;
}
}
@@ -401,7 +538,7 @@ do_transfer:
assert(old == writeidx);
wakeup(&rep->m_thread[to].m_waiter); // potentially wakeup
- rep->m_thread[from].m_out_queue[to] = seize_buffer(rep, from);
+ rep->m_thread[from].m_out_queue[to] = seize_buffer(rep, from, false);
return ;
check_full:
ndbout_c("sleep in transfer from %u to %u", from, to);
@@ -418,7 +555,7 @@ check_full:
static
unsigned
dojob(struct thr_data* selfptr, struct thr_job_buffer* ptr, struct Signal* sig,
- Uint32 *watchDogCounter)
+ Uint32 *watchDogCounter, Uint32 *signalIdCounter)
{
unsigned cnt = 0;
unsigned pos = ptr->m_pos;
@@ -426,9 +563,7 @@ dojob(struct thr_data* selfptr, struct t
unsigned *posptr = ptr->m_data + pos;
unsigned *endptr = ptr->m_data + end;
SimulatedBlock** blockptr = selfptr->m_blocks - MIN_BLOCK_NO;
-
- ptr->m_pos = end;
-
+
while (posptr < endptr)
{
SignalHeader* s = reinterpret_cast<SignalHeader*>(posptr);
@@ -438,10 +573,15 @@ dojob(struct thr_data* selfptr, struct t
unsigned gsn = s->theVerId_signalNumber;
SimulatedBlock * block = * (blockptr + bno);
*watchDogCounter = 1;
+ s->theSignalId = (*signalIdCounter)++;
memcpy(&sig->header, s, 4*siglen);
sig->m_sectionPtrI[0] = posptr[siglen + 0];
sig->m_sectionPtrI[1] = posptr[siglen + 1];
sig->m_sectionPtrI[2] = posptr[siglen + 2];
+
+ /* Update just before execute so signal dump can know how far we are. */
+ ptr->m_pos += siglen + seccnt;
+
block->executeFunction(gsn, sig);
cnt++;
@@ -455,7 +595,7 @@ static
unsigned
dojoba(struct thr_repository * rep, unsigned thr_no,
Signal* signal, unsigned write_index,
- Uint32 *watchDogCounter)
+ Uint32 *watchDogCounter, Uint32 *signalIdCounter)
{
unsigned sum = 0;
struct thr_job_buffer* buffer;
@@ -468,13 +608,24 @@ dojoba(struct thr_repository * rep, unsi
while (ri_buf != wi_buf)
{
buffer = * (selfptr->m_jba.m_buffers + ri_buf);
- sum += dojob(selfptr, buffer, signal, watchDogCounter);
+ sum += dojob(selfptr, buffer, signal, watchDogCounter, signalIdCounter);
release_buffer(rep, thr_no, buffer);
ri_buf = (ri_buf + 1) % thr_jba::SIZE;
}
-
+
buffer = * (selfptr->m_jba.m_buffers + ri_buf);
- sum += dojob(selfptr, buffer, signal, watchDogCounter);
+ /*
+ We need to take the write lock here to make sure we do not execute any
+ partially written signals due to memory reordering.
+
+ Actually, it would be enough to just pass a value of
+ buffer->m_len read under lock before calling dojoba(). And more
+ actually, a memory barrier in that place would also be sufficient.
+ */
+ // ToDo: This deadlocks currently, as the spinlocks are not recursive
+// lock(&selfptr->m_jba.m_write_lock);
+ sum += dojob(selfptr, buffer, signal, watchDogCounter, signalIdCounter);
+// unlock(&selfptr->m_jba.m_write_lock);
unsigned ri_pos = buffer->m_pos;
selfptr->m_jba.m_read_index = (ri_buf << 16) + ri_pos;
@@ -673,25 +824,25 @@ block2ThreadId(Uint32 block)
{
/*
Block assignment:
- 0 PGMAN BACKUP LOCAL
- 1 LGMAN DBTC GLOBAL
- 2 TSMAN DBDIH GLOBAL
- 3 ACC DBLQH LOCAL
- 4 CMVMI DBACC LOCAL
- 5 FS DBTUP LOCAL
- 6 DICT DBDICT GLOBAL
- 7 DIH NDBCNTR GLOBAL
- 8 LQH QMGR GLOBAL
- 9 TC NDBFS GLOBAL
- 10 TUP CMVMI GLOBAL
- 11 NDBCNTR TRIX GLOBAL
- 12 QMGR DBUTIL GLOBAL
- 13 TRIX SUMA LOCAL
- 14 BACKUP DBTUX LOCAL
- 15 UTIL TSMAN LOCAL
- 16 SUMA LGMAN LOCAL
- 17 TUX PGMAN LOCAL
- 18 RESTORE RESTORE LOCAL
+ 0 BACKUP LOCAL
+ 1 DBTC GLOBAL
+ 2 DBDIH GLOBAL
+ 3 DBLQH LOCAL
+ 4 DBACC LOCAL
+ 5 DBTUP LOCAL
+ 6 DBDICT GLOBAL
+ 7 NDBCNTR GLOBAL
+ 8 QMGR GLOBAL
+ 9 NDBFS GLOBAL
+ 10 CMVMI GLOBAL
+ 11 TRIX GLOBAL
+ 12 DBUTIL GLOBAL
+ 13 SUMA LOCAL
+ 14 DBTUX LOCAL
+ 15 TSMAN LOCAL
+ 16 LGMAN LOCAL
+ 17 PGMAN LOCAL
+ 18 RESTORE LOCAL
Thread 0 is for global, thread 1 for locals.
*/
@@ -710,6 +861,7 @@ mt_thr_main(void *thr_arg)
nowait.tv_nsec = 10 * 1000000;
EmulatedJamBuffer thread_jam;
Uint32 watchDogCounter;
+ Uint32 thrSignalId = 0;
thread_jam.theEmulatedJamIndex = 0;
thread_jam.theEmulatedJamBlockNumber = 0;
@@ -718,6 +870,9 @@ mt_thr_main(void *thr_arg)
struct thr_repository* rep = &g_thr_repository;
struct thr_data* selfptr = (struct thr_data *)thr_arg;
+ selfptr->m_jam = &thread_jam;
+ selfptr->m_watchdog_counter = &watchDogCounter;
+ NdbThread_SetTlsKey(NDB_THREAD_TLS_THREAD, selfptr);
/*
The thread initialisation argument is void *, not numeric, so we obtain
the numeric thread id in this slightly backwards way.
@@ -766,27 +921,47 @@ mt_thr_main(void *thr_arg)
watchDogCounter = 2;
scan_time_queues(selfptr);
unsigned jba_read_index = * a_re_idxptr;
- unsigned jba_write_index = * a_wr_idxptr;
/**
* Process each inbuffer
*/
for (unsigned i = 0; i<cnt; i++)
{
+ /*
+ We must load the prio B state _before_ executing prio A signals.
+ Otherwise a later prio B signal may race to be executed ahead of an
+ earlier prio A signal.
+ */
+ unsigned ri = selfptr->m_in_queue[i].m_read_index;
+ unsigned wi = selfptr->m_in_queue[i].m_write_index;
+
+ /*
+ Now, having loaded the state of the prio B job buffer, we must first
+ execute _all_ prio A signals, to make sure that any A signal is
+ executed here before any B signal sent later by same sender thread.
+
+ We also need to make sure to read the prio A state under the prio A
+ lock (baring any other memory barrier mechanisms), to be sure that
+ any prio A activity done by other threads before visible prio B
+ activity will be visible in this thread.
+ */
+ lock(&selfptr->m_jba.m_write_lock);
+ Uint32 jba_write_index = * a_wr_idxptr;
+ unlock(&selfptr->m_jba.m_write_lock);
+
if (jba_read_index != jba_write_index)
{
- sum += dojoba(rep, thr_no, &signal, jba_write_index, &watchDogCounter);
+ sum += dojoba(rep, thr_no, &signal, jba_write_index,
+ &watchDogCounter, &thrSignalId);
jba_read_index = * a_re_idxptr;
}
- unsigned ri = selfptr->m_in_queue[i].m_read_index;
- unsigned wi = selfptr->m_in_queue[i].m_write_index;
thr_job_buffer * jb = selfptr->m_in_queue[i].m_buffers[ri];
if (ri != wi)
{
ri = (ri + 1) % thr_job_queue::SIZE;
selfptr->m_in_queue[i].m_read_index = ri;
- sum += dojob(selfptr, jb, &signal, &watchDogCounter);
+ sum += dojob(selfptr, jb, &signal, &watchDogCounter, &thrSignalId);
release_buffer(rep, thr_no, jb);
}
jba_write_index = * a_wr_idxptr;
@@ -864,56 +1039,133 @@ sendlocal(Uint32 self, Uint32 block, con
}
}
+/* This must be called only while holding the m_jba.m_write_lock mutex. */
+static
+inline
+Uint32
+insert_prioa(thr_data *dstptr, const SignalHeader *s)
+{
+ Uint32 siglen = (sizeof(*s) >> 2) + s->theLength + s->m_noOfSections;
+ Uint32 wi = dstptr->m_jba.m_write_index;
+ Uint32 buf = wi >> 16;
+ Uint32 newwi = wi + siglen;
+ Uint32 pos = wi & 0xFFFF;
+ Uint32 newpos = pos + siglen;
+
+ struct thr_job_buffer * buffer = * (dstptr->m_jba.m_buffers + buf);
+ assert(buffer->m_len == pos);
+ buffer->m_len = newpos;
+ memcpy(buffer->m_data + pos, s, 4*siglen);
+ dstptr->m_jba.m_write_index = newwi;
+
+ return newpos;
+}
void
sendprioa(Uint32 self, Uint32 block, const SignalHeader* s)
{
+ static const Uint32 EXTRA_SPACE
+ = (sizeof(SignalHeader) >> 2) + StopForCrash::SignalLength;
Uint32 dst = block2ThreadId(block);
struct thr_repository* rep = &g_thr_repository;
struct thr_data * selfptr = rep->m_thread + self;
struct thr_data * dstptr = rep->m_thread + dst;
- unsigned siglen = (sizeof(*s) >> 2) + s->theLength + s->m_noOfSections;
-
struct thr_job_buffer* newbuffer = selfptr->m_jba.m_next_buffer;
lock(&dstptr->m_jba.m_write_lock);
-
- unsigned wi = dstptr->m_jba.m_write_index;
- unsigned buf = wi >> 16;
- unsigned newwi = wi + siglen;
- unsigned pos = wi & 0xFFFF;
- unsigned newpos = pos + siglen;
- struct thr_job_buffer * buffer = * (dstptr->m_jba.m_buffers + buf);
- assert(buffer->m_len == pos);
- buffer->m_len = newpos;
- memcpy(buffer->m_data + pos, s, 4*siglen);
+ Uint32 newpos = insert_prioa(dstptr, s);
if (0)
ndbout_c("sendprioa %s from %s to %s",
getSignalName(s->theVerId_signalNumber),
getBlockName(refToBlock(s->theSendersBlockRef)),
getBlockName(s->theReceiversBlockNumber));
-
- if (unlikely(newpos + 32 >= thr_job_buffer::SIZE))
+
+ /*
+ We reserve extra space for a GSN_STOP_FOR_CRASH signal, so that we can
+ send this signal during a crash in _any_ thread, not just threads which
+ have the infrastructure for selfptr->m_jba->m_next_buffer.
+ */
+ if (unlikely(newpos + 32 + EXTRA_SPACE >= thr_job_buffer::SIZE))
{
- newwi = (buf + 1) % thr_jba::SIZE;
+ Uint32 newwi = ((dstptr->m_jba.m_write_index >> 16) + 1) % thr_jba::SIZE;
+
+ /*
+ There seems to be a race (or missing overflow check) here, nothing
+ seems to prevent senders from overwriting not yet executed
+ previous signal data.
+
+ But that is perhaps unlikely to occur in practise, given the size
+ of buffers, the low amount of prio A signals sent, and the
+ frequency of executing prio A.
+
+ For now, I'll add an assert() ...
+ */
+ assert(newwi != (dstptr->m_jba.m_read_index >> 16));
+
* (dstptr->m_jba.m_buffers + newwi) = newbuffer;
newwi = (newwi << 16);
+ dstptr->m_jba.m_write_index = newwi;
}
- dstptr->m_jba.m_write_index = newwi;
unlock(&dstptr->m_jba.m_write_lock);
+ /* ToDo: Hm, shouldn't we possibly wake up the other thread here? */
+
/**
* Note, do malloc outside of critical region
*/
- if (newpos + 32 >= thr_job_buffer::SIZE)
+ if (newpos + 32 + EXTRA_SPACE >= thr_job_buffer::SIZE)
{
- selfptr->m_jba.m_next_buffer = seize_buffer(rep, self);
+ selfptr->m_jba.m_next_buffer = seize_buffer(rep, self, true);
}
}
+/*
+ This functions sends a prio A STOP_FOR_CRASH signal to a thread.
+
+ It works when called from any other thread, not just from job processing
+ threads. This works by having regular sendprioa() reserve space for one
+ GSN_STOP_FOR_CRASH signal.
+*/
+static
+void
+sendprioa_STOP_FOR_CRASH(Uint32 dst)
+{
+ SignalT<StopForCrash::SignalLength> signalT;
+ struct thr_repository* rep = &g_thr_repository;
+ struct thr_data * dstptr = rep->m_thread + dst;
+
+ /*
+ Currently we have two threads with fixed block assignment.
+ So we send STOP_FOR_CRASH to CMVMI for thread 0 (global) and to
+ DBLQH for thread 1 (local).
+ */
+ assert(dst == 0 || dst == 1); // ToDo when/if more threads.
+ Uint32 bno;
+ if (dst == 0)
+ bno = CMVMI;
+ else if (dst == 1)
+ bno = DBLQH;
+ assert(block2ThreadId(bno) == dst);
+
+ memset(&signalT.header, 0, sizeof(SignalHeader));
+ signalT.header.theVerId_signalNumber = GSN_STOP_FOR_CRASH;
+ signalT.header.theReceiversBlockNumber = bno;
+ signalT.header.theSendersBlockRef = 0;
+ signalT.header.theTrace = 0;
+ signalT.header.theSendersSignalId = 0;
+ signalT.header.theSignalId = 0;
+ signalT.header.theLength = StopForCrash::SignalLength;
+ StopForCrash * const stopForCrash = (StopForCrash *)&signalT.theData[0];
+ stopForCrash->flags = 0;
+
+ lock(&dstptr->m_jba.m_write_lock);
+ insert_prioa(dstptr, &(signalT.header));
+ unlock(&dstptr->m_jba.m_write_lock);
+}
+
static
inline
Uint32*
@@ -941,7 +1193,7 @@ retry:
{
if (tq->m_delayed_signals[i] == 0)
{
- struct thr_job_buffer *jb = seize_buffer(rep, thr_no);
+ struct thr_job_buffer *jb = seize_buffer(rep, thr_no, false);
Uint32 * page = reinterpret_cast<Uint32*>(jb);
tq->m_delayed_signals[i] = page;
@@ -1055,15 +1307,15 @@ init(struct thr_repository* rep, struct
unsigned int i;
selfptr->m_thr_no = thr_no;
- selfptr->m_free_list = 0;
+ selfptr->m_first_free = 0;
+ selfptr->m_first_unused = 0;
for (i = 0; i<cnt; i++)
- selfptr->m_out_queue[i] = seize_buffer(rep, thr_no);
+ selfptr->m_out_queue[i] = seize_buffer(rep, thr_no, false);
- selfptr->m_free = 0;
selfptr->m_jba.m_read_index = 0;
selfptr->m_jba.m_write_index = 0;
- selfptr->m_jba.m_buffers[0] = seize_buffer(rep, thr_no);
- selfptr->m_jba.m_next_buffer = seize_buffer(rep, thr_no);
+ selfptr->m_jba.m_buffers[0] = seize_buffer(rep, thr_no, true);
+ selfptr->m_jba.m_next_buffer = seize_buffer(rep, thr_no, true);
for (i = 0; i<cnt; i++)
{
@@ -1072,6 +1324,8 @@ init(struct thr_repository* rep, struct
}
init(&selfptr->m_tq);
+
+ selfptr->m_jam = NULL;
}
static
@@ -1083,6 +1337,10 @@ init(struct thr_repository* rep, unsigne
{
init(rep, rep->m_thread + i, cnt, i);
}
+
+ rep->stopped_threads = 0;
+ pthread_mutex_init(&rep->stop_for_crash_mutex, NULL);
+ pthread_cond_init(&rep->stop_for_crash_cond, NULL);
}
@@ -1170,6 +1428,360 @@ ThreadConfig::doStart(NodeState::StartLe
senddelay(0, &signalT.header, 1);
return 0;
+}
+
+/*
+ Compare signal ids, taking into account overflow/wrapover.
+ Return same as strcmp().
+ Eg.
+ wrap_compare(0x10,0x20) -> -1
+ wrap_compare(0x10,0xffffff20) -> 1
+ wrap_compare(0xffffff80,0xffffff20) -> 1
+ wrap_compare(0x7fffffff, 0x80000001) -> -1
+*/
+static
+inline
+int
+wrap_compare(Uint32 a, Uint32 b)
+{
+ /* Avoid dependencies on undefined C/C++ interger overflow semantics. */
+ if (a >= 0x80000000)
+ if (b >= 0x80000000)
+ return (int)(a & 0x7fffffff) - (int)(b & 0x7fffffff);
+ else
+ return (a - b) >= 0x80000000 ? -1 : 1;
+ else
+ if (b >= 0x80000000)
+ return (b - a) >= 0x80000000 ? 1 : -1;
+ else
+ return (int)a - (int)b;
+}
+
+Uint32
+FastScheduler::traceDumpGetNumThreads()
+{
+ /* The last thread is only for receiver -> no trace file. */
+ return NUM_THREADS - 1;
+}
+
+bool
+FastScheduler::traceDumpGetJam(Uint32 thr_no, Uint32 & jamBlockNumber,
+ const Uint32 * & thrdTheEmulatedJam,
+ Uint32 & thrdTheEmulatedJamIndex)
+{
+ if (thr_no >= (NUM_THREADS - 1))
+ return false;
+
+#ifdef NO_EMULATED_JAM
+ jamBlockNumber = 0;
+ thrdTheEmulatedJam = NULL;
+ thrdTheEmulatedJamIndex = 0;
+#else
+ const EmulatedJamBuffer *jamBuffer = g_thr_repository.m_thread[thr_no].m_jam;
+ thrdTheEmulatedJam = jamBuffer->theEmulatedJam;
+ thrdTheEmulatedJamIndex = jamBuffer->theEmulatedJamIndex;
+ jamBlockNumber = jamBuffer->theEmulatedJamBlockNumber;
+#endif
+ return true;
+}
+
+void
+FastScheduler::traceDumpPrepare()
+{
+ /*
+ We are about to generate trace files for all threads.
+
+ We want to stop all threads processing before we dump, as otherwise the
+ signal buffers could change while dumping, leading to inconsistent
+ results.
+
+ To stop threads, we send the GSN_STOP_FOR_CRASH signal as prio A to each
+ thread. We then wait for threads to signal they are done (but not forever,
+ so as to not have one hanging thread prevent the generation of trace
+ dumps). We also must be careful not to send to ourself if the crash is
+ being processed by one of the threads processing signals.
+
+ We do not stop the transporter thread, as it cannot receive signals (but
+ because it does not receive signals it does not really influence dumps in
+ any case).
+ */
+ void *value= NdbThread_GetTlsKey(NDB_THREAD_TLS_THREAD);
+ const thr_data *selfptr = reinterpret_cast<const thr_data *>(value);
+ /* The selfptr might be NULL, or pointer to thread that crashed. */
+
+ Uint32 waitFor_count = 0;
+ pthread_mutex_lock(&g_thr_repository.stop_for_crash_mutex);
+ g_thr_repository.stopped_threads = 0;
+
+ for (Uint32 thr_no = 0; thr_no < (NUM_THREADS - 1); thr_no++)
+ {
+ if (selfptr != NULL && selfptr->m_thr_no == thr_no)
+ {
+ /* This is own thread; we have already stopped processing. */
+ continue;
+ }
+
+ sendprioa_STOP_FOR_CRASH(thr_no);
+
+ waitFor_count++;
+ }
+
+ static const Uint32 max_wait_seconds = 2;
+ NDB_TICKS start = NdbTick_CurrentMillisecond();
+ struct timespec waittime;
+ waittime.tv_sec = 0;
+ waittime.tv_nsec = 10*1000*1000;
+ while (g_thr_repository.stopped_threads < waitFor_count)
+ {
+ pthread_cond_timedwait(&g_thr_repository.stop_for_crash_cond,
+ &g_thr_repository.stop_for_crash_mutex,
+ &waittime);
+ NDB_TICKS now = NdbTick_CurrentMillisecond();
+ if (now > start + max_wait_seconds * 1000)
+ break; // Give up
+ }
+ if (g_thr_repository.stopped_threads < waitFor_count)
+ {
+ ndbout_c("Warning: %d thread(s) did not stop before starting crash dump.",
+ waitFor_count - g_thr_repository.stopped_threads);
+ }
+ pthread_mutex_unlock(&g_thr_repository.stop_for_crash_mutex);
+
+ /* Now we are ready (or as ready as can be) for doing crash dump. */
+}
+
+void mt_execSTOP_FOR_CRASH()
+{
+ void *value= NdbThread_GetTlsKey(NDB_THREAD_TLS_THREAD);
+ const thr_data *selfptr = reinterpret_cast<const thr_data *>(value);
+ assert(selfptr != NULL);
+
+ pthread_mutex_lock(&g_thr_repository.stop_for_crash_mutex);
+ g_thr_repository.stopped_threads++;
+ pthread_cond_signal(&g_thr_repository.stop_for_crash_cond);
+ pthread_mutex_unlock(&g_thr_repository.stop_for_crash_mutex);
+
+ globalEmulatorData.theWatchDog->unregisterWatchedThread(selfptr->m_thr_no);
+
+ pthread_exit(NULL);
+}
+
+void
+FastScheduler::dumpSignalMemory(Uint32 thr_no, FILE* out)
+{
+ void *value= NdbThread_GetTlsKey(NDB_THREAD_TLS_THREAD);
+ const thr_data *selfptr = reinterpret_cast<const thr_data *>(value);
+ /*
+ The selfptr might be NULL, or pointer to thread that is doing the crash
+ jump.
+ If non-null, we should update the watchdog counter while dumping.
+ */
+ Uint32 *watchDogCounter;
+ if (selfptr)
+ watchDogCounter = selfptr->m_watchdog_counter;
+ else
+ watchDogCounter = NULL;
+
+ /*
+ We want to dump the signal buffers from last executed to first executed.
+ So we first need to find the correct sequence to output signals in, stored
+ in this arrray.
+
+ We will check any buffers in the cyclic m_free_fifo. In addition,
+ we also need to scan the already executed part of the current
+ buffer in m_jba.
+
+ Due to partial execution of prio A buffers, we will use signal ids to know
+ where to interleave prio A signals into the stream of prio B signals
+ read. So we will keep a pointer to a prio A buffer around; and while
+ scanning prio B buffers we will interleave prio A buffers from that buffer
+ when the signal id fits the sequence.
+
+ This also means that we may have to discard the earliest part of available
+ prio A signal data due to too little prio B data present, or vice versa.
+ */
+ static const Uint32 MAX_SIGNALS_TO_DUMP = 4096;
+ struct {
+ const SignalHeader *ptr;
+ bool prioa;
+ } signalSequence[MAX_SIGNALS_TO_DUMP];
+ Uint32 seq_start = 0;
+ Uint32 seq_end = 0;
+
+ const thr_data *thr_ptr = &g_thr_repository.m_thread[thr_no];
+ /*
+ ToDo: Might do some sanity check to avoid crashing on not yet initialised
+ thread.
+ */
+
+ /* Scan all available buffers with already executed signals. */
+ Uint32 jbb_start = thr_ptr->m_first_free;
+ Uint32 jb_end = thr_ptr->m_first_unused;
+ const thr_job_buffer *jbb = NULL;
+ Uint32 jbb_pos, jbb_len;
+ const thr_job_buffer *jba = NULL;
+ Uint32 jba_current = (thr_ptr->m_jba.m_read_index) >> 16;
+ Uint32 jba_start = jbb_start;
+ Uint32 jba_pos, jba_len;
+ bool no_more_prioa = false;
+
+ if (watchDogCounter)
+ *watchDogCounter = 4;
+
+ for (;;)
+ {
+ if (jba == NULL && !no_more_prioa)
+ {
+ /* Find the next released buffer containing prio A signals, if any. */
+ while (jba_start != jb_end)
+ {
+ const thr_job_buffer *tmp = thr_ptr->m_free_fifo[jba_start];
+ jba_start = (jba_start + 1) % THR_FREE_BUF_MAX;
+ if (tmp->m_pos > 0 && tmp->m_prioa)
+ {
+ jba = tmp;
+ jba_pos = 0;
+ jba_len = jba->m_pos;
+ break;
+ }
+ }
+ if (jba == NULL)
+ {
+ /*
+ No more released prio A buffers, but there might be some stuff
+ in the current buffer.
+ */
+ jba = thr_ptr->m_jba.m_buffers[jba_current];
+ /*
+ jba==NULL shouldn't be possible, but better to at least get
+ some dump in case it happens anyway.
+ */
+ if (jba && jba->m_pos > 0)
+ {
+ jba_pos = 0;
+ jba_len = jba->m_pos;
+ }
+ else
+ jba = NULL;
+ no_more_prioa = true;
+ }
+ }
+
+ if (jbb == NULL)
+ {
+ /* Find the next released buffer containing prio B signals, if any. */
+ while (jbb_start != jb_end)
+ {
+ const thr_job_buffer *tmp = thr_ptr->m_free_fifo[jbb_start];
+ jbb_start = (jbb_start + 1) % THR_FREE_BUF_MAX;
+ if (tmp->m_pos > 0 && !tmp->m_prioa)
+ {
+ jbb = tmp;
+ jbb_pos = 0;
+ jbb_len = jbb->m_pos;
+ break;
+ }
+ }
+ }
+ const SignalHeader *s_a;
+ if (jba)
+ s_a = reinterpret_cast<const SignalHeader*>(&(jba->m_data[jba_pos]));
+ else
+ s_a = NULL;
+
+ const SignalHeader *s_b;
+ if (jbb)
+ s_b = reinterpret_cast<const SignalHeader*>(&(jbb->m_data[jbb_pos]));
+ else
+ s_b = NULL;
+
+ /* Check if we're all done. */
+ if (!s_a && !s_b)
+ break;
+
+ /*
+ If we have both a prio A and a prio B, take the one with the lowest
+ signal id (taking into account that 0x00000000 is just after 0xffffffff).
+ */
+ int cmp; // <0 (a < b) -> use a, >0 -> (a > b) -> use b
+ if (!s_a)
+ cmp = 1;
+ else if (!s_b)
+ cmp = -1;
+ else
+ cmp = wrap_compare(s_a->theSignalId, s_b->theSignalId);
+
+ /* cmp=0 is impossible, unless we somehow get >2**32 signals of history. */
+ if (cmp < 0)
+ {
+ /* Next is prio A. */
+ signalSequence[seq_end].ptr = s_a;
+ signalSequence[seq_end].prioa = true;
+ jba_pos += (sizeof(*s_a)>>2) + s_a->m_noOfSections + s_a->theLength;
+ if (jba_pos >= jba_len)
+ jba = NULL; // Done with this buffer
+ }
+ else
+ {
+ /* Next is prio B. */
+ signalSequence[seq_end].ptr = s_b;
+ signalSequence[seq_end].prioa = false;
+ jbb_pos += (sizeof(*s_b)>>2) + s_b->m_noOfSections + s_b->theLength;
+ if (jbb_pos >= jbb_len)
+ jbb = NULL; // Done with this buffer
+ }
+ seq_end = (seq_end + 1) % MAX_SIGNALS_TO_DUMP;
+ /* Drop old signals if too many available in history. */
+ if (seq_end == seq_start)
+ seq_start = (seq_start + 1) % MAX_SIGNALS_TO_DUMP;
+ }
+
+ /* Now, having build the correct signal sequence, we can dump them all. */
+ fprintf(out, "\n");
+ bool first_one = true;
+ bool out_of_signals = false;
+ Uint32 lastSignalId = 0;
+ while (seq_end != seq_start)
+ {
+ if (watchDogCounter)
+ *watchDogCounter = 4;
+
+ if (seq_end == 0)
+ seq_end = MAX_SIGNALS_TO_DUMP;
+ seq_end--;
+ Signal signal;
+ const SignalHeader *s = signalSequence[seq_end].ptr;
+ unsigned siglen = (sizeof(*s)>>2) + s->theLength;
+ memcpy(&signal.header, s, 4*siglen);
+
+ const Uint32 *posptr = reinterpret_cast<const Uint32 *>(s);
+ signal.m_sectionPtrI[0] = posptr[siglen + 0];
+ signal.m_sectionPtrI[1] = posptr[siglen + 1];
+ signal.m_sectionPtrI[2] = posptr[siglen + 2];
+ bool prioa = signalSequence[seq_end].prioa;
+
+ /* Make sure to display clearly when there is a gap in the dump. */
+ if (!first_one && !out_of_signals && (s->theSignalId + 1) != lastSignalId)
+ {
+ out_of_signals = true;
+ fprintf(out, "\n\n\nNo more prio %s signals, rest of dump will be "
+ "incomplete.\n\n\n\n", prioa ? "B" : "A");
+ }
+ first_one = false;
+ lastSignalId = s->theSignalId;
+
+ fprintf(out, "--------------- Signal ----------------\n");
+ Uint32 prio = (prioa ? JBA : JBB);
+ SignalLoggerManager::printSignalHeader(out,
+ signal.header,
+ prio,
+ globalData.ownId,
+ true);
+ SignalLoggerManager::printSignalData (out,
+ signal.header,
+ &signal.theData[0]);
+ }
+ fflush(out);
}
void
diff -Nrup a/storage/ndb/src/kernel/vm/mt/mt.hpp b/storage/ndb/src/kernel/vm/mt/mt.hpp
--- a/storage/ndb/src/kernel/vm/mt/mt.hpp 2007-09-24 17:12:18 +02:00
+++ b/storage/ndb/src/kernel/vm/mt/mt.hpp 2007-10-04 14:45:20 +02:00
@@ -17,6 +17,7 @@ extern Uint32 senderThreadId;
void sendlocal(Uint32 self, Uint32 dst, const struct SignalHeader*);
void sendprioa(Uint32 self, Uint32 dst, const struct SignalHeader*);
void senddelay(Uint32 thr_no, const struct SignalHeader*, Uint32 delay);
+void mt_execSTOP_FOR_CRASH();
void mt_send_lock();
void mt_send_unlock();
| Thread |
|---|
| • bk commit into 5.1 tree (knielsen:1.2611) | knielsen | 4 Oct |