From: Ole John Aske Date: May 23 2012 6:40am Subject: bzr push into mysql-5.5-cluster-7.2 branch (ole.john.aske:3922 to 3923) List-Archive: http://lists.mysql.com/commits/143914 Message-Id: <20120523064058.D3B24254@fimafeng09.norway.sun.com> MIME-Version: 1.0 Content-Type: text/plain; charset="us-ascii" Content-Transfer-Encoding: 7bit 3923 Ole John Aske 2012-05-23 This is the improved 'save_mem.patch' from Mikael R. patch set : 'Patches used in benchmark tree with Intel' The patch will reduce memory footprint by not allocating job_buffers (Used to communicate between the block-threads within a ndbmtd instance) for those block-threads which are not allowed to communicate with each other. modified: storage/ndb/src/kernel/vm/mt.cpp 3922 Ole John Aske 2012-05-22 This is the 'various.patch' being part of Mikael R patch set: 'Patches used in benchmark tree with Intel' modified: storage/ndb/src/kernel/vm/mt.cpp === modified file 'storage/ndb/src/kernel/vm/mt.cpp' --- a/storage/ndb/src/kernel/vm/mt.cpp 2012-05-22 11:19:59 +0000 +++ b/storage/ndb/src/kernel/vm/mt.cpp 2012-05-23 06:40:37 +0000 @@ -637,6 +637,17 @@ struct thr_jb_write_state /* Number of signals inserted since last wakeup */ Uint32 m_pending_signals_wakeup; + + /* + * Is this job buffer open for communication at all? + * Several threads are not expected to communicate, and thus does + * not allocate thr_job_buffer for exchange of signals. + * Don't access any job_buffers without ensuring 'is_open()==true'. + */ + bool is_open() const + { + return (m_write_buffer != NULL); + } }; /* @@ -669,6 +680,17 @@ struct thr_jb_read_state Uint32 m_write_index; // Last available thr_job_buffer. + /* + * Is this job buffer open for communication at all? + * Several threads are not expected to communicate, and thus does + * not allocate thr_job_buffer for exchange of signals. + * Don't access any job_buffers without ensuring 'is_open()==true'. + */ + bool is_open() const + { + return (m_read_buffer != NULL); + } + bool is_empty() const { assert(m_read_index != m_write_index || m_read_pos <= m_read_end); @@ -2949,6 +2971,7 @@ insert_signal(thr_job_queue *q, thr_job_ { Uint32 write_pos = w->m_write_pos; Uint32 datalen = sh->theLength; + assert(w->is_open()); assert(w->m_write_buffer == q->m_buffers[w->m_write_index]); memcpy(w->m_write_buffer->m_data + write_pos, sh, sizeof(*sh)); write_pos += (sizeof(*sh) >> 2); @@ -3021,19 +3044,22 @@ read_jbb_state(thr_data *selfptr, Uint32 const thr_job_queue_head *h = selfptr->m_in_queue_head; for (Uint32 i = 0; i < count; i++,r++) { - Uint32 read_index = r->m_read_index; - - /** - * Optimization: Only reload when possibly empty. - * Avoid cache reload of shared thr_job_queue_head - * Read head directly to avoid unnecessary cache - * load of first cache line of m_in_queue entry. - */ - if (r->m_write_index == read_index) + if (r->is_open()) { - r->m_write_index = h[i].m_write_index; - read_barrier_depends(); - r->m_read_end = q[i].m_buffers[read_index]->m_len; + Uint32 read_index = r->m_read_index; + + /** + * Optimization: Only reload when possibly empty. + * Avoid cache reload of shared thr_job_queue_head + * Read head directly to avoid unnecessary cache + * load of first cache line of m_in_queue entry. + */ + if (r->m_write_index == read_index) + { + r->m_write_index = h[i].m_write_index; + read_barrier_depends(); + r->m_read_end = q[i].m_buffers[read_index]->m_len; + } } } } @@ -4025,6 +4051,75 @@ sendprioa_STOP_FOR_CRASH(const struct th } /** + * Identify type of thread. + * Based on assumption that threads are allocated in the order: + * main, ldm, tc, recv, send + */ +static bool +is_main_thread(unsigned thr_no) +{ + return thr_no < NUM_MAIN_THREADS; +} + +static bool +is_ldm_thread(unsigned thr_no) +{ + return thr_no >= NUM_MAIN_THREADS && + thr_no < NUM_MAIN_THREADS+globalData.ndbMtLqhThreads; +} + +static bool +is_tc_thread(unsigned thr_no) +{ + unsigned tc_base = NUM_MAIN_THREADS+globalData.ndbMtLqhThreads; + return thr_no >= tc_base && + thr_no < tc_base+globalData.ndbMtTcThreads; +} + +static bool +is_recv_thread(unsigned thr_no) +{ + unsigned recv_base = NUM_MAIN_THREADS+globalData.ndbMtLqhThreads+globalData.ndbMtTcThreads; + return thr_no >= recv_base && + thr_no < recv_base+globalData.ndbMtReceiveThreads; +} + +/** + * Implements the rules for which threads are allowed to have + * communication with each other. + * Also see compute_jb_pages() which has similar logic. + */ +static bool +may_communicate(unsigned from, unsigned to) +{ + if (is_main_thread(from)) + { + // Main threads communicates with all other threads + return true; + } + else if (is_ldm_thread(from)) + { + // LQH threads can communicates with TC-, main- and itself + return is_main_thread(to) || + is_tc_thread(to) || + (to == from); + } + else if (is_tc_thread(from)) + { + // TC threads can communicate with LQH-, main- and itself + return is_main_thread(to) || + is_ldm_thread(to) || + (to == from); + } + else + { + assert(is_recv_thread(from)); + // Receive treads communicate with all, except other receivers + return !is_recv_thread(to); + } +} + +/** * init functions */ static @@ -4072,7 +4167,8 @@ thr_init(struct thr_repository* rep, str { selfptr->m_in_queue_head[i].m_read_index = 0; selfptr->m_in_queue_head[i].m_write_index = 0; - buffer = seize_buffer(rep, thr_no, false); + buffer = may_communicate(i,thr_no) + ? seize_buffer(rep, thr_no, false) : NULL; selfptr->m_in_queue[i].m_buffers[0] = buffer; selfptr->m_read_states[i].m_read_index = 0; selfptr->m_read_states[i].m_read_buffer = buffer; @@ -4258,37 +4354,93 @@ mt_get_extra_send_buffer_pages(Uint32 cu Uint32 compute_jb_pages(struct EmulatorData * ed) { - Uint32 cnt = NUM_MAIN_THREADS + - globalData.ndbMtReceiveThreads + globalData.ndbMtTcThreads + globalData.ndbMtLqhThreads + 1; + Uint32 cnt = get_total_number_of_block_threads(); + Uint32 num_receive_threads = globalData.ndbMtReceiveThreads; + Uint32 num_lqh_threads = globalData.ndbMtLqhThreads; + Uint32 num_tc_threads = globalData.ndbMtTcThreads; + Uint32 num_main_threads = NUM_MAIN_THREADS; + + /** + * Number of pages each thread needs to communicate with another + * thread. + */ + Uint32 job_queue_pages_per_thread = thr_job_queue::SIZE; + /** + * In 'perthread' we calculate number of pages required by + * all 'block threads' (excludes 'send-threads'). 'perthread' + * usage is independent of whether this thread will communicate + * with other 'block threads' or not. + */ Uint32 perthread = 0; /** - * Each thread can have thr_job_queue::SIZE pages in out-queues - * to each other thread + * Each threads has its own job_queue for 'prio A' signals */ - perthread += cnt * (1 + thr_job_queue::SIZE); + perthread += job_queue_pages_per_thread; /** - * And thr_job_queue::SIZE prio A signals + * Each thread keeps a available free page in 'm_next_buffer' + * in case it is required by insert_signal() into JBA or JBB. */ - perthread += (1 + thr_job_queue::SIZE); + perthread += 1; /** - * And XXX time-queue signals + * Each thread keeps time-queued signals in 'struct thr_tq' + * thr_tq::PAGES are used to store these. */ - perthread += 32; // Say 1M for now + perthread += thr_tq::PAGES; /** - * Each thread also keeps an own cache with max THR_FREE_BUF_MAX + * Each thread has its own 'm_free_fifo[THR_FREE_BUF_MAX]' cache. + * As it is filled to MAX *before* a page is allocated, which consumes a page, + * it will never cache more than MAX-1 pages. Pages are also returned to global + * allocator as soon as MAX is reached. */ - perthread += THR_FREE_BUF_MAX; + perthread += THR_FREE_BUF_MAX-1; /** - * Multiply by no of threads + * Start by calculating the basic number of pages required for + * our 'cnt' block threads. + * (no inter-thread communication assumed so far) */ Uint32 tot = cnt * perthread; + /** + * We then start adding pages required for inter-thread communications: + * + * Receiver threads will be able to communicate with all other + * threads except other receive threads. + */ + tot += num_receive_threads * + (cnt - num_receive_threads) * + job_queue_pages_per_thread; + + /** + * LQH threads can communicate with TC threads and main threads. + * Cannot communicate with receive threads and other LQH threads, + * but it can communicate with itself. + */ + tot += num_lqh_threads * + (num_tc_threads + num_main_threads + 1) * + job_queue_pages_per_thread; + + /** + * TC threads can communicate with LQH threads and main threads. + * Cannot communicate with receive threads and other TC threads, + * but it can communicate with itself. + */ + tot += num_tc_threads * + (num_lqh_threads + num_main_threads + 1) * + job_queue_pages_per_thread; + + /** + * Main threads can communicate with all other threads + */ + tot += num_main_threads * + cnt * + job_queue_pages_per_thread; + return tot; } @@ -4710,7 +4862,7 @@ FastScheduler::dumpSignalMemory(Uint32 t const thr_job_queue *q = thr_ptr->m_in_queue + thr_no; const thr_jb_read_state *r = thr_ptr->m_read_states + thr_no; Uint32 read_pos = r->m_read_pos; - if (read_pos > 0) + if (r->is_open() && read_pos > 0) { jbs[num_jbs].m_jb = q->m_buffers[r->m_read_index]; jbs[num_jbs].m_pos = 0; No bundle (reason: useless for push emails).