List:Commits« Previous MessageNext Message »
From:Ole John Aske Date:November 23 2012 11:41am
Subject:bzr push into mysql-5.5-cluster-7.2 branch (ole.john.aske:4109 to 4113)
View as plain text  
 4113 Ole John Aske	2012-11-23
      Fix failing ndb_bushy_joins.test under Valgrind
      
      Extends the TimeBetweenEpoch to avoid failure due to 
      exceeding 'MaxBufferedEpocs > 100).

    modified:
      mysql-test/suite/ndb/t/ndb_bushy_joins.cnf
 4112 Ole John Aske	2012-11-22
      Fix for bug#15908684: Job threads may 'sleep' without pending messages being sent
      
      This fix correct the calculation of the 'pending' argument to 
      update_sched_config() such that both previously failed 'pending_send',
      and currently available unsent 'send_sum' together forms the 
      number of 'pending' signals.

    modified:
      storage/ndb/src/kernel/vm/mt.cpp
 4111 Ole John Aske	2012-11-22
      Fix for Bug#15907515 RECEIVER THREAD COULD BLOCK/BUSY WAIT WHILE HOLDING RECEIVER MUTEX
      
      Note: This fix require the fix for bug 15907122 as 'baseline'.
      
      This fix removes the waiting for more job buffers to become available
      inside performReceive() (or actually: mt_checkDoJob() called from it).
      
      Instead performReceive() will now return with a 'full' status
      to the receive thread, which will unlock the receive mutex, 
      and start a conditional wait for more job buffers to become available.
      
      Furthermore this fix also removes the check_job_buffer() *before* 
      performReceive() such that we will now doReceive() of any pending
      TCP data into our local receiveBuffers even if the job buffers are full.

    modified:
      storage/ndb/include/transporter/TransporterRegistry.hpp
      storage/ndb/src/common/transporter/TransporterRegistry.cpp
      storage/ndb/src/kernel/vm/mt.cpp
 4110 Ole John Aske	2012-11-22
      Fix for Bug#15907122 INCORRECT HANDLING OF JOB-BUFFERS ALMOST FULL - 'SLEEPLOOP 10'
      
      This fix introduce a 'thr_wait*' (Sort of a condition/mutex)
      which is used to wait for job buffers to become available when
      we are in an 'job buffers almost full' situation. This 'thr_wait'
      will be signaled when a job buffer has been consumed & freed.
      
      This fix is a collection of the following 5 sub patches:
      
      ******
      
      Fix mixed up comment & naming wrt. whether 'm_recv_transporters' and
      'm_has_data_transporters' is a bitmask of NodeId or Tranporters.
      (It *is* NodeId bitmasks)
      
      ******
      Add dump of job buffer utilization before we 
      crash due to 'job buffer full', and to 'sleeploop 10'
      reports where we have to wait due to almost full job buffers.
      
      ******
      Change the signature of the yield() function such that it can take a more
      general '*check_callback()' function as arguments.
      
      Change the existing functions currently used as 'callbacks' 
      to the new (relaxed) signature.
      ******
      Refactor: Splitt out compute_free_buffers_in_queue() 
      from compute_max_signals_to_execute().
      
      New function contains common code intended for reuse in later patches.
      ******
      update_sched_config() will 'yield' the CPU and wait for more job buffers
      to become available when it is about to run out of job buffers.
      
      The yield() call will wait on a 'thr_wait' object, which may be sent
      a 'wakeup()' when the waiting condition has been resolved (by another thread)
      
      However, update_schec_config() used the 'thr_wait' object intended to be used 
      to wait for more *incomming signal* - What we actually have to wait for 
      are signals to be *consumed* by the destination thread. Luckily there is 
      also defined a 'max wait' of 1ms which currently will wakeup the thread .
      
      This patch introduce a own 'thr_wait' object which which is signaled by the
      consumer, and we now wait on this object.
      
      Furthermore the patch also avoid a situation where a thread could end up
      waiting for itself.

    modified:
      storage/ndb/include/transporter/TransporterRegistry.hpp
      storage/ndb/src/common/transporter/TransporterRegistry.cpp
      storage/ndb/src/kernel/vm/mt.cpp
 4109 Mauritz Sundell	2012-11-22 [merge]
      merge 7.1 -> 7.2

=== modified file 'mysql-test/suite/ndb/t/ndb_bushy_joins.cnf'
--- a/mysql-test/suite/ndb/t/ndb_bushy_joins.cnf	2012-11-13 10:27:06 +0000
+++ b/mysql-test/suite/ndb/t/ndb_bushy_joins.cnf	2012-11-23 11:34:35 +0000
@@ -15,6 +15,7 @@ ThreadConfig=ldm={count=4}
 MaxNoOfConcurrentOperations=250000
 LongMessageBuffer=64M
 TransactionDeadlockDetectionTimeout=30000
+TimeBetweenEpochs=1000
 
 [ENV]
 # Need to always use ndbmtd when we want lots of partitions

=== modified file 'storage/ndb/include/transporter/TransporterRegistry.hpp'
--- a/storage/ndb/include/transporter/TransporterRegistry.hpp	2012-09-29 00:02:40 +0000
+++ b/storage/ndb/include/transporter/TransporterRegistry.hpp	2012-11-22 12:18:17 +0000
@@ -108,17 +108,27 @@ struct TransporterReceiveData
   NodeBitmask m_transporters;
 
   /**
-   * Bitmask of transporters having data awaiting to be received
+   * Bitmask of nodes having data awaiting to be received
    * from its transporter.
    */
   NodeBitmask m_recv_transporters;
 
   /**
-   * Bitmask of transporters that has already received data buffered
+   * Bitmask of nodes that has already received data buffered
    * inside its transporter. Possibly "carried over" from last 
    * performReceive
    */
   NodeBitmask m_has_data_transporters;
+
+  /**
+   * Subset of m_has_data_transporters which we completed handling
+   * of in previous ::performReceive before we was interrupted due
+   * to lack of job buffers. Will skip these when we later retry 
+   * ::performReceive in order to avoid startvation of non-handled
+   * transporters.
+   */
+  NodeBitmask m_handled_transporters;
+
 #if defined(HAVE_EPOLL_CREATE)
   int m_epoll_fd;
   struct epoll_event *m_epoll_events;
@@ -579,7 +589,7 @@ public:
    * Receiving
    */
   Uint32 pollReceive(Uint32 timeOutMillis, TransporterReceiveHandle& mask);
-  void performReceive(TransporterReceiveHandle&);
+  Uint32 performReceive(TransporterReceiveHandle&);
   void update_connections(TransporterReceiveHandle&);
 
   inline Uint32 pollReceive(Uint32 timeOutMillis) {
@@ -587,9 +597,9 @@ public:
     return pollReceive(timeOutMillis, * receiveHandle);
   }
 
-  inline void performReceive() {
+  inline Uint32 performReceive() {
     assert(receiveHandle != 0);
-    performReceive(* receiveHandle);
+    return performReceive(* receiveHandle);
   }
 
   inline void update_connections() {

=== modified file 'storage/ndb/src/common/transporter/TransporterRegistry.cpp'
--- a/storage/ndb/src/common/transporter/TransporterRegistry.cpp	2012-09-29 00:02:40 +0000
+++ b/storage/ndb/src/common/transporter/TransporterRegistry.cpp	2012-11-22 12:18:17 +0000
@@ -94,6 +94,7 @@ TransporterReceiveData::TransporterRecei
    */
   m_transporters.set();            // Handle all
   m_transporters.clear(Uint32(0)); // Except wakeup socket...
+  m_handled_transporters.clear();
 
 #if defined(HAVE_EPOLL_CREATE)
   m_epoll_fd = -1;
@@ -1142,13 +1143,13 @@ TransporterRegistry::pollReceive(Uint32 
     {
       for (int i = 0; i < num_socket_events; i++)
       {
-        const Uint32 trpid = recvdata.m_epoll_events[i].data.u32;
+        const Uint32 node_id = recvdata.m_epoll_events[i].data.u32;
         /**
          * check that it's assigned to "us"
          */
-        assert(recvdata.m_transporters.get(trpid));
+        assert(recvdata.m_transporters.get(node_id));
 
-        recvdata.m_recv_transporters.set(trpid);
+        recvdata.m_recv_transporters.set(node_id);
       }
     }
     else if (num_socket_events < 0)
@@ -1319,14 +1320,15 @@ TransporterRegistry::poll_TCP(Uint32 tim
 
 /**
  * In multi-threaded cases, this must be protected by a global receive lock.
+ * In case we were unable to received due to job buffers being full.
+ * Returns 0 when receive succeeded from all Transporters having data,
+ * else 1.
  */
-void
+Uint32
 TransporterRegistry::performReceive(TransporterReceiveHandle& recvdata)
 {
   assert((receiveHandle == &recvdata) || (receiveHandle == 0));
 
-  bool hasReceived = false;
-
   if (recvdata.m_recv_transporters.get(0))
   {
     assert(recvdata.m_transporters.get(0));
@@ -1380,7 +1382,7 @@ TransporterRegistry::performReceive(Tran
   recvdata.m_recv_transporters.clear();
 
   /**
-   * Handle data either received above or pending from prev rounds.
+   * Unpack data either received above or pending from prev rounds.
    */
   for(Uint32 id = recvdata.m_has_data_transporters.find_first();
       id != BitmaskImpl::NotFound;
@@ -1395,9 +1397,10 @@ TransporterRegistry::performReceive(Tran
     {
       if (t->isConnected())
       {
-        if (hasReceived)
-          recvdata.checkJobBuffer();
-        hasReceived = true;
+        if (unlikely(recvdata.checkJobBuffer()))
+          return 1;     // Full, can't unpack more
+        if (unlikely(recvdata.m_handled_transporters.get(id)))
+          continue;     // Skip now to avoid startvation
         Uint32 * ptr;
         Uint32 sz = t->getReceiveData(&ptr);
         Uint32 szUsed = unpack(recvdata, ptr, sz, id, ioStates[id]);
@@ -1413,6 +1416,7 @@ TransporterRegistry::performReceive(Tran
     }
     // If transporter still have data, make sure that it's remember to next time
     recvdata.m_has_data_transporters.set(id, hasdata);
+    recvdata.m_handled_transporters.set(id, hasdata);
   }
 #endif
   
@@ -1428,9 +1432,11 @@ TransporterRegistry::performReceive(Tran
     {
       if(t->isConnected() && t->checkConnected())
       {
-        if (hasReceived)
-          callbackObj->checkJobBuffer();
-        hasReceived = true;
+        if (unlikely(recvdata.checkJobBuffer()))
+          return 1;      // Full, can't unpack more
+        if (unlikely(recvdata.m_handled_transporters.get(nodeId)))
+          continue;      // Skip now to avoid startvation
+
         Uint32 * readPtr, * eodPtr;
         t->getReceivePtr(&readPtr, &eodPtr);
         callbackObj->transporter_recv_from(nodeId);
@@ -1438,6 +1444,7 @@ TransporterRegistry::performReceive(Tran
         t->updateReceivePtr(newPtr);
       }
     } 
+    recvdata.m_handled_transporters.set(nodeId);
   }
 #endif
 #ifdef NDB_SHM_TRANSPORTER
@@ -1449,9 +1456,11 @@ TransporterRegistry::performReceive(Tran
     if(is_connected(nodeId)){
       if(t->isConnected() && t->checkConnected())
       {
-        if (hasReceived)
-          recvdata.checkJobBuffer();
-        hasReceived = true;
+        if (unlikely(recvdata.checkJobBuffer()))
+          return 1;      // Full, can't unpack more
+        if (unlikely(recvdata.m_handled_transporters.get(nodeId)))
+          continue;      // Skip now to avoid startvation
+
         Uint32 * readPtr, * eodPtr;
         t->getReceivePtr(&readPtr, &eodPtr);
         recvdata.transporter_recv_from(nodeId);
@@ -1460,8 +1469,11 @@ TransporterRegistry::performReceive(Tran
         t->updateReceivePtr(newPtr);
       }
     } 
+    recvdata.m_handled_transporters.set(nodeId);
   }
 #endif
+  recvdata.m_handled_transporters.clear();
+  return 0;
 }
 
 /**
@@ -1776,6 +1788,7 @@ TransporterRegistry::report_disconnect(T
   performStates[node_id] = DISCONNECTED;
   recvdata.m_recv_transporters.clear(node_id);
   recvdata.m_has_data_transporters.clear(node_id);
+  recvdata.m_handled_transporters.clear(node_id);
   recvdata.reportDisconnect(node_id, errnum);
   DBUG_VOID_RETURN;
 }

=== modified file 'storage/ndb/src/kernel/vm/mt.cpp'
--- a/storage/ndb/src/kernel/vm/mt.cpp	2012-08-14 11:23:36 +0000
+++ b/storage/ndb/src/kernel/vm/mt.cpp	2012-11-22 13:20:37 +0000
@@ -38,6 +38,8 @@
 #include "mt-asm.h"
 #include "mt-lock.hpp"
 
+static void dumpJobQueues(void);
+
 inline
 SimulatedBlock*
 GlobalData::mt_getBlock(BlockNumber blockNo, Uint32 instanceNo)
@@ -155,7 +157,7 @@ struct thr_wait
 static inline
 bool
 yield(struct thr_wait* wait, const Uint32 nsec,
-      bool (*check_callback)(struct thr_data *), struct thr_data *check_arg)
+      bool (*check_callback)(void *), void *check_arg)
 {
   volatile unsigned * val = &wait->m_futex_state;
 #ifndef NDEBUG
@@ -224,7 +226,7 @@ struct thr_wait
 static inline
 bool
 yield(struct thr_wait* wait, const Uint32 nsec,
-      bool (*check_callback)(struct thr_data *), struct thr_data *check_arg)
+      bool (*check_callback)(void *), void *check_arg)
 {
   struct timespec end;
   NdbCondition_ComputeAbsTime(&end, nsec/1000000);
@@ -585,12 +587,20 @@ struct thr_job_queue_head
   unsigned m_read_index;  // Read/written by consumer, read by producer
   unsigned m_write_index; // Read/written by producer, read by consumer
 
+  /**
+   * Waiter object: In case job queue is full, the produced thread
+   * will 'yield' on this waiter object until the consumer thread
+   * has consumed (at least) a job buffer.
+   */
+  thr_wait m_waiter;
+
   Uint32 used() const;
 };
 
 struct thr_job_queue
 {
   static const unsigned SIZE = 32;
+  static const unsigned SAFETY = 2;
 
   struct thr_job_buffer* m_buffers[SIZE];
 };
@@ -1386,7 +1396,7 @@ thr_send_threads::alert_send_thread(Node
 
 extern "C"
 bool
-check_available_send_data(struct thr_data *not_used)
+check_available_send_data(void *not_used)
 {
   (void)not_used;
   return !g_send_threads->data_available();
@@ -1552,6 +1562,7 @@ void
 job_buffer_full(struct thr_data* selfptr)
 {
   ndbout_c("job buffer full");
+  dumpJobQueues();
   abort();
 }
 
@@ -1560,6 +1571,7 @@ void
 out_of_job_buffer(struct thr_data* selfptr)
 {
   ndbout_c("out of job buffer");
+  dumpJobQueues();
   abort();
 }
 
@@ -2031,40 +2043,81 @@ flush_jbb_write_state(thr_data *selfptr)
   }
 }
 
+static
+void
+dumpJobQueues(void)
+{
+  const struct thr_repository* rep = &g_thr_repository;
+  for (unsigned from = 0; from<num_threads; from++)
+  {
+    for (unsigned to = 0; to<num_threads; to++)
+    {
+      const thr_data_aligned *thr_align_ptr = rep->m_thread + to;
+      const struct thr_data *thrptr = &thr_align_ptr->m_thr_data;
+      const thr_job_queue_head *q_head = thrptr->m_in_queue_head + from;
+
+      unsigned used = q_head->used();
+      if (used > 0)
+      {
+        ndbout << " job buffer " << from << " --> " << to
+               << ", used:" << used
+               << endl;
+      }
+    }
+  }
+}
+
 /**
- * Transporter will receive 1024 signals (MAX_RECEIVED_SIGNALS)
- * before running check_job_buffers
+ * Receive thread will unpack 1024 signals (MAX_RECEIVED_SIGNALS)
+ * from Transporters before running another check_recv_queue
+ *
+ * This function returns true if there is not space to unpack
+ * this amount of signals, else false.
  *
- * This function returns 0 if there is space to receive this amount of
- *   signals
- * else 1
+ * Also used as callback function from yield() to recheck
+ * 'full' condition before going to sleep.
  */
-static int
-check_job_buffers(struct thr_repository* rep, Uint32 recv_thread_id)
+static bool
+check_recv_queue(void *arg)
 {
+  thr_job_queue_head *q_head = static_cast<thr_job_queue_head*>(arg);
+
   const Uint32 minfree = (1024 + MIN_SIGNALS_PER_PAGE - 1)/MIN_SIGNALS_PER_PAGE;
-  unsigned thr_no = first_receiver_thread_no + recv_thread_id;
-  const thr_data_aligned *thr_align_ptr = rep->m_thread;
+  /**
+   * NOTE: m_read_index is read wo/ lock (and updated by different thread)
+   *       but since the different thread can only consume
+   *       signals this means that the value returned from this
+   *       function is always conservative (i.e it can be better than
+   *       returned value, if read-index has moved but we didnt see it)
+   */
+  const unsigned ri = q_head->m_read_index;
+  const unsigned wi = q_head->m_write_index;
+  const unsigned busy = (wi >= ri) ? wi - ri : (thr_job_queue::SIZE - ri) + wi;
+  return (1 + minfree + busy >= thr_job_queue::SIZE);
+}
+
+/**
+ * Check if any of the receive queues for the threads being served
+ * by this receive thread, are full.
+ * If full: Return 'Thr_data*' for (one of) the thread(s)
+ *          which we have to wait for. (to consume from queue)
+ */
+static struct thr_data*
+get_congested_recv_queue(struct thr_repository* rep, Uint32 recv_thread_id)
+{
+  const unsigned thr_no = first_receiver_thread_no + recv_thread_id;
+  thr_data_aligned *thr_align_ptr = rep->m_thread;
+
   for (unsigned i = 0; i<num_threads; i++, thr_align_ptr++)
   {
-    /**
-     * NOTE: m_read_index is read wo/ lock (and updated by different thread)
-     *       but since the different thread can only consume
-     *       signals this means that the value returned from this
-     *       function is always conservative (i.e it can be better than
-     *       returned value, if read-index has moved but we didnt see it)
-     */
-    const struct thr_data *thrptr = &thr_align_ptr->m_thr_data;
-    const thr_job_queue_head *q_head = thrptr->m_in_queue_head + thr_no;
-    unsigned ri = q_head->m_read_index;
-    unsigned wi = q_head->m_write_index;
-    unsigned busy = (wi >= ri) ? wi - ri : (thr_job_queue::SIZE - ri) + wi;
-    if (1 + minfree + busy >= thr_job_queue::SIZE)
+    struct thr_data *thrptr = &thr_align_ptr->m_thr_data;
+    thr_job_queue_head *q_head = thrptr->m_in_queue_head + thr_no;
+    if (check_recv_queue(q_head))
     {
-      return 1;
+      return thrptr;
     }
   }
-  return 0;
+  return NULL;
 }
 
 /**
@@ -2086,6 +2139,29 @@ check_job_buffers(struct thr_repository*
  */
 static
 Uint32
+compute_free_buffers_in_queue(const thr_job_queue_head *q_head)
+{
+  /**
+   * NOTE: m_read_index is read wo/ lock (and updated by different thread)
+   *       but since the different thread can only consume
+   *       signals this means that the value returned from this
+   *       function is always conservative (i.e it can be better than
+   *       returned value, if read-index has moved but we didnt see it)
+   */
+  unsigned ri = q_head->m_read_index;
+  unsigned wi = q_head->m_write_index;
+  unsigned free = (wi < ri) ? ri - wi : (thr_job_queue::SIZE + ri) - wi;
+
+  assert(free <= thr_job_queue::SIZE);
+
+  if (free <= (1 + thr_job_queue::SAFETY))
+    return 0;
+  else 
+    return free - (1 + thr_job_queue::SAFETY);
+}
+
+static
+Uint32
 compute_max_signals_to_execute(Uint32 thr_no)
 {
   Uint32 minfree = thr_job_queue::SIZE;
@@ -2094,30 +2170,17 @@ compute_max_signals_to_execute(Uint32 th
 
   for (unsigned i = 0; i<num_threads; i++, thr_align_ptr++)
   {
-    /**
-     * NOTE: m_read_index is read wo/ lock (and updated by different thread)
-     *       but since the different thread can only consume
-     *       signals this means that the value returned from this
-     *       function is always conservative (i.e it can be better than
-     *       returned value, if read-index has moved but we didnt see it)
-     */
     const struct thr_data *thrptr = &thr_align_ptr->m_thr_data;
     const thr_job_queue_head *q_head = thrptr->m_in_queue_head + thr_no;
-    unsigned ri = q_head->m_read_index;
-    unsigned wi = q_head->m_write_index;
-    unsigned free = (wi < ri) ? ri - wi : (thr_job_queue::SIZE + ri) - wi;
-
-    assert(free <= thr_job_queue::SIZE);
+    unsigned free = compute_free_buffers_in_queue(q_head);
 
     if (free < minfree)
       minfree = free;
   }
 
-#define SAFETY 2
-
-  if (minfree >= (1 + SAFETY))
+  if (minfree > 0)
   {
-    return (3 + (minfree - (1 + SAFETY)) * MIN_SIGNALS_PER_PAGE) / 4;
+    return (3 + (minfree) * MIN_SIGNALS_PER_PAGE) / 4;
   }
   else
   {
@@ -2220,41 +2283,22 @@ int
 mt_checkDoJob(Uint32 recv_thread_idx)
 {
   struct thr_repository* rep = &g_thr_repository;
-  if (unlikely(check_job_buffers(rep, recv_thread_idx)))
-  {
-    do 
-    {
-      /**
-       * theoretically (or when we do single threaded by using ndbmtd with
-       * all in same thread) we should execute signals here...to 
-       * prevent dead-lock, but...with current ndbmtd only TRPMAN runs in
-       * this thread, and other thread is waiting for TRPMAN 
-       * except for QMGR open/close connection, but that is not
-       * (i think) sufficient to create a deadlock
-       */
-
-      /** FIXME:
-       *  On a CMT chip where #CPU >= #NDB-threads sched_yield() is
-       *  effectively a NOOP as there will normally be an idle CPU available
-       *  to immediately resume thread execution.
-       *  On a Niagara chip this may severely impact performance as the CPUs
-       *  are virtualized by timemultiplexing the physical core.
-       *  The thread should really be 'parked' on
-       *  a condition to free its execution resources.
-       */
-//    usleep(a-few-usec);  /* A micro-sleep would likely have been better... */
-#if defined HAVE_SCHED_YIELD
-      sched_yield();
-#elif defined _WIN32
-      SwitchToThread();
-#else
-      NdbSleep_MilliSleep(0);
-#endif
-
-    } while (check_job_buffers(rep, recv_thread_idx));
-  }
 
-  return 0;
+  /**
+   * Return '1' if we are not allowed to receive more signals
+   * into the job buffers from this 'recv_thread_idx'.
+   *
+   * NOTE:
+   *   We should not loop-wait for buffers to become available
+   *   here as we currently hold the receiver-lock. Furthermore
+   *   waiting too long here could cause the receiver thread to be
+   *   less responsive wrt. moving incoming (TCP) data from the 
+   *   TCPTransporters into the (local) receiveBuffers.
+   *   The thread could also oversleep on its other tasks as 
+   *   handling open/close of connections, and catching 
+   *   its own shutdown events
+   */
+  return (get_congested_recv_queue(rep, recv_thread_idx) != NULL);
 }
 
 /**
@@ -3077,8 +3121,9 @@ read_jba_state(thr_data *selfptr)
 
 /* Check all job queues, return true only if all are empty. */
 static bool
-check_queues_empty(thr_data *selfptr)
+check_queues_empty(void *arg)
 {
+  thr_data *selfptr = static_cast<thr_data *>(arg);
   Uint32 thr_count = g_thr_repository.m_thread_count;
   bool empty = read_jba_state(selfptr);
   if (!empty)
@@ -3141,6 +3186,7 @@ execute_signals(thr_data *selfptr,
         r->m_read_buffer = read_buffer;
         r->m_read_pos = read_pos;
         r->m_read_end = read_end;
+        wakeup(&h->m_waiter);
       }
     }
 
@@ -3607,13 +3653,33 @@ mt_receiver_thread_main(void *thr_arg)
     has_received = false;
     if (globalTransporterRegistry.pollReceive(1, recvdata))
     {
-      if (check_job_buffers(rep, recv_thread_idx) == 0)
+      watchDogCounter = 8;
+      lock(&rep->m_receive_lock[recv_thread_idx]);
+      const bool buffersFull = (globalTransporterRegistry.performReceive(recvdata) != 0);
+      unlock(&rep->m_receive_lock[recv_thread_idx]);
+      has_received = true;
+
+      if (buffersFull)       /* Receive queues(s) are full */
       {
-	watchDogCounter = 8;
-        lock(&rep->m_receive_lock[recv_thread_idx]);
-        globalTransporterRegistry.performReceive(recvdata);
-        unlock(&rep->m_receive_lock[recv_thread_idx]);
-        has_received = true;
+        thr_data* waitthr = get_congested_recv_queue(rep, recv_thread_idx);
+        if (waitthr != NULL) /* Will wait for buffers to be freed */
+        {
+          /**
+           * Wait for thread 'waitthr' to consume some of the
+           * pending signals in m_in_queue previously received 
+           * from this receive thread, 'thr_no'.
+           * Will recheck queue status with 'check_recv_queue' after latch
+           * has been set, and *before* going to sleep.
+           */
+          const Uint32 nano_wait = 1000*1000;    /* -> 1 ms */
+          thr_job_queue_head *wait_queue = waitthr->m_in_queue_head + thr_no;
+
+          const bool waited = yield(&wait_queue->m_waiter,
+                                    nano_wait,
+                                    check_recv_queue,
+                                    wait_queue);
+          (void)waited;
+        }
       }
     }
     selfptr->m_stat.m_loop_cnt++;
@@ -3644,28 +3710,46 @@ sendpacked(struct thr_data* thr_ptr, Sig
 }
 
 /**
- * check if out-queues of selfptr is full
- * return true is so
+ * Callback function used by yield() to recheck 
+ * 'job queue full' condition before going to sleep.
+ *
+ * Check if the specified 'thr_job_queue_head' (arg)
+ * is still full, return true if so.
  */
 static bool
-check_job_buffer_full(thr_data *selfptr)
+check_congested_job_queue(void *arg)
 {
-  Uint32 thr_no = selfptr->m_thr_no;
-  Uint32 tmp = compute_max_signals_to_execute(thr_no);
-#if 0
-  Uint32 perjb = tmp / g_thr_repository.m_thread_count;
+  const thr_job_queue_head *waitfor = static_cast<thr_job_queue_head*>(arg);
+  return (compute_free_buffers_in_queue(waitfor) == 0);
+}
+
+/**
+ * Check if any out-queues of selfptr is full.
+ * If full: Return 'Thr_data*' for (one of) the thread(s)
+ *          which we have to wait for. (to consume from queue)
+ */
+static struct thr_data*
+get_congested_job_queue(const thr_data *selfptr)
+{
+  const Uint32 thr_no = selfptr->m_thr_no;
+  struct thr_repository* rep = &g_thr_repository;
+  struct thr_data_aligned *thr_align_ptr = rep->m_thread;
+  struct thr_data *waitfor = NULL;
 
-  if (perjb == 0)
+  for (unsigned i = 0; i<num_threads; i++, thr_align_ptr++)
   {
-    return true;
-  }
+    struct thr_data *thrptr = &thr_align_ptr->m_thr_data;
+    thr_job_queue_head *q_head = thrptr->m_in_queue_head + thr_no;
 
-  return false;
-#else
-  if (tmp < g_thr_repository.m_thread_count)
-    return true;
-  return false;
-#endif
+    if (compute_free_buffers_in_queue(q_head) == 0)
+    {
+      if (thrptr != selfptr)  // Don't wait on myself (yet)
+        return thrptr;
+      else
+        waitfor = thrptr;
+    }
+  }
+  return waitfor;             // Possibly 'thrptr == selfptr'
 }
 
 /**
@@ -3716,6 +3800,7 @@ loop:
        */
       selfptr->m_max_signals_per_jb = 1;
       ndbout_c("%u - sleeploop 10!!", selfptr->m_thr_no);
+      dumpJobQueues();
       return true;
     }
 
@@ -3725,8 +3810,47 @@ loop:
       pending_send = do_send(selfptr, TRUE);
     }
 
-    const Uint32 wait = 1000000;    /* 1 ms */
-    yield(&selfptr->m_waiter, wait, check_job_buffer_full, selfptr);
+    struct thr_data* waitthr = get_congested_job_queue(selfptr);
+    if (waitthr == NULL)                 // Waiters resolved
+    {
+      goto loop;
+    }
+    else if (waitthr == selfptr)         // Avoid self-wait
+    {
+      //ndbout << "update_sched_config"
+      //       << ", thr_no=" << thr_no
+      //       << ", in 'self wait' - proceeding slowly" 
+      //       << endl;
+      selfptr->m_max_signals_per_jb = 1; // Proceed slowly
+      return sleeploop > 0;
+    }
+    else if (false)
+    {
+      ndbout << "update_sched_config"
+             << ", thr_no: " << thr_no << " waitfor: " << waitthr->m_thr_no
+             << endl;
+    }
+    /**
+     * Wait for thread 'waitthr' to consume some of the
+     * pending signals in m_in_queue[].
+     * Will recheck queue status with 'check_recv_queue'
+     * after latch has been set, and *before* going to sleep.
+     */
+    const Uint32 nano_wait = 1000*1000;    /* -> 1 ms */
+    thr_job_queue_head *wait_queue = waitthr->m_in_queue_head + thr_no;
+
+    const bool waited = yield(&wait_queue->m_waiter,
+                              nano_wait,
+                              check_congested_job_queue,
+                              wait_queue);
+    if (waited)
+    {
+      sleeploop++;
+      //ndbout << "update_sched_config"
+      //       << ", thr_no=" << thr_no
+      //       << ", waited due to 'job_buffers_full'" 
+      //       << endl;
+    }
     goto loop;
   }
 
@@ -3807,7 +3931,7 @@ mt_job_thread_main(void *thr_arg)
     else
     {
       /* No signals processed, prepare to sleep to wait for more */
-      if (pending_send || send_sum > 0)
+      if ((pending_send + send_sum) > 0)
       {
         /* About to sleep, _must_ send now. */
         pending_send = do_send(selfptr, TRUE);
@@ -3836,7 +3960,7 @@ mt_job_thread_main(void *thr_arg)
      */
     if (sum >= selfptr->m_max_exec_signals)
     {
-      if (update_sched_config(selfptr, pending_send))
+      if (update_sched_config(selfptr, pending_send + send_sum))
       {
         /* Update current time after sleeping */
         now = NdbTick_CurrentMillisecond();
@@ -4185,6 +4309,7 @@ thr_init(struct thr_repository* rep, str
   {
     selfptr->m_in_queue_head[i].m_read_index = 0;
     selfptr->m_in_queue_head[i].m_write_index = 0;
+    selfptr->m_in_queue_head[i].m_waiter.init();
     buffer = may_communicate(i,thr_no) 
               ? seize_buffer(rep, thr_no, false) : NULL;
     selfptr->m_in_queue[i].m_buffers[0] = buffer;

No bundle (reason: useless for push emails).
Thread
bzr push into mysql-5.5-cluster-7.2 branch (ole.john.aske:4109 to 4113) Ole John Aske23 Nov