3692 Mikael Ronstrom 2011-12-29
Fixed another place where number of receive threads is bigger than 1
modified:
storage/ndb/src/kernel/vm/SimulatedBlock.hpp
3691 Mikael Ronstrom 2011-12-29
Add a new check
modified:
storage/ndb/src/kernel/vm/mt.cpp
3690 Mikael Ronstrom 2011-12-29
Also receiver threads are block threads and must be accounted for in MAX_BLOCK_THREADS
modified:
storage/ndb/src/kernel/vm/mt.cpp
3689 Mikael Ronstrom 2011-12-29
Don't update bitmask from every thread, only update when it actually needs to toggle a bit
modified:
storage/ndb/include/transporter/TransporterRegistry.hpp
storage/ndb/src/common/transporter/TransporterRegistry.cpp
storage/ndb/src/kernel/vm/mt.cpp
3688 Mikael Ronstrom 2011-12-29
Added debug info
modified:
storage/ndb/src/common/transporter/TransporterRegistry.cpp
storage/ndb/src/kernel/vm/mt.cpp
3687 Mikael Ronstrom 2011-12-29
One more protection against CPU cache misses
modified:
storage/ndb/src/kernel/vm/mt.cpp
3686 Mikael Ronstrom 2011-12-29
Avoid unnecessary protection variables
modified:
storage/ndb/src/kernel/vm/mt.cpp
3685 Mikael Ronstrom 2011-12-29
More protection against false CPU cacheline sharing
modified:
storage/ndb/src/kernel/vm/mt.cpp
3684 Mikael Ronstrom 2011-12-29
minor fix
modified:
storage/ndb/src/kernel/vm/mt.cpp
3683 Mikael Ronstrom 2011-12-29
Avoid false CPU cacheline sharing and move some CPU cache reloads to CPU calculations instead
modified:
storage/ndb/src/kernel/vm/mt.cpp
3682 Mikael Ronstrom 2011-12-27
Fix for platforms with JDK installed but not Java
modified:
storage/ndb/CMakeLists.txt
=== modified file 'storage/ndb/include/transporter/TransporterRegistry.hpp'
--- a/storage/ndb/include/transporter/TransporterRegistry.hpp revid:mikael.ronstrom@stripped
+++ b/storage/ndb/include/transporter/TransporterRegistry.hpp revid:mikael.ronstrom@stripped
@@ -568,7 +568,8 @@ inline void
TransporterRegistry::set_status_overloaded(Uint32 nodeId, bool val)
{
assert(nodeId < MAX_NODES);
- m_status_overloaded.set(nodeId, val);
+ if (val != m_status_overloaded.get(nodeId))
+ m_status_overloaded.set(nodeId, val);
}
inline const NodeBitmask&
=== modified file 'storage/ndb/src/kernel/vm/SimulatedBlock.hpp'
--- a/storage/ndb/src/kernel/vm/SimulatedBlock.hpp revid:mikael.ronstrom@stripped
+++ b/storage/ndb/src/kernel/vm/SimulatedBlock.hpp revid:mikael.ronstrom@stripped
@@ -635,7 +635,10 @@ private:
* are real LQHs run by multiple threads.
*/
protected:
- enum { MaxInstances = 3 + MAX_NDBMT_TC_THREADS + MAX_NDBMT_LQH_WORKERS + 1 };
+ enum { MaxInstances = 3 +
+ MAX_NDBMT_TC_THREADS +
+ MAX_NDBMT_LQH_WORKERS +
+ MAX_NDBMT_RECEIVE_THREADS };
private:
SimulatedBlock** theInstanceList; // set in main, indexed by instance
SimulatedBlock* theMainInstance; // set in all
=== modified file 'storage/ndb/src/kernel/vm/mt.cpp'
--- a/storage/ndb/src/kernel/vm/mt.cpp revid:mikael.ronstrom@stripped
+++ b/storage/ndb/src/kernel/vm/mt.cpp revid:mikael.ronstrom@stripped
@@ -82,7 +82,8 @@ executes thus no signals.
*/
#define MAX_BLOCK_THREADS (NUM_MAIN_THREADS + \
MAX_NDBMT_LQH_THREADS + \
- MAX_NDBMT_TC_THREADS + 1)
+ MAX_NDBMT_TC_THREADS + \
+ MAX_NDBMT_RECEIVE_THREADS)
#define MAX_BLOCK_INSTANCES (MAX_BLOCK_THREADS+1)
/* If this is too small it crashes before first signal. */
@@ -692,9 +693,8 @@ struct thr_job_queue_head
struct thr_job_queue
{
- static const unsigned SIZE = 31;
+ static const unsigned SIZE = 32;
- struct thr_job_queue_head* m_head;
struct thr_job_buffer* m_buffers[SIZE];
};
@@ -885,19 +885,6 @@ struct thr_data
Uint64 m_time;
struct thr_tq m_tq;
- /* Prio A signal incoming queue. */
- struct thr_spin_lock<64> m_jba_write_lock;
- struct thr_job_queue m_jba;
-
- struct thr_job_queue_head m_jba_head;
-
- /* Thread-local read state of prio A buffer. */
- struct thr_jb_read_state m_jba_read_state;
- /*
- * There is no m_jba_write_state, as we have multiple writers to the prio A
- * queue, so local state becomes invalid as soon as we release the lock.
- */
-
/*
* In m_next_buffer we keep a free buffer at all times, so that when
* we hold the lock and find we need a new buffer, we can use this and this
@@ -917,10 +904,35 @@ struct thr_data
Uint32 m_first_unused;
/*
+ * Prio A signal incoming queue. This area is used from many threads
+ * protected by the spin lock. Thus it is also important to protect
+ * surrounding thread-local variables from CPU cache line sharing
+ * with this part.
+ */
+ char unused_protection1[NDB_CL];
+ struct thr_spin_lock<64> m_jba_write_lock;
+ struct thr_job_queue m_jba;
+
+ struct thr_job_queue_head m_jba_head;
+
+ /* Thread-local read state of prio A buffer. */
+ struct thr_jb_read_state m_jba_read_state;
+ /*
+ * There is no m_jba_write_state, as we have multiple writers to the prio A
+ * queue, so local state becomes invalid as soon as we release the lock.
+ */
+
+ /*
* These are the thread input queues, where other threads deliver signals
* into.
+ * Protect the m_in_queue_head by empty cache line to ensure that we don't
+ * get false CPU cacheline sharing. These cache lines are going to be
+ * updated by many different CPU's all the time whereas other neighbour
+ * variables are thread-local variables.
*/
+ char unused_protection2[NDB_CL];
struct thr_job_queue_head m_in_queue_head[MAX_BLOCK_THREADS];
+ char unused_protection3[NDB_CL];
struct thr_job_queue m_in_queue[MAX_BLOCK_THREADS];
/* These are the write states of m_in_queue[self] in each thread. */
struct thr_jb_write_state m_write_states[MAX_BLOCK_THREADS];
@@ -1020,6 +1032,8 @@ struct thr_repository
struct thr_safe_pool<thr_send_page> m_sb_pool;
Ndbd_mem_manager * m_mm;
unsigned m_thread_count;
+ /* Protect m_mm and m_thread_count from CPU cache misses */
+ char protection_unused[NDB_CL];
struct thr_data m_thread[MAX_BLOCK_THREADS];
/**
@@ -2793,7 +2807,8 @@ mt_send_handle::updateWritePtr(NodeId no
*/
static inline
bool
-insert_signal(thr_job_queue *q, thr_jb_write_state *w, Uint32 prioa,
+insert_signal(thr_job_queue *q, thr_job_queue_head *h,
+ thr_jb_write_state *w, Uint32 prioa,
const SignalHeader* sh, const Uint32 *data,
const Uint32 secPtr[3], thr_job_buffer *new_buffer)
{
@@ -2846,7 +2861,7 @@ insert_signal(thr_job_queue *q, thr_jb_w
*
* Or alternatively, ndbrequire() ?
*/
- if (unlikely(write_index == q->m_head->m_read_index))
+ if (unlikely(write_index == h->m_read_index))
{
job_buffer_full(0);
}
@@ -2869,19 +2884,22 @@ read_jbb_state(thr_data *selfptr, Uint32
thr_jb_read_state *r = selfptr->m_read_states;
const thr_job_queue *q = selfptr->m_in_queue;
- for (Uint32 i = 0; i < count; i++,r++,q++)
+ const thr_job_queue_head *h = selfptr->m_in_queue_head;
+ for (Uint32 i = 0; i < count; i++,r++)
{
Uint32 read_index = r->m_read_index;
/**
* Optimization: Only reload when possibly empty.
* Avoid cache reload of shared thr_job_queue_head
+ * Read head directly to avoid unnecessary cache
+ * load of first cache line of m_in_queue entry.
*/
if (r->m_write_index == read_index)
{
- r->m_write_index = q->m_head->m_write_index;
+ r->m_write_index = h[i].m_write_index;
read_barrier_depends();
- r->m_read_end = q->m_buffers[read_index]->m_len;
+ r->m_read_end = q[i].m_buffers[read_index]->m_len;
}
}
}
@@ -2924,7 +2942,9 @@ check_queues_empty(thr_data *selfptr)
*/
static
Uint32
-execute_signals(thr_data *selfptr, thr_job_queue *q, thr_jb_read_state *r,
+execute_signals(thr_data *selfptr,
+ thr_job_queue *q, thr_job_queue_head *h,
+ thr_jb_read_state *r,
Signal *sig, Uint32 max_signals, Uint32 *signalIdCounter)
{
Uint32 num_signals;
@@ -2957,7 +2977,7 @@ execute_signals(thr_data *selfptr, thr_j
read_pos = 0;
read_end = read_buffer->m_len;
/* Update thread-local read state. */
- r->m_read_index = q->m_head->m_read_index = read_index;
+ r->m_read_index = h->m_read_index = read_index;
r->m_read_buffer = read_buffer;
r->m_read_pos = read_pos;
r->m_read_end = read_end;
@@ -3042,22 +3062,25 @@ run_job_buffers(thr_data *selfptr, Signa
rmb();
thr_job_queue *queue = selfptr->m_in_queue;
+ thr_job_queue_head *head = selfptr->m_in_queue_head;
thr_jb_read_state *read_state = selfptr->m_read_states;
for (Uint32 send_thr_no = 0; send_thr_no < thr_count;
- send_thr_no++,queue++,read_state++)
+ send_thr_no++,queue++,read_state++,head++)
{
/* Read the prio A state often, to avoid starvation of prio A. */
bool jba_empty = read_jba_state(selfptr);
if (!jba_empty)
{
static Uint32 max_prioA = thr_job_queue::SIZE * thr_job_buffer::SIZE;
- signal_count += execute_signals(selfptr, &(selfptr->m_jba),
+ signal_count += execute_signals(selfptr,
+ &(selfptr->m_jba),
+ &(selfptr->m_jba_head),
&(selfptr->m_jba_read_state), sig,
max_prioA, signalIdCounter);
}
/* Now execute prio B signals from one thread. */
- signal_count += execute_signals(selfptr, queue, read_state,
+ signal_count += execute_signals(selfptr, queue, head, read_state,
sig, perjb, signalIdCounter);
}
@@ -3098,6 +3121,7 @@ add_thr_map(Uint32 main, Uint32 instance
Uint32 block = numberToBlock(main, instance);
require(thr_no < num_threads);
+ require(block <= MAX_BLOCK_NO && block >= MIN_BLOCK_NO);
struct thr_repository* rep = &g_thr_repository;
thr_data* thr_ptr = rep->m_thread + thr_no;
@@ -3686,13 +3710,14 @@ sendlocal(Uint32 self, const SignalHeade
selfptr->m_stat.m_priob_size += siglen;
thr_job_queue *q = dstptr->m_in_queue + self;
+ thr_job_queue_head *h = dstptr->m_in_queue_head + self;
thr_jb_write_state *w = selfptr->m_write_states + dst;
- if (insert_signal(q, w, false, s, data, secPtr, selfptr->m_next_buffer))
+ if (insert_signal(q, h, w, false, s, data, secPtr, selfptr->m_next_buffer))
{
selfptr->m_next_buffer = seize_buffer(rep, self, false);
}
if (w->m_pending_signals >= MAX_SIGNALS_BEFORE_FLUSH)
- flush_write_state(selfptr, dstptr, q->m_head, w);
+ flush_write_state(selfptr, dstptr, h, w);
}
void
@@ -3714,20 +3739,21 @@ sendprioa(Uint32 self, const SignalHeade
selfptr->m_stat.m_prioa_size += siglen;
thr_job_queue *q = &(dstptr->m_jba);
+ thr_job_queue_head *h = &(dstptr->m_jba_head);
thr_jb_write_state w;
lock(&dstptr->m_jba_write_lock);
- Uint32 index = q->m_head->m_write_index;
+ Uint32 index = h->m_write_index;
w.m_write_index = index;
thr_job_buffer *buffer = q->m_buffers[index];
w.m_write_buffer = buffer;
w.m_write_pos = buffer->m_len;
w.m_pending_signals = 0;
w.m_pending_signals_wakeup = MAX_SIGNALS_BEFORE_WAKEUP;
- bool buf_used = insert_signal(q, &w, true, s, data, secPtr,
+ bool buf_used = insert_signal(q, h, &w, true, s, data, secPtr,
selfptr->m_next_buffer);
- flush_write_state(selfptr, dstptr, q->m_head, &w);
+ flush_write_state(selfptr, dstptr, h, &w);
unlock(&dstptr->m_jba_write_lock);
@@ -3812,20 +3838,21 @@ sendprioa_STOP_FOR_CRASH(const struct th
stopForCrash->flags = 0;
thr_job_queue *q = &(dstptr->m_jba);
+ thr_job_queue_head *h = &(dstptr->m_jba_head);
thr_jb_write_state w;
lock(&dstptr->m_jba_write_lock);
- Uint32 index = q->m_head->m_write_index;
+ Uint32 index = h->m_write_index;
w.m_write_index = index;
thr_job_buffer *buffer = q->m_buffers[index];
w.m_write_buffer = buffer;
w.m_write_pos = buffer->m_len;
w.m_pending_signals = 0;
w.m_pending_signals_wakeup = MAX_SIGNALS_BEFORE_WAKEUP;
- insert_signal(q, &w, true, &signalT.header, signalT.theData, NULL,
+ insert_signal(q, h, &w, true, &signalT.header, signalT.theData, NULL,
&dummy_buffer);
- flush_write_state(selfptr, dstptr, q->m_head, &w);
+ flush_write_state(selfptr, dstptr, h, &w);
unlock(&dstptr->m_jba_write_lock);
}
@@ -3864,7 +3891,6 @@ thr_init(struct thr_repository* rep, str
}
selfptr->m_jba_head.m_read_index = 0;
selfptr->m_jba_head.m_write_index = 0;
- selfptr->m_jba.m_head = &selfptr->m_jba_head;
thr_job_buffer *buffer = seize_buffer(rep, thr_no, true);
selfptr->m_jba.m_buffers[0] = buffer;
selfptr->m_jba_read_state.m_read_index = 0;
@@ -3879,7 +3905,6 @@ thr_init(struct thr_repository* rep, str
{
selfptr->m_in_queue_head[i].m_read_index = 0;
selfptr->m_in_queue_head[i].m_write_index = 0;
- selfptr->m_in_queue[i].m_head = &selfptr->m_in_queue_head[i];
buffer = seize_buffer(rep, thr_no, false);
selfptr->m_in_queue[i].m_buffers[0] = buffer;
selfptr->m_read_states[i].m_read_index = 0;
No bundle (reason: useless for push emails).
| Thread |
|---|
| • bzr push into mysql-5.5-cluster-7.2 branch (mikael.ronstrom:3682 to 3692) | Mikael Ronstrom | 29 Dec |