List:Commits« Previous MessageNext Message »
From:Mikael Ronstrom Date:December 29 2011 2:30pm
Subject:bzr push into mysql-5.5-cluster-7.2 branch (mikael.ronstrom:3682 to 3692)
View as plain text  
 3692 Mikael Ronstrom	2011-12-29
      Fixed another place where number of receive threads is bigger than 1

    modified:
      storage/ndb/src/kernel/vm/SimulatedBlock.hpp
 3691 Mikael Ronstrom	2011-12-29
      Add a new check

    modified:
      storage/ndb/src/kernel/vm/mt.cpp
 3690 Mikael Ronstrom	2011-12-29
      Also receiver threads are block threads and must be accounted for in MAX_BLOCK_THREADS

    modified:
      storage/ndb/src/kernel/vm/mt.cpp
 3689 Mikael Ronstrom	2011-12-29
      Don't update bitmask from every thread, only update when it actually needs to toggle a bit

    modified:
      storage/ndb/include/transporter/TransporterRegistry.hpp
      storage/ndb/src/common/transporter/TransporterRegistry.cpp
      storage/ndb/src/kernel/vm/mt.cpp
 3688 Mikael Ronstrom	2011-12-29
      Added debug info

    modified:
      storage/ndb/src/common/transporter/TransporterRegistry.cpp
      storage/ndb/src/kernel/vm/mt.cpp
 3687 Mikael Ronstrom	2011-12-29
      One more protection against CPU cache misses

    modified:
      storage/ndb/src/kernel/vm/mt.cpp
 3686 Mikael Ronstrom	2011-12-29
      Avoid unnecessary protection variables

    modified:
      storage/ndb/src/kernel/vm/mt.cpp
 3685 Mikael Ronstrom	2011-12-29
      More protection against false CPU cacheline sharing

    modified:
      storage/ndb/src/kernel/vm/mt.cpp
 3684 Mikael Ronstrom	2011-12-29
      minor fix

    modified:
      storage/ndb/src/kernel/vm/mt.cpp
 3683 Mikael Ronstrom	2011-12-29
      Avoid false CPU cacheline sharing and move some CPU cache reloads to CPU calculations instead

    modified:
      storage/ndb/src/kernel/vm/mt.cpp
 3682 Mikael Ronstrom	2011-12-27
      Fix for platforms with JDK installed but not Java

    modified:
      storage/ndb/CMakeLists.txt
=== modified file 'storage/ndb/include/transporter/TransporterRegistry.hpp'
--- a/storage/ndb/include/transporter/TransporterRegistry.hpp	revid:mikael.ronstrom@stripped
+++ b/storage/ndb/include/transporter/TransporterRegistry.hpp	revid:mikael.ronstrom@stripped
@@ -568,7 +568,8 @@ inline void
 TransporterRegistry::set_status_overloaded(Uint32 nodeId, bool val)
 {
   assert(nodeId < MAX_NODES);
-  m_status_overloaded.set(nodeId, val);
+  if (val != m_status_overloaded.get(nodeId))
+    m_status_overloaded.set(nodeId, val);
 }
 
 inline const NodeBitmask&

=== modified file 'storage/ndb/src/kernel/vm/SimulatedBlock.hpp'
--- a/storage/ndb/src/kernel/vm/SimulatedBlock.hpp	revid:mikael.ronstrom@stripped
+++ b/storage/ndb/src/kernel/vm/SimulatedBlock.hpp	revid:mikael.ronstrom@stripped
@@ -635,7 +635,10 @@ private:
    * are real LQHs run by multiple threads.
    */
 protected:
-  enum { MaxInstances = 3 + MAX_NDBMT_TC_THREADS + MAX_NDBMT_LQH_WORKERS + 1 };
+  enum { MaxInstances = 3 +
+                        MAX_NDBMT_TC_THREADS +
+                        MAX_NDBMT_LQH_WORKERS +
+                        MAX_NDBMT_RECEIVE_THREADS };
 private:
   SimulatedBlock** theInstanceList; // set in main, indexed by instance
   SimulatedBlock* theMainInstance;  // set in all

=== modified file 'storage/ndb/src/kernel/vm/mt.cpp'
--- a/storage/ndb/src/kernel/vm/mt.cpp	revid:mikael.ronstrom@stripped
+++ b/storage/ndb/src/kernel/vm/mt.cpp	revid:mikael.ronstrom@stripped
@@ -82,7 +82,8 @@ executes thus no signals.
 */
 #define MAX_BLOCK_THREADS (NUM_MAIN_THREADS +       \
                          MAX_NDBMT_LQH_THREADS +  \
-                         MAX_NDBMT_TC_THREADS + 1)
+                         MAX_NDBMT_TC_THREADS + \
+                         MAX_NDBMT_RECEIVE_THREADS)
 #define MAX_BLOCK_INSTANCES (MAX_BLOCK_THREADS+1)
 
 /* If this is too small it crashes before first signal. */
@@ -692,9 +693,8 @@ struct thr_job_queue_head
 
 struct thr_job_queue
 {
-  static const unsigned SIZE = 31;
+  static const unsigned SIZE = 32;
 
-  struct thr_job_queue_head* m_head;
   struct thr_job_buffer* m_buffers[SIZE];
 };
 
@@ -885,19 +885,6 @@ struct thr_data
   Uint64 m_time;
   struct thr_tq m_tq;
 
-  /* Prio A signal incoming queue. */
-  struct thr_spin_lock<64> m_jba_write_lock;
-  struct thr_job_queue m_jba;
-
-  struct thr_job_queue_head m_jba_head;
-
-  /* Thread-local read state of prio A buffer. */
-  struct thr_jb_read_state m_jba_read_state;
-  /*
-   * There is no m_jba_write_state, as we have multiple writers to the prio A
-   * queue, so local state becomes invalid as soon as we release the lock.
-   */
-
   /*
    * In m_next_buffer we keep a free buffer at all times, so that when
    * we hold the lock and find we need a new buffer, we can use this and this
@@ -917,10 +904,35 @@ struct thr_data
   Uint32 m_first_unused;
 
   /*
+   * Prio A signal incoming queue. This area is used from many threads
+   * protected by the spin lock. Thus it is also important to protect
+   * surrounding thread-local variables from CPU cache line sharing
+   * with this part.
+   */
+  char unused_protection1[NDB_CL];
+  struct thr_spin_lock<64> m_jba_write_lock;
+  struct thr_job_queue m_jba;
+
+  struct thr_job_queue_head m_jba_head;
+
+  /* Thread-local read state of prio A buffer. */
+  struct thr_jb_read_state m_jba_read_state;
+  /*
+   * There is no m_jba_write_state, as we have multiple writers to the prio A
+   * queue, so local state becomes invalid as soon as we release the lock.
+   */
+
+  /*
    * These are the thread input queues, where other threads deliver signals
    * into.
+   * Protect the m_in_queue_head by empty cache line to ensure that we don't
+   * get false CPU cacheline sharing. These cache lines are going to be
+   * updated by many different CPU's all the time whereas other neighbour
+   * variables are thread-local variables.
    */
+  char unused_protection2[NDB_CL];
   struct thr_job_queue_head m_in_queue_head[MAX_BLOCK_THREADS];
+  char unused_protection3[NDB_CL];
   struct thr_job_queue m_in_queue[MAX_BLOCK_THREADS];
   /* These are the write states of m_in_queue[self] in each thread. */
   struct thr_jb_write_state m_write_states[MAX_BLOCK_THREADS];
@@ -1020,6 +1032,8 @@ struct thr_repository
   struct thr_safe_pool<thr_send_page> m_sb_pool;
   Ndbd_mem_manager * m_mm;
   unsigned m_thread_count;
+  /* Protect m_mm and m_thread_count from CPU cache misses */
+  char protection_unused[NDB_CL];
   struct thr_data m_thread[MAX_BLOCK_THREADS];
 
   /**
@@ -2793,7 +2807,8 @@ mt_send_handle::updateWritePtr(NodeId no
  */
 static inline
 bool
-insert_signal(thr_job_queue *q, thr_jb_write_state *w, Uint32 prioa,
+insert_signal(thr_job_queue *q, thr_job_queue_head *h,
+              thr_jb_write_state *w, Uint32 prioa,
               const SignalHeader* sh, const Uint32 *data,
               const Uint32 secPtr[3], thr_job_buffer *new_buffer)
 {
@@ -2846,7 +2861,7 @@ insert_signal(thr_job_queue *q, thr_jb_w
      *
      * Or alternatively, ndbrequire() ?
      */
-    if (unlikely(write_index == q->m_head->m_read_index))
+    if (unlikely(write_index == h->m_read_index))
     {
       job_buffer_full(0);
     }
@@ -2869,19 +2884,22 @@ read_jbb_state(thr_data *selfptr, Uint32
 
   thr_jb_read_state *r = selfptr->m_read_states;
   const thr_job_queue *q = selfptr->m_in_queue;
-  for (Uint32 i = 0; i < count; i++,r++,q++)
+  const thr_job_queue_head *h = selfptr->m_in_queue_head;
+  for (Uint32 i = 0; i < count; i++,r++)
   {
     Uint32 read_index = r->m_read_index;
 
     /**
      * Optimization: Only reload when possibly empty.
      * Avoid cache reload of shared thr_job_queue_head
+     * Read head directly to avoid unnecessary cache
+     * load of first cache line of m_in_queue entry.
      */
     if (r->m_write_index == read_index)
     {
-      r->m_write_index = q->m_head->m_write_index;
+      r->m_write_index = h[i].m_write_index;
       read_barrier_depends();
-      r->m_read_end = q->m_buffers[read_index]->m_len;
+      r->m_read_end = q[i].m_buffers[read_index]->m_len;
     }
   }
 }
@@ -2924,7 +2942,9 @@ check_queues_empty(thr_data *selfptr)
  */
 static
 Uint32
-execute_signals(thr_data *selfptr, thr_job_queue *q, thr_jb_read_state *r,
+execute_signals(thr_data *selfptr,
+                thr_job_queue *q, thr_job_queue_head *h,
+                thr_jb_read_state *r,
                 Signal *sig, Uint32 max_signals, Uint32 *signalIdCounter)
 {
   Uint32 num_signals;
@@ -2957,7 +2977,7 @@ execute_signals(thr_data *selfptr, thr_j
         read_pos = 0;
         read_end = read_buffer->m_len;
         /* Update thread-local read state. */
-        r->m_read_index = q->m_head->m_read_index = read_index;
+        r->m_read_index = h->m_read_index = read_index;
         r->m_read_buffer = read_buffer;
         r->m_read_pos = read_pos;
         r->m_read_end = read_end;
@@ -3042,22 +3062,25 @@ run_job_buffers(thr_data *selfptr, Signa
   rmb();
 
   thr_job_queue *queue = selfptr->m_in_queue;
+  thr_job_queue_head *head = selfptr->m_in_queue_head;
   thr_jb_read_state *read_state = selfptr->m_read_states;
   for (Uint32 send_thr_no = 0; send_thr_no < thr_count;
-       send_thr_no++,queue++,read_state++)
+       send_thr_no++,queue++,read_state++,head++)
   {
     /* Read the prio A state often, to avoid starvation of prio A. */
     bool jba_empty = read_jba_state(selfptr);
     if (!jba_empty)
     {
       static Uint32 max_prioA = thr_job_queue::SIZE * thr_job_buffer::SIZE;
-      signal_count += execute_signals(selfptr, &(selfptr->m_jba),
+      signal_count += execute_signals(selfptr,
+                                      &(selfptr->m_jba),
+                                      &(selfptr->m_jba_head),
                                       &(selfptr->m_jba_read_state), sig,
                                       max_prioA, signalIdCounter);
     }
 
     /* Now execute prio B signals from one thread. */
-    signal_count += execute_signals(selfptr, queue, read_state,
+    signal_count += execute_signals(selfptr, queue, head, read_state,
                                     sig, perjb, signalIdCounter);
   }
 
@@ -3098,6 +3121,7 @@ add_thr_map(Uint32 main, Uint32 instance
   Uint32 block = numberToBlock(main, instance);
 
   require(thr_no < num_threads);
+  require(block <= MAX_BLOCK_NO && block >= MIN_BLOCK_NO);
   struct thr_repository* rep = &g_thr_repository;
   thr_data* thr_ptr = rep->m_thread + thr_no;
 
@@ -3686,13 +3710,14 @@ sendlocal(Uint32 self, const SignalHeade
   selfptr->m_stat.m_priob_size += siglen;
 
   thr_job_queue *q = dstptr->m_in_queue + self;
+  thr_job_queue_head *h = dstptr->m_in_queue_head + self;
   thr_jb_write_state *w = selfptr->m_write_states + dst;
-  if (insert_signal(q, w, false, s, data, secPtr, selfptr->m_next_buffer))
+  if (insert_signal(q, h, w, false, s, data, secPtr, selfptr->m_next_buffer))
   {
     selfptr->m_next_buffer = seize_buffer(rep, self, false);
   }
   if (w->m_pending_signals >= MAX_SIGNALS_BEFORE_FLUSH)
-    flush_write_state(selfptr, dstptr, q->m_head, w);
+    flush_write_state(selfptr, dstptr, h, w);
 }
 
 void
@@ -3714,20 +3739,21 @@ sendprioa(Uint32 self, const SignalHeade
   selfptr->m_stat.m_prioa_size += siglen;
 
   thr_job_queue *q = &(dstptr->m_jba);
+  thr_job_queue_head *h = &(dstptr->m_jba_head);
   thr_jb_write_state w;
 
   lock(&dstptr->m_jba_write_lock);
 
-  Uint32 index = q->m_head->m_write_index;
+  Uint32 index = h->m_write_index;
   w.m_write_index = index;
   thr_job_buffer *buffer = q->m_buffers[index];
   w.m_write_buffer = buffer;
   w.m_write_pos = buffer->m_len;
   w.m_pending_signals = 0;
   w.m_pending_signals_wakeup = MAX_SIGNALS_BEFORE_WAKEUP;
-  bool buf_used = insert_signal(q, &w, true, s, data, secPtr,
+  bool buf_used = insert_signal(q, h, &w, true, s, data, secPtr,
                                 selfptr->m_next_buffer);
-  flush_write_state(selfptr, dstptr, q->m_head, &w);
+  flush_write_state(selfptr, dstptr, h, &w);
 
   unlock(&dstptr->m_jba_write_lock);
 
@@ -3812,20 +3838,21 @@ sendprioa_STOP_FOR_CRASH(const struct th
   stopForCrash->flags = 0;
 
   thr_job_queue *q = &(dstptr->m_jba);
+  thr_job_queue_head *h = &(dstptr->m_jba_head);
   thr_jb_write_state w;
 
   lock(&dstptr->m_jba_write_lock);
 
-  Uint32 index = q->m_head->m_write_index;
+  Uint32 index = h->m_write_index;
   w.m_write_index = index;
   thr_job_buffer *buffer = q->m_buffers[index];
   w.m_write_buffer = buffer;
   w.m_write_pos = buffer->m_len;
   w.m_pending_signals = 0;
   w.m_pending_signals_wakeup = MAX_SIGNALS_BEFORE_WAKEUP;
-  insert_signal(q, &w, true, &signalT.header, signalT.theData, NULL,
+  insert_signal(q, h, &w, true, &signalT.header, signalT.theData, NULL,
                 &dummy_buffer);
-  flush_write_state(selfptr, dstptr, q->m_head, &w);
+  flush_write_state(selfptr, dstptr, h, &w);
 
   unlock(&dstptr->m_jba_write_lock);
 }
@@ -3864,7 +3891,6 @@ thr_init(struct thr_repository* rep, str
   }
   selfptr->m_jba_head.m_read_index = 0;
   selfptr->m_jba_head.m_write_index = 0;
-  selfptr->m_jba.m_head = &selfptr->m_jba_head;
   thr_job_buffer *buffer = seize_buffer(rep, thr_no, true);
   selfptr->m_jba.m_buffers[0] = buffer;
   selfptr->m_jba_read_state.m_read_index = 0;
@@ -3879,7 +3905,6 @@ thr_init(struct thr_repository* rep, str
   {
     selfptr->m_in_queue_head[i].m_read_index = 0;
     selfptr->m_in_queue_head[i].m_write_index = 0;
-    selfptr->m_in_queue[i].m_head = &selfptr->m_in_queue_head[i];
     buffer = seize_buffer(rep, thr_no, false);
     selfptr->m_in_queue[i].m_buffers[0] = buffer;
     selfptr->m_read_states[i].m_read_index = 0;

No bundle (reason: useless for push emails).
Thread
bzr push into mysql-5.5-cluster-7.2 branch (mikael.ronstrom:3682 to 3692) Mikael Ronstrom29 Dec