3923 Ole John Aske 2012-05-23
This is the improved 'save_mem.patch' from Mikael R. patch set :
'Patches used in benchmark tree with Intel'
The patch will reduce memory footprint by not allocating
job_buffers (Used to communicate between the block-threads
within a ndbmtd instance) for those block-threads which
are not allowed to communicate with each other.
modified:
storage/ndb/src/kernel/vm/mt.cpp
3922 Ole John Aske 2012-05-22
This is the 'various.patch' being part of Mikael R patch set:
'Patches used in benchmark tree with Intel'
modified:
storage/ndb/src/kernel/vm/mt.cpp
=== modified file 'storage/ndb/src/kernel/vm/mt.cpp'
--- a/storage/ndb/src/kernel/vm/mt.cpp 2012-05-22 11:19:59 +0000
+++ b/storage/ndb/src/kernel/vm/mt.cpp 2012-05-23 06:40:37 +0000
@@ -637,6 +637,17 @@ struct thr_jb_write_state
/* Number of signals inserted since last wakeup */
Uint32 m_pending_signals_wakeup;
+
+ /*
+ * Is this job buffer open for communication at all?
+ * Several threads are not expected to communicate, and thus does
+ * not allocate thr_job_buffer for exchange of signals.
+ * Don't access any job_buffers without ensuring 'is_open()==true'.
+ */
+ bool is_open() const
+ {
+ return (m_write_buffer != NULL);
+ }
};
/*
@@ -669,6 +680,17 @@ struct thr_jb_read_state
Uint32 m_write_index; // Last available thr_job_buffer.
+ /*
+ * Is this job buffer open for communication at all?
+ * Several threads are not expected to communicate, and thus does
+ * not allocate thr_job_buffer for exchange of signals.
+ * Don't access any job_buffers without ensuring 'is_open()==true'.
+ */
+ bool is_open() const
+ {
+ return (m_read_buffer != NULL);
+ }
+
bool is_empty() const
{
assert(m_read_index != m_write_index || m_read_pos <= m_read_end);
@@ -2949,6 +2971,7 @@ insert_signal(thr_job_queue *q, thr_job_
{
Uint32 write_pos = w->m_write_pos;
Uint32 datalen = sh->theLength;
+ assert(w->is_open());
assert(w->m_write_buffer == q->m_buffers[w->m_write_index]);
memcpy(w->m_write_buffer->m_data + write_pos, sh, sizeof(*sh));
write_pos += (sizeof(*sh) >> 2);
@@ -3021,19 +3044,22 @@ read_jbb_state(thr_data *selfptr, Uint32
const thr_job_queue_head *h = selfptr->m_in_queue_head;
for (Uint32 i = 0; i < count; i++,r++)
{
- Uint32 read_index = r->m_read_index;
-
- /**
- * Optimization: Only reload when possibly empty.
- * Avoid cache reload of shared thr_job_queue_head
- * Read head directly to avoid unnecessary cache
- * load of first cache line of m_in_queue entry.
- */
- if (r->m_write_index == read_index)
+ if (r->is_open())
{
- r->m_write_index = h[i].m_write_index;
- read_barrier_depends();
- r->m_read_end = q[i].m_buffers[read_index]->m_len;
+ Uint32 read_index = r->m_read_index;
+
+ /**
+ * Optimization: Only reload when possibly empty.
+ * Avoid cache reload of shared thr_job_queue_head
+ * Read head directly to avoid unnecessary cache
+ * load of first cache line of m_in_queue entry.
+ */
+ if (r->m_write_index == read_index)
+ {
+ r->m_write_index = h[i].m_write_index;
+ read_barrier_depends();
+ r->m_read_end = q[i].m_buffers[read_index]->m_len;
+ }
}
}
}
@@ -4025,6 +4051,75 @@ sendprioa_STOP_FOR_CRASH(const struct th
}
/**
+ * Identify type of thread.
+ * Based on assumption that threads are allocated in the order:
+ * main, ldm, tc, recv, send
+ */
+static bool
+is_main_thread(unsigned thr_no)
+{
+ return thr_no < NUM_MAIN_THREADS;
+}
+
+static bool
+is_ldm_thread(unsigned thr_no)
+{
+ return thr_no >= NUM_MAIN_THREADS &&
+ thr_no < NUM_MAIN_THREADS+globalData.ndbMtLqhThreads;
+}
+
+static bool
+is_tc_thread(unsigned thr_no)
+{
+ unsigned tc_base = NUM_MAIN_THREADS+globalData.ndbMtLqhThreads;
+ return thr_no >= tc_base &&
+ thr_no < tc_base+globalData.ndbMtTcThreads;
+}
+
+static bool
+is_recv_thread(unsigned thr_no)
+{
+ unsigned recv_base = NUM_MAIN_THREADS+globalData.ndbMtLqhThreads+globalData.ndbMtTcThreads;
+ return thr_no >= recv_base &&
+ thr_no < recv_base+globalData.ndbMtReceiveThreads;
+}
+
+/**
+ * Implements the rules for which threads are allowed to have
+ * communication with each other.
+ * Also see compute_jb_pages() which has similar logic.
+ */
+static bool
+may_communicate(unsigned from, unsigned to)
+{
+ if (is_main_thread(from))
+ {
+ // Main threads communicates with all other threads
+ return true;
+ }
+ else if (is_ldm_thread(from))
+ {
+ // LQH threads can communicates with TC-, main- and itself
+ return is_main_thread(to) ||
+ is_tc_thread(to) ||
+ (to == from);
+ }
+ else if (is_tc_thread(from))
+ {
+ // TC threads can communicate with LQH-, main- and itself
+ return is_main_thread(to) ||
+ is_ldm_thread(to) ||
+ (to == from);
+ }
+ else
+ {
+ assert(is_recv_thread(from));
+ // Receive treads communicate with all, except other receivers
+ return !is_recv_thread(to);
+ }
+}
+
+/**
* init functions
*/
static
@@ -4072,7 +4167,8 @@ thr_init(struct thr_repository* rep, str
{
selfptr->m_in_queue_head[i].m_read_index = 0;
selfptr->m_in_queue_head[i].m_write_index = 0;
- buffer = seize_buffer(rep, thr_no, false);
+ buffer = may_communicate(i,thr_no)
+ ? seize_buffer(rep, thr_no, false) : NULL;
selfptr->m_in_queue[i].m_buffers[0] = buffer;
selfptr->m_read_states[i].m_read_index = 0;
selfptr->m_read_states[i].m_read_buffer = buffer;
@@ -4258,37 +4354,93 @@ mt_get_extra_send_buffer_pages(Uint32 cu
Uint32
compute_jb_pages(struct EmulatorData * ed)
{
- Uint32 cnt = NUM_MAIN_THREADS +
- globalData.ndbMtReceiveThreads + globalData.ndbMtTcThreads + globalData.ndbMtLqhThreads + 1;
+ Uint32 cnt = get_total_number_of_block_threads();
+ Uint32 num_receive_threads = globalData.ndbMtReceiveThreads;
+ Uint32 num_lqh_threads = globalData.ndbMtLqhThreads;
+ Uint32 num_tc_threads = globalData.ndbMtTcThreads;
+ Uint32 num_main_threads = NUM_MAIN_THREADS;
+
+ /**
+ * Number of pages each thread needs to communicate with another
+ * thread.
+ */
+ Uint32 job_queue_pages_per_thread = thr_job_queue::SIZE;
+ /**
+ * In 'perthread' we calculate number of pages required by
+ * all 'block threads' (excludes 'send-threads'). 'perthread'
+ * usage is independent of whether this thread will communicate
+ * with other 'block threads' or not.
+ */
Uint32 perthread = 0;
/**
- * Each thread can have thr_job_queue::SIZE pages in out-queues
- * to each other thread
+ * Each threads has its own job_queue for 'prio A' signals
*/
- perthread += cnt * (1 + thr_job_queue::SIZE);
+ perthread += job_queue_pages_per_thread;
/**
- * And thr_job_queue::SIZE prio A signals
+ * Each thread keeps a available free page in 'm_next_buffer'
+ * in case it is required by insert_signal() into JBA or JBB.
*/
- perthread += (1 + thr_job_queue::SIZE);
+ perthread += 1;
/**
- * And XXX time-queue signals
+ * Each thread keeps time-queued signals in 'struct thr_tq'
+ * thr_tq::PAGES are used to store these.
*/
- perthread += 32; // Say 1M for now
+ perthread += thr_tq::PAGES;
/**
- * Each thread also keeps an own cache with max THR_FREE_BUF_MAX
+ * Each thread has its own 'm_free_fifo[THR_FREE_BUF_MAX]' cache.
+ * As it is filled to MAX *before* a page is allocated, which consumes a page,
+ * it will never cache more than MAX-1 pages. Pages are also returned to global
+ * allocator as soon as MAX is reached.
*/
- perthread += THR_FREE_BUF_MAX;
+ perthread += THR_FREE_BUF_MAX-1;
/**
- * Multiply by no of threads
+ * Start by calculating the basic number of pages required for
+ * our 'cnt' block threads.
+ * (no inter-thread communication assumed so far)
*/
Uint32 tot = cnt * perthread;
+ /**
+ * We then start adding pages required for inter-thread communications:
+ *
+ * Receiver threads will be able to communicate with all other
+ * threads except other receive threads.
+ */
+ tot += num_receive_threads *
+ (cnt - num_receive_threads) *
+ job_queue_pages_per_thread;
+
+ /**
+ * LQH threads can communicate with TC threads and main threads.
+ * Cannot communicate with receive threads and other LQH threads,
+ * but it can communicate with itself.
+ */
+ tot += num_lqh_threads *
+ (num_tc_threads + num_main_threads + 1) *
+ job_queue_pages_per_thread;
+
+ /**
+ * TC threads can communicate with LQH threads and main threads.
+ * Cannot communicate with receive threads and other TC threads,
+ * but it can communicate with itself.
+ */
+ tot += num_tc_threads *
+ (num_lqh_threads + num_main_threads + 1) *
+ job_queue_pages_per_thread;
+
+ /**
+ * Main threads can communicate with all other threads
+ */
+ tot += num_main_threads *
+ cnt *
+ job_queue_pages_per_thread;
+
return tot;
}
@@ -4710,7 +4862,7 @@ FastScheduler::dumpSignalMemory(Uint32 t
const thr_job_queue *q = thr_ptr->m_in_queue + thr_no;
const thr_jb_read_state *r = thr_ptr->m_read_states + thr_no;
Uint32 read_pos = r->m_read_pos;
- if (read_pos > 0)
+ if (r->is_open() && read_pos > 0)
{
jbs[num_jbs].m_jb = q->m_buffers[r->m_read_index];
jbs[num_jbs].m_pos = 0;
No bundle (reason: useless for push emails).
| Thread |
|---|
| • bzr push into mysql-5.5-cluster-7.2 branch (ole.john.aske:3922 to 3923) | Ole John Aske | 23 May |