Below is the list of changes that have just been committed into a local
5.1 repository of knielsen. When knielsen does a push these changes will
be propagated to the main repository and, within 24 hours after the
push, to the public repository.
For information on how to access the public repository
see http://dev.mysql.com/doc/mysql/en/installing-source-tree.html
ChangeSet@stripped, 2007-11-14 14:46:13+01:00, knielsen@ymer.(none) +2 -0
WL#1498: Multi-threaded ndbd.
Split thread_main for receiver thread and job execution threads.
storage/ndb/src/kernel/vm/SimulatedBlock.cpp@stripped, 2007-11-14 14:46:10+01:00, knielsen@ymer.(none) +2 -2
Small fix.
storage/ndb/src/kernel/vm/mt/mt.cpp@stripped, 2007-11-14 14:46:10+01:00, knielsen@ymer.(none) +104 -71
Split thread_main for receiver thread and job execution threads.
diff -Nrup a/storage/ndb/src/kernel/vm/SimulatedBlock.cpp b/storage/ndb/src/kernel/vm/SimulatedBlock.cpp
--- a/storage/ndb/src/kernel/vm/SimulatedBlock.cpp 2007-11-13 13:42:10 +01:00
+++ b/storage/ndb/src/kernel/vm/SimulatedBlock.cpp 2007-11-14 14:46:10 +01:00
@@ -400,8 +400,8 @@ SimulatedBlock::sendSignalFromReceiver(B
/* Now lock the receiver and deliver into queue 2 (receiver thread). */
mt_receive_lock();
jobBuffer == JBB ?
- sendlocal(2, recBlock, &signal->header) :
- sendprioa(2, recBlock, &signal->header);
+ sendlocal(receiverThreadId, recBlock, &signal->header) :
+ sendprioa(receiverThreadId, recBlock, &signal->header);
mt_receive_unlock();
#endif
}
diff -Nrup a/storage/ndb/src/kernel/vm/mt/mt.cpp b/storage/ndb/src/kernel/vm/mt/mt.cpp
--- a/storage/ndb/src/kernel/vm/mt/mt.cpp 2007-11-14 13:26:46 +01:00
+++ b/storage/ndb/src/kernel/vm/mt/mt.cpp 2007-11-14 14:46:10 +01:00
@@ -49,6 +49,7 @@ static const Uint32 MAX_SIGNALS_PER_JB =
#define NDB_MT_LOCK_TO_CPU
static const Uint32 NUM_THREADS = 3;
+static const Uint32 RECEIVER_THREAD_NO = 2;
#define MAX_THREADS 4
static inline
@@ -820,26 +821,6 @@ do_send(struct thr_repository* rep, stru
unlock(&rep->m_send_lock);
}
-Uint32 receiverThreadId;
-
-static
-void
-do_receive(struct thr_repository*rep, struct thr_data* selfptr, unsigned delay,
- Uint32 *watchDogCounter)
-{
- unsigned thr_no = selfptr->m_thr_no;
- *watchDogCounter = 7;
- receiverThreadId = thr_no;
- if (globalTransporterRegistry.pollReceive(delay))
- {
- *watchDogCounter = 8;
- globalTransporterRegistry.performReceive();
- }
- unlock(&rep->m_receive_lock);
-
- flush_jbb_write_state(selfptr);
-}
-
static
inline
void
@@ -1145,9 +1126,95 @@ update_sched_stats(thr_data *selfptr)
}
}
+static void
+init_thread(thr_data *selfptr, EmulatedJamBuffer *jam, Uint32 *watchDogCounter)
+{
+ jam->theEmulatedJamIndex = 0;
+ jam->theEmulatedJamBlockNumber = 0;
+ memset(jam->theEmulatedJam, 0, sizeof(jam->theEmulatedJam));
+ NdbThread_SetTlsKey(NDB_THREAD_TLS_JAM, jam);
+ selfptr->m_jam = jam;
+
+ selfptr->m_watchdog_counter = watchDogCounter;
+ unsigned thr_no = selfptr->m_thr_no;
+ globalEmulatorData.theWatchDog->registerWatchedThread(watchDogCounter,
+ thr_no);
+
+ NdbThread_SetTlsKey(NDB_THREAD_TLS_THREAD, selfptr);
+
+ pid_t tid = (unsigned)syscall(SYS_gettid);
+ ndbout_c("Tread %u started, tid=%u", thr_no, tid);
+#ifdef NDB_MT_LOCK_TO_CPU
+ uint cpu_no = 1 + (thr_no % 3);
+ cpu_no = (cpu_no >= 2 ? 5 - cpu_no : cpu_no);
+ ndbout_c("lock to cpu %u", cpu_no);
+ {
+ cpu_set_t mask;
+ CPU_ZERO(&mask);
+ CPU_SET(cpu_no, &mask);
+ sched_setaffinity(tid, sizeof(mask), &mask);
+ }
+#endif
+}
+
+Uint32 receiverThreadId;
+
+/*
+ * We only do receive in thread 2, which _only_ does receive.
+ *
+ * Otherwise we have a problem waking up a thread that is sleeping in
+ * the transporter
+ */
extern "C"
void *
-mt_thr_main(void *thr_arg)
+mt_receiver_thread_main(void *thr_arg)
+{
+ struct thr_repository* rep = &g_thr_repository;
+ struct thr_data* selfptr = (struct thr_data *)thr_arg;
+ unsigned thr_no = selfptr->m_thr_no;
+ EmulatedJamBuffer thread_jam;
+ Uint32 watchDogCounter;
+
+ init_thread(selfptr, &thread_jam, &watchDogCounter);
+ receiverThreadId = thr_no;
+
+ while (globalData.theRestartFlag != perform_stop)
+ {
+ watchDogCounter = 7;
+
+ update_sched_stats(selfptr);
+
+ /*
+ * Hm, this sucks, actually.
+ *
+ * We are sleeping in the transporter with the receiver lock held,
+ * and not doing much else.
+ *
+ * So without some kind of lock fairness (which we don't have), we could
+ * be locking out other threads from the receiver for a _long_ time
+ * (currently this means node failure handling in CMVMI).
+ *
+ * We may need to have the transporter release the lock before it does
+ * select()/poll(), and re-aquire...
+ */
+ lock(&rep->m_receive_lock);
+ if (globalTransporterRegistry.pollReceive(1))
+ {
+ watchDogCounter = 8;
+ globalTransporterRegistry.performReceive();
+ }
+ unlock(&rep->m_receive_lock);
+
+ flush_jbb_write_state(selfptr);
+ }
+
+ globalEmulatorData.theWatchDog->unregisterWatchedThread(thr_no);
+ return NULL; // Return value not currently used
+}
+
+extern "C"
+void *
+mt_job_thread_main(void *thr_arg)
{
unsigned char signal_buf[sizeof(Signal) + 63 + 256 * MAX_THREADS];
Signal *signal;
@@ -1158,21 +1225,11 @@ mt_thr_main(void *thr_arg)
Uint32 watchDogCounter;
Uint32 thrSignalId = 0;
- thread_jam.theEmulatedJamIndex = 0;
- thread_jam.theEmulatedJamBlockNumber = 0;
- memset(thread_jam.theEmulatedJam, 0, sizeof(thread_jam.theEmulatedJam));
- NdbThread_SetTlsKey(NDB_THREAD_TLS_JAM, &thread_jam);
-
struct thr_repository* rep = &g_thr_repository;
struct thr_data* selfptr = (struct thr_data *)thr_arg;
- selfptr->m_jam = &thread_jam;
- selfptr->m_watchdog_counter = &watchDogCounter;
- NdbThread_SetTlsKey(NDB_THREAD_TLS_THREAD, selfptr);
- /*
- * The thread initialisation argument is void *, not numeric, so we obtain
- * the numeric thread id in this slightly backwards way.
- */
- unsigned thr_no = selfptr - &(rep->m_thread[0]);
+ init_thread(selfptr, &thread_jam, &watchDogCounter);
+
+ unsigned thr_no = selfptr->m_thr_no;
/*
* Align signal buffer for better cache performance.
* Also skew it a litte for each thread to avoid cache pollution.
@@ -1182,20 +1239,6 @@ mt_thr_main(void *thr_arg)
sigtmp+= thr_no*256;
signal = (Signal *)sigtmp;
- pid_t tid = (unsigned)syscall(SYS_gettid);
- ndbout_c("Tread %u started, tid=%u", thr_no, tid);
-#ifdef NDB_MT_LOCK_TO_CPU
- uint cpu_no = 1 + (thr_no % 3);
- cpu_no = (cpu_no >= 2 ? 5 - cpu_no : cpu_no);
- ndbout_c("lock to cpu %u", cpu_no);
- {
- cpu_set_t mask;
- CPU_ZERO(&mask);
- CPU_SET(cpu_no, &mask);
- sched_setaffinity(tid, sizeof(mask), &mask);
- }
-#endif
-
/*
* Now we need to somehow assign to this thread all blocks that will run in
* this thread.
@@ -1212,8 +1255,6 @@ mt_thr_main(void *thr_arg)
/* Avoid false watchdog alarms caused by race condition. */
watchDogCounter = 1;
- globalEmulatorData.theWatchDog->registerWatchedThread(&watchDogCounter,
- thr_no);
Uint32 send_sum = 0;
while (globalData.theRestartFlag != perform_stop)
@@ -1254,17 +1295,7 @@ mt_thr_main(void *thr_arg)
}
}
- /*
- * We only do receive in thread 2, which _only_ does receive.
- *
- * Otherwise we have a problem waking up a thread that is sleeping in the
- * transporter
- */
- if (thr_no == 2 && trylock(&rep->m_receive_lock) == 0)
- {
- do_receive(rep, selfptr, sum ? 0 : 1, &watchDogCounter);
- }
- else if (sum == 0)
+ if (sum == 0)
{
yield(&selfptr->m_waiter, &nowait);
}
@@ -1282,7 +1313,7 @@ sendlocal(Uint32 self, Uint32 block, con
* to the other thread.
* This parameter found to be reasonable by benchmarking.
*/
- Uint32 MAX_SIGNALS_BEFORE_FLUSH = (self == 2 ? 2 : 20);
+ Uint32 MAX_SIGNALS_BEFORE_FLUSH = (self == RECEIVER_THREAD_NO ? 2 : 20);
Uint32 dst = block2ThreadId(block);
struct thr_repository* rep = &g_thr_repository;
struct thr_data * selfptr = rep->m_thread + self;
@@ -1641,11 +1672,11 @@ void ThreadConfig::ipControlLoop(Uint32
unsigned int i;
unsigned int thr_no;
struct thr_repository* rep = &g_thr_repository;
- NdbThread *threads[NUM_THREADS - 1];
+ NdbThread *threads[NUM_THREADS];
/*
- * Start threads for all execution threads, except for thread 0, which
- * runs in the main thread.
+ * Start threads for all execution threads, except for the receiver
+ * thread, which runs in the main thread.
*/
for (thr_no = 0; thr_no < NUM_THREADS; thr_no++)
{
@@ -1659,26 +1690,28 @@ void ThreadConfig::ipControlLoop(Uint32
}
rep->m_thread[thr_no].m_time = NdbTick_CurrentMillisecond();
- if (thr_no == 0)
+ if (thr_no == RECEIVER_THREAD_NO)
continue; // Will run in the main thread.
/*
* The NdbThread_Create() takes void **, but that is cast to void * when
- * passed to the mt_thr_main(). Which is kind of strange ...
+ * passed to the thread function. Which is kind of strange ...
*/
- threads[thr_no - 1] = NdbThread_Create(mt_thr_main,
+ threads[thr_no] = NdbThread_Create(mt_job_thread_main,
(void **)(rep->m_thread + thr_no),
1024*1024,
"execute thread", //ToDo add number
NDB_THREAD_PRIO_MEAN);
- assert(threads[thr_no - 1] != NULL);
+ assert(threads[thr_no] != NULL);
}
/* Now run the main loop for thread 0 directly. */
- mt_thr_main(&(rep->m_thread[0]));
+ mt_receiver_thread_main(&(rep->m_thread[RECEIVER_THREAD_NO]));
/* Wait for all threads to shutdown. */
- for (thr_no = 1; thr_no < NUM_THREADS; thr_no++)
+ for (thr_no = 0; thr_no < NUM_THREADS; thr_no++)
{
+ if (thr_no == RECEIVER_THREAD_NO)
+ continue;
void *dummy_return_status;
NdbThread_WaitFor(threads[thr_no - 1], &dummy_return_status);
NdbThread_Destroy(&(threads[thr_no - 1]));
| Thread |
|---|
| • bk commit into 5.1 tree (knielsen:1.2621) | knielsen | 14 Nov |