List:Commits« Previous MessageNext Message »
From:Ole John Aske Date:May 23 2012 6:40am
Subject:bzr push into mysql-5.5-cluster-7.2 branch (ole.john.aske:3922 to 3923)
View as plain text  
 3923 Ole John Aske	2012-05-23
      This is the improved 'save_mem.patch' from Mikael R. patch set :
      
      'Patches used in benchmark tree with Intel'
      
      The patch will reduce memory footprint by not allocating
      job_buffers (Used to communicate between the block-threads 
      within a ndbmtd instance) for those block-threads which 
      are not allowed to communicate with each other.

    modified:
      storage/ndb/src/kernel/vm/mt.cpp
 3922 Ole John Aske	2012-05-22
      This is the 'various.patch' being part of Mikael R patch set:
      
      'Patches used in benchmark tree with Intel'

    modified:
      storage/ndb/src/kernel/vm/mt.cpp
=== modified file 'storage/ndb/src/kernel/vm/mt.cpp'
--- a/storage/ndb/src/kernel/vm/mt.cpp	2012-05-22 11:19:59 +0000
+++ b/storage/ndb/src/kernel/vm/mt.cpp	2012-05-23 06:40:37 +0000
@@ -637,6 +637,17 @@ struct thr_jb_write_state
 
   /* Number of signals inserted since last wakeup */
   Uint32 m_pending_signals_wakeup;
+
+  /*
+   * Is this job buffer open for communication at all?
+   * Several threads are not expected to communicate, and thus does
+   * not allocate thr_job_buffer for exchange of signals.
+   * Don't access any job_buffers without ensuring 'is_open()==true'.
+   */
+  bool is_open() const
+  {
+    return (m_write_buffer != NULL);
+  }
 };
 
 /*
@@ -669,6 +680,17 @@ struct thr_jb_read_state
 
   Uint32 m_write_index; // Last available thr_job_buffer.
 
+  /*
+   * Is this job buffer open for communication at all?
+   * Several threads are not expected to communicate, and thus does
+   * not allocate thr_job_buffer for exchange of signals.
+   * Don't access any job_buffers without ensuring 'is_open()==true'.
+   */
+  bool is_open() const
+  {
+    return (m_read_buffer != NULL);
+  }
+
   bool is_empty() const
   {
     assert(m_read_index != m_write_index  ||  m_read_pos <= m_read_end);
@@ -2949,6 +2971,7 @@ insert_signal(thr_job_queue *q, thr_job_
 {
   Uint32 write_pos = w->m_write_pos;
   Uint32 datalen = sh->theLength;
+  assert(w->is_open());
   assert(w->m_write_buffer == q->m_buffers[w->m_write_index]);
   memcpy(w->m_write_buffer->m_data + write_pos, sh, sizeof(*sh));
   write_pos += (sizeof(*sh) >> 2);
@@ -3021,19 +3044,22 @@ read_jbb_state(thr_data *selfptr, Uint32
   const thr_job_queue_head *h = selfptr->m_in_queue_head;
   for (Uint32 i = 0; i < count; i++,r++)
   {
-    Uint32 read_index = r->m_read_index;
-
-    /**
-     * Optimization: Only reload when possibly empty.
-     * Avoid cache reload of shared thr_job_queue_head
-     * Read head directly to avoid unnecessary cache
-     * load of first cache line of m_in_queue entry.
-     */
-    if (r->m_write_index == read_index)
+    if (r->is_open())
     {
-      r->m_write_index = h[i].m_write_index;
-      read_barrier_depends();
-      r->m_read_end = q[i].m_buffers[read_index]->m_len;
+      Uint32 read_index = r->m_read_index;
+
+      /**
+       * Optimization: Only reload when possibly empty.
+       * Avoid cache reload of shared thr_job_queue_head
+       * Read head directly to avoid unnecessary cache
+       * load of first cache line of m_in_queue entry.
+       */
+      if (r->m_write_index == read_index)
+      {
+        r->m_write_index = h[i].m_write_index;
+        read_barrier_depends();
+        r->m_read_end = q[i].m_buffers[read_index]->m_len;
+      }
     }
   }
 }
@@ -4025,6 +4051,75 @@ sendprioa_STOP_FOR_CRASH(const struct th
 }
 
 /**
+ * Identify type of thread.
+ * Based on assumption that threads are allocated in the order:
+ *  main, ldm, tc, recv, send
+ */
+static bool
+is_main_thread(unsigned thr_no)
+{
+  return thr_no < NUM_MAIN_THREADS;
+}
+
+static bool
+is_ldm_thread(unsigned thr_no)
+{
+  return thr_no >= NUM_MAIN_THREADS && 
+         thr_no <  NUM_MAIN_THREADS+globalData.ndbMtLqhThreads;
+}
+
+static bool
+is_tc_thread(unsigned thr_no)
+{
+  unsigned tc_base = NUM_MAIN_THREADS+globalData.ndbMtLqhThreads;
+  return thr_no >= tc_base && 
+         thr_no <  tc_base+globalData.ndbMtTcThreads;
+}
+
+static bool
+is_recv_thread(unsigned thr_no)
+{
+  unsigned recv_base = NUM_MAIN_THREADS+globalData.ndbMtLqhThreads+globalData.ndbMtTcThreads;
+  return thr_no >= recv_base &&
+         thr_no <  recv_base+globalData.ndbMtReceiveThreads;
+}
+
+/**
+ * Implements the rules for which threads are allowed to have
+ * communication with each other.
+ * Also see compute_jb_pages() which has similar logic.
+ */
+static bool
+may_communicate(unsigned from, unsigned to)
+{
+  if (is_main_thread(from))
+  {
+    // Main threads communicates with all other threads
+    return true;
+  }
+  else if (is_ldm_thread(from))
+  {
+    // LQH threads can communicates with TC-, main- and itself
+    return is_main_thread(to) ||
+           is_tc_thread(to)   ||
+           (to == from);
+  }
+  else if (is_tc_thread(from))
+  {
+    // TC threads can communicate with LQH-, main- and itself
+    return is_main_thread(to) ||
+           is_ldm_thread(to)  ||
+           (to == from);
+  }
+  else
+  {
+    assert(is_recv_thread(from));
+    // Receive treads communicate with all, except other receivers
+    return !is_recv_thread(to);
+  }
+}
+
+/**
  * init functions
  */
 static
@@ -4072,7 +4167,8 @@ thr_init(struct thr_repository* rep, str
   {
     selfptr->m_in_queue_head[i].m_read_index = 0;
     selfptr->m_in_queue_head[i].m_write_index = 0;
-    buffer = seize_buffer(rep, thr_no, false);
+    buffer = may_communicate(i,thr_no) 
+              ? seize_buffer(rep, thr_no, false) : NULL;
     selfptr->m_in_queue[i].m_buffers[0] = buffer;
     selfptr->m_read_states[i].m_read_index = 0;
     selfptr->m_read_states[i].m_read_buffer = buffer;
@@ -4258,37 +4354,93 @@ mt_get_extra_send_buffer_pages(Uint32 cu
 Uint32
 compute_jb_pages(struct EmulatorData * ed)
 {
-  Uint32 cnt = NUM_MAIN_THREADS +
-    globalData.ndbMtReceiveThreads + globalData.ndbMtTcThreads + globalData.ndbMtLqhThreads + 1;
+  Uint32 cnt = get_total_number_of_block_threads();
+  Uint32 num_receive_threads = globalData.ndbMtReceiveThreads;
+  Uint32 num_lqh_threads = globalData.ndbMtLqhThreads;
+  Uint32 num_tc_threads = globalData.ndbMtTcThreads;
+  Uint32 num_main_threads = NUM_MAIN_THREADS;
+
+  /**
+   * Number of pages each thread needs to communicate with another
+   * thread.
+   */
+  Uint32 job_queue_pages_per_thread = thr_job_queue::SIZE;
 
+  /**
+   * In 'perthread' we calculate number of pages required by
+   * all 'block threads' (excludes 'send-threads'). 'perthread'
+   * usage is independent of whether this thread will communicate 
+   * with other 'block threads' or not.
+   */
   Uint32 perthread = 0;
 
   /**
-   * Each thread can have thr_job_queue::SIZE pages in out-queues
-   *   to each other thread
+   * Each threads has its own job_queue for 'prio A' signals
    */
-  perthread += cnt * (1 + thr_job_queue::SIZE);
+  perthread += job_queue_pages_per_thread;
 
   /**
-   * And thr_job_queue::SIZE prio A signals
+   * Each thread keeps a available free page in 'm_next_buffer'
+   * in case it is required by insert_signal() into JBA or JBB.
    */
-  perthread += (1 + thr_job_queue::SIZE);
+  perthread += 1;
 
   /**
-   * And XXX time-queue signals
+   * Each thread keeps time-queued signals in 'struct thr_tq'
+   * thr_tq::PAGES are used to store these. 
    */
-  perthread += 32; // Say 1M for now
+  perthread += thr_tq::PAGES;
 
   /**
-   * Each thread also keeps an own cache with max THR_FREE_BUF_MAX
+   * Each thread has its own 'm_free_fifo[THR_FREE_BUF_MAX]' cache.
+   * As it is filled to MAX *before* a page is allocated, which consumes a page,
+   * it will never cache more than MAX-1 pages. Pages are also returned to global
+   * allocator as soon as MAX is reached.
    */
-  perthread += THR_FREE_BUF_MAX;
+  perthread += THR_FREE_BUF_MAX-1;
 
   /**
-   * Multiply by no of threads
+   * Start by calculating the basic number of pages required for
+   * our 'cnt' block threads.
+   * (no inter-thread communication assumed so far)
    */
   Uint32 tot = cnt * perthread;
 
+  /**
+   * We then start adding pages required for inter-thread communications:
+   * 
+   * Receiver threads will be able to communicate with all other
+   * threads except other receive threads.
+   */
+  tot += num_receive_threads *
+         (cnt - num_receive_threads) *
+         job_queue_pages_per_thread;
+
+  /**
+   * LQH threads can communicate with TC threads and main threads.
+   * Cannot communicate with receive threads and other LQH threads,
+   * but it can communicate with itself.
+   */
+  tot += num_lqh_threads *
+         (num_tc_threads + num_main_threads + 1) *
+         job_queue_pages_per_thread;
+
+  /**
+   * TC threads can communicate with LQH threads and main threads.
+   * Cannot communicate with receive threads and other TC threads,
+   * but it can communicate with itself.
+   */
+  tot += num_tc_threads *
+         (num_lqh_threads + num_main_threads + 1) *
+         job_queue_pages_per_thread;
+
+  /**
+   * Main threads can communicate with all other threads
+   */
+  tot += num_main_threads *
+         cnt *
+         job_queue_pages_per_thread;
+
   return tot;
 }
 
@@ -4710,7 +4862,7 @@ FastScheduler::dumpSignalMemory(Uint32 t
     const thr_job_queue *q = thr_ptr->m_in_queue + thr_no;
     const thr_jb_read_state *r = thr_ptr->m_read_states + thr_no;
     Uint32 read_pos = r->m_read_pos;
-    if (read_pos > 0)
+    if (r->is_open() && read_pos > 0)
     {
       jbs[num_jbs].m_jb = q->m_buffers[r->m_read_index];
       jbs[num_jbs].m_pos = 0;

No bundle (reason: useless for push emails).
Thread
bzr push into mysql-5.5-cluster-7.2 branch (ole.john.aske:3922 to 3923) Ole John Aske23 May