Below is the list of changes that have just been committed into a local
5.0 repository of jonas. When jonas does a push these changes will
be propagated to the main repository and, within 24 hours after the
push, to the public repository.
For information on how to access the public repository
see http://dev.mysql.com/doc/mysql/en/installing-source-tree.html
ChangeSet@stripped, 2006-11-09 15:26:52+01:00, jonas@eel.(none) +5 -0
start to integrate mt.cpp
ndb/src/kernel/Makefile.am@stripped, 2006-11-09 15:26:49+01:00, jonas@eel.(none) +6 -2
build 2 binaries ndbd & ndbmtd
ndb/src/kernel/vm/Makefile.am@stripped, 2006-11-09 15:26:49+01:00, jonas@eel.(none) +11 -6
split into 2 libraries, one single threaded and 1 multi threade
ndb/src/kernel/vm/mt/dummy_mt.cpp@stripped, 2006-11-09 15:26:49+01:00, jonas@eel.(none) +58
-0
New BitKeeper file ``ndb/src/kernel/vm/mt/dummy_mt.cpp''
ndb/src/kernel/vm/mt/dummy_mt.cpp@stripped, 2006-11-09 15:26:49+01:00, jonas@eel.(none) +0 -0
ndb/src/kernel/vm/mt/mt.cpp@stripped, 2006-11-09 15:26:49+01:00, jonas@eel.(none) +583 -107
updates to integrate with real vm
ndb/src/kernel/vm/mt/mt.cpp@stripped, 2006-11-09 12:11:23+01:00, jonas@eel.(none) +0 -0
Rename: ndb/src/kernel/vm/mt.cpp -> ndb/src/kernel/vm/mt/mt.cpp
ndb/src/kernel/vm/mt/sb_mt.cpp@stripped, 2006-11-09 15:26:49+01:00, jonas@eel.(none) +1993 -0
New BitKeeper file ``ndb/src/kernel/vm/mt/sb_mt.cpp''
ndb/src/kernel/vm/mt/sb_mt.cpp@stripped, 2006-11-09 15:26:49+01:00, jonas@eel.(none) +0 -0
# This is a BitKeeper patch. What follows are the unified diffs for the
# set of deltas contained in the patch. The rest of the patch, the part
# that BitKeeper cares about, is below these diffs.
# User: jonas
# Host: eel.(none)
# Root: /home/jonas/src/50-atrt
--- 1.1/ndb/src/kernel/vm/mt.cpp 2006-11-09 15:26:58 +01:00
+++ 1.3/ndb/src/kernel/vm/mt/mt.cpp 2006-11-09 15:26:58 +01:00
@@ -12,8 +12,15 @@
#include <stdlib.h>
#include <string.h>
-#define likely(x) x
-#define unlikely(x) x
+#include <VMSignal.hpp>
+#include <kernel_types.h>
+#include <Prio.hpp>
+#include <SignalLoggerManager.hpp>
+#include <SimulatedBlock.hpp>
+#include <ErrorHandlingMacros.hpp>
+#include <GlobalData.hpp>
+#include <TransporterDefinitions.hpp>
+#include <prefetch.h>
static inline
int
@@ -94,74 +101,33 @@
return 0;
}
-extern "C"
-void *
-run(void * arg)
-{
- thr_wait* wait = (thr_wait*)arg;
- sleep(1);
- printf("wakeup: %d\n", wakeup(wait));
- sleep(1);
-}
-
-struct signalheader
-{
- unsigned len;
- unsigned seccount;
- unsigned bno;
- unsigned gsn;
- unsigned header[3];
- unsigned data[1]; // As specified in len
-};
-
-struct Signal
-{
- struct signalheader m_header;
-};
-
-struct SimulatedBlock
-{
- unsigned m_thr_no;
- struct thr_data* m_thr_data;
- struct thread_repository* m_thr_repository;
-
- void execute(unsigned gsn, struct Signal*);
- void sendsignal(Signal*, unsigned ref_thr);
-};
-
-struct globaldata
-{
- SimulatedBlock* get(unsigned);
-};
-
-struct job_buffer // 32k
+struct thr_job_buffer // 32k
{
unsigned m_len;
unsigned m_data[8191];
- inline int insert(struct signalheader* s) {
+ inline int insert(struct SignalHeader* s) {
unsigned len = m_len;
unsigned* pos = m_data + len;
-
- unsigned siglen = s->len + s->seccount;
+ unsigned siglen = s->theLength + s->m_noOfSections;
memcpy(pos, s, 4*siglen);
m_len = len + siglen;
return (len + siglen + 32) - 8191; // > 0 not full, <=0 full
}
- unsigned dojob(struct globaldata* g, struct Signal* sig){
+ unsigned dojob(struct GlobalData* g, struct Signal* sig){
unsigned cnt = 0;
unsigned pos = 0;
unsigned len = m_len;
while (pos < len)
{
- signalheader* s = reinterpret_cast<signalheader*>(m_data + pos);
- unsigned siglen = s->len + s->seccount;
- unsigned bno = s->bno;
- unsigned gsn = s->gsn;
- SimulatedBlock * block = g->get(bno);
+ SignalHeader* s = reinterpret_cast<SignalHeader*>(m_data + pos);
+ unsigned siglen = s->theLength + s->m_noOfSections;
+ unsigned bno = s->theReceiversBlockNumber;
+ unsigned gsn = s->theVerId_signalNumber;
+ SimulatedBlock * block = g->getBlock(bno);
memcpy(sig, s, 4*siglen);
- block->execute(gsn, sig);
+ block->executeFunction(gsn, sig);
cnt++;
pos += siglen;
@@ -170,44 +136,43 @@
}
};
-struct job_queue
+struct thr_job_queue
{
static const unsigned SIZE = 62;
unsigned m_read_index; // used by consumer
unsigned m_write_index; // used by producer
- struct job_buffer* m_buffers[SIZE];
+ struct thr_job_buffer* m_buffers[SIZE];
};
struct thr_data
{
thr_wait m_waiter;
- struct job_buffer* m_out_queue[MAX_THREADS];
- struct job_buffer* m_free_list;
- struct job_queue m_in_queue[MAX_THREADS];
+ struct thr_job_buffer* m_out_queue[MAX_THREADS];
+ struct thr_job_buffer* m_free_list;
+ struct thr_job_queue m_in_queue[MAX_THREADS];
};
template<typename T>
struct thr_safe_pool
{
- T* seize();
+ T* seize() { return reinterpret_cast<T*>(malloc(sizeof(T)));}
};
-struct thread_repository
+struct thr_repository
{
int m_thread_count;
struct thr_data m_thread[MAX_THREADS];
- struct thr_safe_pool<job_buffer> m_free_list;
+ struct thr_safe_pool<thr_job_buffer> m_free_list;
};
-#ifndef NOCODE
static
-job_buffer*
-seize_buffer(struct thread_repository* rep, int thr_no)
+thr_job_buffer*
+seize_buffer(struct thr_repository* rep, int thr_no)
{
- job_buffer* jb;
+ thr_job_buffer* jb;
if (likely((jb = rep->m_thread[thr_no].m_free_list)))
{
- job_buffer* next = reinterpret_cast<job_buffer*>(jb->m_len);
+ thr_job_buffer* next = reinterpret_cast<thr_job_buffer*>(jb->m_len);
rep->m_thread[thr_no].m_free_list = next;
return jb;
}
@@ -218,24 +183,24 @@
static
void
-release_buffer(struct thread_repository* rep, int thr_no, job_buffer* jb)
+release_buffer(struct thr_repository* rep, int thr_no, thr_job_buffer* jb)
{
- job_buffer* next = reinterpret_cast<job_buffer*>(jb->m_len);
+ thr_job_buffer* next = reinterpret_cast<thr_job_buffer*>(jb->m_len);
next = rep->m_thread[thr_no].m_free_list;
rep->m_thread[thr_no].m_free_list = jb;
}
static
void
-transfer_buffer(struct thread_repository* rep, int from, int to)
+transfer_buffer(struct thr_repository* rep, int from, int to)
{
unsigned old;
unsigned * writeptr = &(rep->m_thread[to].m_in_queue[from].m_write_index);
volatile unsigned * readptr =
&(rep->m_thread[to].m_in_queue[from].m_read_index);
unsigned readidx = * writeptr;
unsigned writeidx = * readptr;
- unsigned nextidx = (writeidx + 1) % job_queue::SIZE;
- job_buffer* src = rep->m_thread[from].m_out_queue[to];
+ unsigned nextidx = (writeidx + 1) % thr_job_queue::SIZE;
+ thr_job_buffer* src = rep->m_thread[from].m_out_queue[to];
if (unlikely(readidx == nextidx))
goto check_full;
@@ -259,11 +224,11 @@
}
void
-thr_main(struct thread_repository* rep, unsigned thr_no)
+thr_main(struct thr_repository* rep, unsigned thr_no)
{
- struct Signal * signal;
- while (true)
- {
+ Signal signal;
+ while (globalData.theRestartFlag != perform_stop)
+ {
unsigned sum = 0;
unsigned cnt = rep->m_thread_count;
/**
@@ -273,12 +238,12 @@
{
unsigned ri = rep->m_thread[thr_no].m_in_queue[i].m_read_index;
unsigned wi = rep->m_thread[thr_no].m_in_queue[i].m_write_index;
- job_buffer * jb = rep->m_thread[thr_no].m_in_queue[i].m_buffers[ri];
+ thr_job_buffer * jb = rep->m_thread[thr_no].m_in_queue[i].m_buffers[ri];
if (ri != wi)
{
- ri = (ri + 1) % job_queue::SIZE;
+ ri = (ri + 1) % thr_job_queue::SIZE;
rep->m_thread[thr_no].m_in_queue[i].m_read_index = ri;
- sum += jb->dojob(0, signal);
+ sum += jb->dojob(0, &signal);
release_buffer(rep, thr_no, jb);
}
}
@@ -289,7 +254,7 @@
*/
for (unsigned i = 0; i<cnt; i++)
{
- job_buffer * jb = rep->m_thread[thr_no].m_out_queue[i];
+ thr_job_buffer * jb = rep->m_thread[thr_no].m_out_queue[i];
if (jb->m_len)
{
transfer_buffer(rep, thr_no, i);
@@ -304,45 +269,556 @@
}
void
-SimulatedBlock::sendsignal(Signal* sig, unsigned thr_no)
+thr_transporter_main(struct thr_repository* rep, unsigned thr_no)
{
- struct thread_repository* rep = m_thr_repository;
- int res = m_thr_data->m_out_queue[thr_no]->insert(&sig->m_header);
- if (res <= 0)
- {
- transfer_buffer(rep, m_thr_no, thr_no);
- }
}
-#endif
+/**
+ * Thread Config
+ */
-void
-sizes()
+#include "ThreadConfig.hpp"
+#include <signaldata/StartOrd.hpp>
+
+ThreadConfig::ThreadConfig()
{
- printf("sizeof(thr_wait): %d\n", sizeof(thr_wait));
- printf("sizeof(job_buffer): %d\n", sizeof(job_buffer));
- printf("sizeof(job_queue): %d\n", sizeof(job_queue));
- printf("sizeof(thr_data): %d\n", sizeof(thr_data));
- printf("sizeof(thread_repository): %d\n", sizeof(thread_repository));
}
-int main(void)
+ThreadConfig::~ThreadConfig()
+{
+}
+
+
+void ThreadConfig::ipControlLoop()
+{
+}
+
+int
+ThreadConfig::doStart(NodeState::StartLevel startLevel)
{
#if 0
- thr_wait wait;
- pthread_t thr;
- wait.wakeup();
- pthread_create(&thr, 0, run, &wait);
- wait.yield(0);
+ SignalHeader sh;
+ memset(&sh, 0, sizeof(SignalHeader));
+
+ sh.theVerId_signalNumber = GSN_START_ORD;
+ sh.theReceiversBlockNumber = CMVMI;
+ sh.theSendersBlockRef = 0;
+ sh.theTrace = 0;
+ sh.theSignalId = 0;
+ sh.theLength = StartOrd::SignalLength;
+
+ Uint32 theData[25];
+ StartOrd * const startOrd = (StartOrd *)&theData[0];
+ startOrd->restartInfo = 0;
+
+ Uint32 secPtrI[3];
+ globalScheduler.execute(&sh, JBA, theData, secPtrI);
+ return 0;
+#endif
+}
+
+/**
+ * Transporter part
+ */
+#include <TransporterCallback.hpp>
+#include <TransporterRegistry.hpp>
+#include <FastScheduler.hpp>
+#include <Emulator.hpp>
+#include <ErrorHandlingMacros.hpp>
+
+#include "LongSignal.hpp"
+
+#include <signaldata/EventReport.hpp>
+#include <signaldata/TestOrd.hpp>
+#include <signaldata/SignalDroppedRep.hpp>
+#include <signaldata/DisconnectRep.hpp>
+
+#include "VMSignal.hpp"
+#include <NdbOut.hpp>
+#include "DataBuffer.hpp"
+
+/**
+ * The instance
+ */
+SectionSegmentPool g_sectionSegmentPool;
+
+struct ConnectionError
+{
+ enum TransporterError err;
+ const char *text;
+};
+
+static const ConnectionError connectionError[] =
+{
+ { TE_NO_ERROR, "No error"},
+ { TE_SHM_UNABLE_TO_CREATE_SEGMENT, "Unable to create shared memory segment"},
+ { (enum TransporterError) -1, "No connection error message available (please report a
bug)"}
+};
+
+const char *lookupConnectionError(Uint32 err)
+{
+ int i= 0;
+ while ((Uint32)connectionError[i].err != err &&
+ (Uint32)connectionError[i].err != -1)
+ i++;
+ return connectionError[i].text;
+}
+
+bool
+import(Ptr<SectionSegment> & first, const Uint32 * src, Uint32 len){
+ /**
+ * Dummy data used when setting prev.m_nextSegment for first segment of a
+ * section
+ */
+ Uint32 dummyPrev[4];
+
+ first.p = 0;
+ if(g_sectionSegmentPool.seize(first)){
+ ;
+ } else {
+ return false;
+ }
+
+ first.p->m_sz = len;
+ first.p->m_ownerRef = 0;
+
+ Ptr<SectionSegment> prevPtr = { (SectionSegment *)&dummyPrev[0], 0 };
+ Ptr<SectionSegment> currPtr = first;
+
+ while(len > SectionSegment::DataLength){
+ prevPtr.p->m_nextSegment = currPtr.i;
+ memcpy(&currPtr.p->theData[0], src, 4 * SectionSegment::DataLength);
+ src += SectionSegment::DataLength;
+ len -= SectionSegment::DataLength;
+ prevPtr = currPtr;
+ if(g_sectionSegmentPool.seize(currPtr)){
+ ;
+ } else {
+ first.p->m_lastSegment = prevPtr.i;
+ return false;
+ }
+ }
+
+ first.p->m_lastSegment = currPtr.i;
+ currPtr.p->m_nextSegment = RNIL;
+ memcpy(&currPtr.p->theData[0], src, 4 * len);
+ return true;
+}
+
+void
+linkSegments(Uint32 head, Uint32 tail){
+
+ Ptr<SectionSegment> headPtr;
+ g_sectionSegmentPool.getPtr(headPtr, head);
+
+ Ptr<SectionSegment> tailPtr;
+ g_sectionSegmentPool.getPtr(tailPtr, tail);
+
+ Ptr<SectionSegment> oldTailPtr;
+ g_sectionSegmentPool.getPtr(oldTailPtr, headPtr.p->m_lastSegment);
+
+ headPtr.p->m_lastSegment = tailPtr.p->m_lastSegment;
+ headPtr.p->m_sz += tailPtr.p->m_sz;
+
+ oldTailPtr.p->m_nextSegment = tailPtr.i;
+}
+
+void
+copy(Uint32 * & insertPtr,
+ class SectionSegmentPool & thePool, const SegmentedSectionPtr & _ptr){
+
+ Uint32 len = _ptr.sz;
+ SectionSegment * ptrP = _ptr.p;
+
+ while(len > 60){
+ memcpy(insertPtr, &ptrP->theData[0], 4 * 60);
+ len -= 60;
+ insertPtr += 60;
+ ptrP = thePool.getPtr(ptrP->m_nextSegment);
+ }
+ memcpy(insertPtr, &ptrP->theData[0], 4 * len);
+ insertPtr += len;
+}
+
+void
+copy(Uint32 * dst, SegmentedSectionPtr src){
+ copy(dst, g_sectionSegmentPool, src);
+}
+
+void
+getSections(Uint32 secCount, SegmentedSectionPtr ptr[3]){
+ Uint32 tSec0 = ptr[0].i;
+ Uint32 tSec1 = ptr[1].i;
+ Uint32 tSec2 = ptr[2].i;
+ SectionSegment * p;
+ switch(secCount){
+ case 3:
+ p = g_sectionSegmentPool.getPtr(tSec2);
+ ptr[2].p = p;
+ ptr[2].sz = p->m_sz;
+ case 2:
+ p = g_sectionSegmentPool.getPtr(tSec1);
+ ptr[1].p = p;
+ ptr[1].sz = p->m_sz;
+ case 1:
+ p = g_sectionSegmentPool.getPtr(tSec0);
+ ptr[0].p = p;
+ ptr[0].sz = p->m_sz;
+ case 0:
+ return;
+ }
+ char msg[40];
+ sprintf(msg, "secCount=%d", secCount);
+ ErrorReporter::handleAssert(msg, __FILE__, __LINE__);
+}
+
+void
+getSection(SegmentedSectionPtr & ptr, Uint32 i){
+ ptr.i = i;
+ SectionSegment * p = g_sectionSegmentPool.getPtr(i);
+ ptr.p = p;
+ ptr.sz = p->m_sz;
+}
+
+#define relSz(x) ((x + SectionSegment::DataLength - 1) / SectionSegment::DataLength)
+
+void
+release(SegmentedSectionPtr & ptr){
+ g_sectionSegmentPool.releaseList(relSz(ptr.sz),
+ ptr.i,
+ ptr.p->m_lastSegment);
+}
+
+void
+releaseSections(Uint32 secCount, SegmentedSectionPtr ptr[3]){
+ Uint32 tSec0 = ptr[0].i;
+ Uint32 tSz0 = ptr[0].sz;
+ Uint32 tSec1 = ptr[1].i;
+ Uint32 tSz1 = ptr[1].sz;
+ Uint32 tSec2 = ptr[2].i;
+ Uint32 tSz2 = ptr[2].sz;
+ switch(secCount){
+ case 3:
+ g_sectionSegmentPool.releaseList(relSz(tSz2), tSec2,
+ ptr[2].p->m_lastSegment);
+ case 2:
+ g_sectionSegmentPool.releaseList(relSz(tSz1), tSec1,
+ ptr[1].p->m_lastSegment);
+ case 1:
+ g_sectionSegmentPool.releaseList(relSz(tSz0), tSec0,
+ ptr[0].p->m_lastSegment);
+ case 0:
+ return;
+ }
+ char msg[40];
+ sprintf(msg, "secCount=%d", secCount);
+ ErrorReporter::handleAssert(msg, __FILE__, __LINE__);
+}
- sleep(5);
+#include <DebuggerNames.hpp>
+
+void
+execute(void * callbackObj,
+ SignalHeader * const header,
+ Uint8 prio,
+ Uint32 * const theData,
+ LinearSectionPtr ptr[3]){
+
+ const Uint32 secCount = header->m_noOfSections;
+ const Uint32 length = header->theLength;
+
+#ifdef TRACE_DISTRIBUTED
+ ndbout_c("recv: %s(%d) from (%s, %d)",
+ getSignalName(header->theVerId_signalNumber),
+ header->theVerId_signalNumber,
+ getBlockName(refToBlock(header->theSendersBlockRef)),
+ refToNode(header->theSendersBlockRef));
#endif
-#ifdef NOCODE
- sizes();
-#else
- thread_repository rep;
- thr_main(&rep, 0);
+
+ bool ok = true;
+ Ptr<SectionSegment> secPtr[3];
+ switch(secCount){
+ case 3:
+ ok &= import(secPtr[2], ptr[2].p, ptr[2].sz);
+ case 2:
+ ok &= import(secPtr[1], ptr[1].p, ptr[1].sz);
+ case 1:
+ ok &= import(secPtr[0], ptr[0].p, ptr[0].sz);
+ }
- return 0;
+ /**
+ * Check that we haven't received a too long signal
+ */
+ ok &= (length + secCount <= 25);
+
+ Uint32 secPtrI[3];
+ if(ok){
+ /**
+ * Normal path
+ */
+ secPtrI[0] = secPtr[0].i;
+ secPtrI[1] = secPtr[1].i;
+ secPtrI[2] = secPtr[2].i;
+
+ globalScheduler.execute(header, prio, theData, secPtrI);
+ return;
+ }
+
+ /**
+ * Out of memory
+ */
+ for(Uint32 i = 0; i<secCount; i++){
+ if(secPtr[i].p != 0){
+ g_sectionSegmentPool.releaseList(relSz(ptr[i].sz), secPtr[i].i,
+ secPtr[i].p->m_lastSegment);
+ }
+ }
+ Uint32 gsn = header->theVerId_signalNumber;
+ Uint32 len = header->theLength;
+ Uint32 newLen= (len > 22 ? 22 : len);
+ SignalDroppedRep * rep = (SignalDroppedRep*)theData;
+ memmove(rep->originalData, theData, (4 * newLen));
+ rep->originalGsn = gsn;
+ rep->originalLength = len;
+ rep->originalSectionCount = secCount;
+ header->theVerId_signalNumber = GSN_SIGNAL_DROPPED_REP;
+ header->theLength = newLen + 3;
+ header->m_noOfSections = 0;
+ globalScheduler.execute(header, prio, theData, secPtrI);
+}
+
+NdbOut &
+operator<<(NdbOut& out, const SectionSegment & ss){
+ out << "[ last= " << ss.m_lastSegment << " next= " <<
ss.nextPool << " ]";
+ return out;
+}
+
+void
+print(SectionSegment * s, Uint32 len, FILE* out){
+ for(Uint32 i = 0; i<len; i++){
+ fprintf(out, "H\'0x%.8x ", s->theData[i]);
+ if(((i + 1) % 6) == 0)
+ fprintf(out, "\n");
+ }
+}
+
+void
+print(SegmentedSectionPtr ptr, FILE* out){
+
+ ptr.p = g_sectionSegmentPool.getPtr(ptr.i);
+ Uint32 len = ptr.p->m_sz;
+
+ fprintf(out, "ptr.i = %d(%p) ptr.sz = %d(%d)\n", ptr.i, ptr.p, len, ptr.sz);
+ while(len > SectionSegment::DataLength){
+ print(ptr.p, SectionSegment::DataLength, out);
+
+ len -= SectionSegment::DataLength;
+ fprintf(out, "ptr.i = %d\n", ptr.p->m_nextSegment);
+ ptr.p = g_sectionSegmentPool.getPtr(ptr.p->m_nextSegment);
+ }
+
+ print(ptr.p, len, out);
+ fprintf(out, "\n");
+}
+
+int
+checkJobBuffer() {
+ /**
+ * Check to see if jobbbuffers are starting to get full
+ * and if so call doJob
+ */
+ return 0; //
+ //return globalScheduler.checkDoJob();
+}
+
+void
+reportError(void * callbackObj, NodeId nodeId,
+ TransporterError errorCode, const char *info)
+{
+#ifdef DEBUG_TRANSPORTER
+ ndbout_c("reportError (%d, 0x%x) %s", nodeId, errorCode, info ? info : "")
#endif
+
+ DBUG_ENTER("reportError");
+ DBUG_PRINT("info",("nodeId %d errorCode: 0x%x info: %s",
+ nodeId, errorCode, info));
+
+ switch (errorCode)
+ {
+ case TE_SIGNAL_LOST_SEND_BUFFER_FULL:
+ {
+ char msg[64];
+ snprintf(msg, sizeof(msg), "Remote note id %d.%s%s", nodeId,
+ info ? " " : "", info ? info : "");
+ ErrorReporter::handleError(NDBD_EXIT_SIGNAL_LOST_SEND_BUFFER_FULL,
+ msg, __FILE__, NST_ErrorHandler);
+ }
+ case TE_SIGNAL_LOST:
+ {
+ char msg[64];
+ snprintf(msg, sizeof(msg), "Remote node id %d,%s%s", nodeId,
+ info ? " " : "", info ? info : "");
+ ErrorReporter::handleError(NDBD_EXIT_SIGNAL_LOST,
+ msg, __FILE__, NST_ErrorHandler);
+ }
+ case TE_SHM_IPC_PERMANENT:
+ {
+ char msg[128];
+ snprintf(msg, sizeof(msg),
+ "Remote node id %d.%s%s",
+ nodeId, info ? " " : "", info ? info : "");
+ ErrorReporter::handleError(NDBD_EXIT_CONNECTION_SETUP_FAILED,
+ msg, __FILE__, NST_ErrorHandler);
+ }
+ default:
+ break;
+ }
+
+ if(errorCode & TE_DO_DISCONNECT){
+ reportDisconnect(callbackObj, nodeId, errorCode);
+ }
+
+ SignalT<3> signalT;
+ Signal &signal= *(Signal*)&signalT;
+ memset(&signal.header, 0, sizeof(signal.header));
+
+
+ if(errorCode & TE_DO_DISCONNECT)
+ signal.theData[0] = NDB_LE_TransporterError;
+ else
+ signal.theData[0] = NDB_LE_TransporterWarning;
+
+ signal.theData[1] = nodeId;
+ signal.theData[2] = errorCode;
+
+ signal.header.theLength = 3;
+ signal.header.theSendersSignalId = 0;
+ signal.header.theSendersBlockRef = numberToRef(0, globalData.ownId);
+ globalScheduler.execute(&signal, JBA, CMVMI, GSN_EVENT_REP);
+
+ DBUG_VOID_RETURN;
+}
+
+/**
+ * Report average send length in bytes (4096 last sends)
+ */
+void
+reportSendLen(void * callbackObj,
+ NodeId nodeId, Uint32 count, Uint64 bytes){
+
+ SignalT<3> signalT;
+ Signal &signal= *(Signal*)&signalT;
+ memset(&signal.header, 0, sizeof(signal.header));
+
+ signal.header.theLength = 3;
+ signal.header.theSendersSignalId = 0;
+ signal.header.theSendersBlockRef = numberToRef(0, globalData.ownId);
+ signal.theData[0] = NDB_LE_SendBytesStatistic;
+ signal.theData[1] = nodeId;
+ signal.theData[2] = (bytes/count);
+ globalScheduler.execute(&signal, JBA, CMVMI, GSN_EVENT_REP);
+}
+
+/**
+ * Report average receive length in bytes (4096 last receives)
+ */
+void
+reportReceiveLen(void * callbackObj,
+ NodeId nodeId, Uint32 count, Uint64 bytes){
+
+ SignalT<3> signalT;
+ Signal &signal= *(Signal*)&signalT;
+ memset(&signal.header, 0, sizeof(signal.header));
+
+ signal.header.theLength = 3;
+ signal.header.theSendersSignalId = 0;
+ signal.header.theSendersBlockRef = numberToRef(0, globalData.ownId);
+ signal.theData[0] = NDB_LE_ReceiveBytesStatistic;
+ signal.theData[1] = nodeId;
+ signal.theData[2] = (bytes/count);
+ globalScheduler.execute(&signal, JBA, CMVMI, GSN_EVENT_REP);
+}
+
+/**
+ * Report connection established
+ */
+
+void
+reportConnect(void * callbackObj, NodeId nodeId){
+
+ SignalT<1> signalT;
+ Signal &signal= *(Signal*)&signalT;
+ memset(&signal.header, 0, sizeof(signal.header));
+
+ signal.header.theLength = 1;
+ signal.header.theSendersSignalId = 0;
+ signal.header.theSendersBlockRef = numberToRef(0, globalData.ownId);
+ signal.theData[0] = nodeId;
+
+ globalScheduler.execute(&signal, JBA, CMVMI, GSN_CONNECT_REP);
+}
+
+/**
+ * Report connection broken
+ */
+void
+reportDisconnect(void * callbackObj, NodeId nodeId, Uint32 errNo){
+
+ DBUG_ENTER("reportDisconnect");
+
+ SignalT<sizeof(DisconnectRep)/4> signalT;
+ Signal &signal= *(Signal*)&signalT;
+ memset(&signal.header, 0, sizeof(signal.header));
+
+ signal.header.theLength = DisconnectRep::SignalLength;
+ signal.header.theSendersSignalId = 0;
+ signal.header.theSendersBlockRef = numberToRef(0, globalData.ownId);
+ signal.header.theTrace = TestOrd::TraceDisconnect;
+
+ DisconnectRep * const rep = (DisconnectRep *)&signal.theData[0];
+ rep->nodeId = nodeId;
+ rep->err = errNo;
+
+ globalScheduler.execute(&signal, JBA, CMVMI, GSN_DISCONNECT_REP);
+
+ DBUG_VOID_RETURN;
+}
+
+void
+SignalLoggerManager::printSegmentedSection(FILE * output,
+ const SignalHeader & sh,
+ const SegmentedSectionPtr ptr[3],
+ unsigned i)
+{
+ fprintf(output, "SECTION %u type=segmented", i);
+ if (i >= 3) {
+ fprintf(output, " *** invalid ***\n");
+ return;
+ }
+ const Uint32 len = ptr[i].sz;
+ SectionSegment * ssp = ptr[i].p;
+ Uint32 pos = 0;
+ fprintf(output, " size=%u\n", (unsigned)len);
+ while (pos < len) {
+ if (pos > 0 && pos % SectionSegment::DataLength == 0) {
+ ssp = g_sectionSegmentPool.getPtr(ssp->m_nextSegment);
+ }
+ printDataWord(output, pos, ssp->theData[pos % SectionSegment::DataLength]);
+ }
+ if (len > 0)
+ putc('\n', output);
+}
+
+void
+transporter_recv_from(void * callbackObj, NodeId nodeId){
+ globalData.m_nodeInfo[nodeId].m_heartbeat_cnt= 0;
+ return;
+}
+
+/**
+ * Fast sched
+ */
+void
+FastScheduler::dumpSignalMemory(FILE* out)
+{
}
--- New file ---
+++ ndb/src/kernel/vm/mt/dummy_mt.cpp 06/11/09 15:26:49
#include "FastScheduler.hpp"
#include "RefConvert.hpp"
#include "Emulator.hpp"
#include "VMSignal.hpp"
#include <SignalLoggerManager.hpp>
#include <BlockNumbers.h>
#include <GlobalSignalNumbers.h>
#include <signaldata/EventReport.hpp>
#include "LongSignal.hpp"
#include <NdbTick.h>
FastScheduler::FastScheduler()
{
}
FastScheduler::~FastScheduler()
{
}
void
FastScheduler::clear()
{
}
void bnr_error()
{
}
void jbuf_error()
{
}
void
FastScheduler::prio_level_error()
{
}
APZJobBuffer::APZJobBuffer()
{
}
APZJobBuffer::~APZJobBuffer()
{
}
void
APZJobBuffer::insert(const SignalHeader * const sh,
const Uint32 * const theData, const Uint32 secPtrI[3]){
}
void
APZJobBuffer::signal2buffer(Signal* signal,
BlockNumber bnr, GlobalSignalNumber gsn,
BufferEntry& buf)
{
}
--- New file ---
+++ ndb/src/kernel/vm/mt/sb_mt.cpp 06/11/09 15:26:49
/* Copyright (C) 2003 MySQL AB
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; either version 2 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
#include <ndb_global.h>
#include "SimulatedBlock.hpp"
#include <NdbOut.hpp>
#include <GlobalData.hpp>
#include <Emulator.hpp>
#include <ErrorHandlingMacros.hpp>
#include <TimeQueue.hpp>
#include <TransporterRegistry.hpp>
#include <SignalLoggerManager.hpp>
#include <FastScheduler.hpp>
#include "ndbd_malloc.hpp"
#include <signaldata/EventReport.hpp>
#include <signaldata/ContinueFragmented.hpp>
#include <signaldata/NodeStateSignalData.hpp>
#include <signaldata/FsRef.hpp>
#include <signaldata/SignalDroppedRep.hpp>
#include <DebuggerNames.hpp>
#include "LongSignal.hpp"
#include <Properties.hpp>
#include "Configuration.hpp"
#define ljamEntry() jamEntryLine(30000 + __LINE__)
#define ljam() jamLine(30000 + __LINE__)
//
// Constructor, Destructor
//
SimulatedBlock::SimulatedBlock(BlockNumber blockNumber,
const class Configuration & conf)
: theNodeId(globalData.ownId),
theNumber(blockNumber),
theReference(numberToRef(blockNumber, globalData.ownId)),
theConfiguration(conf),
c_fragmentInfoHash(c_fragmentInfoPool),
c_linearFragmentSendList(c_fragmentSendPool),
c_segmentedFragmentSendList(c_fragmentSendPool),
c_mutexMgr(* this),
c_counterMgr(* this),
c_ptrMetaDataCommon(0)
{
NewVarRef = 0;
globalData.setBlock(blockNumber, this);
c_fragmentIdCounter = 1;
c_fragSenderRunning = false;
Properties tmp;
const Properties * p = &tmp;
ndbrequire(p != 0);
Uint32 count = 10;
char buf[255];
count = 10;
BaseString::snprintf(buf, 255, "%s.FragmentSendPool", getBlockName(blockNumber));
if(!p->get(buf, &count))
p->get("FragmentSendPool", &count);
c_fragmentSendPool.setSize(count);
count = 10;
BaseString::snprintf(buf, 255, "%s.FragmentInfoPool", getBlockName(blockNumber));
if(!p->get(buf, &count))
p->get("FragmentInfoPool", &count);
c_fragmentInfoPool.setSize(count);
count = 10;
BaseString::snprintf(buf, 255, "%s.FragmentInfoHash", getBlockName(blockNumber));
if(!p->get(buf, &count))
p->get("FragmentInfoHash", &count);
c_fragmentInfoHash.setSize(count);
count = 5;
BaseString::snprintf(buf, 255, "%s.ActiveMutexes", getBlockName(blockNumber));
if(!p->get(buf, &count))
p->get("ActiveMutexes", &count);
c_mutexMgr.setSize(count);
c_counterMgr.setSize(5);
#ifdef VM_TRACE_TIME
clearTimes();
#endif
for(GlobalSignalNumber i = 0; i<=MAX_GSN; i++)
theExecArray[i] = 0;
installSimulatedBlockFunctions();
UpgradeStartup::installEXEC(this);
CLEAR_ERROR_INSERT_VALUE;
#ifdef VM_TRACE
m_global_variables = new Ptr<void> * [1];
m_global_variables[0] = 0;
#endif
}
SimulatedBlock::~SimulatedBlock()
{
freeBat();
#ifdef VM_TRACE_TIME
printTimes(stdout);
#endif
#ifdef VM_TRACE
delete [] m_global_variables;
#endif
}
void
SimulatedBlock::installSimulatedBlockFunctions(){
ExecFunction * a = theExecArray;
a[GSN_NODE_STATE_REP] = &SimulatedBlock::execNODE_STATE_REP;
a[GSN_CHANGE_NODE_STATE_REQ] = &SimulatedBlock::execCHANGE_NODE_STATE_REQ;
a[GSN_NDB_TAMPER] = &SimulatedBlock::execNDB_TAMPER;
a[GSN_SIGNAL_DROPPED_REP] = &SimulatedBlock::execSIGNAL_DROPPED_REP;
a[GSN_CONTINUE_FRAGMENTED]= &SimulatedBlock::execCONTINUE_FRAGMENTED;
a[GSN_UTIL_CREATE_LOCK_REF] = &SimulatedBlock::execUTIL_CREATE_LOCK_REF;
a[GSN_UTIL_CREATE_LOCK_CONF] = &SimulatedBlock::execUTIL_CREATE_LOCK_CONF;
a[GSN_UTIL_DESTROY_LOCK_REF] = &SimulatedBlock::execUTIL_DESTORY_LOCK_REF;
a[GSN_UTIL_DESTROY_LOCK_CONF] = &SimulatedBlock::execUTIL_DESTORY_LOCK_CONF;
a[GSN_UTIL_LOCK_REF] = &SimulatedBlock::execUTIL_LOCK_REF;
a[GSN_UTIL_LOCK_CONF] = &SimulatedBlock::execUTIL_LOCK_CONF;
a[GSN_UTIL_UNLOCK_REF] = &SimulatedBlock::execUTIL_UNLOCK_REF;
a[GSN_UTIL_UNLOCK_CONF] = &SimulatedBlock::execUTIL_UNLOCK_CONF;
a[GSN_FSOPENREF] = &SimulatedBlock::execFSOPENREF;
a[GSN_FSCLOSEREF] = &SimulatedBlock::execFSCLOSEREF;
a[GSN_FSWRITEREF] = &SimulatedBlock::execFSWRITEREF;
a[GSN_FSREADREF] = &SimulatedBlock::execFSREADREF;
a[GSN_FSREMOVEREF] = &SimulatedBlock::execFSREMOVEREF;
a[GSN_FSSYNCREF] = &SimulatedBlock::execFSSYNCREF;
a[GSN_FSAPPENDREF] = &SimulatedBlock::execFSAPPENDREF;
a[GSN_NODE_START_REP] = &SimulatedBlock::execNODE_START_REP;
}
void
SimulatedBlock::addRecSignalImpl(GlobalSignalNumber gsn,
ExecFunction f, bool force){
if(gsn > MAX_GSN || (!force && theExecArray[gsn] != 0)){
char errorMsg[255];
BaseString::snprintf(errorMsg, 255,
"GSN %d(%d))", gsn, MAX_GSN);
ERROR_SET(fatal, NDBD_EXIT_ILLEGAL_SIGNAL, errorMsg, errorMsg);
}
theExecArray[gsn] = f;
}
void
SimulatedBlock::signal_error(Uint32 gsn, Uint32 len, Uint32 recBlockNo,
const char* filename, int lineno) const
{
char objRef[255];
BaseString::snprintf(objRef, 255, "%s:%d", filename, lineno);
char probData[255];
BaseString::snprintf(probData, 255,
"Signal (GSN: %d, Length: %d, Rec Block No: %d)",
gsn, len, recBlockNo);
ErrorReporter::handleError(NDBD_EXIT_BLOCK_BNR_ZERO,
probData,
objRef);
}
extern class SectionSegmentPool g_sectionSegmentPool;
void
SimulatedBlock::sendSignal(BlockReference ref,
GlobalSignalNumber gsn,
Signal* signal,
Uint32 length,
JobBufferLevel jobBuffer) const {
BlockNumber sendBnr = number();
BlockReference sendBRef = reference();
Uint32 noOfSections = signal->header.m_noOfSections;
Uint32 recBlock = refToBlock(ref);
Uint32 recNode = refToNode(ref);
Uint32 ourProcessor = globalData.ownId;
signal->header.theLength = length;
signal->header.theVerId_signalNumber = gsn;
signal->header.theReceiversBlockNumber = recBlock;
Uint32 tSignalId = signal->header.theSignalId;
if ((length == 0) || (length + noOfSections > 25) || (recBlock == 0)) {
signal_error(gsn, length, recBlock, __FILE__, __LINE__);
return;
}//if
#ifdef VM_TRACE
if(globalData.testOn){
Uint16 proc =
(recNode == 0 ? globalData.ownId : recNode);
signal->header.theSendersBlockRef = sendBRef;
globalSignalLoggers.sendSignal(signal->header,
jobBuffer,
&signal->theData[0],
proc,
signal->m_sectionPtr,
signal->header.m_noOfSections);
}
#endif
if(recNode == ourProcessor || recNode == 0) {
signal->header.theSendersSignalId = tSignalId;
signal->header.theSendersBlockRef = sendBRef;
signal->header.theLength = length;
globalScheduler.execute(signal, jobBuffer, recBlock,
gsn);
signal->header.m_noOfSections = 0;
signal->header.m_fragmentInfo = 0;
return;
} else {
// send distributed Signal
SignalHeader sh;
Uint32 tTrace = signal->getTrace();
sh.theVerId_signalNumber = gsn;
sh.theReceiversBlockNumber = recBlock;
sh.theSendersBlockRef = sendBnr;
sh.theLength = length;
sh.theTrace = tTrace;
sh.theSignalId = tSignalId;
sh.m_noOfSections = noOfSections;
sh.m_fragmentInfo = 0;
#ifdef TRACE_DISTRIBUTED
ndbout_c("send: %s(%d) to (%s, %d)",
getSignalName(gsn), gsn, getBlockName(recBlock),
recNode);
#endif
SendStatus ss = globalTransporterRegistry.prepareSend(&sh, jobBuffer,
&signal->theData[0],
recNode,
g_sectionSegmentPool,
signal->m_sectionPtr);
ndbrequire(ss == SEND_OK || ss == SEND_BLOCKED || ss == SEND_DISCONNECTED);
::releaseSections(noOfSections, signal->m_sectionPtr);
signal->header.m_noOfSections = 0;
}
return;
}
void
SimulatedBlock::sendSignal(NodeReceiverGroup rg,
GlobalSignalNumber gsn,
Signal* signal,
Uint32 length,
JobBufferLevel jobBuffer) const {
Uint32 noOfSections = signal->header.m_noOfSections;
Uint32 tSignalId = signal->header.theSignalId;
Uint32 tTrace = signal->getTrace();
Uint32 tFragInf = signal->header.m_fragmentInfo;
Uint32 ourProcessor = globalData.ownId;
Uint32 recBlock = rg.m_block;
signal->header.theLength = length;
signal->header.theVerId_signalNumber = gsn;
signal->header.theReceiversBlockNumber = recBlock;
signal->header.theSendersSignalId = tSignalId;
signal->header.theSendersBlockRef = reference();
if ((length == 0) || (length + noOfSections > 25) || (recBlock == 0)) {
signal_error(gsn, length, recBlock, __FILE__, __LINE__);
return;
}//if
SignalHeader sh;
sh.theVerId_signalNumber = gsn;
sh.theReceiversBlockNumber = recBlock;
sh.theSendersBlockRef = number();
sh.theLength = length;
sh.theTrace = tTrace;
sh.theSignalId = tSignalId;
sh.m_noOfSections = noOfSections;
sh.m_fragmentInfo = tFragInf;
/**
* Check own node
*/
bool release = true;
if(rg.m_nodes.get(0) || rg.m_nodes.get(ourProcessor)){
#ifdef VM_TRACE
if(globalData.testOn){
globalSignalLoggers.sendSignal(signal->header,
jobBuffer,
&signal->theData[0],
ourProcessor,
signal->m_sectionPtr,
signal->header.m_noOfSections);
}
#endif
globalScheduler.execute(signal, jobBuffer, recBlock, gsn);
rg.m_nodes.clear((Uint32)0);
rg.m_nodes.clear(ourProcessor);
release = false;
}
/**
* Do the big loop
*/
Uint32 recNode = 0;
while(!rg.m_nodes.isclear()){
recNode = rg.m_nodes.find(recNode + 1);
rg.m_nodes.clear(recNode);
#ifdef VM_TRACE
if(globalData.testOn){
globalSignalLoggers.sendSignal(signal->header,
jobBuffer,
&signal->theData[0],
recNode,
signal->m_sectionPtr,
signal->header.m_noOfSections);
}
#endif
#ifdef TRACE_DISTRIBUTED
ndbout_c("send: %s(%d) to (%s, %d)",
getSignalName(gsn), gsn, getBlockName(recBlock),
recNode);
#endif
SendStatus ss = globalTransporterRegistry.prepareSend(&sh, jobBuffer,
&signal->theData[0],
recNode,
g_sectionSegmentPool,
signal->m_sectionPtr);
ndbrequire(ss == SEND_OK || ss == SEND_BLOCKED || ss == SEND_DISCONNECTED);
}
if(release){
::releaseSections(noOfSections, signal->m_sectionPtr);
}
signal->header.m_noOfSections = 0;
signal->header.m_fragmentInfo = 0;
return;
}
bool import(Ptr<SectionSegment> & first, const Uint32 * src, Uint32 len);
void
SimulatedBlock::sendSignal(BlockReference ref,
GlobalSignalNumber gsn,
Signal* signal,
Uint32 length,
JobBufferLevel jobBuffer,
LinearSectionPtr ptr[3],
Uint32 noOfSections) const {
BlockNumber sendBnr = number();
BlockReference sendBRef = reference();
Uint32 recBlock = refToBlock(ref);
Uint32 recNode = refToNode(ref);
Uint32 ourProcessor = globalData.ownId;
::releaseSections(signal->header.m_noOfSections, signal->m_sectionPtr);
signal->header.theLength = length;
signal->header.theVerId_signalNumber = gsn;
signal->header.theReceiversBlockNumber = recBlock;
signal->header.m_noOfSections = noOfSections;
Uint32 tSignalId = signal->header.theSignalId;
Uint32 tFragInfo = signal->header.m_fragmentInfo;
if ((length == 0) || (length + noOfSections > 25) || (recBlock == 0)) {
signal_error(gsn, length, recBlock, __FILE__, __LINE__);
return;
}//if
#ifdef VM_TRACE
if(globalData.testOn){
Uint16 proc =
(recNode == 0 ? globalData.ownId : recNode);
signal->header.theSendersBlockRef = sendBRef;
globalSignalLoggers.sendSignal(signal->header,
jobBuffer,
&signal->theData[0],
proc,
ptr, noOfSections);
}
#endif
if(recNode == ourProcessor || recNode == 0) {
signal->header.theSendersSignalId = tSignalId;
signal->header.theSendersBlockRef = sendBRef;
/**
* We have to copy the data
*/
Ptr<SectionSegment> segptr[3];
for(Uint32 i = 0; i<noOfSections; i++){
ndbrequire(import(segptr[i], ptr[i].p, ptr[i].sz));
signal->m_sectionPtr[i].i = segptr[i].i;
}
globalScheduler.execute(signal, jobBuffer, recBlock,
gsn);
signal->header.m_noOfSections = 0;
return;
} else {
// send distributed Signal
SignalHeader sh;
Uint32 tTrace = signal->getTrace();
Uint32 noOfSections = signal->header.m_noOfSections;
sh.theVerId_signalNumber = gsn;
sh.theReceiversBlockNumber = recBlock;
sh.theSendersBlockRef = sendBnr;
sh.theLength = length;
sh.theTrace = tTrace;
sh.theSignalId = tSignalId;
sh.m_noOfSections = noOfSections;
sh.m_fragmentInfo = tFragInfo;
#ifdef TRACE_DISTRIBUTED
ndbout_c("send: %s(%d) to (%s, %d)",
getSignalName(gsn), gsn, getBlockName(recBlock),
recNode);
#endif
SendStatus ss = globalTransporterRegistry.prepareSend(&sh, jobBuffer,
&signal->theData[0],
recNode,
ptr);
ndbrequire(ss == SEND_OK || ss == SEND_BLOCKED || ss == SEND_DISCONNECTED);
}
signal->header.m_noOfSections = 0;
signal->header.m_fragmentInfo = 0;
return;
}
void
SimulatedBlock::sendSignal(NodeReceiverGroup rg,
GlobalSignalNumber gsn,
Signal* signal,
Uint32 length,
JobBufferLevel jobBuffer,
LinearSectionPtr ptr[3],
Uint32 noOfSections) const {
Uint32 tSignalId = signal->header.theSignalId;
Uint32 tTrace = signal->getTrace();
Uint32 tFragInfo = signal->header.m_fragmentInfo;
Uint32 ourProcessor = globalData.ownId;
Uint32 recBlock = rg.m_block;
::releaseSections(signal->header.m_noOfSections, signal->m_sectionPtr);
signal->header.theLength = length;
signal->header.theVerId_signalNumber = gsn;
signal->header.theReceiversBlockNumber = recBlock;
signal->header.theSendersSignalId = tSignalId;
signal->header.theSendersBlockRef = reference();
signal->header.m_noOfSections = noOfSections;
if ((length == 0) || (length + noOfSections > 25) || (recBlock == 0)) {
signal_error(gsn, length, recBlock, __FILE__, __LINE__);
return;
}//if
SignalHeader sh;
sh.theVerId_signalNumber = gsn;
sh.theReceiversBlockNumber = recBlock;
sh.theSendersBlockRef = number();
sh.theLength = length;
sh.theTrace = tTrace;
sh.theSignalId = tSignalId;
sh.m_noOfSections = noOfSections;
sh.m_fragmentInfo = tFragInfo;
/**
* Check own node
*/
if(rg.m_nodes.get(0) || rg.m_nodes.get(ourProcessor)){
#ifdef VM_TRACE
if(globalData.testOn){
globalSignalLoggers.sendSignal(signal->header,
jobBuffer,
&signal->theData[0],
ourProcessor,
ptr, noOfSections);
}
#endif
/**
* We have to copy the data
*/
Ptr<SectionSegment> segptr[3];
for(Uint32 i = 0; i<noOfSections; i++){
ndbrequire(import(segptr[i], ptr[i].p, ptr[i].sz));
signal->m_sectionPtr[i].i = segptr[i].i;
}
globalScheduler.execute(signal, jobBuffer, recBlock, gsn);
rg.m_nodes.clear((Uint32)0);
rg.m_nodes.clear(ourProcessor);
}
/**
* Do the big loop
*/
Uint32 recNode = 0;
while(!rg.m_nodes.isclear()){
recNode = rg.m_nodes.find(recNode + 1);
rg.m_nodes.clear(recNode);
#ifdef VM_TRACE
if(globalData.testOn){
globalSignalLoggers.sendSignal(signal->header,
jobBuffer,
&signal->theData[0],
recNode,
ptr, noOfSections);
}
#endif
#ifdef TRACE_DISTRIBUTED
ndbout_c("send: %s(%d) to (%s, %d)",
getSignalName(gsn), gsn, getBlockName(recBlock),
recNode);
#endif
SendStatus ss = globalTransporterRegistry.prepareSend(&sh, jobBuffer,
&signal->theData[0],
recNode,
ptr);
ndbrequire(ss == SEND_OK || ss == SEND_BLOCKED || ss == SEND_DISCONNECTED);
}
signal->header.m_noOfSections = 0;
signal->header.m_fragmentInfo = 0;
return;
}
void
SimulatedBlock::sendSignalWithDelay(BlockReference ref,
GlobalSignalNumber gsn,
Signal* signal,
Uint32 delayInMilliSeconds,
Uint32 length) const {
BlockNumber bnr = refToBlock(ref);
//BlockNumber sendBnr = number();
BlockReference sendBRef = reference();
if (bnr == 0) {
bnr_error();
}//if
signal->header.theLength = length;
signal->header.theSendersSignalId = signal->header.theSignalId;
signal->header.theSendersBlockRef = sendBRef;
signal->header.theVerId_signalNumber = gsn;
signal->header.theReceiversBlockNumber = bnr;
#ifdef VM_TRACE
{
if(globalData.testOn){
globalSignalLoggers.sendSignalWithDelay(delayInMilliSeconds,
signal->header,
0,
&signal->theData[0],
globalData.ownId,
signal->m_sectionPtr,
signal->header.m_noOfSections);
}
}
#endif
globalTimeQueue.insert(signal, bnr, gsn, delayInMilliSeconds);
signal->header.m_noOfSections = 0;
signal->header.m_fragmentInfo = 0;
// befor 2nd parameter to globalTimeQueue.insert
// (Priority)theSendSig[sigIndex].jobBuffer
}
void
SimulatedBlock::releaseSections(Signal* signal){
::releaseSections(signal->header.m_noOfSections, signal->m_sectionPtr);
signal->header.m_noOfSections = 0;
}
class SectionSegmentPool&
SimulatedBlock::getSectionSegmentPool(){
return g_sectionSegmentPool;
}
NewVARIABLE *
SimulatedBlock::allocateBat(int batSize){
NewVARIABLE* bat = NewVarRef;
bat = (NewVARIABLE*)realloc(bat, batSize * sizeof(NewVARIABLE));
NewVarRef = bat;
theBATSize = batSize;
return bat;
}
void
SimulatedBlock::freeBat(){
if(NewVarRef != 0){
free(NewVarRef);
NewVarRef = 0;
}
}
const NewVARIABLE *
SimulatedBlock::getBat(Uint16 blockNo){
SimulatedBlock * sb = globalData.getBlock(blockNo);
if(sb == 0)
return 0;
return sb->NewVarRef;
}
Uint16
SimulatedBlock::getBatSize(Uint16 blockNo){
SimulatedBlock * sb = globalData.getBlock(blockNo);
if(sb == 0)
return 0;
return sb->theBATSize;
}
void*
SimulatedBlock::allocRecord(const char * type, size_t s, size_t n, bool clear)
{
void * p = NULL;
size_t size = n*s;
refresh_watch_dog();
if (size > 0){
#ifdef VM_TRACE_MEM
ndbout_c("%s::allocRecord(%s, %u, %u) = %u bytes",
getBlockName(number()),
type,
s,
n,
size);
#endif
p = ndbd_malloc(size);
if (p == NULL){
char buf1[255];
char buf2[255];
BaseString::snprintf(buf1, sizeof(buf1), "%s could not allocate memory for %s",
getBlockName(number()), type);
BaseString::snprintf(buf2, sizeof(buf2), "Requested: %ux%u = %u bytes",
(Uint32)s, (Uint32)n, (Uint32)size);
ERROR_SET(fatal, NDBD_EXIT_MEMALLOC, buf1, buf2);
}
if(clear){
char * ptr = (char*)p;
const Uint32 chunk = 128 * 1024;
while(size > chunk){
refresh_watch_dog();
memset(ptr, 0, chunk);
ptr += chunk;
size -= chunk;
}
refresh_watch_dog();
memset(ptr, 0, size);
}
}
return p;
}
void
SimulatedBlock::deallocRecord(void ** ptr,
const char * type, size_t s, size_t n){
(void)type;
if(* ptr != 0){
ndbd_free(* ptr, n*s);
* ptr = 0;
}
}
void
SimulatedBlock::refresh_watch_dog()
{
globalData.incrementWatchDogCounter(1);
}
void
SimulatedBlock::progError(int line, int err_code, const char* extra) const {
jamLine(line);
const char *aBlockName = getBlockName(number(), "VM Kernel");
// Pack status of interesting config variables
// so that we can print them in error.log
int magicStatus =
(theConfiguration.stopOnError()<<1) +
(theConfiguration.getInitialStart()<<2) +
(theConfiguration.getDaemonMode()<<3);
/* Add line number to block name */
char buf[100];
BaseString::snprintf(&buf[0], 100, "%s (Line: %d) 0x%.8x",
aBlockName, line, magicStatus);
ErrorReporter::handleError(err_code, extra, buf);
}
void
SimulatedBlock::infoEvent(const char * msg, ...) const {
if(msg == 0)
return;
Uint32 theData[25];
theData[0] = NDB_LE_InfoEvent;
char * buf = (char *)&(theData[1]);
va_list ap;
va_start(ap, msg);
BaseString::vsnprintf(buf, 96, msg, ap); // 96 = 100 - 4
va_end(ap);
int len = strlen(buf) + 1;
if(len > 96){
len = 96;
buf[95] = 0;
}
/**
* Init and put it into the job buffer
*/
SignalHeader sh;
memset(&sh, 0, sizeof(SignalHeader));
const Signal * signal = globalScheduler.getVMSignals();
Uint32 tTrace = signal->header.theTrace;
Uint32 tSignalId = signal->header.theSignalId;
sh.theVerId_signalNumber = GSN_EVENT_REP;
sh.theReceiversBlockNumber = CMVMI;
sh.theSendersBlockRef = reference();
sh.theTrace = tTrace;
sh.theSignalId = tSignalId;
sh.theLength = ((len+3)/4)+1;
Uint32 secPtrI[3]; // Dummy
globalScheduler.execute(&sh, JBB, theData, secPtrI);
}
void
SimulatedBlock::warningEvent(const char * msg, ...) const {
if(msg == 0)
return;
Uint32 theData[25];
theData[0] = NDB_LE_WarningEvent;
char * buf = (char *)&(theData[1]);
va_list ap;
va_start(ap, msg);
BaseString::vsnprintf(buf, 96, msg, ap); // 96 = 100 - 4
va_end(ap);
int len = strlen(buf) + 1;
if(len > 96){
len = 96;
buf[95] = 0;
}
/**
* Init and put it into the job buffer
*/
SignalHeader sh;
memset(&sh, 0, sizeof(SignalHeader));
const Signal * signal = globalScheduler.getVMSignals();
Uint32 tTrace = signal->header.theTrace;
Uint32 tSignalId = signal->header.theSignalId;
sh.theVerId_signalNumber = GSN_EVENT_REP;
sh.theReceiversBlockNumber = CMVMI;
sh.theSendersBlockRef = reference();
sh.theTrace = tTrace;
sh.theSignalId = tSignalId;
sh.theLength = ((len+3)/4)+1;
Uint32 secPtrI[3]; // Dummy
globalScheduler.execute(&sh, JBB, theData, secPtrI);
}
void
SimulatedBlock::execNODE_STATE_REP(Signal* signal){
const NodeStateRep * const rep = (NodeStateRep *)&signal->theData[0];
this->theNodeState = rep->nodeState;
}
void
SimulatedBlock::execCHANGE_NODE_STATE_REQ(Signal* signal){
const ChangeNodeStateReq * const req =
(ChangeNodeStateReq *)&signal->theData[0];
this->theNodeState = req->nodeState;
const Uint32 senderData = req->senderData;
const BlockReference senderRef = req->senderRef;
/**
* Pack return signal
*/
ChangeNodeStateConf * const conf =
(ChangeNodeStateConf *)&signal->theData[0];
conf->senderData = senderData;
sendSignal(senderRef, GSN_CHANGE_NODE_STATE_CONF, signal,
ChangeNodeStateConf::SignalLength, JBB);
}
void
SimulatedBlock::execNDB_TAMPER(Signal * signal){
SET_ERROR_INSERT_VALUE(signal->theData[0]);
}
void
SimulatedBlock::execSIGNAL_DROPPED_REP(Signal * signal){
char msg[64];
const SignalDroppedRep * const rep = (SignalDroppedRep *)&signal->theData[0];
snprintf(msg, sizeof(msg), "%s GSN: %u (%u,%u)", getBlockName(number()),
rep->originalGsn, rep->originalLength,rep->originalSectionCount);
ErrorReporter::handleError(NDBD_EXIT_OUT_OF_LONG_SIGNAL_MEMORY,
msg,
__FILE__,
NST_ErrorHandler);
}
void
SimulatedBlock::execCONTINUE_FRAGMENTED(Signal * signal){
ljamEntry();
Ptr<FragmentSendInfo> fragPtr;
c_segmentedFragmentSendList.first(fragPtr);
for(; !fragPtr.isNull();){
ljam();
Ptr<FragmentSendInfo> copyPtr = fragPtr;
c_segmentedFragmentSendList.next(fragPtr);
sendNextSegmentedFragment(signal, * copyPtr.p);
if(copyPtr.p->m_status == FragmentSendInfo::SendComplete){
ljam();
if(copyPtr.p->m_callback.m_callbackFunction != 0) {
ljam();
execute(signal, copyPtr.p->m_callback, 0);
}//if
c_segmentedFragmentSendList.release(copyPtr);
}
}
c_linearFragmentSendList.first(fragPtr);
for(; !fragPtr.isNull();){
ljam();
Ptr<FragmentSendInfo> copyPtr = fragPtr;
c_linearFragmentSendList.next(fragPtr);
sendNextLinearFragment(signal, * copyPtr.p);
if(copyPtr.p->m_status == FragmentSendInfo::SendComplete){
ljam();
if(copyPtr.p->m_callback.m_callbackFunction != 0) {
ljam();
execute(signal, copyPtr.p->m_callback, 0);
}//if
c_linearFragmentSendList.release(copyPtr);
}
}
if(c_segmentedFragmentSendList.isEmpty() &&
c_linearFragmentSendList.isEmpty()){
ljam();
c_fragSenderRunning = false;
return;
}
ContinueFragmented * sig = (ContinueFragmented*)signal->getDataPtrSend();
sig->line = __LINE__;
sendSignal(reference(), GSN_CONTINUE_FRAGMENTED, signal, 1, JBB);
}
void
SimulatedBlock::execNODE_START_REP(Signal* signal)
{
// common stuff for all blocks
// block specific stuff by virtual method override (default empty)
exec_node_start_rep(signal);
}
void
SimulatedBlock::exec_node_start_rep(Signal* signal)
{
}
#ifdef VM_TRACE_TIME
void
SimulatedBlock::clearTimes() {
for(Uint32 i = 0; i <= MAX_GSN; i++){
m_timeTrace[i].cnt = 0;
m_timeTrace[i].sum = 0;
m_timeTrace[i].sub = 0;
}
}
void
SimulatedBlock::printTimes(FILE * output){
fprintf(output, "-- %s --\n", getBlockName(number()));
Uint64 sum = 0;
for(Uint32 i = 0; i <= MAX_GSN; i++){
Uint32 n = m_timeTrace[i].cnt;
if(n != 0){
double dn = n;
double avg = m_timeTrace[i].sum;
double avg2 = avg - m_timeTrace[i].sub;
avg /= dn;
avg2 /= dn;
fprintf(output,
//name ; cnt ; loc ; acc
"%s ; #%d ; %dus ; %dus ; %dms\n",
getSignalName(i), n, (Uint32)avg, (Uint32)avg2,
(Uint32)((m_timeTrace[i].sum - m_timeTrace[i].sub + 500)/ 1000));
sum += (m_timeTrace[i].sum - m_timeTrace[i].sub);
}
}
sum = (sum + 500)/ 1000;
fprintf(output, "-- %s : %d --\n", getBlockName(number()), sum);
fprintf(output, "\n");
fflush(output);
}
#endif
void release(SegmentedSectionPtr & ptr);
SimulatedBlock::FragmentInfo::FragmentInfo(Uint32 fragId, Uint32 sender){
m_fragmentId = fragId;
m_senderRef = sender;
m_sectionPtrI[0] = RNIL;
m_sectionPtrI[1] = RNIL;
m_sectionPtrI[2] = RNIL;
}
SimulatedBlock::FragmentSendInfo::FragmentSendInfo()
{
}
bool
SimulatedBlock::assembleFragments(Signal * signal){
Uint32 sigLen = signal->length() - 1;
Uint32 fragId = signal->theData[sigLen];
Uint32 fragInfo = signal->header.m_fragmentInfo;
Uint32 senderRef = signal->getSendersBlockRef();
if(fragInfo == 0){
return true;
}
const Uint32 secs = signal->header.m_noOfSections;
const Uint32 * const secNos = &signal->theData[sigLen - secs];
if(fragInfo == 1){
/**
* First in train
*/
Ptr<FragmentInfo> fragPtr;
if(!c_fragmentInfoHash.seize(fragPtr)){
ndbrequire(false);
return false;
}
new (fragPtr.p)FragmentInfo(fragId, senderRef);
c_fragmentInfoHash.add(fragPtr);
for(Uint32 i = 0; i<secs; i++){
Uint32 sectionNo = secNos[i];
ndbassert(sectionNo < 3);
fragPtr.p->m_sectionPtrI[sectionNo] = signal->m_sectionPtr[i].i;
}
/**
* Don't release allocated segments
*/
signal->header.m_noOfSections = 0;
return false;
}
FragmentInfo key(fragId, senderRef);
Ptr<FragmentInfo> fragPtr;
if(c_fragmentInfoHash.find(fragPtr, key)){
/**
* FragInfo == 2 or 3
*/
Uint32 i;
for(i = 0; i<secs; i++){
Uint32 sectionNo = secNos[i];
ndbassert(sectionNo < 3);
Uint32 sectionPtrI = signal->m_sectionPtr[i].i;
if(fragPtr.p->m_sectionPtrI[sectionNo] != RNIL){
linkSegments(fragPtr.p->m_sectionPtrI[sectionNo], sectionPtrI);
} else {
fragPtr.p->m_sectionPtrI[sectionNo] = sectionPtrI;
}
}
/**
* fragInfo = 2
*/
if(fragInfo == 2){
signal->header.m_noOfSections = 0;
return false;
}
/**
* fragInfo = 3
*/
for(i = 0; i<3; i++){
Uint32 ptrI = fragPtr.p->m_sectionPtrI[i];
if(ptrI != RNIL){
signal->m_sectionPtr[i].i = ptrI;
} else {
break;
}
}
signal->setLength(sigLen - i);
signal->header.m_noOfSections = i;
signal->header.m_fragmentInfo = 0;
getSections(i, signal->m_sectionPtr);
c_fragmentInfoHash.release(fragPtr);
return true;
}
/**
* Unable to find fragment
*/
ndbrequire(false);
return false;
}
bool
SimulatedBlock::sendFirstFragment(FragmentSendInfo & info,
NodeReceiverGroup rg,
GlobalSignalNumber gsn,
Signal* signal,
Uint32 length,
JobBufferLevel jbuf,
Uint32 messageSize){
info.m_sectionPtr[0].m_segmented.i = RNIL;
info.m_sectionPtr[1].m_segmented.i = RNIL;
info.m_sectionPtr[2].m_segmented.i = RNIL;
Uint32 totalSize = 0;
SectionSegment * p;
switch(signal->header.m_noOfSections){
case 3:
p = signal->m_sectionPtr[2].p;
info.m_sectionPtr[2].m_segmented.p = p;
info.m_sectionPtr[2].m_segmented.i = signal->m_sectionPtr[2].i;
totalSize += p->m_sz;
case 2:
p = signal->m_sectionPtr[1].p;
info.m_sectionPtr[1].m_segmented.p = p;
info.m_sectionPtr[1].m_segmented.i = signal->m_sectionPtr[1].i;
totalSize += p->m_sz;
case 1:
p = signal->m_sectionPtr[0].p;
info.m_sectionPtr[0].m_segmented.p = p;
info.m_sectionPtr[0].m_segmented.i = signal->m_sectionPtr[0].i;
totalSize += p->m_sz;
}
if(totalSize <= messageSize + SectionSegment::DataLength){
/**
* Send signal directly
*/
sendSignal(rg, gsn, signal, length, jbuf);
info.m_status = FragmentSendInfo::SendComplete;
return true;
}
/**
* Consume sections
*/
signal->header.m_noOfSections = 0;
/**
* Setup info object
*/
info.m_status = FragmentSendInfo::SendNotComplete;
info.m_prio = (Uint8)jbuf;
info.m_gsn = gsn;
info.m_fragInfo = 1;
info.m_messageSize = messageSize;
info.m_fragmentId = c_fragmentIdCounter++;
info.m_nodeReceiverGroup = rg;
info.m_callback.m_callbackFunction = 0;
Ptr<SectionSegment> tmp;
if(!import(tmp, &signal->theData[0], length)){
ndbrequire(false);
return false;
}
info.m_theDataSection.p = &tmp.p->theData[0];
info.m_theDataSection.sz = length;
tmp.p->theData[length] = tmp.i;
sendNextSegmentedFragment(signal, info);
if(c_fragmentIdCounter == 0){
/**
* Fragment id 0 is invalid
*/
c_fragmentIdCounter = 1;
}
return true;
}
#if 0
#define lsout(x) x
#else
#define lsout(x)
#endif
void
SimulatedBlock::sendNextSegmentedFragment(Signal* signal,
FragmentSendInfo & info){
/**
* Store "theData"
*/
const Uint32 sigLen = info.m_theDataSection.sz;
memcpy(&signal->theData[0], info.m_theDataSection.p, 4 * sigLen);
Uint32 sz = 0;
Uint32 maxSz = info.m_messageSize;
Int32 secNo = 2;
Uint32 secCount = 0;
Uint32 * secNos = &signal->theData[sigLen];
enum { Unknown = 0, Full = 1 } loop = Unknown;
for(; secNo >= 0 && secCount < 3; secNo--){
Uint32 ptrI = info.m_sectionPtr[secNo].m_segmented.i;
if(ptrI == RNIL)
continue;
info.m_sectionPtr[secNo].m_segmented.i = RNIL;
SectionSegment * ptrP = info.m_sectionPtr[secNo].m_segmented.p;
const Uint32 size = ptrP->m_sz;
signal->m_sectionPtr[secCount].i = ptrI;
signal->m_sectionPtr[secCount].p = ptrP;
signal->m_sectionPtr[secCount].sz = size;
secNos[secCount] = secNo;
secCount++;
const Uint32 sizeLeft = maxSz - sz;
if(size <= sizeLeft){
/**
* The section fits
*/
sz += size;
lsout(ndbout_c("section %d saved as %d", secNo, secCount-1));
continue;
}
const Uint32 overflow = size - sizeLeft; // > 0
if(overflow <= SectionSegment::DataLength){
/**
* Only one segment left to send
* send even if sizeLeft <= size
*/
lsout(ndbout_c("section %d saved as %d but full over: %d",
secNo, secCount-1, overflow));
secNo--;
break;
}
// size >= 61
if(sizeLeft < SectionSegment::DataLength){
/**
* Less than one segment left (space)
* dont bother sending
*/
secCount--;
info.m_sectionPtr[secNo].m_segmented.i = ptrI;
loop = Full;
lsout(ndbout_c("section %d not saved", secNo));
break;
}
/**
* Split list
* 1) Find place to split
* 2) Rewrite header (the part that will be sent)
* 3) Write new header (for remaining part)
* 4) Store new header on FragmentSendInfo - record
*/
// size >= 61 && sizeLeft >= 60
Uint32 sum = SectionSegment::DataLength;
Uint32 prevPtrI = ptrI;
ptrI = ptrP->m_nextSegment;
const Uint32 fill = sizeLeft - SectionSegment::DataLength;
while(sum < fill){
prevPtrI = ptrI;
ptrP = g_sectionSegmentPool.getPtr(ptrI);
ptrI = ptrP->m_nextSegment;
sum += SectionSegment::DataLength;
}
/**
* Rewrite header w.r.t size and last
*/
Uint32 prev = secCount - 1;
const Uint32 last = signal->m_sectionPtr[prev].p->m_lastSegment;
signal->m_sectionPtr[prev].p->m_lastSegment = prevPtrI;
signal->m_sectionPtr[prev].p->m_sz = sum;
signal->m_sectionPtr[prev].sz = sum;
/**
* Write "new" list header
*/
ptrP = g_sectionSegmentPool.getPtr(ptrI);
ptrP->m_lastSegment = last;
ptrP->m_sz = size - sum;
/**
* And store it on info-record
*/
info.m_sectionPtr[secNo].m_segmented.i = ptrI;
info.m_sectionPtr[secNo].m_segmented.p = ptrP;
loop = Full;
lsout(ndbout_c("section %d split into %d", secNo, prev));
break;
}
lsout(ndbout_c("loop: %d secNo: %d secCount: %d sz: %d",
loop, secNo, secCount, sz));
/**
* Store fragment id
*/
secNos[secCount] = info.m_fragmentId;
Uint32 fragInfo = info.m_fragInfo;
info.m_fragInfo = 2;
switch(loop){
case Unknown:
if(secNo >= 0){
lsout(ndbout_c("Unknown - Full"));
/**
* Not finished
*/
break;
}
// Fall through
lsout(ndbout_c("Unknown - Done"));
info.m_status = FragmentSendInfo::SendComplete;
ndbassert(fragInfo == 2);
fragInfo = 3;
case Full:
break;
}
signal->header.m_fragmentInfo = fragInfo;
signal->header.m_noOfSections = secCount;
sendSignal(info.m_nodeReceiverGroup,
info.m_gsn,
signal,
sigLen + secCount + 1,
(JobBufferLevel)info.m_prio);
if(fragInfo == 3){
/**
* This is the last signal
*/
g_sectionSegmentPool.release(info.m_theDataSection.p[sigLen]);
}
}
bool
SimulatedBlock::sendFirstFragment(FragmentSendInfo & info,
NodeReceiverGroup rg,
GlobalSignalNumber gsn,
Signal* signal,
Uint32 length,
JobBufferLevel jbuf,
LinearSectionPtr ptr[3],
Uint32 noOfSections,
Uint32 messageSize){
::releaseSections(signal->header.m_noOfSections, signal->m_sectionPtr);
signal->header.m_noOfSections = 0;
info.m_sectionPtr[0].m_linear.p = NULL;
info.m_sectionPtr[1].m_linear.p = NULL;
info.m_sectionPtr[2].m_linear.p = NULL;
Uint32 totalSize = 0;
switch(noOfSections){
case 3:
info.m_sectionPtr[2].m_linear = ptr[2];
totalSize += ptr[2].sz;
case 2:
info.m_sectionPtr[1].m_linear = ptr[1];
totalSize += ptr[1].sz;
case 1:
info.m_sectionPtr[0].m_linear = ptr[0];
totalSize += ptr[0].sz;
}
if(totalSize <= messageSize + SectionSegment::DataLength){
/**
* Send signal directly
*/
sendSignal(rg, gsn, signal, length, jbuf, ptr, noOfSections);
info.m_status = FragmentSendInfo::SendComplete;
/**
* Indicate to sendLinearSignalFragment
* that we'r already done
*/
return true;
}
/**
* Setup info object
*/
info.m_status = FragmentSendInfo::SendNotComplete;
info.m_prio = (Uint8)jbuf;
info.m_gsn = gsn;
info.m_messageSize = messageSize;
info.m_fragInfo = 1;
info.m_fragmentId = c_fragmentIdCounter++;
info.m_nodeReceiverGroup = rg;
info.m_callback.m_callbackFunction = 0;
Ptr<SectionSegment> tmp;
if(!import(tmp, &signal->theData[0], length)){
ndbrequire(false);
return false;
}
info.m_theDataSection.p = &tmp.p->theData[0];
info.m_theDataSection.sz = length;
tmp.p->theData[length] = tmp.i;
sendNextLinearFragment(signal, info);
if(c_fragmentIdCounter == 0){
/**
* Fragment id 0 is invalid
*/
c_fragmentIdCounter = 1;
}
return true;
}
void
SimulatedBlock::sendNextLinearFragment(Signal* signal,
FragmentSendInfo & info){
/**
* Store "theData"
*/
const Uint32 sigLen = info.m_theDataSection.sz;
memcpy(&signal->theData[0], info.m_theDataSection.p, 4 * sigLen);
Uint32 sz = 0;
Uint32 maxSz = info.m_messageSize;
Int32 secNo = 2;
Uint32 secCount = 0;
Uint32 * secNos = &signal->theData[sigLen];
LinearSectionPtr signalPtr[3];
enum { Unknown = 0, Full = 2 } loop = Unknown;
for(; secNo >= 0 && secCount < 3; secNo--){
Uint32 * ptrP = info.m_sectionPtr[secNo].m_linear.p;
if(ptrP == NULL)
continue;
info.m_sectionPtr[secNo].m_linear.p = NULL;
const Uint32 size = info.m_sectionPtr[secNo].m_linear.sz;
signalPtr[secCount].p = ptrP;
signalPtr[secCount].sz = size;
secNos[secCount] = secNo;
secCount++;
const Uint32 sizeLeft = maxSz - sz;
if(size <= sizeLeft){
/**
* The section fits
*/
sz += size;
lsout(ndbout_c("section %d saved as %d", secNo, secCount-1));
continue;
}
const Uint32 overflow = size - sizeLeft; // > 0
if(overflow <= SectionSegment::DataLength){
/**
* Only one segment left to send
* send even if sizeLeft <= size
*/
lsout(ndbout_c("section %d saved as %d but full over: %d",
secNo, secCount-1, overflow));
secNo--;
break;
}
// size >= 61
if(sizeLeft < SectionSegment::DataLength){
/**
* Less than one segment left (space)
* dont bother sending
*/
secCount--;
info.m_sectionPtr[secNo].m_linear.p = ptrP;
loop = Full;
lsout(ndbout_c("section %d not saved", secNo));
break;
}
/**
* Split list
* 1) Find place to split
* 2) Rewrite header (the part that will be sent)
* 3) Write new header (for remaining part)
* 4) Store new header on FragmentSendInfo - record
*/
Uint32 sum = sizeLeft;
sum /= SectionSegment::DataLength;
sum *= SectionSegment::DataLength;
/**
* Rewrite header w.r.t size
*/
Uint32 prev = secCount - 1;
signalPtr[prev].sz = sum;
/**
* Write/store "new" header
*/
info.m_sectionPtr[secNo].m_linear.p = ptrP + sum;
info.m_sectionPtr[secNo].m_linear.sz = size - sum;
loop = Full;
lsout(ndbout_c("section %d split into %d", secNo, prev));
break;
}
lsout(ndbout_c("loop: %d secNo: %d secCount: %d sz: %d",
loop, secNo, secCount, sz));
/**
* Store fragment id
*/
secNos[secCount] = info.m_fragmentId;
Uint32 fragInfo = info.m_fragInfo;
info.m_fragInfo = 2;
switch(loop){
case Unknown:
if(secNo >= 0){
lsout(ndbout_c("Unknown - Full"));
/**
* Not finished
*/
break;
}
// Fall through
lsout(ndbout_c("Unknown - Done"));
info.m_status = FragmentSendInfo::SendComplete;
ndbassert(fragInfo == 2);
fragInfo = 3;
case Full:
break;
}
signal->header.m_noOfSections = 0;
signal->header.m_fragmentInfo = fragInfo;
sendSignal(info.m_nodeReceiverGroup,
info.m_gsn,
signal,
sigLen + secCount + 1,
(JobBufferLevel)info.m_prio,
signalPtr,
secCount);
if(fragInfo == 3){
/**
* This is the last signal
*/
g_sectionSegmentPool.release(info.m_theDataSection.p[sigLen]);
}
}
void
SimulatedBlock::sendFragmentedSignal(BlockReference ref,
GlobalSignalNumber gsn,
Signal* signal,
Uint32 length,
JobBufferLevel jbuf,
Callback & c,
Uint32 messageSize){
bool res = true;
Ptr<FragmentSendInfo> ptr;
res = c_segmentedFragmentSendList.seize(ptr);
ndbrequire(res);
res = sendFirstFragment(* ptr.p,
NodeReceiverGroup(ref),
gsn,
signal,
length,
jbuf,
messageSize);
ndbrequire(res);
if(ptr.p->m_status == FragmentSendInfo::SendComplete){
c_segmentedFragmentSendList.release(ptr);
if(c.m_callbackFunction != 0)
execute(signal, c, 0);
return;
}
ptr.p->m_callback = c;
if(!c_fragSenderRunning){
c_fragSenderRunning = true;
ContinueFragmented * sig = (ContinueFragmented*)signal->getDataPtrSend();
sig->line = __LINE__;
sendSignal(reference(), GSN_CONTINUE_FRAGMENTED, signal, 1, JBB);
}
}
void
SimulatedBlock::sendFragmentedSignal(NodeReceiverGroup rg,
GlobalSignalNumber gsn,
Signal* signal,
Uint32 length,
JobBufferLevel jbuf,
Callback & c,
Uint32 messageSize){
bool res = true;
Ptr<FragmentSendInfo> ptr;
res = c_segmentedFragmentSendList.seize(ptr);
ndbrequire(res);
res = sendFirstFragment(* ptr.p,
rg,
gsn,
signal,
length,
jbuf,
messageSize);
ndbrequire(res);
if(ptr.p->m_status == FragmentSendInfo::SendComplete){
c_segmentedFragmentSendList.release(ptr);
if(c.m_callbackFunction != 0)
execute(signal, c, 0);
return;
}
ptr.p->m_callback = c;
if(!c_fragSenderRunning){
c_fragSenderRunning = true;
ContinueFragmented * sig = (ContinueFragmented*)signal->getDataPtrSend();
sig->line = __LINE__;
sendSignal(reference(), GSN_CONTINUE_FRAGMENTED, signal, 1, JBB);
}
}
SimulatedBlock::Callback SimulatedBlock::TheEmptyCallback = {0, 0};
void
SimulatedBlock::sendFragmentedSignal(BlockReference ref,
GlobalSignalNumber gsn,
Signal* signal,
Uint32 length,
JobBufferLevel jbuf,
LinearSectionPtr ptr[3],
Uint32 noOfSections,
Callback & c,
Uint32 messageSize){
bool res = true;
Ptr<FragmentSendInfo> tmp;
res = c_linearFragmentSendList.seize(tmp);
ndbrequire(res);
res = sendFirstFragment(* tmp.p,
NodeReceiverGroup(ref),
gsn,
signal,
length,
jbuf,
ptr,
noOfSections,
messageSize);
ndbrequire(res);
if(tmp.p->m_status == FragmentSendInfo::SendComplete){
c_linearFragmentSendList.release(tmp);
if(c.m_callbackFunction != 0)
execute(signal, c, 0);
return;
}
tmp.p->m_callback = c;
if(!c_fragSenderRunning){
c_fragSenderRunning = true;
ContinueFragmented * sig = (ContinueFragmented*)signal->getDataPtrSend();
sig->line = __LINE__;
sendSignal(reference(), GSN_CONTINUE_FRAGMENTED, signal, 1, JBB);
}
}
void
SimulatedBlock::sendFragmentedSignal(NodeReceiverGroup rg,
GlobalSignalNumber gsn,
Signal* signal,
Uint32 length,
JobBufferLevel jbuf,
LinearSectionPtr ptr[3],
Uint32 noOfSections,
Callback & c,
Uint32 messageSize){
bool res = true;
Ptr<FragmentSendInfo> tmp;
res = c_linearFragmentSendList.seize(tmp);
ndbrequire(res);
res = sendFirstFragment(* tmp.p,
rg,
gsn,
signal,
length,
jbuf,
ptr,
noOfSections,
messageSize);
ndbrequire(res);
if(tmp.p->m_status == FragmentSendInfo::SendComplete){
c_linearFragmentSendList.release(tmp);
if(c.m_callbackFunction != 0)
execute(signal, c, 0);
return;
}
tmp.p->m_callback = c;
if(!c_fragSenderRunning){
c_fragSenderRunning = true;
ContinueFragmented * sig = (ContinueFragmented*)signal->getDataPtrSend();
sig->line = __LINE__;
sendSignal(reference(), GSN_CONTINUE_FRAGMENTED, signal, 1, JBB);
}
}
NodeInfo &
SimulatedBlock::setNodeInfo(NodeId nodeId) {
ndbrequire(nodeId > 0 && nodeId < MAX_NODES);
return globalData.m_nodeInfo[nodeId];
}
void
SimulatedBlock::execUTIL_CREATE_LOCK_REF(Signal* signal){
ljamEntry();
c_mutexMgr.execUTIL_CREATE_LOCK_REF(signal);
}
void SimulatedBlock::execUTIL_CREATE_LOCK_CONF(Signal* signal){
ljamEntry();
c_mutexMgr.execUTIL_CREATE_LOCK_CONF(signal);
}
void SimulatedBlock::execUTIL_DESTORY_LOCK_REF(Signal* signal){
ljamEntry();
c_mutexMgr.execUTIL_DESTORY_LOCK_REF(signal);
}
void SimulatedBlock::execUTIL_DESTORY_LOCK_CONF(Signal* signal){
ljamEntry();
c_mutexMgr.execUTIL_DESTORY_LOCK_CONF(signal);
}
void SimulatedBlock::execUTIL_LOCK_REF(Signal* signal){
ljamEntry();
c_mutexMgr.execUTIL_LOCK_REF(signal);
}
void SimulatedBlock::execUTIL_LOCK_CONF(Signal* signal){
ljamEntry();
c_mutexMgr.execUTIL_LOCK_CONF(signal);
}
void SimulatedBlock::execUTIL_UNLOCK_REF(Signal* signal){
ljamEntry();
c_mutexMgr.execUTIL_UNLOCK_REF(signal);
}
void SimulatedBlock::execUTIL_UNLOCK_CONF(Signal* signal){
ljamEntry();
c_mutexMgr.execUTIL_UNLOCK_CONF(signal);
}
void
SimulatedBlock::ignoreMutexUnlockCallback(Signal* signal,
Uint32 ptrI, Uint32 retVal){
c_mutexMgr.release(ptrI);
}
void
UpgradeStartup::installEXEC(SimulatedBlock* block){
SimulatedBlock::ExecFunction * a = block->theExecArray;
switch(block->number()){
case QMGR:
a[UpgradeStartup::GSN_CM_APPCHG] = &SimulatedBlock::execUPGRADE;
break;
case CNTR:
a[UpgradeStartup::GSN_CNTR_MASTERREF] = &SimulatedBlock::execUPGRADE;
a[UpgradeStartup::GSN_CNTR_MASTERCONF] = &SimulatedBlock::execUPGRADE;
break;
}
}
void
SimulatedBlock::execUPGRADE(Signal* signal){
Uint32 gsn = signal->header.theVerId_signalNumber;
switch(gsn){
case UpgradeStartup::GSN_CM_APPCHG:
UpgradeStartup::execCM_APPCHG(* this, signal);
break;
case UpgradeStartup::GSN_CNTR_MASTERREF:
UpgradeStartup::execCNTR_MASTER_REPLY(* this, signal);
break;
case UpgradeStartup::GSN_CNTR_MASTERCONF:
UpgradeStartup::execCNTR_MASTER_REPLY(* this, signal);
break;
}
}
void
SimulatedBlock::fsRefError(Signal* signal, Uint32 line, const char *msg)
{
const FsRef *fsRef = (FsRef*)signal->getDataPtr();
Uint32 errorCode = fsRef->errorCode;
Uint32 osErrorCode = fsRef->osErrorCode;
char msg2[100];
sprintf(msg2, "%s: %s. OS errno: %u", getBlockName(number()), msg, osErrorCode);
progError(line, errorCode, msg2);
}
void
SimulatedBlock::execFSWRITEREF(Signal* signal)
{
fsRefError(signal, __LINE__, "File system write failed");
}
void
SimulatedBlock::execFSREADREF(Signal* signal)
{
fsRefError(signal, __LINE__, "File system read failed");
}
void
SimulatedBlock::execFSCLOSEREF(Signal* signal)
{
fsRefError(signal, __LINE__, "File system close failed");
}
void
SimulatedBlock::execFSOPENREF(Signal* signal)
{
fsRefError(signal, __LINE__, "File system open failed");
}
void
SimulatedBlock::execFSREMOVEREF(Signal* signal)
{
fsRefError(signal, __LINE__, "File system remove failed");
}
void
SimulatedBlock::execFSSYNCREF(Signal* signal)
{
fsRefError(signal, __LINE__, "File system sync failed");
}
void
SimulatedBlock::execFSAPPENDREF(Signal* signal)
{
fsRefError(signal, __LINE__, "File system append failed");
}
#ifdef VM_TRACE
void
SimulatedBlock::clear_global_variables(){
Ptr<void> ** tmp = m_global_variables;
while(* tmp != 0){
(* tmp)->i = RNIL;
(* tmp)->p = 0;
tmp++;
}
}
void
SimulatedBlock::init_globals_list(void ** tmp, size_t cnt){
m_global_variables = new Ptr<void> * [cnt+1];
for(size_t i = 0; i<cnt; i++){
m_global_variables[i] = (Ptr<void>*)tmp[i];
}
m_global_variables[cnt] = 0;
}
#endif
#include "KeyDescriptor.hpp"
Uint32
SimulatedBlock::xfrm_key(Uint32 tab, const Uint32* src,
Uint32 *dst, Uint32 dstSize,
Uint32 keyPartLen[MAX_ATTRIBUTES_IN_INDEX]) const
{
const KeyDescriptor * desc = g_key_descriptor_pool.getPtr(tab);
const Uint32 noOfKeyAttr = desc->noOfKeyAttr;
Uint32 i = 0;
Uint32 srcPos = 0;
Uint32 dstPos = 0;
while (i < noOfKeyAttr)
{
const KeyDescriptor::KeyAttr& keyAttr = desc->keyAttr[i];
Uint32 dstWords =
xfrm_attr(keyAttr.attributeDescriptor, keyAttr.charsetInfo,
src, srcPos, dst, dstPos, dstSize);
keyPartLen[i++] = dstWords;
if (unlikely(dstWords == 0))
return 0;
}
return dstPos;
}
Uint32
SimulatedBlock::xfrm_attr(Uint32 attrDesc, CHARSET_INFO* cs,
const Uint32* src, Uint32 & srcPos,
Uint32* dst, Uint32 & dstPos, Uint32 dstSize) const
{
Uint32 srcBytes = AttributeDescriptor::getSizeInBytes(attrDesc);
Uint32 srcWords = (srcBytes + 3) / 4;
Uint32 dstWords = ~0;
uchar* dstPtr = (uchar*)&dst[dstPos];
const uchar* srcPtr = (const uchar*)&src[srcPos];
if (cs == NULL)
{
jam();
memcpy(dstPtr, srcPtr, srcWords << 2);
dstWords = srcWords;
}
else
{
jam();
Uint32 typeId = AttributeDescriptor::getType(attrDesc);
Uint32 lb, len;
bool ok = NdbSqlUtil::get_var_length(typeId, srcPtr, srcBytes, lb, len);
if (unlikely(!ok))
return 0;
Uint32 xmul = cs->strxfrm_multiply;
if (xmul == 0)
xmul = 1;
/*
* Varchar end-spaces are ignored in comparisons. To get same hash
* we blank-pad to maximum length via strnxfrm.
*/
Uint32 dstLen = xmul * (srcBytes - lb);
ndbrequire(dstLen <= ((dstSize - dstPos) << 2));
int n = NdbSqlUtil::strnxfrm_bug7284(cs, dstPtr, dstLen, srcPtr + lb, len);
if (unlikely(n == -1))
return 0;
while ((n & 3) != 0)
{
dstPtr[n++] = 0;
}
dstWords = (n >> 2);
}
dstPos += dstWords;
srcPos += srcWords;
return dstWords;
}
Uint32
SimulatedBlock::create_distr_key(Uint32 tableId,
Uint32 *data,
const Uint32
keyPartLen[MAX_ATTRIBUTES_IN_INDEX]) const
{
const KeyDescriptor* desc = g_key_descriptor_pool.getPtr(tableId);
const Uint32 noOfKeyAttr = desc->noOfKeyAttr;
Uint32 noOfDistrKeys = desc->noOfDistrKeys;
Uint32 *src = data;
Uint32 *dst = data;
Uint32 i = 0;
Uint32 dstPos = 0;
if(keyPartLen)
{
while (i < noOfKeyAttr && noOfDistrKeys)
{
Uint32 attr = desc->keyAttr[i].attributeDescriptor;
Uint32 len = keyPartLen[i];
if(AttributeDescriptor::getDKey(attr))
{
noOfDistrKeys--;
memmove(dst+dstPos, src, len << 2);
dstPos += len;
}
src += len;
i++;
}
}
else
{
while (i < noOfKeyAttr && noOfDistrKeys)
{
Uint32 attr = desc->keyAttr[i].attributeDescriptor;
Uint32 len = AttributeDescriptor::getSizeInWords(attr);
if(AttributeDescriptor::getDKey(attr))
{
noOfDistrKeys--;
memmove(dst+dstPos, src, len << 2);
dstPos += len;
}
src += len;
i++;
}
}
return dstPos;
}
--- 1.12/ndb/src/kernel/Makefile.am 2006-11-09 15:26:58 +01:00
+++ 1.13/ndb/src/kernel/Makefile.am 2006-11-09 15:26:58 +01:00
@@ -2,9 +2,10 @@
include $(top_srcdir)/ndb/config/common.mk.am
-ndbbin_PROGRAMS = ndbd
+ndbbin_PROGRAMS = ndbd ndbmtd
ndbd_SOURCES = main.cpp SimBlockList.cpp
+ndbmtd_SOURCES = main.cpp SimBlockList.cpp
include $(top_srcdir)/ndb/config/type_kernel.mk.am
@@ -25,7 +26,7 @@
-Iblocks/suma \
-Iblocks/dbtux
-LDADD += \
+LIBS= \
blocks/cmvmi/libcmvmi.a \
blocks/dbacc/libdbacc.a \
blocks/dbdict/libdbdict.a \
@@ -54,6 +55,9 @@
$(top_builddir)/dbug/libdbug.a \
$(top_builddir)/mysys/libmysys.a \
$(top_builddir)/strings/libmystrings.a @NDB_SCI_LIBS@
+
+ndbd_LDADD = $(LIBS) vm/libsched.a
+ndbmtd_LDADD = $(LIBS) vm/libsched_mt.a
# Don't update the files from bitkeeper
%::SCCS/s.%
--- 1.8/ndb/src/kernel/vm/Makefile.am 2006-11-09 15:26:58 +01:00
+++ 1.9/ndb/src/kernel/vm/Makefile.am 2006-11-09 15:26:58 +01:00
@@ -3,15 +3,10 @@
#DIRS += testLongSig
#endif
-noinst_LIBRARIES = libkernel.a
+noinst_LIBRARIES = libkernel.a libsched.a libsched_mt.a
libkernel_a_SOURCES = \
- SimulatedBlock.cpp \
- FastScheduler.cpp \
- TimeQueue.cpp \
VMSignal.cpp \
- ThreadConfig.cpp \
- TransporterCallback.cpp \
Emulator.cpp \
Configuration.cpp \
WatchDog.cpp \
@@ -19,6 +14,16 @@
SectionReader.cpp \
MetaData.cpp \
Mutex.cpp SafeCounter.cpp ndbd_malloc.cpp
+
+libsched_a_SOURCES = TimeQueue.cpp \
+ ThreadConfig.cpp \
+ FastScheduler.cpp \
+ TransporterCallback.cpp \
+ SimulatedBlock.cpp
+
+libsched_mt_a_SOURCES = mt/mt.cpp \
+ mt/sb_mt.cpp \
+ mt/dummy_mt.cpp
INCLUDES_LOC = -I$(top_srcdir)/ndb/src/mgmapi
| Thread |
|---|
| • bk commit into 5.0 tree (jonas:1.2260) | jonas | 9 Nov |