Below is the list of changes that have just been committed into a local
5.1 repository of knielsen. When knielsen does a push these changes will
be propagated to the main repository and, within 24 hours after the
push, to the public repository.
For information on how to access the public repository
see http://dev.mysql.com/doc/mysql/en/installing-source-tree.html
ChangeSet@stripped, 2007-12-14 10:17:50+01:00, knielsen@ymer.(none) +1 -0
WL#1498: Multi-threaded ndbd.
Fix race condition in yield/wakeup.
storage/ndb/src/kernel/vm/mt/mt.cpp@stripped, 2007-12-14 10:17:47+01:00, knielsen@ymer.(none) +62 -6
Fix race condition in yield/wakeup.
diff -Nrup a/storage/ndb/src/kernel/vm/mt/mt.cpp b/storage/ndb/src/kernel/vm/mt/mt.cpp
--- a/storage/ndb/src/kernel/vm/mt/mt.cpp 2007-12-12 11:53:03 +01:00
+++ b/storage/ndb/src/kernel/vm/mt/mt.cpp 2007-12-14 10:17:47 +01:00
@@ -50,6 +50,7 @@ static const Uint32 MAX_SIGNALS_PER_JB =
#define NDB_MT_LOCK_TO_CPU
+#define USE_FUTEX
static const Uint32 NUM_THREADS = 3;
static const Uint32 RECEIVER_THREAD_NO = 2;
@@ -99,14 +100,35 @@ struct thr_wait
thr_wait() { xcng(&m_futex_state, FS_RUNNING);}
};
+/**
+ * Sleep until woken up or timeout occurs.
+ *
+ * Will call check_callback(check_arg) after proper synchronisation, and only
+ * if that returns true will it actually sleep, else it will return
+ * immediately. This is needed to avoid races with wakeup.
+ */
static
void
-yield(struct thr_wait* wait, const struct timespec *timeout)
+yield(struct thr_wait* wait, const struct timespec *timeout,
+ bool (*check_callback)(void *), void *check_arg)
{
volatile unsigned * val = &wait->m_futex_state;
int old = xcng(val, thr_wait::FS_SLEEPING);
assert(old == thr_wait::FS_RUNNING);
- int ret = futex_wait(val, thr_wait::FS_SLEEPING, timeout);
+
+ /**
+ * At this point, we need to re-check the condition that made us decide to
+ * sleep, and skip sleeping if it changed..
+ *
+ * Otherwise, the condition may have not changed, and the thread making the
+ * change have already decided not to wake us, as our state was FS_RUNNING
+ * at the time.
+ *
+ * Also need a memory barrier to ensure this extra check is race-free.
+ */
+ mb();
+ if ((*check_callback)(check_arg))
+ futex_wait(val, thr_wait::FS_SLEEPING, timeout);
xcng(val, thr_wait::FS_RUNNING);
}
@@ -115,6 +137,14 @@ int
wakeup(struct thr_wait* wait)
{
volatile unsigned * val = &wait->m_futex_state;
+ /**
+ * Need a memory barrier here to avoid races causing us to loose the wakeup.
+ *
+ * We must ensure that any state update (new data in buffers...) are visible
+ * to the other thread before we can look at the sleep state of that other
+ * thread.
+ */
+ mb();
if (xcng(val, thr_wait::FS_RUNNING) == thr_wait::FS_SLEEPING)
{
return futex_wake(val);
@@ -132,22 +162,27 @@ struct thr_wait
thr_wait() {
m_mutex = NdbMutex_Create();
m_cond = NdbCondition_Create();
- NdbMutex_Lock(m_mutex);
}
};
static
void
-yield(struct thr_wait* wait, const struct timespec *timeout)
+yield(struct thr_wait* wait, const struct timespec *timeout,
+ bool (*check_callback)(void *), void *check_arg)
{
- NdbCondition_WaitTimeout(wait->m_cond, wait->m_mutex, 1);
+ NdbMutex_Lock(wait->m_mutex);
+ if ((*check_callback)(check_arg))
+ NdbCondition_WaitTimeout(wait->m_cond, wait->m_mutex, 1);
+ NdbMutex_Unlock(wait->m_mutex);
}
static
int
wakeup(struct thr_wait* wait)
{
+ NdbMutex_Lock(wait->m_mutex);
NdbCondition_Signal(wait->m_cond);
+ NdbMutex_Unlock(wait->m_mutex);
return 0;
}
#endif
@@ -1019,6 +1054,27 @@ read_jba_state(thr_data *selfptr)
selfptr->m_jba_read_state.m_write_pos = jba->m_buffers[index]->m_len;
}
+/* Check all job queues, return true only if all are empty. */
+static bool
+check_queues_empty(void *data)
+{
+ Uint32 thr_count = g_thr_repository.m_thread_count;
+ thr_data *selfptr = reinterpret_cast<thr_data *>(data);
+
+ read_jbb_state(selfptr, thr_count);
+ read_jba_state(selfptr);
+ const thr_jb_read_state *r = &(selfptr->m_jba_read_state);
+ if (r->m_read_index < r->m_write_index || r->m_read_pos < r->m_write_pos)
+ return false;
+ for (Uint32 i = 0; i < thr_count; i++)
+ {
+ r = selfptr->m_read_states + i;;
+ if (r->m_read_index < r->m_write_index || r->m_read_pos < r->m_write_pos)
+ return false;
+ }
+ return true;
+}
+
/*
* Execute at most MAX_SIGNALS signals from one job queue, updating local read
* state as appropriate.
@@ -1385,7 +1441,7 @@ mt_job_thread_main(void *thr_arg)
if (sum == 0)
{
- yield(&selfptr->m_waiter, &nowait);
+ yield(&selfptr->m_waiter, &nowait, check_queues_empty, selfptr);
}
}
| Thread |
|---|
| • bk commit into 5.1 tree (knielsen:1.2634) | knielsen | 14 Dec |