4839 jonas oreland 2012-01-31
ndb - wl-5929 - add various padding to separate variables accessed by multiple threads
modified:
storage/ndb/src/kernel/vm/mt.cpp
4838 Frazer Clement 2012-01-31
Align #include behaviour with 7.2
modified:
sql/ndb_conflict.cc
sql/ndb_conflict.h
=== modified file 'storage/ndb/src/kernel/vm/mt.cpp'
--- a/storage/ndb/src/kernel/vm/mt.cpp 2012-01-31 07:37:40 +0000
+++ b/storage/ndb/src/kernel/vm/mt.cpp 2012-01-31 15:45:02 +0000
@@ -144,6 +144,7 @@ futex_wake(volatile unsigned * addr)
struct thr_wait
{
volatile unsigned m_futex_state;
+ char padding[NDB_CL_PADSZ(sizeof(unsigned))];
enum {
FS_RUNNING = 0,
FS_SLEEPING = 1
@@ -218,9 +219,10 @@ wakeup(struct thr_wait* wait)
struct thr_wait
{
- bool m_need_wakeup;
NdbMutex *m_mutex;
NdbCondition *m_cond;
+ bool m_need_wakeup;
+ char padding[NDB_CL_PAD_SZ(sizeof(bool) + (2*sizeof(void*)))];
thr_wait() : m_need_wakeup(false), m_mutex(0), m_cond(0) {}
void init() {
@@ -891,7 +893,44 @@ struct thr_data
THR_SEND_BUFFER_MAX_FREE,
THR_SEND_BUFFER_ALLOC_SIZE) {}
- thr_wait m_waiter;
+ /**
+ * We start with the data structures that are shared globally to
+ * ensure that they get the proper padding
+ */
+ thr_wait m_waiter; /* Cacheline aligned*/
+
+ /*
+ * Prio A signal incoming queue. This area is used from many threads
+ * protected by the spin lock. Thus it is also important to protect
+ * surrounding thread-local variables from CPU cache line sharing
+ * with this part.
+ */
+ struct thr_spin_lock<64> m_jba_write_lock;
+
+ struct thr_job_queue m_jba; /* aligned */
+ struct thr_job_queue_head m_jba_head;
+ char unused_protection1[NDB_CL_PADSZ(sizeof(struct thr_job_queue_head))];
+
+ /*
+ * These are the thread input queues, where other threads deliver signals
+ * into.
+ * Protect the m_in_queue_head by empty cache line to ensure that we don't
+ * get false CPU cacheline sharing. These cache lines are going to be
+ * updated by many different CPU's all the time whereas other neighbour
+ * variables are thread-local variables.
+ */
+ struct thr_job_queue_head m_in_queue_head[MAX_BLOCK_THREADS];
+ char unused_protection2[
+ NDB_CL_PADSZ(MAX_BLOCK_THREADS*sizeof(struct thr_job_queue_head))];
+ struct thr_job_queue m_in_queue[MAX_BLOCK_THREADS];
+
+ /**
+ * The remainder of the variables in thr_data are thread-local,
+ * meaning that they are always updated by the thread that owns those
+ * data structures and thus those variables aren't shared with other
+ * CPUs.
+ */
+
unsigned m_thr_no;
/**
@@ -907,19 +946,6 @@ struct thr_data
Uint64 m_time;
struct thr_tq m_tq;
- /* Prio A signal incoming queue. */
- struct thr_spin_lock<64> m_jba_write_lock;
- struct thr_job_queue m_jba;
-
- struct thr_job_queue_head m_jba_head;
-
- /* Thread-local read state of prio A buffer. */
- struct thr_jb_read_state m_jba_read_state;
- /*
- * There is no m_jba_write_state, as we have multiple writers to the prio A
- * queue, so local state becomes invalid as soon as we release the lock.
- */
-
/*
* In m_next_buffer we keep a free buffer at all times, so that when
* we hold the lock and find we need a new buffer, we can use this and this
@@ -938,12 +964,15 @@ struct thr_data
/* m_first_unused is the first unused entry in m_free_fifo. */
Uint32 m_first_unused;
+
+ /* Thread-local read state of prio A buffer. */
+ struct thr_jb_read_state m_jba_read_state;
+
/*
- * These are the thread input queues, where other threads deliver signals
- * into.
+ * There is no m_jba_write_state, as we have multiple writers to the prio A
+ * queue, so local state becomes invalid as soon as we release the lock.
*/
- struct thr_job_queue_head m_in_queue_head[MAX_BLOCK_THREADS];
- struct thr_job_queue m_in_queue[MAX_BLOCK_THREADS];
+
/* These are the write states of m_in_queue[self] in each thread. */
struct thr_jb_write_state m_write_states[MAX_BLOCK_THREADS];
/* These are the read states of all of our own m_in_queue[]. */
@@ -994,6 +1023,13 @@ struct thr_data
NdbThread* m_thread;
};
+struct thr_data_aligned
+{
+ struct thr_data m_thr_data;
+ /* Ensure that the thr_data is aligned on a cacheline boundary */
+ char unused_protection[NDB_CL_PADSZ(sizeof(struct thr_data))];
+};
+
struct mt_send_handle : public TransporterSendBufferHandle
{
struct thr_data * m_selfptr;
@@ -1034,18 +1070,29 @@ struct thr_repository
m_sb_pool("sendbufferpool")
{}
+ /**
+ * m_receive_lock, m_section_lock, m_mem_manager_lock, m_jb_pool
+ * and m_sb_pool are allvariables globally shared among the threads
+ * and also heavily updated.
+ */
struct thr_spin_lock<64> m_receive_lock[MAX_NDBMT_RECEIVE_THREADS];
struct thr_spin_lock<64> m_section_lock;
struct thr_spin_lock<64> m_mem_manager_lock;
+ /* thr_safe_pool is aligned to be also 64 bytes in size */
struct thr_safe_pool<thr_job_buffer> m_jb_pool;
struct thr_safe_pool<thr_send_page> m_sb_pool;
+ /* m_mm and m_thread_count are globally shared and read only variables */
Ndbd_mem_manager * m_mm;
unsigned m_thread_count;
- struct thr_data m_thread[MAX_BLOCK_THREADS];
-
/**
- * send buffer handling
+ * Protect m_mm and m_thread_count from CPU cache misses, first
+ * part of m_thread (struct thr_data) is globally shared variables.
+ * So sharing cache line with these for these read only variables
+ * isn't a good idea
*/
+ char protection_unused[NDB_CL_PADSZ(sizeof(void*) + sizeof(unsigned))];
+
+ struct thr_data_aligned m_thread[MAX_BLOCK_THREADS];
/* The buffers that are to be sent */
struct send_buffer
@@ -1602,7 +1649,7 @@ thr_job_buffer*
seize_buffer(struct thr_repository* rep, int thr_no, bool prioa)
{
thr_job_buffer* jb;
- thr_data* selfptr = rep->m_thread + thr_no;
+ struct thr_data* selfptr = &rep->m_thread[thr_no].m_thr_data;
Uint32 first_free = selfptr->m_first_free;
Uint32 first_unused = selfptr->m_first_unused;
@@ -1664,7 +1711,7 @@ static
void
release_buffer(struct thr_repository* rep, int thr_no, thr_job_buffer* jb)
{
- struct thr_data* selfptr = rep->m_thread + thr_no;
+ struct thr_data* selfptr = &rep->m_thread[thr_no].m_thr_data;
Uint32 first_free = selfptr->m_first_free;
Uint32 first_unused = selfptr->m_first_unused;
@@ -1900,7 +1947,7 @@ void
senddelay(Uint32 thr_no, const SignalHeader* s, Uint32 delay)
{
struct thr_repository* rep = &g_thr_repository;
- struct thr_data * selfptr = rep->m_thread + thr_no;
+ struct thr_data* selfptr = &rep->m_thread[thr_no].m_thr_data;
assert(pthread_equal(selfptr->m_thr_id, pthread_self()));
unsigned siglen = (sizeof(*s) >> 2) + s->theLength + s->m_noOfSections;
@@ -2052,11 +2099,12 @@ flush_jbb_write_state(thr_data *selfptr)
Uint32 self = selfptr->m_thr_no;
thr_jb_write_state *w = selfptr->m_write_states;
- thr_data *thrptr = g_thr_repository.m_thread;
- for (Uint32 thr_no = 0; thr_no < thr_count; thr_no++, thrptr++, w++)
+ thr_data_aligned *thr_align_ptr = g_thr_repository.m_thread;
+ for (Uint32 thr_no = 0; thr_no < thr_count; thr_no++, thr_align_ptr++, w++)
{
if (w->m_pending_signals || w->m_pending_signals_wakeup)
{
+ struct thr_data *thrptr = &thr_align_ptr->m_thr_data;
w->m_pending_signals_wakeup = MAX_SIGNALS_BEFORE_WAKEUP;
thr_job_queue_head *q_head = thrptr->m_in_queue_head + self;
flush_write_state(selfptr, thrptr, q_head, w);
@@ -2077,8 +2125,8 @@ check_job_buffers(struct thr_repository*
{
const Uint32 minfree = (1024 + MIN_SIGNALS_PER_PAGE - 1)/MIN_SIGNALS_PER_PAGE;
unsigned thr_no = first_receiver_thread_no + recv_thread_id;
- const thr_data *thrptr = rep->m_thread;
- for (unsigned i = 0; i<num_threads; i++, thrptr++)
+ const thr_data_aligned *thr_align_ptr = rep->m_thread;
+ for (unsigned i = 0; i<num_threads; i++, thr_align_ptr++)
{
/**
* NOTE: m_read_index is read wo/ lock (and updated by different thread)
@@ -2087,6 +2135,7 @@ check_job_buffers(struct thr_repository*
* function is always conservative (i.e it can be better than
* returned value, if read-index has moved but we didnt see it)
*/
+ const struct thr_data *thrptr = &thr_align_ptr->m_thr_data;
const thr_job_queue_head *q_head = thrptr->m_in_queue_head + thr_no;
unsigned ri = q_head->m_read_index;
unsigned wi = q_head->m_write_index;
@@ -2122,9 +2171,9 @@ compute_max_signals_to_execute(Uint32 th
{
Uint32 minfree = thr_job_queue::SIZE;
const struct thr_repository* rep = &g_thr_repository;
- const thr_data *thrptr = rep->m_thread;
+ const struct thr_data_aligned *thr_align_ptr = rep->m_thread;
- for (unsigned i = 0; i<num_threads; i++, thrptr++)
+ for (unsigned i = 0; i<num_threads; i++, thr_align_ptr++)
{
/**
* NOTE: m_read_index is read wo/ lock (and updated by different thread)
@@ -2133,6 +2182,7 @@ compute_max_signals_to_execute(Uint32 th
* function is always conservative (i.e it can be better than
* returned value, if read-index has moved but we didnt see it)
*/
+ const struct thr_data *thrptr = &thr_align_ptr->m_thr_data;
const thr_job_queue_head *q_head = thrptr->m_in_queue_head + thr_no;
unsigned ri = q_head->m_read_index;
unsigned wi = q_head->m_write_index;
@@ -2537,7 +2587,8 @@ trp_callback::bytes_sent(NodeId node, Ui
assert(thr_no != NO_SEND_THREAD);
if (!is_send_thread(thr_no))
{
- return ::bytes_sent(&g_thr_repository.m_thread[thr_no].m_send_buffer_pool,
+ thr_data * thrptr = &g_thr_repository.m_thread[thr_no].m_thr_data;
+ return ::bytes_sent(&thrptr->m_send_buffer_pool,
sb, bytes);
}
else
@@ -3220,7 +3271,7 @@ add_thr_map(Uint32 main, Uint32 instance
require(thr_no < num_threads);
struct thr_repository* rep = &g_thr_repository;
- thr_data* thr_ptr = rep->m_thread + thr_no;
+ struct thr_data* thr_ptr = &rep->m_thread[thr_no].m_thr_data;
/* Add to list. */
{
@@ -3818,9 +3869,9 @@ sendlocal(Uint32 self, const SignalHeade
Uint32 dst = block2ThreadId(block, instance);
struct thr_repository* rep = &g_thr_repository;
- struct thr_data * selfptr = rep->m_thread + self;
+ struct thr_data *selfptr = &rep->m_thread[self].m_thr_data;
assert(pthread_equal(selfptr->m_thr_id, pthread_self()));
- struct thr_data * dstptr = rep->m_thread + dst;
+ struct thr_data *dstptr = &rep->m_thread[dst].m_thr_data;
selfptr->m_stat.m_priob_count++;
Uint32 siglen = (sizeof(*s) >> 2) + s->theLength + s->m_noOfSections;
@@ -3846,10 +3897,10 @@ sendprioa(Uint32 self, const SignalHeade
Uint32 dst = block2ThreadId(block, instance);
struct thr_repository* rep = &g_thr_repository;
- struct thr_data *selfptr = rep->m_thread + self;
+ struct thr_data *selfptr = &rep->m_thread[self].m_thr_data;
assert(s->theVerId_signalNumber == GSN_START_ORD ||
pthread_equal(selfptr->m_thr_id, pthread_self()));
- struct thr_data *dstptr = rep->m_thread + dst;
+ struct thr_data *dstptr = &rep->m_thread[dst].m_thr_data;
selfptr->m_stat.m_prioa_count++;
Uint32 siglen = (sizeof(*s) >> 2) + s->theLength + s->m_noOfSections;
@@ -3889,7 +3940,7 @@ mt_send_remote(Uint32 self, const Signal
const LinearSectionPtr ptr[3])
{
thr_repository *rep = &g_thr_repository;
- thr_data *selfptr = rep->m_thread + self;
+ struct thr_data *selfptr = &rep->m_thread[self].m_thr_data;
SendStatus ss;
mt_send_handle handle(selfptr);
@@ -3907,7 +3958,7 @@ mt_send_remote(Uint32 self, const Signal
const SegmentedSectionPtr ptr[3])
{
thr_repository *rep = &g_thr_repository;
- thr_data *selfptr = rep->m_thread + self;
+ struct thr_data *selfptr = &rep->m_thread[self].m_thr_data;
SendStatus ss;
mt_send_handle handle(selfptr);
@@ -3940,7 +3991,7 @@ sendprioa_STOP_FOR_CRASH(const struct th
/**
* Pick any instance running in this thread
*/
- struct thr_data * dstptr = rep->m_thread + dst;
+ struct thr_data *dstptr = &rep->m_thread[dst].m_thr_data;
Uint32 bno = dstptr->m_instance_list[0];
memset(&signalT.header, 0, sizeof(SignalHeader));
@@ -4058,7 +4109,7 @@ thr_init2(struct thr_repository* rep, st
selfptr->m_write_states[i].m_write_index = 0;
selfptr->m_write_states[i].m_write_pos = 0;
selfptr->m_write_states[i].m_write_buffer =
- rep->m_thread[i].m_in_queue[thr_no].m_buffers[0];
+ rep->m_thread[i].m_thr_data.m_in_queue[thr_no].m_buffers[0];
selfptr->m_write_states[i].m_pending_signals = 0;
selfptr->m_write_states[i].m_pending_signals_wakeup = 0;
}
@@ -4097,11 +4148,11 @@ rep_init(struct thr_repository* rep, uns
rep->m_thread_count = cnt;
for (unsigned int i = 0; i<cnt; i++)
{
- thr_init(rep, rep->m_thread + i, cnt, i);
+ thr_init(rep, &rep->m_thread[i].m_thr_data, cnt, i);
}
for (unsigned int i = 0; i<cnt; i++)
{
- thr_init2(rep, rep->m_thread + i, cnt, i);
+ thr_init2(rep, &rep->m_thread[i].m_thr_data, cnt, i);
}
rep->stopped_threads = 0;
@@ -4356,7 +4407,7 @@ ThreadConfig::ipControlLoop(NdbThread* p
*/
for (thr_no = 0; thr_no < num_threads; thr_no++)
{
- rep->m_thread[thr_no].m_time = NdbTick_CurrentMillisecond();
+ rep->m_thread[thr_no].m_thr_data.m_time = NdbTick_CurrentMillisecond();
if (thr_no == first_receiver_thread_no)
continue; // Will run in the main thread.
@@ -4368,30 +4419,30 @@ ThreadConfig::ipControlLoop(NdbThread* p
if (thr_no < first_receiver_thread_no)
{
/* Start block threads */
- rep->m_thread[thr_no].m_thread =
+ rep->m_thread[thr_no].m_thr_data.m_thread =
NdbThread_Create(mt_job_thread_main,
(void **)(rep->m_thread + thr_no),
1024*1024,
"execute thread", //ToDo add number
NDB_THREAD_PRIO_MEAN);
- require(rep->m_thread[thr_no].m_thread != NULL);
+ require(rep->m_thread[thr_no].m_thr_data.m_thread != NULL);
}
else
{
/* Start a receiver thread, also block thread for TRPMAN */
- rep->m_thread[thr_no].m_thread =
+ rep->m_thread[thr_no].m_thr_data.m_thread =
NdbThread_Create(mt_receiver_thread_main,
- (void **)(rep->m_thread + thr_no),
+ (void **)(&rep->m_thread[thr_no].m_thr_data),
1024*1024,
"receive thread", //ToDo add number
NDB_THREAD_PRIO_MEAN);
- require(rep->m_thread[thr_no].m_thread != NULL);
+ require(rep->m_thread[thr_no].m_thr_data.m_thread != NULL);
}
}
/* Now run the main loop for first receiver thread directly. */
- rep->m_thread[first_receiver_thread_no].m_thread = pThis;
- mt_receiver_thread_main(&(rep->m_thread[first_receiver_thread_no]));
+ rep->m_thread[first_receiver_thread_no].m_thr_data.m_thread = pThis;
+ mt_receiver_thread_main(&(rep->m_thread[first_receiver_thread_no].m_thr_data));
/* Wait for all threads to shutdown. */
for (thr_no = 0; thr_no < num_threads; thr_no++)
@@ -4399,8 +4450,9 @@ ThreadConfig::ipControlLoop(NdbThread* p
if (thr_no == first_receiver_thread_no)
continue;
void *dummy_return_status;
- NdbThread_WaitFor(rep->m_thread[thr_no].m_thread, &dummy_return_status);
- NdbThread_Destroy(&(rep->m_thread[thr_no].m_thread));
+ NdbThread_WaitFor(rep->m_thread[thr_no].m_thr_data.m_thread,
+ &dummy_return_status);
+ NdbThread_Destroy(&(rep->m_thread[thr_no].m_thr_data.m_thread));
}
/* Delete send threads, includes waiting for threads to shutdown */
@@ -4477,7 +4529,8 @@ FastScheduler::traceDumpGetJam(Uint32 th
thrdTheEmulatedJam = NULL;
thrdTheEmulatedJamIndex = 0;
#else
- const EmulatedJamBuffer *jamBuffer = &g_thr_repository.m_thread[thr_no].m_jam;
+ const EmulatedJamBuffer *jamBuffer =
+ &g_thr_repository.m_thread[thr_no].m_thr_data.m_jam;
thrdTheEmulatedJam = jamBuffer->theEmulatedJam;
thrdTheEmulatedJamIndex = jamBuffer->theEmulatedJamIndex;
jamBlockNumber = jamBuffer->theEmulatedJamBlockNumber;
@@ -4611,7 +4664,7 @@ FastScheduler::dumpSignalMemory(Uint32 t
Uint32 seq_start = 0;
Uint32 seq_end = 0;
- const thr_data *thr_ptr = &rep->m_thread[thr_no];
+ const struct thr_data *thr_ptr = &rep->m_thread[thr_no].m_thr_data;
if (watchDogCounter)
*watchDogCounter = 4;
@@ -4905,7 +4958,7 @@ void
mt_wakeup(class SimulatedBlock* block)
{
Uint32 thr_no = block->getThreadId();
- thr_data *thrptr = g_thr_repository.m_thread + thr_no;
+ struct thr_data *thrptr = &g_thr_repository.m_thread[thr_no].m_thr_data;
wakeup(&thrptr->m_waiter);
}
@@ -4914,7 +4967,7 @@ void
mt_assert_own_thread(SimulatedBlock* block)
{
Uint32 thr_no = block->getThreadId();
- thr_data *thrptr = g_thr_repository.m_thread + thr_no;
+ struct thr_data *thrptr = &g_thr_repository.m_thread[thr_no].m_thr_data;
if (unlikely(pthread_equal(thrptr->m_thr_id, pthread_self()) == 0))
{
@@ -4930,7 +4983,7 @@ Uint32
mt_get_blocklist(SimulatedBlock * block, Uint32 arr[], Uint32 len)
{
Uint32 thr_no = block->getThreadId();
- thr_data *thr_ptr = g_thr_repository.m_thread + thr_no;
+ struct thr_data *thr_ptr = &g_thr_repository.m_thread[thr_no].m_thr_data;
for (Uint32 i = 0; i < thr_ptr->m_instance_count; i++)
{
@@ -4945,7 +4998,7 @@ mt_get_thr_stat(class SimulatedBlock * b
{
bzero(dst, sizeof(* dst));
Uint32 thr_no = block->getThreadId();
- thr_data *selfptr = g_thr_repository.m_thread + thr_no;
+ struct thr_data *selfptr = &g_thr_repository.m_thread[thr_no].m_thr_data;
THRConfigApplier & conf = globalEmulatorData.theConfiguration->m_thr_config;
dst->thr_no = thr_no;
No bundle (reason: useless for push emails).
| Thread |
|---|
| • bzr push into mysql-5.1-telco-7.0 branch (jonas.oreland:4838 to 4839) | jonas oreland | 31 Jan |