From: Mikael Ronstrom Date: December 29 2011 2:30pm Subject: bzr push into mysql-5.5-cluster-7.2 branch (mikael.ronstrom:3682 to 3692) List-Archive: http://lists.mysql.com/commits/142264 Message-Id: <201112291431.pBTEV4Fl015019@dator6> MIME-Version: 1.0 Content-Type: text/plain; charset="us-ascii" Content-Transfer-Encoding: 7bit 3692 Mikael Ronstrom 2011-12-29 Fixed another place where number of receive threads is bigger than 1 modified: storage/ndb/src/kernel/vm/SimulatedBlock.hpp 3691 Mikael Ronstrom 2011-12-29 Add a new check modified: storage/ndb/src/kernel/vm/mt.cpp 3690 Mikael Ronstrom 2011-12-29 Also receiver threads are block threads and must be accounted for in MAX_BLOCK_THREADS modified: storage/ndb/src/kernel/vm/mt.cpp 3689 Mikael Ronstrom 2011-12-29 Don't update bitmask from every thread, only update when it actually needs to toggle a bit modified: storage/ndb/include/transporter/TransporterRegistry.hpp storage/ndb/src/common/transporter/TransporterRegistry.cpp storage/ndb/src/kernel/vm/mt.cpp 3688 Mikael Ronstrom 2011-12-29 Added debug info modified: storage/ndb/src/common/transporter/TransporterRegistry.cpp storage/ndb/src/kernel/vm/mt.cpp 3687 Mikael Ronstrom 2011-12-29 One more protection against CPU cache misses modified: storage/ndb/src/kernel/vm/mt.cpp 3686 Mikael Ronstrom 2011-12-29 Avoid unnecessary protection variables modified: storage/ndb/src/kernel/vm/mt.cpp 3685 Mikael Ronstrom 2011-12-29 More protection against false CPU cacheline sharing modified: storage/ndb/src/kernel/vm/mt.cpp 3684 Mikael Ronstrom 2011-12-29 minor fix modified: storage/ndb/src/kernel/vm/mt.cpp 3683 Mikael Ronstrom 2011-12-29 Avoid false CPU cacheline sharing and move some CPU cache reloads to CPU calculations instead modified: storage/ndb/src/kernel/vm/mt.cpp 3682 Mikael Ronstrom 2011-12-27 Fix for platforms with JDK installed but not Java modified: storage/ndb/CMakeLists.txt === modified file 'storage/ndb/include/transporter/TransporterRegistry.hpp' --- a/storage/ndb/include/transporter/TransporterRegistry.hpp revid:mikael.ronstrom@stripped +++ b/storage/ndb/include/transporter/TransporterRegistry.hpp revid:mikael.ronstrom@stripped @@ -568,7 +568,8 @@ inline void TransporterRegistry::set_status_overloaded(Uint32 nodeId, bool val) { assert(nodeId < MAX_NODES); - m_status_overloaded.set(nodeId, val); + if (val != m_status_overloaded.get(nodeId)) + m_status_overloaded.set(nodeId, val); } inline const NodeBitmask& === modified file 'storage/ndb/src/kernel/vm/SimulatedBlock.hpp' --- a/storage/ndb/src/kernel/vm/SimulatedBlock.hpp revid:mikael.ronstrom@stripped +++ b/storage/ndb/src/kernel/vm/SimulatedBlock.hpp revid:mikael.ronstrom@stripped @@ -635,7 +635,10 @@ private: * are real LQHs run by multiple threads. */ protected: - enum { MaxInstances = 3 + MAX_NDBMT_TC_THREADS + MAX_NDBMT_LQH_WORKERS + 1 }; + enum { MaxInstances = 3 + + MAX_NDBMT_TC_THREADS + + MAX_NDBMT_LQH_WORKERS + + MAX_NDBMT_RECEIVE_THREADS }; private: SimulatedBlock** theInstanceList; // set in main, indexed by instance SimulatedBlock* theMainInstance; // set in all === modified file 'storage/ndb/src/kernel/vm/mt.cpp' --- a/storage/ndb/src/kernel/vm/mt.cpp revid:mikael.ronstrom@stripped +++ b/storage/ndb/src/kernel/vm/mt.cpp revid:mikael.ronstrom@stripped @@ -82,7 +82,8 @@ executes thus no signals. */ #define MAX_BLOCK_THREADS (NUM_MAIN_THREADS + \ MAX_NDBMT_LQH_THREADS + \ - MAX_NDBMT_TC_THREADS + 1) + MAX_NDBMT_TC_THREADS + \ + MAX_NDBMT_RECEIVE_THREADS) #define MAX_BLOCK_INSTANCES (MAX_BLOCK_THREADS+1) /* If this is too small it crashes before first signal. */ @@ -692,9 +693,8 @@ struct thr_job_queue_head struct thr_job_queue { - static const unsigned SIZE = 31; + static const unsigned SIZE = 32; - struct thr_job_queue_head* m_head; struct thr_job_buffer* m_buffers[SIZE]; }; @@ -885,19 +885,6 @@ struct thr_data Uint64 m_time; struct thr_tq m_tq; - /* Prio A signal incoming queue. */ - struct thr_spin_lock<64> m_jba_write_lock; - struct thr_job_queue m_jba; - - struct thr_job_queue_head m_jba_head; - - /* Thread-local read state of prio A buffer. */ - struct thr_jb_read_state m_jba_read_state; - /* - * There is no m_jba_write_state, as we have multiple writers to the prio A - * queue, so local state becomes invalid as soon as we release the lock. - */ - /* * In m_next_buffer we keep a free buffer at all times, so that when * we hold the lock and find we need a new buffer, we can use this and this @@ -917,10 +904,35 @@ struct thr_data Uint32 m_first_unused; /* + * Prio A signal incoming queue. This area is used from many threads + * protected by the spin lock. Thus it is also important to protect + * surrounding thread-local variables from CPU cache line sharing + * with this part. + */ + char unused_protection1[NDB_CL]; + struct thr_spin_lock<64> m_jba_write_lock; + struct thr_job_queue m_jba; + + struct thr_job_queue_head m_jba_head; + + /* Thread-local read state of prio A buffer. */ + struct thr_jb_read_state m_jba_read_state; + /* + * There is no m_jba_write_state, as we have multiple writers to the prio A + * queue, so local state becomes invalid as soon as we release the lock. + */ + + /* * These are the thread input queues, where other threads deliver signals * into. + * Protect the m_in_queue_head by empty cache line to ensure that we don't + * get false CPU cacheline sharing. These cache lines are going to be + * updated by many different CPU's all the time whereas other neighbour + * variables are thread-local variables. */ + char unused_protection2[NDB_CL]; struct thr_job_queue_head m_in_queue_head[MAX_BLOCK_THREADS]; + char unused_protection3[NDB_CL]; struct thr_job_queue m_in_queue[MAX_BLOCK_THREADS]; /* These are the write states of m_in_queue[self] in each thread. */ struct thr_jb_write_state m_write_states[MAX_BLOCK_THREADS]; @@ -1020,6 +1032,8 @@ struct thr_repository struct thr_safe_pool m_sb_pool; Ndbd_mem_manager * m_mm; unsigned m_thread_count; + /* Protect m_mm and m_thread_count from CPU cache misses */ + char protection_unused[NDB_CL]; struct thr_data m_thread[MAX_BLOCK_THREADS]; /** @@ -2793,7 +2807,8 @@ mt_send_handle::updateWritePtr(NodeId no */ static inline bool -insert_signal(thr_job_queue *q, thr_jb_write_state *w, Uint32 prioa, +insert_signal(thr_job_queue *q, thr_job_queue_head *h, + thr_jb_write_state *w, Uint32 prioa, const SignalHeader* sh, const Uint32 *data, const Uint32 secPtr[3], thr_job_buffer *new_buffer) { @@ -2846,7 +2861,7 @@ insert_signal(thr_job_queue *q, thr_jb_w * * Or alternatively, ndbrequire() ? */ - if (unlikely(write_index == q->m_head->m_read_index)) + if (unlikely(write_index == h->m_read_index)) { job_buffer_full(0); } @@ -2869,19 +2884,22 @@ read_jbb_state(thr_data *selfptr, Uint32 thr_jb_read_state *r = selfptr->m_read_states; const thr_job_queue *q = selfptr->m_in_queue; - for (Uint32 i = 0; i < count; i++,r++,q++) + const thr_job_queue_head *h = selfptr->m_in_queue_head; + for (Uint32 i = 0; i < count; i++,r++) { Uint32 read_index = r->m_read_index; /** * Optimization: Only reload when possibly empty. * Avoid cache reload of shared thr_job_queue_head + * Read head directly to avoid unnecessary cache + * load of first cache line of m_in_queue entry. */ if (r->m_write_index == read_index) { - r->m_write_index = q->m_head->m_write_index; + r->m_write_index = h[i].m_write_index; read_barrier_depends(); - r->m_read_end = q->m_buffers[read_index]->m_len; + r->m_read_end = q[i].m_buffers[read_index]->m_len; } } } @@ -2924,7 +2942,9 @@ check_queues_empty(thr_data *selfptr) */ static Uint32 -execute_signals(thr_data *selfptr, thr_job_queue *q, thr_jb_read_state *r, +execute_signals(thr_data *selfptr, + thr_job_queue *q, thr_job_queue_head *h, + thr_jb_read_state *r, Signal *sig, Uint32 max_signals, Uint32 *signalIdCounter) { Uint32 num_signals; @@ -2957,7 +2977,7 @@ execute_signals(thr_data *selfptr, thr_j read_pos = 0; read_end = read_buffer->m_len; /* Update thread-local read state. */ - r->m_read_index = q->m_head->m_read_index = read_index; + r->m_read_index = h->m_read_index = read_index; r->m_read_buffer = read_buffer; r->m_read_pos = read_pos; r->m_read_end = read_end; @@ -3042,22 +3062,25 @@ run_job_buffers(thr_data *selfptr, Signa rmb(); thr_job_queue *queue = selfptr->m_in_queue; + thr_job_queue_head *head = selfptr->m_in_queue_head; thr_jb_read_state *read_state = selfptr->m_read_states; for (Uint32 send_thr_no = 0; send_thr_no < thr_count; - send_thr_no++,queue++,read_state++) + send_thr_no++,queue++,read_state++,head++) { /* Read the prio A state often, to avoid starvation of prio A. */ bool jba_empty = read_jba_state(selfptr); if (!jba_empty) { static Uint32 max_prioA = thr_job_queue::SIZE * thr_job_buffer::SIZE; - signal_count += execute_signals(selfptr, &(selfptr->m_jba), + signal_count += execute_signals(selfptr, + &(selfptr->m_jba), + &(selfptr->m_jba_head), &(selfptr->m_jba_read_state), sig, max_prioA, signalIdCounter); } /* Now execute prio B signals from one thread. */ - signal_count += execute_signals(selfptr, queue, read_state, + signal_count += execute_signals(selfptr, queue, head, read_state, sig, perjb, signalIdCounter); } @@ -3098,6 +3121,7 @@ add_thr_map(Uint32 main, Uint32 instance Uint32 block = numberToBlock(main, instance); require(thr_no < num_threads); + require(block <= MAX_BLOCK_NO && block >= MIN_BLOCK_NO); struct thr_repository* rep = &g_thr_repository; thr_data* thr_ptr = rep->m_thread + thr_no; @@ -3686,13 +3710,14 @@ sendlocal(Uint32 self, const SignalHeade selfptr->m_stat.m_priob_size += siglen; thr_job_queue *q = dstptr->m_in_queue + self; + thr_job_queue_head *h = dstptr->m_in_queue_head + self; thr_jb_write_state *w = selfptr->m_write_states + dst; - if (insert_signal(q, w, false, s, data, secPtr, selfptr->m_next_buffer)) + if (insert_signal(q, h, w, false, s, data, secPtr, selfptr->m_next_buffer)) { selfptr->m_next_buffer = seize_buffer(rep, self, false); } if (w->m_pending_signals >= MAX_SIGNALS_BEFORE_FLUSH) - flush_write_state(selfptr, dstptr, q->m_head, w); + flush_write_state(selfptr, dstptr, h, w); } void @@ -3714,20 +3739,21 @@ sendprioa(Uint32 self, const SignalHeade selfptr->m_stat.m_prioa_size += siglen; thr_job_queue *q = &(dstptr->m_jba); + thr_job_queue_head *h = &(dstptr->m_jba_head); thr_jb_write_state w; lock(&dstptr->m_jba_write_lock); - Uint32 index = q->m_head->m_write_index; + Uint32 index = h->m_write_index; w.m_write_index = index; thr_job_buffer *buffer = q->m_buffers[index]; w.m_write_buffer = buffer; w.m_write_pos = buffer->m_len; w.m_pending_signals = 0; w.m_pending_signals_wakeup = MAX_SIGNALS_BEFORE_WAKEUP; - bool buf_used = insert_signal(q, &w, true, s, data, secPtr, + bool buf_used = insert_signal(q, h, &w, true, s, data, secPtr, selfptr->m_next_buffer); - flush_write_state(selfptr, dstptr, q->m_head, &w); + flush_write_state(selfptr, dstptr, h, &w); unlock(&dstptr->m_jba_write_lock); @@ -3812,20 +3838,21 @@ sendprioa_STOP_FOR_CRASH(const struct th stopForCrash->flags = 0; thr_job_queue *q = &(dstptr->m_jba); + thr_job_queue_head *h = &(dstptr->m_jba_head); thr_jb_write_state w; lock(&dstptr->m_jba_write_lock); - Uint32 index = q->m_head->m_write_index; + Uint32 index = h->m_write_index; w.m_write_index = index; thr_job_buffer *buffer = q->m_buffers[index]; w.m_write_buffer = buffer; w.m_write_pos = buffer->m_len; w.m_pending_signals = 0; w.m_pending_signals_wakeup = MAX_SIGNALS_BEFORE_WAKEUP; - insert_signal(q, &w, true, &signalT.header, signalT.theData, NULL, + insert_signal(q, h, &w, true, &signalT.header, signalT.theData, NULL, &dummy_buffer); - flush_write_state(selfptr, dstptr, q->m_head, &w); + flush_write_state(selfptr, dstptr, h, &w); unlock(&dstptr->m_jba_write_lock); } @@ -3864,7 +3891,6 @@ thr_init(struct thr_repository* rep, str } selfptr->m_jba_head.m_read_index = 0; selfptr->m_jba_head.m_write_index = 0; - selfptr->m_jba.m_head = &selfptr->m_jba_head; thr_job_buffer *buffer = seize_buffer(rep, thr_no, true); selfptr->m_jba.m_buffers[0] = buffer; selfptr->m_jba_read_state.m_read_index = 0; @@ -3879,7 +3905,6 @@ thr_init(struct thr_repository* rep, str { selfptr->m_in_queue_head[i].m_read_index = 0; selfptr->m_in_queue_head[i].m_write_index = 0; - selfptr->m_in_queue[i].m_head = &selfptr->m_in_queue_head[i]; buffer = seize_buffer(rep, thr_no, false); selfptr->m_in_queue[i].m_buffers[0] = buffer; selfptr->m_read_states[i].m_read_index = 0; No bundle (reason: useless for push emails).