List:Commits« Previous MessageNext Message »
From:jonas oreland Date:January 31 2012 6:29pm
Subject:bzr push into mysql-5.5-cluster-7.2 branch (jonas.oreland:3801 to 3802)
View as plain text  
 3802 jonas oreland	2012-01-31 [merge]
      ndb - merge 71 to 72

    modified:
      storage/ndb/src/kernel/vm/mt.cpp
 3801 Frazer Clement	2012-01-31 [merge]
      Merge 7.1->7.2

=== modified file 'storage/ndb/src/kernel/vm/mt.cpp'
--- a/storage/ndb/src/kernel/vm/mt.cpp	2012-01-31 08:51:18 +0000
+++ b/storage/ndb/src/kernel/vm/mt.cpp	2012-01-31 18:28:03 +0000
@@ -144,6 +144,7 @@ futex_wake(volatile unsigned * addr)
 struct thr_wait
 {
   volatile unsigned m_futex_state;
+  char padding[NDB_CL_PADSZ(sizeof(unsigned))];
   enum {
     FS_RUNNING = 0,
     FS_SLEEPING = 1
@@ -218,9 +219,10 @@ wakeup(struct thr_wait* wait)
 
 struct thr_wait
 {
-  bool m_need_wakeup;
   NdbMutex *m_mutex;
   NdbCondition *m_cond;
+  bool m_need_wakeup;
+  char padding[NDB_CL_PADSZ(sizeof(bool) + (2*sizeof(void*)))];
   thr_wait() : m_need_wakeup(false), m_mutex(0), m_cond(0) {}
 
   void init() {
@@ -891,7 +893,44 @@ struct thr_data
                                   THR_SEND_BUFFER_MAX_FREE,
                                   THR_SEND_BUFFER_ALLOC_SIZE) {}
 
-  thr_wait m_waiter;
+  /**
+   * We start with the data structures that are shared globally to
+   * ensure that they get the proper padding
+   */
+  thr_wait m_waiter; /* Cacheline aligned*/
+
+  /*
+   * Prio A signal incoming queue. This area is used from many threads
+   * protected by the spin lock. Thus it is also important to protect
+   * surrounding thread-local variables from CPU cache line sharing
+   * with this part.
+   */
+  struct thr_spin_lock<64> m_jba_write_lock;
+
+  struct thr_job_queue m_jba; /* aligned */
+  struct thr_job_queue_head m_jba_head;
+  char unused_protection1[NDB_CL_PADSZ(sizeof(struct thr_job_queue_head))];
+
+  /*
+   * These are the thread input queues, where other threads deliver signals
+   * into.
+   * Protect the m_in_queue_head by empty cache line to ensure that we don't
+   * get false CPU cacheline sharing. These cache lines are going to be
+   * updated by many different CPU's all the time whereas other neighbour
+   * variables are thread-local variables.
+   */
+  struct thr_job_queue_head m_in_queue_head[MAX_BLOCK_THREADS];
+  char unused_protection2[
+    NDB_CL_PADSZ(MAX_BLOCK_THREADS*sizeof(struct thr_job_queue_head))];
+  struct thr_job_queue m_in_queue[MAX_BLOCK_THREADS];
+
+  /**
+   * The remainder of the variables in thr_data are thread-local,
+   * meaning that they are always updated by the thread that owns those
+   * data structures and thus those variables aren't shared with other
+   * CPUs.
+   */
+
   unsigned m_thr_no;
 
   /**
@@ -907,19 +946,6 @@ struct thr_data
   Uint64 m_time;
   struct thr_tq m_tq;
 
-  /* Prio A signal incoming queue. */
-  struct thr_spin_lock<64> m_jba_write_lock;
-  struct thr_job_queue m_jba;
-
-  struct thr_job_queue_head m_jba_head;
-
-  /* Thread-local read state of prio A buffer. */
-  struct thr_jb_read_state m_jba_read_state;
-  /*
-   * There is no m_jba_write_state, as we have multiple writers to the prio A
-   * queue, so local state becomes invalid as soon as we release the lock.
-   */
-
   /*
    * In m_next_buffer we keep a free buffer at all times, so that when
    * we hold the lock and find we need a new buffer, we can use this and this
@@ -938,12 +964,15 @@ struct thr_data
   /* m_first_unused is the first unused entry in m_free_fifo. */
   Uint32 m_first_unused;
 
+
+  /* Thread-local read state of prio A buffer. */
+  struct thr_jb_read_state m_jba_read_state;
+
   /*
-   * These are the thread input queues, where other threads deliver signals
-   * into.
+   * There is no m_jba_write_state, as we have multiple writers to the prio A
+   * queue, so local state becomes invalid as soon as we release the lock.
    */
-  struct thr_job_queue_head m_in_queue_head[MAX_BLOCK_THREADS];
-  struct thr_job_queue m_in_queue[MAX_BLOCK_THREADS];
+
   /* These are the write states of m_in_queue[self] in each thread. */
   struct thr_jb_write_state m_write_states[MAX_BLOCK_THREADS];
   /* These are the read states of all of our own m_in_queue[]. */
@@ -994,6 +1023,13 @@ struct thr_data
   NdbThread* m_thread;
 };
 
+struct thr_data_aligned
+{
+  struct thr_data m_thr_data;
+  /* Ensure that the thr_data is aligned on a cacheline boundary */
+  char unused_protection[NDB_CL_PADSZ(sizeof(struct thr_data))];
+};
+
 struct mt_send_handle  : public TransporterSendBufferHandle
 {
   struct thr_data * m_selfptr;
@@ -1034,18 +1070,29 @@ struct thr_repository
       m_sb_pool("sendbufferpool")
     {}
 
+  /**
+   * m_receive_lock, m_section_lock, m_mem_manager_lock, m_jb_pool
+   * and m_sb_pool are allvariables globally shared among the threads
+   * and also heavily updated.
+   */
   struct thr_spin_lock<64> m_receive_lock[MAX_NDBMT_RECEIVE_THREADS];
   struct thr_spin_lock<64> m_section_lock;
   struct thr_spin_lock<64> m_mem_manager_lock;
+  /* thr_safe_pool is aligned to be also 64 bytes in size */
   struct thr_safe_pool<thr_job_buffer> m_jb_pool;
   struct thr_safe_pool<thr_send_page> m_sb_pool;
+  /* m_mm and m_thread_count are globally shared and read only variables */
   Ndbd_mem_manager * m_mm;
   unsigned m_thread_count;
-  struct thr_data m_thread[MAX_BLOCK_THREADS];
-
   /**
-   * send buffer handling
+   * Protect m_mm and m_thread_count from CPU cache misses, first
+   * part of m_thread (struct thr_data) is globally shared variables.
+   * So sharing cache line with these for these read only variables
+   * isn't a good idea
    */
+  char protection_unused[NDB_CL_PADSZ(sizeof(void*) + sizeof(unsigned))];
+
+  struct thr_data_aligned m_thread[MAX_BLOCK_THREADS];
 
   /* The buffers that are to be sent */
   struct send_buffer
@@ -1602,7 +1649,7 @@ thr_job_buffer*
 seize_buffer(struct thr_repository* rep, int thr_no, bool prioa)
 {
   thr_job_buffer* jb;
-  thr_data* selfptr = rep->m_thread + thr_no;
+  struct thr_data* selfptr = &rep->m_thread[thr_no].m_thr_data;
   Uint32 first_free = selfptr->m_first_free;
   Uint32 first_unused = selfptr->m_first_unused;
 
@@ -1664,7 +1711,7 @@ static
 void
 release_buffer(struct thr_repository* rep, int thr_no, thr_job_buffer* jb)
 {
-  struct thr_data* selfptr = rep->m_thread + thr_no;
+  struct thr_data* selfptr = &rep->m_thread[thr_no].m_thr_data;
   Uint32 first_free = selfptr->m_first_free;
   Uint32 first_unused = selfptr->m_first_unused;
 
@@ -1900,7 +1947,7 @@ void
 senddelay(Uint32 thr_no, const SignalHeader* s, Uint32 delay)
 {
   struct thr_repository* rep = &g_thr_repository;
-  struct thr_data * selfptr = rep->m_thread + thr_no;
+  struct thr_data* selfptr = &rep->m_thread[thr_no].m_thr_data;
   assert(pthread_equal(selfptr->m_thr_id, pthread_self()));
   unsigned siglen = (sizeof(*s) >> 2) + s->theLength + s->m_noOfSections;
 
@@ -2052,11 +2099,12 @@ flush_jbb_write_state(thr_data *selfptr)
   Uint32 self = selfptr->m_thr_no;
 
   thr_jb_write_state *w = selfptr->m_write_states;
-  thr_data *thrptr = g_thr_repository.m_thread;
-  for (Uint32 thr_no = 0; thr_no < thr_count; thr_no++, thrptr++, w++)
+  thr_data_aligned *thr_align_ptr = g_thr_repository.m_thread;
+  for (Uint32 thr_no = 0; thr_no < thr_count; thr_no++, thr_align_ptr++, w++)
   {
     if (w->m_pending_signals || w->m_pending_signals_wakeup)
     {
+      struct thr_data *thrptr = &thr_align_ptr->m_thr_data;
       w->m_pending_signals_wakeup = MAX_SIGNALS_BEFORE_WAKEUP;
       thr_job_queue_head *q_head = thrptr->m_in_queue_head + self;
       flush_write_state(selfptr, thrptr, q_head, w);
@@ -2077,8 +2125,8 @@ check_job_buffers(struct thr_repository*
 {
   const Uint32 minfree = (1024 + MIN_SIGNALS_PER_PAGE - 1)/MIN_SIGNALS_PER_PAGE;
   unsigned thr_no = first_receiver_thread_no + recv_thread_id;
-  const thr_data *thrptr = rep->m_thread;
-  for (unsigned i = 0; i<num_threads; i++, thrptr++)
+  const thr_data_aligned *thr_align_ptr = rep->m_thread;
+  for (unsigned i = 0; i<num_threads; i++, thr_align_ptr++)
   {
     /**
      * NOTE: m_read_index is read wo/ lock (and updated by different thread)
@@ -2087,6 +2135,7 @@ check_job_buffers(struct thr_repository*
      *       function is always conservative (i.e it can be better than
      *       returned value, if read-index has moved but we didnt see it)
      */
+    const struct thr_data *thrptr = &thr_align_ptr->m_thr_data;
     const thr_job_queue_head *q_head = thrptr->m_in_queue_head + thr_no;
     unsigned ri = q_head->m_read_index;
     unsigned wi = q_head->m_write_index;
@@ -2122,9 +2171,9 @@ compute_max_signals_to_execute(Uint32 th
 {
   Uint32 minfree = thr_job_queue::SIZE;
   const struct thr_repository* rep = &g_thr_repository;
-  const thr_data *thrptr = rep->m_thread;
+  const struct thr_data_aligned *thr_align_ptr = rep->m_thread;
 
-  for (unsigned i = 0; i<num_threads; i++, thrptr++)
+  for (unsigned i = 0; i<num_threads; i++, thr_align_ptr++)
   {
     /**
      * NOTE: m_read_index is read wo/ lock (and updated by different thread)
@@ -2133,6 +2182,7 @@ compute_max_signals_to_execute(Uint32 th
      *       function is always conservative (i.e it can be better than
      *       returned value, if read-index has moved but we didnt see it)
      */
+    const struct thr_data *thrptr = &thr_align_ptr->m_thr_data;
     const thr_job_queue_head *q_head = thrptr->m_in_queue_head + thr_no;
     unsigned ri = q_head->m_read_index;
     unsigned wi = q_head->m_write_index;
@@ -2537,7 +2587,8 @@ trp_callback::bytes_sent(NodeId node, Ui
   assert(thr_no != NO_SEND_THREAD);
   if (!is_send_thread(thr_no))
   {
-    return ::bytes_sent(&g_thr_repository.m_thread[thr_no].m_send_buffer_pool,
+    thr_data * thrptr = &g_thr_repository.m_thread[thr_no].m_thr_data;
+    return ::bytes_sent(&thrptr->m_send_buffer_pool,
                         sb, bytes);
   }
   else
@@ -3220,7 +3271,7 @@ add_thr_map(Uint32 main, Uint32 instance
 
   require(thr_no < num_threads);
   struct thr_repository* rep = &g_thr_repository;
-  thr_data* thr_ptr = rep->m_thread + thr_no;
+  struct thr_data* thr_ptr = &rep->m_thread[thr_no].m_thr_data;
 
   /* Add to list. */
   {
@@ -3818,9 +3869,9 @@ sendlocal(Uint32 self, const SignalHeade
 
   Uint32 dst = block2ThreadId(block, instance);
   struct thr_repository* rep = &g_thr_repository;
-  struct thr_data * selfptr = rep->m_thread + self;
+  struct thr_data *selfptr = &rep->m_thread[self].m_thr_data;
   assert(pthread_equal(selfptr->m_thr_id, pthread_self()));
-  struct thr_data * dstptr = rep->m_thread + dst;
+  struct thr_data *dstptr = &rep->m_thread[dst].m_thr_data;
 
   selfptr->m_stat.m_priob_count++;
   Uint32 siglen = (sizeof(*s) >> 2) + s->theLength + s->m_noOfSections;
@@ -3846,10 +3897,10 @@ sendprioa(Uint32 self, const SignalHeade
 
   Uint32 dst = block2ThreadId(block, instance);
   struct thr_repository* rep = &g_thr_repository;
-  struct thr_data *selfptr = rep->m_thread + self;
+  struct thr_data *selfptr = &rep->m_thread[self].m_thr_data;
   assert(s->theVerId_signalNumber == GSN_START_ORD ||
          pthread_equal(selfptr->m_thr_id, pthread_self()));
-  struct thr_data *dstptr = rep->m_thread + dst;
+  struct thr_data *dstptr = &rep->m_thread[dst].m_thr_data;
 
   selfptr->m_stat.m_prioa_count++;
   Uint32 siglen = (sizeof(*s) >> 2) + s->theLength + s->m_noOfSections;
@@ -3889,7 +3940,7 @@ mt_send_remote(Uint32 self, const Signal
                const LinearSectionPtr ptr[3])
 {
   thr_repository *rep = &g_thr_repository;
-  thr_data *selfptr = rep->m_thread + self;
+  struct thr_data *selfptr = &rep->m_thread[self].m_thr_data;
   SendStatus ss;
 
   mt_send_handle handle(selfptr);
@@ -3907,7 +3958,7 @@ mt_send_remote(Uint32 self, const Signal
                const SegmentedSectionPtr ptr[3])
 {
   thr_repository *rep = &g_thr_repository;
-  thr_data *selfptr = rep->m_thread + self;
+  struct thr_data *selfptr = &rep->m_thread[self].m_thr_data;
   SendStatus ss;
 
   mt_send_handle handle(selfptr);
@@ -3940,7 +3991,7 @@ sendprioa_STOP_FOR_CRASH(const struct th
   /**
    * Pick any instance running in this thread
    */
-  struct thr_data * dstptr = rep->m_thread + dst;
+  struct thr_data *dstptr = &rep->m_thread[dst].m_thr_data;
   Uint32 bno = dstptr->m_instance_list[0];
 
   memset(&signalT.header, 0, sizeof(SignalHeader));
@@ -4058,7 +4109,7 @@ thr_init2(struct thr_repository* rep, st
     selfptr->m_write_states[i].m_write_index = 0;
     selfptr->m_write_states[i].m_write_pos = 0;
     selfptr->m_write_states[i].m_write_buffer =
-      rep->m_thread[i].m_in_queue[thr_no].m_buffers[0];
+      rep->m_thread[i].m_thr_data.m_in_queue[thr_no].m_buffers[0];
     selfptr->m_write_states[i].m_pending_signals = 0;
     selfptr->m_write_states[i].m_pending_signals_wakeup = 0;
   }    
@@ -4097,11 +4148,11 @@ rep_init(struct thr_repository* rep, uns
   rep->m_thread_count = cnt;
   for (unsigned int i = 0; i<cnt; i++)
   {
-    thr_init(rep, rep->m_thread + i, cnt, i);
+    thr_init(rep, &rep->m_thread[i].m_thr_data, cnt, i);
   }
   for (unsigned int i = 0; i<cnt; i++)
   {
-    thr_init2(rep, rep->m_thread + i, cnt, i);
+    thr_init2(rep, &rep->m_thread[i].m_thr_data, cnt, i);
   }
 
   rep->stopped_threads = 0;
@@ -4356,7 +4407,7 @@ ThreadConfig::ipControlLoop(NdbThread* p
    */
   for (thr_no = 0; thr_no < num_threads; thr_no++)
   {
-    rep->m_thread[thr_no].m_time = NdbTick_CurrentMillisecond();
+    rep->m_thread[thr_no].m_thr_data.m_time = NdbTick_CurrentMillisecond();
 
     if (thr_no == first_receiver_thread_no)
       continue;                 // Will run in the main thread.
@@ -4368,30 +4419,30 @@ ThreadConfig::ipControlLoop(NdbThread* p
     if (thr_no < first_receiver_thread_no)
     {
       /* Start block threads */
-      rep->m_thread[thr_no].m_thread =
+      rep->m_thread[thr_no].m_thr_data.m_thread =
         NdbThread_Create(mt_job_thread_main,
                          (void **)(rep->m_thread + thr_no),
                          1024*1024,
                          "execute thread", //ToDo add number
                          NDB_THREAD_PRIO_MEAN);
-      require(rep->m_thread[thr_no].m_thread != NULL);
+      require(rep->m_thread[thr_no].m_thr_data.m_thread != NULL);
     }
     else
     {
       /* Start a receiver thread, also block thread for TRPMAN */
-      rep->m_thread[thr_no].m_thread =
+      rep->m_thread[thr_no].m_thr_data.m_thread =
         NdbThread_Create(mt_receiver_thread_main,
-                         (void **)(rep->m_thread + thr_no),
+                         (void **)(&rep->m_thread[thr_no].m_thr_data),
                          1024*1024,
                          "receive thread", //ToDo add number
                          NDB_THREAD_PRIO_MEAN);
-      require(rep->m_thread[thr_no].m_thread != NULL);
+      require(rep->m_thread[thr_no].m_thr_data.m_thread != NULL);
     }
   }
 
   /* Now run the main loop for first receiver thread directly. */
-  rep->m_thread[first_receiver_thread_no].m_thread = pThis;
-  mt_receiver_thread_main(&(rep->m_thread[first_receiver_thread_no]));
+  rep->m_thread[first_receiver_thread_no].m_thr_data.m_thread = pThis;
+  mt_receiver_thread_main(&(rep->m_thread[first_receiver_thread_no].m_thr_data));
 
   /* Wait for all threads to shutdown. */
   for (thr_no = 0; thr_no < num_threads; thr_no++)
@@ -4399,8 +4450,9 @@ ThreadConfig::ipControlLoop(NdbThread* p
     if (thr_no == first_receiver_thread_no)
       continue;
     void *dummy_return_status;
-    NdbThread_WaitFor(rep->m_thread[thr_no].m_thread, &dummy_return_status);
-    NdbThread_Destroy(&(rep->m_thread[thr_no].m_thread));
+    NdbThread_WaitFor(rep->m_thread[thr_no].m_thr_data.m_thread,
+                      &dummy_return_status);
+    NdbThread_Destroy(&(rep->m_thread[thr_no].m_thr_data.m_thread));
   }
 
   /* Delete send threads, includes waiting for threads to shutdown */
@@ -4477,7 +4529,8 @@ FastScheduler::traceDumpGetJam(Uint32 th
   thrdTheEmulatedJam = NULL;
   thrdTheEmulatedJamIndex = 0;
 #else
-  const EmulatedJamBuffer *jamBuffer = &g_thr_repository.m_thread[thr_no].m_jam;
+  const EmulatedJamBuffer *jamBuffer =
+    &g_thr_repository.m_thread[thr_no].m_thr_data.m_jam;
   thrdTheEmulatedJam = jamBuffer->theEmulatedJam;
   thrdTheEmulatedJamIndex = jamBuffer->theEmulatedJamIndex;
   jamBlockNumber = jamBuffer->theEmulatedJamBlockNumber;
@@ -4611,7 +4664,7 @@ FastScheduler::dumpSignalMemory(Uint32 t
   Uint32 seq_start = 0;
   Uint32 seq_end = 0;
 
-  const thr_data *thr_ptr = &rep->m_thread[thr_no];
+  const struct thr_data *thr_ptr = &rep->m_thread[thr_no].m_thr_data;
   if (watchDogCounter)
     *watchDogCounter = 4;
 
@@ -4905,7 +4958,7 @@ void
 mt_wakeup(class SimulatedBlock* block)
 {
   Uint32 thr_no = block->getThreadId();
-  thr_data *thrptr = g_thr_repository.m_thread + thr_no;
+  struct thr_data *thrptr = &g_thr_repository.m_thread[thr_no].m_thr_data;
   wakeup(&thrptr->m_waiter);
 }
 
@@ -4914,7 +4967,7 @@ void
 mt_assert_own_thread(SimulatedBlock* block)
 {
   Uint32 thr_no = block->getThreadId();
-  thr_data *thrptr = g_thr_repository.m_thread + thr_no;
+  struct thr_data *thrptr = &g_thr_repository.m_thread[thr_no].m_thr_data;
 
   if (unlikely(pthread_equal(thrptr->m_thr_id, pthread_self()) == 0))
   {
@@ -4930,7 +4983,7 @@ Uint32
 mt_get_blocklist(SimulatedBlock * block, Uint32 arr[], Uint32 len)
 {
   Uint32 thr_no = block->getThreadId();
-  thr_data *thr_ptr = g_thr_repository.m_thread + thr_no;
+  struct thr_data *thr_ptr = &g_thr_repository.m_thread[thr_no].m_thr_data;
 
   for (Uint32 i = 0; i < thr_ptr->m_instance_count; i++)
   {
@@ -4945,7 +4998,7 @@ mt_get_thr_stat(class SimulatedBlock * b
 {
   bzero(dst, sizeof(* dst));
   Uint32 thr_no = block->getThreadId();
-  thr_data *selfptr = g_thr_repository.m_thread + thr_no;
+  struct thr_data *selfptr = &g_thr_repository.m_thread[thr_no].m_thr_data;
 
   THRConfigApplier & conf = globalEmulatorData.theConfiguration->m_thr_config;
   dst->thr_no = thr_no;

No bundle (reason: useless for push emails).
Thread
bzr push into mysql-5.5-cluster-7.2 branch (jonas.oreland:3801 to 3802) jonas oreland1 Feb