Below is the list of changes that have just been committed into a local
5.1 repository of knielsen. When knielsen does a push these changes will
be propagated to the main repository and, within 24 hours after the
push, to the public repository.
For information on how to access the public repository
see http://dev.mysql.com/doc/mysql/en/installing-source-tree.html
ChangeSet@stripped, 2007-11-19 10:45:00+01:00, knielsen@ymer.(none) +9 -0
WL#1498: Multi-threaded ndbd.
Split sendlock into per-node lock.
storage/ndb/include/transporter/TransporterRegistry.hpp@stripped, 2007-11-19 10:44:56+01:00, knielsen@ymer.(none) +1 -0
Split sendlock into per-node lock.
storage/ndb/src/common/transporter/SHM_Transporter.cpp@stripped, 2007-11-19 10:44:56+01:00, knielsen@ymer.(none) +2 -1
Split sendlock into per-node lock.
storage/ndb/src/common/transporter/SHM_Transporter.hpp@stripped, 2007-11-19 10:44:56+01:00, knielsen@ymer.(none) +3 -1
Split sendlock into per-node lock.
storage/ndb/src/common/transporter/Transporter.hpp@stripped, 2007-11-19 10:44:56+01:00, knielsen@ymer.(none) +3 -0
Split sendlock into per-node lock.
storage/ndb/src/common/transporter/TransporterRegistry.cpp@stripped, 2007-11-19 10:44:56+01:00, knielsen@ymer.(none) +8 -0
Split sendlock into per-node lock.
storage/ndb/src/kernel/vm/SimulatedBlock.cpp@stripped, 2007-11-19 10:44:56+01:00, knielsen@ymer.(none) +62 -47
Split sendlock into per-node lock.
storage/ndb/src/kernel/vm/TransporterCallback.cpp@stripped, 2007-11-19 10:44:56+01:00, knielsen@ymer.(none) +18 -30
Cleanup locking #ifdef.
storage/ndb/src/kernel/vm/mt/mt.cpp@stripped, 2007-11-19 10:44:57+01:00, knielsen@ymer.(none) +145 -38
Split sendlock into per-node lock.
Small cleanups.
storage/ndb/src/kernel/vm/mt/mt.hpp@stripped, 2007-11-19 10:44:57+01:00, knielsen@ymer.(none) +8 -3
Split sendlock into per-node lock.
diff -Nrup a/storage/ndb/include/transporter/TransporterRegistry.hpp b/storage/ndb/include/transporter/TransporterRegistry.hpp
--- a/storage/ndb/include/transporter/TransporterRegistry.hpp 2007-08-28 11:43:22 +02:00
+++ b/storage/ndb/include/transporter/TransporterRegistry.hpp 2007-11-19 10:44:56 +01:00
@@ -248,6 +248,7 @@ public:
Uint32 pollReceive(Uint32 timeOutMillis);
void performReceive();
+ void performSend(NodeId nodeId);
void performSend();
/**
diff -Nrup a/storage/ndb/src/common/transporter/SHM_Transporter.cpp b/storage/ndb/src/common/transporter/SHM_Transporter.cpp
--- a/storage/ndb/src/common/transporter/SHM_Transporter.cpp 2007-01-24 18:57:02 +01:00
+++ b/storage/ndb/src/common/transporter/SHM_Transporter.cpp 2007-11-19 10:44:56 +01:00
@@ -359,7 +359,7 @@ SHM_Transporter::connect_common(NDB_SOCK
return false;
}
-void
+bool
SHM_Transporter::doSend()
{
if(m_last_signal)
@@ -367,6 +367,7 @@ SHM_Transporter::doSend()
m_last_signal = 0;
kill(m_remote_pid, g_ndb_shm_signum);
}
+ return true;
}
Uint32
diff -Nrup a/storage/ndb/src/common/transporter/SHM_Transporter.hpp b/storage/ndb/src/common/transporter/SHM_Transporter.hpp
--- a/storage/ndb/src/common/transporter/SHM_Transporter.hpp 2006-12-23 20:20:12 +01:00
+++ b/storage/ndb/src/common/transporter/SHM_Transporter.hpp 2007-11-19 10:44:56 +01:00
@@ -131,10 +131,12 @@ protected:
*/
void setupBuffers();
+ bool hasDataToSend() const { return TRUE; }
+
/**
* doSend (i.e signal receiver)
*/
- void doSend();
+ bool doSend();
int m_remote_pid;
Uint32 m_last_signal;
Uint32 m_signal_threshold;
diff -Nrup a/storage/ndb/src/common/transporter/Transporter.hpp b/storage/ndb/src/common/transporter/Transporter.hpp
--- a/storage/ndb/src/common/transporter/Transporter.hpp 2007-08-28 11:39:43 +02:00
+++ b/storage/ndb/src/common/transporter/Transporter.hpp 2007-11-19 10:44:56 +01:00
@@ -91,6 +91,9 @@ public:
m_transporter_registry.set_status_overloaded(remoteNodeId, val);
}
+ virtual bool hasDataToSend() const = 0;
+ virtual bool doSend() = 0;
+
protected:
Transporter(TransporterRegistry &,
TransporterType,
diff -Nrup a/storage/ndb/src/common/transporter/TransporterRegistry.cpp b/storage/ndb/src/common/transporter/TransporterRegistry.cpp
--- a/storage/ndb/src/common/transporter/TransporterRegistry.cpp 2007-10-11 14:55:28 +02:00
+++ b/storage/ndb/src/common/transporter/TransporterRegistry.cpp 2007-11-19 10:44:56 +01:00
@@ -876,6 +876,14 @@ TransporterRegistry::performReceive()
}
void
+TransporterRegistry::performSend(NodeId nodeId)
+{
+ Transporter *t = get_transporter(nodeId);
+ if (t && t->hasDataToSend() && t->isConnected() && is_connected(nodeId))
+ t->doSend();
+}
+
+void
TransporterRegistry::performSend()
{
int i;
diff -Nrup a/storage/ndb/src/kernel/vm/SimulatedBlock.cpp b/storage/ndb/src/kernel/vm/SimulatedBlock.cpp
--- a/storage/ndb/src/kernel/vm/SimulatedBlock.cpp 2007-11-14 14:46:10 +01:00
+++ b/storage/ndb/src/kernel/vm/SimulatedBlock.cpp 2007-11-19 10:44:56 +01:00
@@ -252,15 +252,6 @@ SimulatedBlock::handle_lingering_section
}
-#ifdef NDBD_MULTITHREADED
-#define MT_SEND_LOCK mt_send_lock();
-#define MT_SEND_UNLOCK mt_send_unlock();
-#else
-#define MT_SEND_LOCK
-#define MT_SEND_UNLOCK
-#endif
-
-
void
SimulatedBlock::sendSignal(BlockReference ref,
GlobalSignalNumber gsn,
@@ -333,12 +324,16 @@ SimulatedBlock::sendSignal(BlockReferenc
getSignalName(gsn), gsn, getBlockName(recBlock),
recNode);
#endif
- MT_SEND_LOCK
- SendStatus ss = globalTransporterRegistry.prepareSend(&sh, jobBuffer,
- &signal->theData[0],
- recNode,
- (LinearSectionPtr*)0);
- MT_SEND_UNLOCK
+
+ SendStatus ss;
+#ifdef NDBD_MULTITHREADED
+ ss = mt_send_remote(m_threadId, &sh, jobBuffer, &signal->theData[0],
+ recNode, 0);
+#else
+ ss = globalTransporterRegistry.prepareSend(&sh, jobBuffer,
+ &signal->theData[0], recNode,
+ (LinearSectionPtr*)0);
+#endif
ndbrequire(ss == SEND_OK || ss == SEND_BLOCKED || ss == SEND_DISCONNECTED);
}
@@ -492,12 +487,16 @@ SimulatedBlock::sendSignal(NodeReceiverG
recNode);
#endif
- MT_SEND_LOCK
- SendStatus ss = globalTransporterRegistry.prepareSend(&sh, jobBuffer,
- &signal->theData[0],
- recNode,
- (LinearSectionPtr*)0);
- MT_SEND_UNLOCK
+ SendStatus ss;
+#ifdef NDBD_MULTITHREADED
+ ss = mt_send_remote(m_threadId, &sh, jobBuffer, &signal->theData[0],
+ recNode, 0);
+#else
+ ss = globalTransporterRegistry.prepareSend(&sh, jobBuffer,
+ &signal->theData[0], recNode,
+ (LinearSectionPtr*)0);
+#endif
+
ndbrequire(ss == SEND_OK || ss == SEND_BLOCKED || ss == SEND_DISCONNECTED);
}
@@ -594,12 +593,16 @@ SimulatedBlock::sendSignal(BlockReferenc
recNode);
#endif
- MT_SEND_LOCK
- SendStatus ss = globalTransporterRegistry.prepareSend(&sh, jobBuffer,
- &signal->theData[0],
- recNode,
- ptr);
- MT_SEND_UNLOCK
+ SendStatus ss;
+#ifdef NDBD_MULTITHREADED
+ ss = mt_send_remote(m_threadId, &sh, jobBuffer, &signal->theData[0],
+ recNode, ptr);
+#else
+ ss = globalTransporterRegistry.prepareSend(&sh, jobBuffer,
+ &signal->theData[0], recNode,
+ ptr);
+#endif
+
ndbrequire(ss == SEND_OK || ss == SEND_BLOCKED || ss == SEND_DISCONNECTED);
}
@@ -706,12 +709,16 @@ SimulatedBlock::sendSignal(NodeReceiverG
recNode);
#endif
- MT_SEND_LOCK
- SendStatus ss = globalTransporterRegistry.prepareSend(&sh, jobBuffer,
- &signal->theData[0],
- recNode,
- ptr);
- MT_SEND_UNLOCK
+ SendStatus ss;
+#ifdef NDBD_MULTITHREADED
+ ss = mt_send_remote(m_threadId, &sh, jobBuffer, &signal->theData[0],
+ recNode, ptr);
+#else
+ ss = globalTransporterRegistry.prepareSend(&sh, jobBuffer,
+ &signal->theData[0], recNode,
+ ptr);
+#endif
+
ndbrequire(ss == SEND_OK || ss == SEND_BLOCKED || ss == SEND_DISCONNECTED);
}
@@ -804,13 +811,17 @@ SimulatedBlock::sendSignal(BlockReferenc
recNode);
#endif
- MT_SEND_LOCK
- SendStatus ss = globalTransporterRegistry.prepareSend(&sh, jobBuffer,
- &signal->theData[0],
- recNode,
- g_sectionSegmentPool,
- sections->m_ptr);
- MT_SEND_UNLOCK
+ SendStatus ss;
+#ifdef NDBD_MULTITHREADED
+ ss = mt_send_remote(m_threadId, &sh, jobBuffer, &signal->theData[0],
+ recNode, &g_sectionSegmentPool, sections->m_ptr);
+#else
+ ss = globalTransporterRegistry.prepareSend(&sh, jobBuffer,
+ &signal->theData[0], recNode,
+ g_sectionSegmentPool,
+ sections->m_ptr);
+#endif
+
ndbrequire(ss == SEND_OK || ss == SEND_BLOCKED || ss == SEND_DISCONNECTED);
::releaseSections(noOfSections, sections->m_ptr);
}
@@ -920,13 +931,17 @@ SimulatedBlock::sendSignal(NodeReceiverG
recNode);
#endif
- MT_SEND_LOCK
- SendStatus ss = globalTransporterRegistry.prepareSend(&sh, jobBuffer,
- &signal->theData[0],
- recNode,
- g_sectionSegmentPool,
- sections->m_ptr);
- MT_SEND_UNLOCK
+ SendStatus ss;
+#ifdef NDBD_MULTITHREADED
+ ss = mt_send_remote(m_threadId, &sh, jobBuffer, &signal->theData[0],
+ recNode, &g_sectionSegmentPool, sections->m_ptr);
+#else
+ ss = globalTransporterRegistry.prepareSend(&sh, jobBuffer,
+ &signal->theData[0], recNode,
+ g_sectionSegmentPool,
+ sections->m_ptr);
+#endif
+
ndbrequire(ss == SEND_OK || ss == SEND_BLOCKED || ss == SEND_DISCONNECTED);
}
diff -Nrup a/storage/ndb/src/kernel/vm/TransporterCallback.cpp b/storage/ndb/src/kernel/vm/TransporterCallback.cpp
--- a/storage/ndb/src/kernel/vm/TransporterCallback.cpp 2007-09-24 17:12:18 +02:00
+++ b/storage/ndb/src/kernel/vm/TransporterCallback.cpp 2007-11-19 10:44:56 +01:00
@@ -60,6 +60,14 @@ const char *lookupConnectionError(Uint32
return connectionError[i].text;
}
+#ifdef NDBD_MULTITHREADED
+#define MT_SECTION_LOCK mt_section_lock();
+#define MT_SECTION_UNLOCK mt_section_unlock();
+#else
+#define MT_SECTION_LOCK
+#define MT_SECTION_UNLOCK
+#endif
+
bool
import(Ptr<SectionSegment> & first, const Uint32 * src, Uint32 len){
/**
@@ -69,15 +77,11 @@ import(Ptr<SectionSegment> & first, cons
Uint32 dummyPrev[4];
first.p = 0;
-#ifdef NDBD_MULTITHREADED
- mt_section_lock();
-#endif
+ MT_SECTION_LOCK
if(g_sectionSegmentPool.seize(first)){
;
} else {
-#ifdef NDBD_MULTITHREADED
- mt_section_unlock();
-#endif
+ MT_SECTION_UNLOCK
ndbout_c("here");
return false;
}
@@ -98,16 +102,12 @@ import(Ptr<SectionSegment> & first, cons
;
} else {
first.p->m_lastSegment = prevPtr.i;
-#ifdef NDBD_MULTITHREADED
- mt_section_unlock();
-#endif
+ MT_SECTION_LOCK
ndbout_c("hera");
return false;
}
}
-#ifdef NDBD_MULTITHREADED
- mt_section_unlock();
-#endif
+ MT_SECTION_UNLOCK
first.p->m_lastSegment = currPtr.i;
currPtr.p->m_nextSegment = RNIL;
@@ -194,15 +194,11 @@ getSection(SegmentedSectionPtr & ptr, Ui
void
release(SegmentedSectionPtr & ptr){
-#ifdef NDBD_MULTITHREADED
- mt_section_lock();
-#endif
+ MT_SECTION_LOCK
g_sectionSegmentPool.releaseList(relSz(ptr.sz),
ptr.i,
ptr.p->m_lastSegment);
-#ifdef NDBD_MULTITHREADED
- mt_section_unlock();
-#endif
+ MT_SECTION_UNLOCK
}
void
@@ -213,9 +209,7 @@ releaseSections(Uint32 secCount, Segment
Uint32 tSz1 = ptr[1].sz;
Uint32 tSec2 = ptr[2].i;
Uint32 tSz2 = ptr[2].sz;
-#ifdef NDBD_MULTITHREADED
- mt_section_lock();
-#endif
+ MT_SECTION_LOCK
switch(secCount){
case 3:
g_sectionSegmentPool.releaseList(relSz(tSz2), tSec2,
@@ -227,9 +221,7 @@ releaseSections(Uint32 secCount, Segment
g_sectionSegmentPool.releaseList(relSz(tSz0), tSec0,
ptr[0].p->m_lastSegment);
case 0:
-#ifdef NDBD_MULTITHREADED
- mt_section_unlock();
-#endif
+ MT_SECTION_UNLOCK
return;
}
char msg[40];
@@ -306,18 +298,14 @@ execute(void * callbackObj,
/**
* Out of memory
*/
-#ifdef NDBD_MULTITHREADED
- mt_section_lock();
-#endif
+ MT_SECTION_LOCK
for(Uint32 i = 0; i<secCount; i++){
if(secPtr[i].p != 0){
g_sectionSegmentPool.releaseList(relSz(ptr[i].sz), secPtr[i].i,
secPtr[i].p->m_lastSegment);
}
}
-#ifdef NDBD_MULTITHREADED
- mt_section_unlock();
-#endif
+ MT_SECTION_UNLOCK
#ifndef NDBD_MULTITHREADED
diff -Nrup a/storage/ndb/src/kernel/vm/mt/mt.cpp b/storage/ndb/src/kernel/vm/mt/mt.cpp
--- a/storage/ndb/src/kernel/vm/mt/mt.cpp 2007-11-16 14:13:49 +01:00
+++ b/storage/ndb/src/kernel/vm/mt/mt.cpp 2007-11-19 10:44:57 +01:00
@@ -157,7 +157,7 @@ inline void require(bool x)
struct thr_spin_lock
{
- thr_spin_lock(const char * name)
+ thr_spin_lock(const char * name = 0)
{
m_lock = 0;
m_name = name;
@@ -166,7 +166,7 @@ struct thr_spin_lock
const char * m_name;
Uint32 m_contended_count;
- volatile unsigned m_lock;
+ volatile Uint32 m_lock;
};
struct thr_mutex
@@ -438,10 +438,20 @@ struct thr_data
EmulatedJamBuffer *m_jam;
/* Watchdog counter for this thread. */
Uint32 *m_watchdog_counter;
+ /* Signal delivery statistics. */
Uint32 m_prioa_count;
Uint32 m_prioa_size;
Uint32 m_priob_count;
Uint32 m_priob_size;
+ /* Array of node ids with pending remote send data. */
+ Uint8 m_pending_send_nodes[MAX_NTRANSPORTERS];
+ /* Number of node ids in m_pending_send_nodes. */
+ Uint32 m_pending_send_count;
+ /**
+ * Bitmap of pending node ids with send data.
+ * Used to quickly check if a node id is already in m_pending_send_nodes.
+ */
+ Bitmask<(MAX_NTRANSPORTERS+31)/32> m_pending_send_mask;
};
template<typename T>
@@ -486,13 +496,11 @@ struct thr_safe_pool
struct thr_repository
{
thr_repository()
- : m_send_lock("sendlock"),
- m_receive_lock("recvlock"),
+ : m_receive_lock("recvlock"),
m_section_lock("sectionlock"),
m_mem_manager_lock("memmanagerlock") {}
unsigned m_thread_count;
- struct thr_spin_lock m_send_lock;
struct thr_spin_lock m_receive_lock;
struct thr_spin_lock m_section_lock;
struct thr_spin_lock m_mem_manager_lock;
@@ -507,6 +515,11 @@ struct thr_repository
pthread_mutex_t stop_for_crash_mutex;
pthread_cond_t stop_for_crash_cond;
Uint32 stopped_threads;
+
+ /**
+ * Send locks for the transporters, one per possible remote node.
+ */
+ thr_spin_lock m_send_locks[MAX_NTRANSPORTERS];
};
static
@@ -800,16 +813,74 @@ flush_jbb_write_state(thr_data *selfptr)
Uint32 senderThreadId;
+static inline
+void
+register_pending_send(thr_data *selfptr, Uint32 nodeId)
+{
+ /* Mark that this node has pending send data. */
+ if (!selfptr->m_pending_send_mask.get(nodeId))
+ {
+ selfptr->m_pending_send_mask.set(nodeId, 1);
+ Uint32 i = selfptr->m_pending_send_count;
+ selfptr->m_pending_send_nodes[i] = nodeId;
+ selfptr->m_pending_send_count = i + 1;
+ }
+}
+
+/**
+ * Send any pending data to remote nodes.
+ *
+ * If MUST_SEND is false, will only try to lock the send lock, but if it would
+ * block, that node is skipped, to be tried again next time round.
+ *
+ * If MUST_SEND is true, will always take the lock, waiting on it if needed.
+ *
+ * Currently, the list of pending nodes to send to is thread-local, but the
+ * per-node send buffer is shared by all threads. Thus we might skip a node
+ * for which another thread has pending send data, and we might send pending
+ * data also for another thread without clearing the node from the pending
+ * list of that other thread (but we will never loose signals due to this).
+ *
+ * Later, when we might move to per-thread-per-node send buffers, eliminating
+ * these issues and significantly reducing the send lock contentions.
+ */
static
void
do_send(struct thr_repository* rep, struct thr_data* selfptr,
- Uint32 *watchDogCounter)
+ Uint32 *watchDogCounter, bool must_send)
{
- senderThreadId = selfptr->m_thr_no;
+ Uint32 i;
+ Uint32 count = selfptr->m_pending_send_count;
+ Uint8 *nodes = selfptr->m_pending_send_nodes;
+
+ if (count == 0)
+ return;
+ /* Clear the pending list. */
+ selfptr->m_pending_send_mask.clear();
+ selfptr->m_pending_send_count = 0;
+
+ for (i = 0; i < count; i++)
+ {
+ NodeId nodeId = nodes[i];
+ *watchDogCounter = 6;
+ if (must_send)
+ lock(&rep->m_send_locks[nodeId]);
+ else if (trylock(&rep->m_send_locks[nodeId]) != 0)
+ {
+ /* Not doing this node now, re-add to pending list. */
+ register_pending_send(selfptr, nodeId);
+ continue;
+ }
- *watchDogCounter = 6;
- globalTransporterRegistry.performSend();
- unlock(&rep->m_send_lock);
+ /**
+ * The senderThreadId global is set so that TransporterCallback can know
+ * which thread queue to use to deliver local signals for transporter
+ * state change signals.
+ */
+ senderThreadId = selfptr->m_thr_no;
+ globalTransporterRegistry.performSend(nodeId);
+ unlock(&rep->m_send_locks[nodeId]);
+ }
}
static
@@ -1270,14 +1341,13 @@ mt_job_thread_main(void *thr_arg)
if (sum == 0)
{
/* About to sleep, _must_ send now. */
- lock(&rep->m_send_lock);
- do_send(rep, selfptr, &watchDogCounter);
+ do_send(rep, selfptr, &watchDogCounter, TRUE);
send_sum = 0;
}
- else if (send_sum > MAX_SIGNALS_BEFORE_SEND &&
- trylock(&rep->m_send_lock) == 0)
+ else if (send_sum > MAX_SIGNALS_BEFORE_SEND)
{
- do_send(rep, selfptr, &watchDogCounter);
+ /* Try to send, but skip for now in case of lock contention. */
+ do_send(rep, selfptr, &watchDogCounter, FALSE);
send_sum = 0;
}
}
@@ -1352,6 +1422,45 @@ sendprioa(Uint32 self, Uint32 block, con
selfptr->m_next_buffer = seize_buffer(rep, self, true);
}
+/**
+ * Send a signal to a remote node.
+ *
+ * (The signal is only queued here, and actually sent later in do_send()).
+ */
+SendStatus
+mt_send_remote(Uint32 self, const SignalHeader *sh, Uint8 prio,
+ const Uint32 * data, NodeId nodeId,
+ const LinearSectionPtr ptr[3])
+{
+ thr_repository *rep = &g_thr_repository;
+ thr_data *selfptr = rep->m_thread + self;
+ SendStatus ss;
+
+ register_pending_send(selfptr, nodeId);
+ lock(&rep->m_send_locks[nodeId]);
+ ss = globalTransporterRegistry.prepareSend(sh, prio, data, nodeId, ptr);
+ unlock(&rep->m_send_locks[nodeId]);
+ return ss;
+}
+
+SendStatus
+mt_send_remote(Uint32 self, const SignalHeader *sh, Uint8 prio,
+ const Uint32 *data, NodeId nodeId,
+ class SectionSegmentPool *thePool,
+ const SegmentedSectionPtr ptr[3])
+{
+ thr_repository *rep = &g_thr_repository;
+ thr_data *selfptr = rep->m_thread + self;
+ SendStatus ss;
+
+ register_pending_send(selfptr, nodeId);
+ lock(&rep->m_send_locks[nodeId]);
+ ss = globalTransporterRegistry.prepareSend(sh, prio, data, nodeId,
+ *thePool, ptr);
+ unlock(&rep->m_send_locks[nodeId]);
+ return ss;
+}
+
/*
* This functions sends a prio A STOP_FOR_CRASH signal to a thread.
*
@@ -1537,7 +1646,7 @@ senddelay(Uint32 thr_no, const SignalHea
static
void
-init(struct thr_tq* tq)
+queue_init(struct thr_tq* tq)
{
tq->m_next_timer = 0;
tq->m_current_time = 0;
@@ -1548,10 +1657,10 @@ init(struct thr_tq* tq)
static
void
-init(struct thr_repository* rep, struct thr_data *selfptr, unsigned int cnt,
- unsigned thr_no)
+thr_init(struct thr_repository* rep, struct thr_data *selfptr, unsigned int cnt,
+ unsigned thr_no)
{
- unsigned int i;
+ Uint32 i;
selfptr->m_thr_no = thr_no;
selfptr->m_first_free = 0;
@@ -1581,7 +1690,7 @@ init(struct thr_repository* rep, struct
selfptr->m_read_states[i].m_write_pos = 0;
}
- init(&selfptr->m_tq);
+ queue_init(&selfptr->m_tq);
selfptr->m_jam = NULL;
@@ -1589,13 +1698,16 @@ init(struct thr_repository* rep, struct
selfptr->m_prioa_size = 0;
selfptr->m_priob_count = 0;
selfptr->m_priob_size = 0;
+
+ selfptr->m_pending_send_count = 0;
+ selfptr->m_pending_send_mask.clear();
}
/* Have to do this after init of all m_in_queues is done. */
static
void
-init2(struct thr_repository* rep, struct thr_data *selfptr, unsigned int cnt,
- unsigned thr_no)
+thr_init2(struct thr_repository* rep, struct thr_data *selfptr, unsigned int cnt,
+ unsigned thr_no)
{
for (Uint32 i = 0; i<cnt; i++)
{
@@ -1609,23 +1721,30 @@ init2(struct thr_repository* rep, struct
static
void
-init(struct thr_repository* rep, unsigned int cnt, Ndbd_mem_manager *mm)
+rep_init(struct thr_repository* rep, unsigned int cnt, Ndbd_mem_manager *mm)
{
rep->m_free_list.m_mm = mm;
rep->m_thread_count = cnt;
for (unsigned int i = 0; i<cnt; i++)
{
- init(rep, rep->m_thread + i, cnt, i);
+ thr_init(rep, rep->m_thread + i, cnt, i);
}
for (unsigned int i = 0; i<cnt; i++)
{
- init2(rep, rep->m_thread + i, cnt, i);
+ thr_init2(rep, rep->m_thread + i, cnt, i);
}
rep->stopped_threads = 0;
pthread_mutex_init(&rep->stop_for_crash_mutex, NULL);
pthread_cond_init(&rep->stop_for_crash_cond, NULL);
+
+ for (int i = 0 ; i < MAX_NTRANSPORTERS; i++)
+ {
+ char buf[100];
+ snprintf(buf, sizeof(buf), "send lock node %d", i);
+ rep->m_send_locks[i].m_name = strdup(buf);
+ }
}
@@ -1651,7 +1770,7 @@ ThreadConfig::~ThreadConfig()
void
ThreadConfig::init(EmulatorData *emulatorData)
{
- ::init(&g_thr_repository, NUM_THREADS, emulatorData->m_mem_manager);
+ ::rep_init(&g_thr_repository, NUM_THREADS, emulatorData->m_mem_manager);
}
void ThreadConfig::ipControlLoop(Uint32 thread_index)
@@ -2060,18 +2179,6 @@ FastScheduler::dumpSignalMemory(Uint32 t
&signal.theData[0]);
}
fflush(out);
-}
-
-void
-mt_send_lock()
-{
- lock(&(g_thr_repository.m_send_lock));
-}
-
-void
-mt_send_unlock()
-{
- unlock(&(g_thr_repository.m_send_lock));
}
void
diff -Nrup a/storage/ndb/src/kernel/vm/mt/mt.hpp b/storage/ndb/src/kernel/vm/mt/mt.hpp
--- a/storage/ndb/src/kernel/vm/mt/mt.hpp 2007-11-13 13:42:10 +01:00
+++ b/storage/ndb/src/kernel/vm/mt/mt.hpp 2007-11-19 10:44:57 +01:00
@@ -1,4 +1,5 @@
#include <kernel_types.h>
+#include <TransporterDefinitions.hpp>
#ifndef ndb_mt_hpp
#define ndb_mt_hpp
@@ -19,9 +20,13 @@ void sendprioa(Uint32 self, Uint32 dst,
void senddelay(Uint32 thr_no, const struct SignalHeader*, Uint32 delay);
void mt_execSTOP_FOR_CRASH();
-void mt_send_lock();
-void mt_send_unlock();
-
+SendStatus mt_send_remote(Uint32 self, const SignalHeader *sh, Uint8 prio,
+ const Uint32 *data, NodeId nodeId,
+ const LinearSectionPtr ptr[3]);
+SendStatus mt_send_remote(Uint32 self, const SignalHeader *sh, Uint8 prio,
+ const Uint32 *data, NodeId nodeId,
+ class SectionSegmentPool *thePool,
+ const SegmentedSectionPtr ptr[3]);
void mt_receive_lock();
void mt_receive_unlock();
| Thread |
|---|
| • bk commit into 5.1 tree (knielsen:1.2625) | knielsen | 19 Nov |