MySQL Lists are EOL. Please join:

List:Commits« Previous MessageNext Message »
From:Jonas Oreland Date:January 21 2009 9:55am
Subject:bzr push into mysql-5.1-bugteam branch (jonas:3222 to 3223) Bug#42052
View as plain text  
 3223 Jonas Oreland	2009-01-21
      ndbmtd - 
        1) OJ optimizations developed for cmt/bw
        2) pessemstic scheduling (update_sched_config), bug#42052
           only execute signals if space exist to send to 
           other-threads in system
        3) new NdbCondition_ComputeAbsTime/NdbCondition_WaitTimeoutAbs
modified:
  storage/ndb/include/portlib/NdbCondition.h
  storage/ndb/src/common/portlib/NdbCondition.c
  storage/ndb/src/kernel/vm/mt.cpp

 3222 Pekka Nousiainen	2009-01-20
      bug#41905 02.diff
      In alter table move invalidate old version to commit.
modified:
  storage/ndb/src/ndbapi/NdbDictionaryImpl.cpp
  storage/ndb/src/ndbapi/NdbDictionaryImpl.hpp

=== modified file 'storage/ndb/include/portlib/NdbCondition.h'
--- a/storage/ndb/include/portlib/NdbCondition.h	2006-12-23 19:20:40 +0000
+++ b/storage/ndb/include/portlib/NdbCondition.h	2009-01-21 09:55:01 +0000
@@ -57,7 +57,22 @@ int
 NdbCondition_WaitTimeout(struct NdbCondition* p_cond,
 			 NdbMutex* p_mutex,
 			 int msec);
-  
+/*
+ * same as NdbCondition_WaitTimeout only that
+ * endtime is a absolute time computed using
+ * NdbCondition_ComputeAbsTime
+ */
+int
+NdbCondition_WaitTimeoutAbs(struct NdbCondition* p_cond,
+			 NdbMutex* p_mutex,
+			 const struct timespec * endtime);
+
+/**
+ * compute an absolute time suitable for use with NdbCondition_WaitTimeoutAbs
+ * and store it in <em>dst</em> <em>ms</em> specifies milliseconds from now
+ */
+void
+NdbCondition_ComputeAbsTime(struct timespec * dst, unsigned ms);
 
 /**
  * Signal a condition

=== modified file 'storage/ndb/src/common/portlib/NdbCondition.c'
--- a/storage/ndb/src/common/portlib/NdbCondition.c	2008-08-21 05:08:15 +0000
+++ b/storage/ndb/src/common/portlib/NdbCondition.c	2009-01-21 09:55:01 +0000
@@ -130,22 +130,27 @@ NdbCondition_Wait(struct NdbCondition* p
 int 
 NdbCondition_WaitTimeout(struct NdbCondition* p_cond,
                          NdbMutex* p_mutex,
-                         int msecs){
-  int result;
+                         int msecs)
+{
   struct timespec abstime; 
+
+  NdbCondition_ComputeAbsTime(&abstime, msecs);
+  return NdbCondition_WaitTimeoutAbs(p_cond, p_mutex, &abstime);
+}
+
+void
+NdbCondition_ComputeAbsTime(struct timespec * abstime, unsigned msecs)
+{
   int secs = 0;
-  
-  if (p_cond == NULL || p_mutex == NULL)
-    return 1;
 #ifndef NDB_WIN
 #ifdef HAVE_CLOCK_GETTIME
-  clock_gettime(clock_id, &abstime);
+  clock_gettime(clock_id, abstime);
 #else
   {
     struct timeval tick_time;
     gettimeofday(&tick_time, 0);
-    abstime.tv_sec  = tick_time.tv_sec;
-    abstime.tv_nsec = tick_time.tv_usec * 1000;
+    abstime->tv_sec  = tick_time.tv_sec;
+    abstime->tv_nsec = tick_time.tv_usec * 1000;
   }
 #endif
 
@@ -154,18 +159,26 @@ NdbCondition_WaitTimeout(struct NdbCondi
     msecs = msecs % 1000;
   }
 
-  abstime.tv_sec  += secs;
-  abstime.tv_nsec += msecs * 1000000;
-  if (abstime.tv_nsec >= 1000000000) {
-    abstime.tv_sec  += 1;
-    abstime.tv_nsec -= 1000000000;
+  abstime->tv_sec  += secs;
+  abstime->tv_nsec += msecs * 1000000;
+  if (abstime->tv_nsec >= 1000000000) {
+    abstime->tv_sec  += 1;
+    abstime->tv_nsec -= 1000000000;
   }
 #else
-  set_timespec_nsec(abstime,msecs*1000000ULL);
+  set_timespec_nsec(*abstime,msecs*1000000ULL);
 #endif
-  result = pthread_cond_timedwait(&p_cond->cond, p_mutex, &abstime);
-  
-  return result;
+}
+
+int
+NdbCondition_WaitTimeoutAbs(struct NdbCondition* p_cond,
+                            NdbMutex* p_mutex,
+                            const struct timespec * abstime)
+{
+  if (p_cond == NULL || p_mutex == NULL)
+    return 1;
+
+  return pthread_cond_timedwait(&p_cond->cond, p_mutex, abstime);
 }
 
 int 

=== modified file 'storage/ndb/src/kernel/vm/mt.cpp'
--- a/storage/ndb/src/kernel/vm/mt.cpp	2008-12-11 10:40:24 +0000
+++ b/storage/ndb/src/kernel/vm/mt.cpp	2009-01-21 09:55:01 +0000
@@ -82,6 +82,9 @@ static Uint32 receiver_thread_no = 0;
 
 #define NO_SEND_THREAD (MAX_THREADS + 1)
 
+/* max signal is 32 words, 7 for signal header and 25 datawords */
+#define MIN_SIGNALS_PER_PAGE (thr_job_buffer::SIZE / 32)
+
 struct mt_lock_stat
 {
   const void * m_ptr;
@@ -143,9 +146,11 @@ struct thr_wait
  * Will call check_callback(check_arg) after proper synchronisation, and only
  * if that returns true will it actually sleep, else it will return
  * immediately. This is needed to avoid races with wakeup.
+ *
+ * Returns 'true' if it actually did sleep.
  */
 static inline
-void
+bool
 yield(struct thr_wait* wait, const struct timespec *timeout,
       bool (*check_callback)(struct thr_data *), struct thr_data *check_arg)
 {
@@ -167,10 +172,11 @@ yield(struct thr_wait* wait, const struc
    * Also need a memory barrier to ensure this extra check is race-free.
    *   but that is already provided by xcng
    */
-
-  if ((*check_callback)(check_arg))
+  bool waited = (*check_callback)(check_arg);
+  if (waited)
     futex_wait(val, thr_wait::FS_SLEEPING, timeout);
   xcng(val, thr_wait::FS_RUNNING);
+  return waited;
 }
 
 static inline
@@ -196,10 +202,9 @@ wakeup(struct thr_wait* wait)
 struct thr_wait
 {
   bool m_need_wakeup;
- 
   NdbMutex *m_mutex;
   NdbCondition *m_cond;
-  thr_wait() : m_mutex(0), m_cond(0), m_need_wakeup(false) {}
+  thr_wait() : m_need_wakeup(false), m_mutex(0), m_cond(0) {}
 
   void init() {
     m_mutex = NdbMutex_Create();
@@ -208,24 +213,32 @@ struct thr_wait
 };
 
 static inline
-void
+bool
 yield(struct thr_wait* wait, const struct timespec *timeout,
       bool (*check_callback)(struct thr_data *), struct thr_data *check_arg)
 {
   Uint32 msec = 
     (1000 * timeout->tv_sec) + 
     (timeout->tv_nsec / 1000000);
+  struct timespec end;
+  NdbCondition_ComputeAbsTime(&end, msec);
   NdbMutex_Lock(wait->m_mutex);
-  while ((*check_callback)(check_arg)) /* May have spurious wakeups: Always recheck condition predicate */
+
+  Uint32 waits = 0;
+  /* May have spurious wakeups: Always recheck condition predicate */
+  while ((*check_callback)(check_arg))
   {
     wait->m_need_wakeup = true;
-    if (NdbCondition_WaitTimeout(wait->m_cond, wait->m_mutex, msec) == ETIMEDOUT)
+    waits++;
+    if (NdbCondition_WaitTimeoutAbs(wait->m_cond,
+                                    wait->m_mutex, &end) == ETIMEDOUT)
     {
       wait->m_need_wakeup = false;
       break;
     }
   }
   NdbMutex_Unlock(wait->m_mutex);
+  return (waits > 0);
 }
 
 
@@ -246,18 +259,14 @@ wakeup(struct thr_wait* wait)
 
 #endif
 
-static inline void
+static inline
+void
 require(bool x)
 {
   if (unlikely(!(x)))
     abort();
 }
 
-#ifdef assert
-#undef assert
-#endif
-#define assert(x) require(x)
-
 #ifdef NDB_HAVE_XCNG
 struct thr_spin_lock
 {
@@ -435,7 +444,7 @@ public:
     if (tmp)
     {
       m_freelist = tmp->m_next;
-      assert (m_free > 0);
+      assert(m_free > 0);
       m_free--;
     }
     else
@@ -520,7 +529,7 @@ struct thr_job_queue_head
 
 struct thr_job_queue
 {
-  static const unsigned SIZE = 30;
+  static const unsigned SIZE = 31;
 
   struct thr_job_queue_head* m_head;
   struct thr_job_buffer* m_buffers[SIZE];
@@ -595,7 +604,7 @@ struct thr_jb_read_state
 
   bool is_empty() const
   {
-//  assert (m_read_index != m_write_index  ||  m_read_pos <= m_read_end);
+    assert(m_read_index != m_write_index  ||  m_read_pos <= m_read_end);
     return (m_read_index == m_write_index) && (m_read_pos >= m_read_end);
   }
 };
@@ -693,6 +702,16 @@ struct thr_data
   thr_wait m_waiter;
   unsigned m_thr_no;
 
+  /**
+   * max signals to execute per JBB buffer
+   */
+  unsigned m_max_signals_per_jb;
+
+  /**
+   * max signals to execute before recomputing m_max_signals_per_jb
+   */
+  unsigned m_max_exec_signals;
+
   Uint64 m_time;
   struct thr_tq m_tq;
 
@@ -1071,50 +1090,44 @@ handle_time_wrap(struct thr_data* selfpt
   }
 }
 
-static inline
+static
 void
-scan_time_queues(struct thr_data* selfptr)
+scan_time_queues_impl(struct thr_data* selfptr, NDB_TICKS now)
 {
   struct thr_tq * tq = &selfptr->m_tq;
-  NDB_TICKS now = NdbTick_CurrentMillisecond();
   NDB_TICKS last = selfptr->m_time;
 
   Uint32 curr = tq->m_current_time;
   Uint32 cnt0 = tq->m_cnt[0];
   Uint32 cnt1 = tq->m_cnt[1];
 
+  assert(now > last);
   Uint64 diff = now - last;
-  if (diff == 0)
-  {
-    return;
-  }
-  else if (diff > 0)
-  {
-    Uint32 step = (Uint32)((diff > 20) ? 20 : diff);
-    Uint32 end = (curr + step);
-    if (end >= 32767)
-    {
-      handle_time_wrap(selfptr);
-      cnt0 = tq->m_cnt[0];
-      cnt1 = tq->m_cnt[1];
-      end -= 32767;
-    }
-    
-    Uint32 tmp0 = scan_queue(selfptr, cnt0, end, tq->m_short_queue);
-    Uint32 tmp1 = scan_queue(selfptr, cnt1, end, tq->m_long_queue);
+  Uint32 step = (Uint32)((diff > 20) ? 20 : diff);
+  Uint32 end = (curr + step);
+  if (end >= 32767)
+  {
+    handle_time_wrap(selfptr);
+    cnt0 = tq->m_cnt[0];
+    cnt1 = tq->m_cnt[1];
+    end -= 32767;
+  }
+
+  Uint32 tmp0 = scan_queue(selfptr, cnt0, end, tq->m_short_queue);
+  Uint32 tmp1 = scan_queue(selfptr, cnt1, end, tq->m_long_queue);
+
+  tq->m_current_time = end;
+  tq->m_cnt[0] = cnt0 - tmp0;
+  tq->m_cnt[1] = cnt1 - tmp1;
+  selfptr->m_time = last + step;
+}
 
-    tq->m_current_time = end;
-    tq->m_cnt[0] = cnt0 - tmp0;
-    tq->m_cnt[1] = cnt1 - tmp1;
-    selfptr->m_time = last + step;
-    
-    return;
-  }
-  else if (diff == 0)
-  {
-    return;
-  }
-  abort();
+static inline
+void
+scan_time_queues(struct thr_data* selfptr, NDB_TICKS now)
+{
+  if (selfptr->m_time != now)
+    scan_time_queues_impl(selfptr, now);
 }
 
 static
@@ -1244,13 +1257,13 @@ senddelay(Uint32 thr_no, const SignalHea
  * receiving threads.
  *
  * Two versions:
- *    - The general version flush_write_state_other() which may flush to any thread,
- *      and possibly signal any waiters.
+ *    - The general version flush_write_state_other() which may flush to
+ *      any thread, and possibly signal any waiters.
  *    - The special version flush_write_state_self() which should only be used
  *      to flush messages to itself.
  *
- * Call to these functions are encapsulated through flush_write_state which decides
- * which of these functions to call.
+ * Call to these functions are encapsulated through flush_write_state
+ * which decides which of these functions to call.
  */
 static inline
 void
@@ -1268,7 +1281,8 @@ flush_write_state_self(thr_job_queue_hea
 
 static inline
 void
-flush_write_state_other(thr_data *dstptr, thr_job_queue_head *q_head, thr_jb_write_state *w)
+flush_write_state_other(thr_data *dstptr, thr_job_queue_head *q_head,
+                        thr_jb_write_state *w)
 {
   /*
    * Two write memory barriers here, as assigning m_len may make signal data
@@ -1280,7 +1294,6 @@ flush_write_state_other(thr_data *dstptr
    *
    * But wmb() is a no-op anyway in x86 ...
    */
-
   wmb();
   w->m_write_buffer->m_len = w->m_write_pos;
   wmb();
@@ -1298,7 +1311,8 @@ flush_write_state_other(thr_data *dstptr
 
 static inline
 void
-flush_write_state(const thr_data *selfptr, thr_data *dstptr, thr_job_queue_head *q_head, thr_jb_write_state *w)
+flush_write_state(const thr_data *selfptr, thr_data *dstptr,
+                  thr_job_queue_head *q_head, thr_jb_write_state *w)
 {
   if (dstptr == selfptr)
   {
@@ -1332,43 +1346,98 @@ flush_jbb_write_state(thr_data *selfptr)
 }
 
 /**
- * return 1 if any threads in-queue is more than 25% full
- *   else 0
+ * Transporter will receive 1024 signals (MAX_RECEIVED_SIGNALS)
+ * before running check_job_buffers
+ *
+ * This function returns 0 if there is space to receive this amount of
+ *   signals
+ * else 1
  */
 static int
 check_job_buffers(struct thr_repository* rep)
 {
-  thr_data *thrptr = rep->m_thread;
+  const Uint32 minfree = (1024 + MIN_SIGNALS_PER_PAGE - 1)/MIN_SIGNALS_PER_PAGE;
+  unsigned thr_no = receiver_thread_no;
+  const thr_data *thrptr = rep->m_thread;
   for (unsigned i = 0; i<num_threads; i++, thrptr++)
   {
-    const thr_job_queue_head *q_head = thrptr->m_in_queue_head;
-    for (unsigned j = 0; j<num_threads; j++,q_head++)
+    /**
+     * NOTE: m_read_index is read wo/ lock (and updated by different thread)
+     *       but since the different thread can only consume
+     *       signals this means that the value returned from this
+     *       function is always conservative (i.e it can be better than
+     *       returned value, if read-index has moved but we didnt see it)
+     */
+    const thr_job_queue_head *q_head = thrptr->m_in_queue_head + thr_no;
+    unsigned ri = q_head->m_read_index;
+    unsigned wi = q_head->m_write_index;
+    unsigned busy = (wi >= ri) ? wi - ri : (thr_job_queue::SIZE - ri) + wi;
+    if (1 + minfree + busy >= thr_job_queue::SIZE)
     {
-      /**
-       * These values are read wo/ locks...
-       *   and they are written by different threads wo/ syncronization
-       *   i.e they are not 100% accurate
-       *
-       * A noticable exception is the values related to the receiver thread
-       *   (which calls this method)
-       * It's write-index is correct (since it's written by itself)
-       *   and this means that the estimate for this thread is only
-       *   conservative (i.e it can be better than guess, if read-index has
-       *   moved but we didnt see it)
-       */
-      unsigned ri = q_head->m_read_index;
-      unsigned wi = q_head->m_write_index;
-      unsigned busy = (wi >= ri) ? wi - ri : (thr_job_queue::SIZE - ri) + wi;
-      if (4*busy >= thr_job_queue::SIZE)
-      {
-        return 1;
-      }
+      return 1;
     }
   }
 
   return 0;
 }
 
+/**
+ * Compute max signals that thr_no can execute wo/ risking
+ *   job-buffer-full
+ *
+ *  see-also update_sched_config
+ *
+ *
+ * 1) compute free-slots in ring-buffer from self to each thread in system
+ * 2) pick smallest value
+ * 3) compute how many signals this corresponds to
+ * 4) compute how many signals self can execute if all were to be to
+ *    the thread with the fullest ring-buffer (i.e the worst case)
+ *
+ *   Assumption: each signal may send *at most* 4 signals
+ *     - this assumption is made the same in ndbd and ndbmtd and is
+ *       mostly followed by block-code, although not it all places :-(
+ */
+static
+Uint32
+compute_max_signals_to_execute(Uint32 thr_no)
+{
+  Uint32 minfree = thr_job_queue::SIZE;
+  const struct thr_repository* rep = &g_thr_repository;
+  const thr_data *thrptr = rep->m_thread;
+
+  for (unsigned i = 0; i<num_threads; i++, thrptr++)
+  {
+    /**
+     * NOTE: m_read_index is read wo/ lock (and updated by different thread)
+     *       but since the different thread can only consume
+     *       signals this means that the value returned from this
+     *       function is always conservative (i.e it can be better than
+     *       returned value, if read-index has moved but we didnt see it)
+     */
+    const thr_job_queue_head *q_head = thrptr->m_in_queue_head + thr_no;
+    unsigned ri = q_head->m_read_index;
+    unsigned wi = q_head->m_write_index;
+    unsigned free = (wi < ri) ? ri - wi : (thr_job_queue::SIZE + ri) - wi;
+
+    assert(free <= thr_job_queue::SIZE);
+
+    if (free < minfree)
+      minfree = free;
+  }
+
+#define SAFETY 2
+
+  if (minfree >= (1 + SAFETY))
+  {
+    return (3 + (minfree - (1 + SAFETY)) * MIN_SIGNALS_PER_PAGE) / 4;
+  }
+  else
+  {
+    return 0;
+  }
+}
+
 //#define NDBMT_RAND_YIELD
 #ifdef NDBMT_RAND_YIELD
 static Uint32 g_rand_yield = 0;
@@ -1468,10 +1537,12 @@ trp_callback::checkJobBuffer()
        */
 
       /** FIXME:
-       *  On a CMT chip where #CPU >= #NDB-threads sched_yield() is effectively a NOOP as
-       *  there will normally be an idle CPU available to immediately resume thread execution.
-       *  On a Niagara chip this may severely impact performance as the CPUs are virtualized
-       *  by timemultiplexing the physical core. The thread should really be 'parked' on
+       *  On a CMT chip where #CPU >= #NDB-threads sched_yield() is
+       *  effectively a NOOP as there will normally be an idle CPU available
+       *  to immediately resume thread execution.
+       *  On a Niagara chip this may severely impact performance as the CPUs
+       *  are virtualized by timemultiplexing the physical core.
+       *  The thread should really be 'parked' on
        *  a condition to free its execution resources.
        */
 //    usleep(a-few-usec);  /* A micro-sleep would likely have been better... */
@@ -1685,6 +1756,17 @@ bool
 trp_callback::has_data_to_send(NodeId node)
 {
   return true;
+
+  thr_repository::send_buffer * sb = g_thr_repository.m_send_buffers + node;
+  Uint32 thr_no = sb->m_send_thread;
+  assert(thr_no != NO_SEND_THREAD);
+  assert((sb->m_bytes > 0) == (sb->m_buffer.m_first_page != 0));
+  if (sb->m_bytes > 0 || sb->m_force_send)
+    return true;
+
+  thr_send_queue * dst = g_thr_repository.m_thread_send_buffers[node]+thr_no;
+
+  return sb->m_read_index[thr_no] != dst->m_write_index;
 }
 
 void
@@ -1820,6 +1902,28 @@ try_send(thr_data * selfptr, Uint32 node
 }
 
 /**
+ * Flush send buffers and append them to dst. nodes send queue
+ *
+ * Flushed buffer contents are piggybacked when another thread
+ * do_send() to the same dst. node. This makes it possible to have
+ * more data included in each message, and thereby reduces total
+ * #messages handled by the OS which really impacts performance!
+ */
+static
+void
+do_flush(struct thr_data* selfptr)
+{
+  Uint32 i;
+  Uint32 count = selfptr->m_pending_send_count;
+  Uint8 *nodes = selfptr->m_pending_send_nodes;
+
+  for (i = 0; i < count; i++)
+  {
+    flush_send_buffer(selfptr, nodes[i]);
+  }
+}
+
+/**
  * Send any pending data to remote nodes.
  *
  * If MUST_SEND is false, will only try to lock the send lock, but if it would
@@ -1835,8 +1939,7 @@ try_send(thr_data * selfptr, Uint32 node
  */
 static
 Uint32
-do_send(struct thr_data* selfptr,
-        Uint32 *watchDogCounter, bool must_send)
+do_send(struct thr_data* selfptr, bool must_send)
 {
   Uint32 i;
   Uint32 count = selfptr->m_pending_send_count;
@@ -1855,7 +1958,7 @@ do_send(struct thr_data* selfptr,
   for (i = 0; i < count; i++)
   {
     Uint32 node = nodes[i];
-    *watchDogCounter = 6;
+    selfptr->m_watchdog_counter = 6;
 
     flush_send_buffer(selfptr, node);
 
@@ -1964,6 +2067,14 @@ mt_send_handle::updateWritePtr(NodeId no
   return p->m_bytes;
 }
 
+static
+void
+job_buffer_full()
+{
+  ndbout_c("job buffer full");
+  abort();
+}
+
 /*
  * Insert a signal in a job queue.
  *
@@ -2022,7 +2133,8 @@ insert_signal(thr_job_queue *q, thr_jb_w
     wmb();
     w->m_write_buffer->m_len = write_pos;
     Uint32 write_index = (w->m_write_index + 1) % thr_job_queue::SIZE;
-    /*
+
+    /**
      * Full job buffer is fatal.
      *
      * ToDo: should we wait for it to become non-full? There is no guarantee
@@ -2030,7 +2142,10 @@ insert_signal(thr_job_queue *q, thr_jb_w
      *
      * Or alternatively, ndbrequire() ?
      */
-    assert(write_index != q->m_head->m_read_index);
+    if (unlikely(write_index == q->m_head->m_read_index))
+    {
+      job_buffer_full();
+    }
     new_buffer->m_len = 0;
     new_buffer->m_prioa = prioa;
     q->m_buffers[write_index] = new_buffer;
@@ -2054,8 +2169,12 @@ read_jbb_state(thr_data *selfptr, Uint32
   {
     Uint32 read_index = r->m_read_index;
 
-    if (r->m_write_index == read_index)  // Optimization: Only reload when possibly empty.
-    {                                    // (Avoid cache reload of shared thr_job_queue_head)
+    /**
+     * Optimization: Only reload when possibly empty.
+     * Avoid cache reload of shared thr_job_queue_head
+     */
+    if (r->m_write_index == read_index)
+    {
       r->m_write_index = q->m_head->m_write_index;
       read_barrier_depends();
       r->m_read_end = q->m_buffers[read_index]->m_len;
@@ -2110,7 +2229,7 @@ map_instance_init()
     if (!globalData.isNdbMtLqh) {
       g_map_instance[ino] = 0;
     } else {
-      assert(num_lqh_workers != 0);
+      require(num_lqh_workers != 0);
       if (ino <= MAX_NDBMT_LQH_WORKERS) {
         g_map_instance[ino] = 1 + (ino - 1) % num_lqh_workers;
       } else {
@@ -2138,14 +2257,14 @@ map_instance(const SignalHeader *s)
 static
 Uint32
 execute_signals(thr_data *selfptr, thr_job_queue *q, thr_jb_read_state *r,
-                Signal *sig, Uint32 max_signals,
-                Uint32 *watchDogCounter, Uint32 *signalIdCounter)
+                Signal *sig, Uint32 max_signals, Uint32 *signalIdCounter)
 {
   Uint32 num_signals;
   Uint32 read_index = r->m_read_index;
   Uint32 write_index = r->m_write_index;
   Uint32 read_pos = r->m_read_pos;
   Uint32 read_end = r->m_read_end;
+  Uint32 *watchDogCounter = &selfptr->m_watchdog_counter;
 
   if (read_index == write_index && read_pos >= read_end)
     return 0;          // empty read_state
@@ -2240,11 +2359,11 @@ execute_signals(thr_data *selfptr, thr_j
 
 static
 Uint32
-run_job_buffers(thr_data *selfptr, Signal *sig,
-                Uint32 *watchDogCounter, Uint32 *signalIdCounter)
+run_job_buffers(thr_data *selfptr, Signal *sig, Uint32 *signalIdCounter)
 {
   Uint32 thr_count = g_thr_repository.m_thread_count;
   Uint32 signal_count = 0;
+  Uint32 perjb = selfptr->m_max_signals_per_jb;
 
   read_jbb_state(selfptr, thr_count);
   /*
@@ -2255,7 +2374,8 @@ run_job_buffers(thr_data *selfptr, Signa
 
   thr_job_queue *queue = selfptr->m_in_queue;
   thr_jb_read_state *read_state = selfptr->m_read_states;
-  for (Uint32 send_thr_no = 0; send_thr_no < thr_count; send_thr_no++,queue++,read_state++)
+  for (Uint32 send_thr_no = 0; send_thr_no < thr_count;
+       send_thr_no++,queue++,read_state++)
   {
     /* Read the prio A state often, to avoid starvation of prio A. */
     bool jba_empty = read_jba_state(selfptr);
@@ -2264,13 +2384,12 @@ run_job_buffers(thr_data *selfptr, Signa
       static Uint32 max_prioA = thr_job_queue::SIZE * thr_job_buffer::SIZE;
       signal_count += execute_signals(selfptr, &(selfptr->m_jba),
                                       &(selfptr->m_jba_read_state), sig,
-                                      max_prioA, watchDogCounter,
-                                      signalIdCounter);
+                                      max_prioA, signalIdCounter);
     }
+
     /* Now execute prio B signals from one thread. */
     signal_count += execute_signals(selfptr, queue, read_state,
-                                    sig, MAX_SIGNALS_PER_JB,
-                                    watchDogCounter, signalIdCounter);
+                                    sig, perjb, signalIdCounter);
   }
 
   return signal_count;
@@ -2304,12 +2423,12 @@ add_thr_map(Uint32 main, Uint32 instance
   assert(instance < MAX_BLOCK_INSTANCES);
 
   SimulatedBlock* b = globalData.getBlock(main, instance);
-  assert(b != 0);
+  require(b != 0);
 
   /* Block number including instance. */
   Uint32 block = numberToBlock(main, instance);
 
-  assert(thr_no < num_threads);
+  require(thr_no < num_threads);
   struct thr_repository* rep = &g_thr_repository;
   thr_data* thr_ptr = rep->m_thread + thr_no;
 
@@ -2317,9 +2436,9 @@ add_thr_map(Uint32 main, Uint32 instance
   {
     Uint32 i;
     for (i = 0; i < thr_ptr->m_instance_count; i++)
-      assert(thr_ptr->m_instance_list[i] != block);
+      require(thr_ptr->m_instance_list[i] != block);
   }
-  assert(thr_ptr->m_instance_count < MAX_INSTANCES_PER_THREAD);
+  require(thr_ptr->m_instance_count < MAX_INSTANCES_PER_THREAD);
   thr_ptr->m_instance_list[thr_ptr->m_instance_count++] = block;
 
   SimulatedBlock::ThreadContext ctx;
@@ -2331,7 +2450,7 @@ add_thr_map(Uint32 main, Uint32 instance
 
   /* Create entry mapping block to thread. */
   thr_map_entry& entry = thr_map[index][instance];
-  assert(entry.thr_no == thr_map_entry::NULL_THR_NO);
+  require(entry.thr_no == thr_map_entry::NULL_THR_NO);
   entry.thr_no = thr_no;
 }
 
@@ -2370,7 +2489,7 @@ add_main_thr_map()
 void
 add_lqh_worker_thr_map(Uint32 block, Uint32 instance)
 {
-  assert(instance != 0);
+  require(instance != 0);
   Uint32 i = instance - 1;
   Uint32 thr_no = NUM_MAIN_THREADS + i % num_lqh_threads;
   add_thr_map(block, instance, thr_no);
@@ -2380,7 +2499,7 @@ add_lqh_worker_thr_map(Uint32 block, Uin
 void
 add_extra_worker_thr_map(Uint32 block, Uint32 instance)
 {
-  assert(instance != 0);
+  require(instance != 0);
   Uint32 thr_no = block2ThreadId(block, 0);
   add_thr_map(block, instance, thr_no);
 }
@@ -2436,8 +2555,8 @@ init_thread(thr_data *selfptr)
   NdbThread_SetTlsKey(NDB_THREAD_TLS_THREAD, selfptr);
 
   unsigned thr_no = selfptr->m_thr_no;
-  globalEmulatorData.theWatchDog->registerWatchedThread(&selfptr->m_watchdog_counter,
-                                                        thr_no);
+  globalEmulatorData.theWatchDog->
+    registerWatchedThread(&selfptr->m_watchdog_counter, thr_no);
   {
     while(selfptr->m_thread == 0)
       NdbSleep_MilliSleep(30);
@@ -2540,10 +2659,11 @@ mt_receiver_thread_main(void *thr_arg)
     cnt = (cnt + 1) & 15;
 
     watchDogCounter = 2;
-    scan_time_queues(selfptr);
 
-    Uint32 sum = run_job_buffers(selfptr, signal,
-                                 &watchDogCounter, &thrSignalId);
+    NDB_TICKS now = NdbTick_CurrentMillisecond();
+    scan_time_queues(selfptr, now);
+
+    Uint32 sum = run_job_buffers(selfptr, signal, &thrSignalId);
 
     if (sum || has_received)
     {
@@ -2551,7 +2671,7 @@ mt_receiver_thread_main(void *thr_arg)
       flush_jbb_write_state(selfptr);
     }
 
-    do_send(selfptr, &watchDogCounter, TRUE);
+    do_send(selfptr, TRUE);
 
     watchDogCounter = 7;
 
@@ -2591,6 +2711,98 @@ sendpacked(struct thr_data* thr_ptr, Sig
   }
 }
 
+/**
+ * check if out-queues of selfptr is full
+ * return true is so
+ */
+static bool
+check_job_buffer_full(thr_data *selfptr)
+{
+  Uint32 thr_no = selfptr->m_thr_no;
+  Uint32 tmp = compute_max_signals_to_execute(thr_no);
+#if 0
+  Uint32 perjb = tmp / g_thr_repository.m_thread_count;
+
+  if (perjb == 0)
+  {
+    return true;
+  }
+
+  return false;
+#else
+  if (tmp < g_thr_repository.m_thread_count)
+    return true;
+  return false;
+#endif
+}
+
+/**
+ * update_sched_config
+ *
+ *   In order to prevent "job-buffer-full", i.e
+ *     that one thread(T1) produces so much signals to another thread(T2)
+ *     so that the ring-buffer from T1 to T2 gets full
+ *     the mainlop have 2 "config" variables
+ *   - m_max_exec_signals
+ *     This is the *total* no of signals T1 can execute before calling
+ *     this method again
+ *   - m_max_signals_per_jb
+ *     This is the max no of signals T1 can execute from each other thread
+ *     in system
+ *
+ *   Assumption: each signal may send *at most* 4 signals
+ *     - this assumption is made the same in ndbd and ndbmtd and is
+ *       mostly followed by block-code, although not it all places :-(
+ *
+ *   This function return true, if it it slept
+ *     (i.e that it concluded that it could not execute *any* signals, wo/
+ *      risking job-buffer-full)
+ */
+static
+bool
+update_sched_config(struct thr_data* selfptr, Uint32 pending_send)
+{
+  Uint32 sleeploop = 0;
+  Uint32 thr_no = selfptr->m_thr_no;
+loop:
+  Uint32 tmp = compute_max_signals_to_execute(thr_no);
+  Uint32 perjb = tmp / g_thr_repository.m_thread_count;
+
+  if (perjb > MAX_SIGNALS_PER_JB)
+    perjb = MAX_SIGNALS_PER_JB;
+
+  selfptr->m_max_exec_signals = tmp;
+  selfptr->m_max_signals_per_jb = perjb;
+
+  if (unlikely(perjb == 0))
+  {
+    sleeploop++;
+    if (sleeploop == 10)
+    {
+      /**
+       * we've slept for 10ms...try running anyway
+       */
+      selfptr->m_max_signals_per_jb = 1;
+      ndbout_c("%u - sleeploop 10!!", selfptr->m_thr_no);
+      return true;
+    }
+
+    if (pending_send)
+    {
+      /* About to sleep, _must_ send now. */
+      pending_send = do_send(selfptr, TRUE);
+    }
+
+    struct timespec wait;
+    wait.tv_sec = 0;
+    wait.tv_nsec = 1 * 1000000;    /* 1 ms */
+    yield(&selfptr->m_waiter, &wait, check_job_buffer_full, selfptr);
+    goto loop;
+  }
+
+  return sleeploop > 0;
+}
+
 extern "C"
 void *
 mt_job_thread_main(void *thr_arg)
@@ -2599,7 +2811,7 @@ mt_job_thread_main(void *thr_arg)
   Signal *signal;
   struct timespec nowait;
   nowait.tv_sec = 0;
-  nowait.tv_nsec = 10 * 1000000;
+  nowait.tv_nsec = 10 * 1000000;    /* 10 ms */
   Uint32 thrSignalId = 0;
 
   struct thr_data* selfptr = (struct thr_data *)thr_arg;
@@ -2612,46 +2824,101 @@ mt_job_thread_main(void *thr_arg)
   /* Avoid false watchdog alarms caused by race condition. */
   watchDogCounter = 1;
 
+  Uint32 pending_send = 0;
   Uint32 send_sum = 0;
+  int loops = 0;
+  int maxloops = 10;/* Loops before reading clock, fuzzy adapted to 1ms freq. */
+  NDB_TICKS now = selfptr->m_time;
+
   while (globalData.theRestartFlag != perform_stop)
   { 
+    loops++;
     update_sched_stats(selfptr);
 
     watchDogCounter = 2;
-    scan_time_queues(selfptr);
+    scan_time_queues(selfptr, now);
 
-    Uint32 sum = run_job_buffers(selfptr, signal,
-                                 &watchDogCounter, &thrSignalId);
+    Uint32 sum = run_job_buffers(selfptr, signal, &thrSignalId);
     
     watchDogCounter = 1;
     signal->header.m_noOfSections = 0; /* valgrind */
     sendpacked(selfptr, signal);
-    
+
     if (sum)
     {
       watchDogCounter = 6;
       flush_jbb_write_state(selfptr);
-    }
-
-    send_sum += sum;
+      send_sum += sum;
 
-    if (send_sum > 0)
+      if (send_sum > MAX_SIGNALS_BEFORE_SEND)
+      {
+        /* Try to send, but skip for now in case of lock contention. */
+        pending_send = do_send(selfptr, FALSE);
+        send_sum = 0;
+      }
+      else
+      {
+        /* Send buffers append to send queues to dst. nodes. */
+        do_flush(selfptr);
+      }
+    }
+    else
     {
-      if (sum == 0)
+      /* No signals processed, prepare to sleep to wait for more */
+      if (pending_send || send_sum > 0)
       {
         /* About to sleep, _must_ send now. */
-        sum = send_sum = do_send(selfptr, &watchDogCounter, TRUE);
+        pending_send = do_send(selfptr, TRUE);
+        send_sum = 0;
       }
-      else if (send_sum > MAX_SIGNALS_BEFORE_SEND)
+
+      if (pending_send == 0)
       {
-        /* Try to send, but skip for now in case of lock contention. */
-        send_sum = do_send(selfptr, &watchDogCounter, FALSE);
+        bool waited = yield(&selfptr->m_waiter, &nowait, check_queues_empty,
+                            selfptr);
+        if (waited)
+        {
+          /* Update current time after sleeping */
+          now = NdbTick_CurrentMillisecond();
+          loops = 0;
+        }
       }
     }
-    
-    if (sum == 0)
+
+    /**
+     * Check if we executed enough signals,
+     *   and if so recompute how many signals to execute
+     */
+    if (sum >= selfptr->m_max_exec_signals)
     {
-      yield(&selfptr->m_waiter, &nowait, check_queues_empty, selfptr);
+      if (update_sched_config(selfptr, pending_send))
+      {
+        /* Update current time after sleeping */
+        now = NdbTick_CurrentMillisecond();
+        loops = 0;
+      }
+    }
+    else
+    {
+      selfptr->m_max_exec_signals -= sum;
+    }
+
+    /**
+     * Adaptive reading freq. of systeme time every time 1ms
+     * is likely to have passed
+     */
+    if (loops > maxloops)
+    {
+      now = NdbTick_CurrentMillisecond();
+      Uint64 diff = now - selfptr->m_time;
+
+      /* Adjust 'maxloop' to achieve clock reading frequency of 1ms */
+      if (diff < 1)
+        maxloops += ((maxloops/10) + 1); /* No change: less frequent reading */
+      else if (diff > 1 && maxloops > 1)
+        maxloops -= ((maxloops/10) + 1); /* Overslept: Need more frequent read*/
+
+      loops = 0;
     }
   }
 
@@ -2809,9 +3076,9 @@ sendprioa_STOP_FOR_CRASH(const struct th
   else if (dst == receiver_thread_no)
     main = CMVMI;
   else
-    assert(false);
+    require(false);
   Uint32 bno = numberToBlock(main, instance);
-  assert(block2ThreadId(main, instance) == dst);
+  require(block2ThreadId(main, instance) == dst);
   struct thr_data * dstptr = rep->m_thread + dst;
 
   memset(&signalT.header, 0, sizeof(SignalHeader));
@@ -2866,6 +3133,8 @@ thr_init(struct thr_repository* rep, str
   Uint32 i;
 
   selfptr->m_thr_no = thr_no;
+  selfptr->m_max_signals_per_jb = MAX_SIGNALS_PER_JB;
+  selfptr->m_max_exec_signals = 0;
   selfptr->m_first_free = 0;
   selfptr->m_first_unused = 0;
   
@@ -2918,8 +3187,8 @@ thr_init(struct thr_repository* rep, str
 /* Have to do this after init of all m_in_queues is done. */
 static
 void
-thr_init2(struct thr_repository* rep, struct thr_data *selfptr, unsigned int cnt,
-          unsigned thr_no)
+thr_init2(struct thr_repository* rep, struct thr_data *selfptr,
+          unsigned int cnt, unsigned thr_no)
 {
   for (Uint32 i = 0; i<cnt; i++)
   {
@@ -3000,7 +3269,7 @@ ThreadConfig::init(EmulatorData *emulato
   num_lqh_workers = globalData.ndbMtLqhWorkers;
   num_lqh_threads = globalData.ndbMtLqhThreads;
   num_threads = NUM_MAIN_THREADS + num_lqh_threads + 1;
-  assert(num_threads <= MAX_THREADS);
+  require(num_threads <= MAX_THREADS);
   receiver_thread_no = num_threads - 1;
 
   ndbout << "NDBMT: num_threads=" << num_threads << endl;
@@ -3045,7 +3314,7 @@ setcpuaffinity(struct thr_repository* re
     if (cnt > globalData.ndbMtLqhThreads)
     {
       /**
-       * let each LQH have it's on CPU and rest share...
+       * let each LQH have it's own CPU and rest share...
        */
       // LQH threads start with 2
       unsigned cpu = mask.find(0);
@@ -3067,7 +3336,8 @@ setcpuaffinity(struct thr_repository* re
     }
     else
     {
-      // put receiver, tc, backup/suma in 1 thread, and round robin LQH for rest
+      // put receiver, tc, backup/suma in 1 thread,
+      // and round robin LQH for rest
       unsigned cpu = mask.find(0);
       rep->m_thread[0].m_cpu = cpu; // TC
       rep->m_thread[1].m_cpu = cpu; // backup/suma
@@ -3090,7 +3360,7 @@ setcpuaffinity(struct thr_repository* re
     /**
      * mt-classic and cnt > 1
      */
-    assert(num_threads == 3);
+    require(num_threads == 3);
     unsigned cpu = mask.find(0);
     rep->m_thread[1].m_cpu = cpu; // LQH
     cpu = mask.find(cpu + 1);
@@ -3131,7 +3401,7 @@ ThreadConfig::ipControlLoop(NdbThread* p
                        1024*1024,
                        "execute thread", //ToDo add number
                        NDB_THREAD_PRIO_MEAN);
-    assert(rep->m_thread[thr_no].m_thread != NULL);
+    require(rep->m_thread[thr_no].m_thread != NULL);
   }
 
   /* Now run the main loop for thread 0 directly. */
@@ -3294,7 +3564,7 @@ void mt_execSTOP_FOR_CRASH()
 {
   void *value= NdbThread_GetTlsKey(NDB_THREAD_TLS_THREAD);
   const thr_data *selfptr = reinterpret_cast<const thr_data *>(value);
-  assert(selfptr != NULL);
+  require(selfptr != NULL);
 
   pthread_mutex_lock(&g_thr_repository.stop_for_crash_mutex);
   g_thr_repository.stopped_threads++;

Thread
bzr push into mysql-5.1-bugteam branch (jonas:3222 to 3223) Bug#42052Jonas Oreland21 Jan