List:Commits« Previous MessageNext Message »
From:Ole John Aske Date:November 23 2012 11:47am
Subject:bzr push into mysql-5.5-cluster-7.2 branch (ole.john.aske:4113 to 4114)
View as plain text  
 4114 Ole John Aske	2012-11-23
      Revert of previous unintended pushes

    modified:
      mysql-test/suite/ndb/t/ndb_bushy_joins.cnf
      storage/ndb/include/transporter/TransporterRegistry.hpp
      storage/ndb/src/common/transporter/TransporterRegistry.cpp
      storage/ndb/src/kernel/vm/mt.cpp
 4113 Ole John Aske	2012-11-23
      Fix failing ndb_bushy_joins.test under Valgrind
      
      Extends the TimeBetweenEpoch to avoid failure due to 
      exceeding 'MaxBufferedEpocs > 100).

    modified:
      mysql-test/suite/ndb/t/ndb_bushy_joins.cnf
=== modified file 'mysql-test/suite/ndb/t/ndb_bushy_joins.cnf'
--- a/mysql-test/suite/ndb/t/ndb_bushy_joins.cnf	2012-11-23 11:34:35 +0000
+++ b/mysql-test/suite/ndb/t/ndb_bushy_joins.cnf	2012-11-23 11:46:55 +0000
@@ -15,7 +15,6 @@ ThreadConfig=ldm={count=4}
 MaxNoOfConcurrentOperations=250000
 LongMessageBuffer=64M
 TransactionDeadlockDetectionTimeout=30000
-TimeBetweenEpochs=1000
 
 [ENV]
 # Need to always use ndbmtd when we want lots of partitions

=== modified file 'storage/ndb/include/transporter/TransporterRegistry.hpp'
--- a/storage/ndb/include/transporter/TransporterRegistry.hpp	2012-11-22 12:18:17 +0000
+++ b/storage/ndb/include/transporter/TransporterRegistry.hpp	2012-11-23 11:46:55 +0000
@@ -108,27 +108,17 @@ struct TransporterReceiveData
   NodeBitmask m_transporters;
 
   /**
-   * Bitmask of nodes having data awaiting to be received
+   * Bitmask of transporters having data awaiting to be received
    * from its transporter.
    */
   NodeBitmask m_recv_transporters;
 
   /**
-   * Bitmask of nodes that has already received data buffered
+   * Bitmask of transporters that has already received data buffered
    * inside its transporter. Possibly "carried over" from last 
    * performReceive
    */
   NodeBitmask m_has_data_transporters;
-
-  /**
-   * Subset of m_has_data_transporters which we completed handling
-   * of in previous ::performReceive before we was interrupted due
-   * to lack of job buffers. Will skip these when we later retry 
-   * ::performReceive in order to avoid startvation of non-handled
-   * transporters.
-   */
-  NodeBitmask m_handled_transporters;
-
 #if defined(HAVE_EPOLL_CREATE)
   int m_epoll_fd;
   struct epoll_event *m_epoll_events;
@@ -589,7 +579,7 @@ public:
    * Receiving
    */
   Uint32 pollReceive(Uint32 timeOutMillis, TransporterReceiveHandle& mask);
-  Uint32 performReceive(TransporterReceiveHandle&);
+  void performReceive(TransporterReceiveHandle&);
   void update_connections(TransporterReceiveHandle&);
 
   inline Uint32 pollReceive(Uint32 timeOutMillis) {
@@ -597,9 +587,9 @@ public:
     return pollReceive(timeOutMillis, * receiveHandle);
   }
 
-  inline Uint32 performReceive() {
+  inline void performReceive() {
     assert(receiveHandle != 0);
-    return performReceive(* receiveHandle);
+    performReceive(* receiveHandle);
   }
 
   inline void update_connections() {

=== modified file 'storage/ndb/src/common/transporter/TransporterRegistry.cpp'
--- a/storage/ndb/src/common/transporter/TransporterRegistry.cpp	2012-11-22 12:18:17 +0000
+++ b/storage/ndb/src/common/transporter/TransporterRegistry.cpp	2012-11-23 11:46:55 +0000
@@ -94,7 +94,6 @@ TransporterReceiveData::TransporterRecei
    */
   m_transporters.set();            // Handle all
   m_transporters.clear(Uint32(0)); // Except wakeup socket...
-  m_handled_transporters.clear();
 
 #if defined(HAVE_EPOLL_CREATE)
   m_epoll_fd = -1;
@@ -1143,13 +1142,13 @@ TransporterRegistry::pollReceive(Uint32 
     {
       for (int i = 0; i < num_socket_events; i++)
       {
-        const Uint32 node_id = recvdata.m_epoll_events[i].data.u32;
+        const Uint32 trpid = recvdata.m_epoll_events[i].data.u32;
         /**
          * check that it's assigned to "us"
          */
-        assert(recvdata.m_transporters.get(node_id));
+        assert(recvdata.m_transporters.get(trpid));
 
-        recvdata.m_recv_transporters.set(node_id);
+        recvdata.m_recv_transporters.set(trpid);
       }
     }
     else if (num_socket_events < 0)
@@ -1320,15 +1319,14 @@ TransporterRegistry::poll_TCP(Uint32 tim
 
 /**
  * In multi-threaded cases, this must be protected by a global receive lock.
- * In case we were unable to received due to job buffers being full.
- * Returns 0 when receive succeeded from all Transporters having data,
- * else 1.
  */
-Uint32
+void
 TransporterRegistry::performReceive(TransporterReceiveHandle& recvdata)
 {
   assert((receiveHandle == &recvdata) || (receiveHandle == 0));
 
+  bool hasReceived = false;
+
   if (recvdata.m_recv_transporters.get(0))
   {
     assert(recvdata.m_transporters.get(0));
@@ -1382,7 +1380,7 @@ TransporterRegistry::performReceive(Tran
   recvdata.m_recv_transporters.clear();
 
   /**
-   * Unpack data either received above or pending from prev rounds.
+   * Handle data either received above or pending from prev rounds.
    */
   for(Uint32 id = recvdata.m_has_data_transporters.find_first();
       id != BitmaskImpl::NotFound;
@@ -1397,10 +1395,9 @@ TransporterRegistry::performReceive(Tran
     {
       if (t->isConnected())
       {
-        if (unlikely(recvdata.checkJobBuffer()))
-          return 1;     // Full, can't unpack more
-        if (unlikely(recvdata.m_handled_transporters.get(id)))
-          continue;     // Skip now to avoid startvation
+        if (hasReceived)
+          recvdata.checkJobBuffer();
+        hasReceived = true;
         Uint32 * ptr;
         Uint32 sz = t->getReceiveData(&ptr);
         Uint32 szUsed = unpack(recvdata, ptr, sz, id, ioStates[id]);
@@ -1416,7 +1413,6 @@ TransporterRegistry::performReceive(Tran
     }
     // If transporter still have data, make sure that it's remember to next time
     recvdata.m_has_data_transporters.set(id, hasdata);
-    recvdata.m_handled_transporters.set(id, hasdata);
   }
 #endif
   
@@ -1432,11 +1428,9 @@ TransporterRegistry::performReceive(Tran
     {
       if(t->isConnected() && t->checkConnected())
       {
-        if (unlikely(recvdata.checkJobBuffer()))
-          return 1;      // Full, can't unpack more
-        if (unlikely(recvdata.m_handled_transporters.get(nodeId)))
-          continue;      // Skip now to avoid startvation
-
+        if (hasReceived)
+          callbackObj->checkJobBuffer();
+        hasReceived = true;
         Uint32 * readPtr, * eodPtr;
         t->getReceivePtr(&readPtr, &eodPtr);
         callbackObj->transporter_recv_from(nodeId);
@@ -1444,7 +1438,6 @@ TransporterRegistry::performReceive(Tran
         t->updateReceivePtr(newPtr);
       }
     } 
-    recvdata.m_handled_transporters.set(nodeId);
   }
 #endif
 #ifdef NDB_SHM_TRANSPORTER
@@ -1456,11 +1449,9 @@ TransporterRegistry::performReceive(Tran
     if(is_connected(nodeId)){
       if(t->isConnected() && t->checkConnected())
       {
-        if (unlikely(recvdata.checkJobBuffer()))
-          return 1;      // Full, can't unpack more
-        if (unlikely(recvdata.m_handled_transporters.get(nodeId)))
-          continue;      // Skip now to avoid startvation
-
+        if (hasReceived)
+          recvdata.checkJobBuffer();
+        hasReceived = true;
         Uint32 * readPtr, * eodPtr;
         t->getReceivePtr(&readPtr, &eodPtr);
         recvdata.transporter_recv_from(nodeId);
@@ -1469,11 +1460,8 @@ TransporterRegistry::performReceive(Tran
         t->updateReceivePtr(newPtr);
       }
     } 
-    recvdata.m_handled_transporters.set(nodeId);
   }
 #endif
-  recvdata.m_handled_transporters.clear();
-  return 0;
 }
 
 /**
@@ -1788,7 +1776,6 @@ TransporterRegistry::report_disconnect(T
   performStates[node_id] = DISCONNECTED;
   recvdata.m_recv_transporters.clear(node_id);
   recvdata.m_has_data_transporters.clear(node_id);
-  recvdata.m_handled_transporters.clear(node_id);
   recvdata.reportDisconnect(node_id, errnum);
   DBUG_VOID_RETURN;
 }

=== modified file 'storage/ndb/src/kernel/vm/mt.cpp'
--- a/storage/ndb/src/kernel/vm/mt.cpp	2012-11-22 13:20:37 +0000
+++ b/storage/ndb/src/kernel/vm/mt.cpp	2012-11-23 11:46:55 +0000
@@ -38,8 +38,6 @@
 #include "mt-asm.h"
 #include "mt-lock.hpp"
 
-static void dumpJobQueues(void);
-
 inline
 SimulatedBlock*
 GlobalData::mt_getBlock(BlockNumber blockNo, Uint32 instanceNo)
@@ -157,7 +155,7 @@ struct thr_wait
 static inline
 bool
 yield(struct thr_wait* wait, const Uint32 nsec,
-      bool (*check_callback)(void *), void *check_arg)
+      bool (*check_callback)(struct thr_data *), struct thr_data *check_arg)
 {
   volatile unsigned * val = &wait->m_futex_state;
 #ifndef NDEBUG
@@ -226,7 +224,7 @@ struct thr_wait
 static inline
 bool
 yield(struct thr_wait* wait, const Uint32 nsec,
-      bool (*check_callback)(void *), void *check_arg)
+      bool (*check_callback)(struct thr_data *), struct thr_data *check_arg)
 {
   struct timespec end;
   NdbCondition_ComputeAbsTime(&end, nsec/1000000);
@@ -587,20 +585,12 @@ struct thr_job_queue_head
   unsigned m_read_index;  // Read/written by consumer, read by producer
   unsigned m_write_index; // Read/written by producer, read by consumer
 
-  /**
-   * Waiter object: In case job queue is full, the produced thread
-   * will 'yield' on this waiter object until the consumer thread
-   * has consumed (at least) a job buffer.
-   */
-  thr_wait m_waiter;
-
   Uint32 used() const;
 };
 
 struct thr_job_queue
 {
   static const unsigned SIZE = 32;
-  static const unsigned SAFETY = 2;
 
   struct thr_job_buffer* m_buffers[SIZE];
 };
@@ -1396,7 +1386,7 @@ thr_send_threads::alert_send_thread(Node
 
 extern "C"
 bool
-check_available_send_data(void *not_used)
+check_available_send_data(struct thr_data *not_used)
 {
   (void)not_used;
   return !g_send_threads->data_available();
@@ -1562,7 +1552,6 @@ void
 job_buffer_full(struct thr_data* selfptr)
 {
   ndbout_c("job buffer full");
-  dumpJobQueues();
   abort();
 }
 
@@ -1571,7 +1560,6 @@ void
 out_of_job_buffer(struct thr_data* selfptr)
 {
   ndbout_c("out of job buffer");
-  dumpJobQueues();
   abort();
 }
 
@@ -2043,81 +2031,40 @@ flush_jbb_write_state(thr_data *selfptr)
   }
 }
 
-static
-void
-dumpJobQueues(void)
-{
-  const struct thr_repository* rep = &g_thr_repository;
-  for (unsigned from = 0; from<num_threads; from++)
-  {
-    for (unsigned to = 0; to<num_threads; to++)
-    {
-      const thr_data_aligned *thr_align_ptr = rep->m_thread + to;
-      const struct thr_data *thrptr = &thr_align_ptr->m_thr_data;
-      const thr_job_queue_head *q_head = thrptr->m_in_queue_head + from;
-
-      unsigned used = q_head->used();
-      if (used > 0)
-      {
-        ndbout << " job buffer " << from << " --> " << to
-               << ", used:" << used
-               << endl;
-      }
-    }
-  }
-}
-
 /**
- * Receive thread will unpack 1024 signals (MAX_RECEIVED_SIGNALS)
- * from Transporters before running another check_recv_queue
- *
- * This function returns true if there is not space to unpack
- * this amount of signals, else false.
+ * Transporter will receive 1024 signals (MAX_RECEIVED_SIGNALS)
+ * before running check_job_buffers
  *
- * Also used as callback function from yield() to recheck
- * 'full' condition before going to sleep.
+ * This function returns 0 if there is space to receive this amount of
+ *   signals
+ * else 1
  */
-static bool
-check_recv_queue(void *arg)
+static int
+check_job_buffers(struct thr_repository* rep, Uint32 recv_thread_id)
 {
-  thr_job_queue_head *q_head = static_cast<thr_job_queue_head*>(arg);
-
   const Uint32 minfree = (1024 + MIN_SIGNALS_PER_PAGE - 1)/MIN_SIGNALS_PER_PAGE;
-  /**
-   * NOTE: m_read_index is read wo/ lock (and updated by different thread)
-   *       but since the different thread can only consume
-   *       signals this means that the value returned from this
-   *       function is always conservative (i.e it can be better than
-   *       returned value, if read-index has moved but we didnt see it)
-   */
-  const unsigned ri = q_head->m_read_index;
-  const unsigned wi = q_head->m_write_index;
-  const unsigned busy = (wi >= ri) ? wi - ri : (thr_job_queue::SIZE - ri) + wi;
-  return (1 + minfree + busy >= thr_job_queue::SIZE);
-}
-
-/**
- * Check if any of the receive queues for the threads being served
- * by this receive thread, are full.
- * If full: Return 'Thr_data*' for (one of) the thread(s)
- *          which we have to wait for. (to consume from queue)
- */
-static struct thr_data*
-get_congested_recv_queue(struct thr_repository* rep, Uint32 recv_thread_id)
-{
-  const unsigned thr_no = first_receiver_thread_no + recv_thread_id;
-  thr_data_aligned *thr_align_ptr = rep->m_thread;
-
+  unsigned thr_no = first_receiver_thread_no + recv_thread_id;
+  const thr_data_aligned *thr_align_ptr = rep->m_thread;
   for (unsigned i = 0; i<num_threads; i++, thr_align_ptr++)
   {
-    struct thr_data *thrptr = &thr_align_ptr->m_thr_data;
-    thr_job_queue_head *q_head = thrptr->m_in_queue_head + thr_no;
-    if (check_recv_queue(q_head))
+    /**
+     * NOTE: m_read_index is read wo/ lock (and updated by different thread)
+     *       but since the different thread can only consume
+     *       signals this means that the value returned from this
+     *       function is always conservative (i.e it can be better than
+     *       returned value, if read-index has moved but we didnt see it)
+     */
+    const struct thr_data *thrptr = &thr_align_ptr->m_thr_data;
+    const thr_job_queue_head *q_head = thrptr->m_in_queue_head + thr_no;
+    unsigned ri = q_head->m_read_index;
+    unsigned wi = q_head->m_write_index;
+    unsigned busy = (wi >= ri) ? wi - ri : (thr_job_queue::SIZE - ri) + wi;
+    if (1 + minfree + busy >= thr_job_queue::SIZE)
     {
-      return thrptr;
+      return 1;
     }
   }
-  return NULL;
+  return 0;
 }
 
 /**
@@ -2139,29 +2086,6 @@ get_congested_recv_queue(struct thr_repo
  */
 static
 Uint32
-compute_free_buffers_in_queue(const thr_job_queue_head *q_head)
-{
-  /**
-   * NOTE: m_read_index is read wo/ lock (and updated by different thread)
-   *       but since the different thread can only consume
-   *       signals this means that the value returned from this
-   *       function is always conservative (i.e it can be better than
-   *       returned value, if read-index has moved but we didnt see it)
-   */
-  unsigned ri = q_head->m_read_index;
-  unsigned wi = q_head->m_write_index;
-  unsigned free = (wi < ri) ? ri - wi : (thr_job_queue::SIZE + ri) - wi;
-
-  assert(free <= thr_job_queue::SIZE);
-
-  if (free <= (1 + thr_job_queue::SAFETY))
-    return 0;
-  else 
-    return free - (1 + thr_job_queue::SAFETY);
-}
-
-static
-Uint32
 compute_max_signals_to_execute(Uint32 thr_no)
 {
   Uint32 minfree = thr_job_queue::SIZE;
@@ -2170,17 +2094,30 @@ compute_max_signals_to_execute(Uint32 th
 
   for (unsigned i = 0; i<num_threads; i++, thr_align_ptr++)
   {
+    /**
+     * NOTE: m_read_index is read wo/ lock (and updated by different thread)
+     *       but since the different thread can only consume
+     *       signals this means that the value returned from this
+     *       function is always conservative (i.e it can be better than
+     *       returned value, if read-index has moved but we didnt see it)
+     */
     const struct thr_data *thrptr = &thr_align_ptr->m_thr_data;
     const thr_job_queue_head *q_head = thrptr->m_in_queue_head + thr_no;
-    unsigned free = compute_free_buffers_in_queue(q_head);
+    unsigned ri = q_head->m_read_index;
+    unsigned wi = q_head->m_write_index;
+    unsigned free = (wi < ri) ? ri - wi : (thr_job_queue::SIZE + ri) - wi;
+
+    assert(free <= thr_job_queue::SIZE);
 
     if (free < minfree)
       minfree = free;
   }
 
-  if (minfree > 0)
+#define SAFETY 2
+
+  if (minfree >= (1 + SAFETY))
   {
-    return (3 + (minfree) * MIN_SIGNALS_PER_PAGE) / 4;
+    return (3 + (minfree - (1 + SAFETY)) * MIN_SIGNALS_PER_PAGE) / 4;
   }
   else
   {
@@ -2283,22 +2220,41 @@ int
 mt_checkDoJob(Uint32 recv_thread_idx)
 {
   struct thr_repository* rep = &g_thr_repository;
+  if (unlikely(check_job_buffers(rep, recv_thread_idx)))
+  {
+    do 
+    {
+      /**
+       * theoretically (or when we do single threaded by using ndbmtd with
+       * all in same thread) we should execute signals here...to 
+       * prevent dead-lock, but...with current ndbmtd only TRPMAN runs in
+       * this thread, and other thread is waiting for TRPMAN 
+       * except for QMGR open/close connection, but that is not
+       * (i think) sufficient to create a deadlock
+       */
 
-  /**
-   * Return '1' if we are not allowed to receive more signals
-   * into the job buffers from this 'recv_thread_idx'.
-   *
-   * NOTE:
-   *   We should not loop-wait for buffers to become available
-   *   here as we currently hold the receiver-lock. Furthermore
-   *   waiting too long here could cause the receiver thread to be
-   *   less responsive wrt. moving incoming (TCP) data from the 
-   *   TCPTransporters into the (local) receiveBuffers.
-   *   The thread could also oversleep on its other tasks as 
-   *   handling open/close of connections, and catching 
-   *   its own shutdown events
-   */
-  return (get_congested_recv_queue(rep, recv_thread_idx) != NULL);
+      /** FIXME:
+       *  On a CMT chip where #CPU >= #NDB-threads sched_yield() is
+       *  effectively a NOOP as there will normally be an idle CPU available
+       *  to immediately resume thread execution.
+       *  On a Niagara chip this may severely impact performance as the CPUs
+       *  are virtualized by timemultiplexing the physical core.
+       *  The thread should really be 'parked' on
+       *  a condition to free its execution resources.
+       */
+//    usleep(a-few-usec);  /* A micro-sleep would likely have been better... */
+#if defined HAVE_SCHED_YIELD
+      sched_yield();
+#elif defined _WIN32
+      SwitchToThread();
+#else
+      NdbSleep_MilliSleep(0);
+#endif
+
+    } while (check_job_buffers(rep, recv_thread_idx));
+  }
+
+  return 0;
 }
 
 /**
@@ -3121,9 +3077,8 @@ read_jba_state(thr_data *selfptr)
 
 /* Check all job queues, return true only if all are empty. */
 static bool
-check_queues_empty(void *arg)
+check_queues_empty(thr_data *selfptr)
 {
-  thr_data *selfptr = static_cast<thr_data *>(arg);
   Uint32 thr_count = g_thr_repository.m_thread_count;
   bool empty = read_jba_state(selfptr);
   if (!empty)
@@ -3186,7 +3141,6 @@ execute_signals(thr_data *selfptr,
         r->m_read_buffer = read_buffer;
         r->m_read_pos = read_pos;
         r->m_read_end = read_end;
-        wakeup(&h->m_waiter);
       }
     }
 
@@ -3653,33 +3607,13 @@ mt_receiver_thread_main(void *thr_arg)
     has_received = false;
     if (globalTransporterRegistry.pollReceive(1, recvdata))
     {
-      watchDogCounter = 8;
-      lock(&rep->m_receive_lock[recv_thread_idx]);
-      const bool buffersFull = (globalTransporterRegistry.performReceive(recvdata) != 0);
-      unlock(&rep->m_receive_lock[recv_thread_idx]);
-      has_received = true;
-
-      if (buffersFull)       /* Receive queues(s) are full */
+      if (check_job_buffers(rep, recv_thread_idx) == 0)
       {
-        thr_data* waitthr = get_congested_recv_queue(rep, recv_thread_idx);
-        if (waitthr != NULL) /* Will wait for buffers to be freed */
-        {
-          /**
-           * Wait for thread 'waitthr' to consume some of the
-           * pending signals in m_in_queue previously received 
-           * from this receive thread, 'thr_no'.
-           * Will recheck queue status with 'check_recv_queue' after latch
-           * has been set, and *before* going to sleep.
-           */
-          const Uint32 nano_wait = 1000*1000;    /* -> 1 ms */
-          thr_job_queue_head *wait_queue = waitthr->m_in_queue_head + thr_no;
-
-          const bool waited = yield(&wait_queue->m_waiter,
-                                    nano_wait,
-                                    check_recv_queue,
-                                    wait_queue);
-          (void)waited;
-        }
+	watchDogCounter = 8;
+        lock(&rep->m_receive_lock[recv_thread_idx]);
+        globalTransporterRegistry.performReceive(recvdata);
+        unlock(&rep->m_receive_lock[recv_thread_idx]);
+        has_received = true;
       }
     }
     selfptr->m_stat.m_loop_cnt++;
@@ -3710,46 +3644,28 @@ sendpacked(struct thr_data* thr_ptr, Sig
 }
 
 /**
- * Callback function used by yield() to recheck 
- * 'job queue full' condition before going to sleep.
- *
- * Check if the specified 'thr_job_queue_head' (arg)
- * is still full, return true if so.
+ * check if out-queues of selfptr is full
+ * return true is so
  */
 static bool
-check_congested_job_queue(void *arg)
+check_job_buffer_full(thr_data *selfptr)
 {
-  const thr_job_queue_head *waitfor = static_cast<thr_job_queue_head*>(arg);
-  return (compute_free_buffers_in_queue(waitfor) == 0);
-}
-
-/**
- * Check if any out-queues of selfptr is full.
- * If full: Return 'Thr_data*' for (one of) the thread(s)
- *          which we have to wait for. (to consume from queue)
- */
-static struct thr_data*
-get_congested_job_queue(const thr_data *selfptr)
-{
-  const Uint32 thr_no = selfptr->m_thr_no;
-  struct thr_repository* rep = &g_thr_repository;
-  struct thr_data_aligned *thr_align_ptr = rep->m_thread;
-  struct thr_data *waitfor = NULL;
+  Uint32 thr_no = selfptr->m_thr_no;
+  Uint32 tmp = compute_max_signals_to_execute(thr_no);
+#if 0
+  Uint32 perjb = tmp / g_thr_repository.m_thread_count;
 
-  for (unsigned i = 0; i<num_threads; i++, thr_align_ptr++)
+  if (perjb == 0)
   {
-    struct thr_data *thrptr = &thr_align_ptr->m_thr_data;
-    thr_job_queue_head *q_head = thrptr->m_in_queue_head + thr_no;
-
-    if (compute_free_buffers_in_queue(q_head) == 0)
-    {
-      if (thrptr != selfptr)  // Don't wait on myself (yet)
-        return thrptr;
-      else
-        waitfor = thrptr;
-    }
+    return true;
   }
-  return waitfor;             // Possibly 'thrptr == selfptr'
+
+  return false;
+#else
+  if (tmp < g_thr_repository.m_thread_count)
+    return true;
+  return false;
+#endif
 }
 
 /**
@@ -3800,7 +3716,6 @@ loop:
        */
       selfptr->m_max_signals_per_jb = 1;
       ndbout_c("%u - sleeploop 10!!", selfptr->m_thr_no);
-      dumpJobQueues();
       return true;
     }
 
@@ -3810,47 +3725,8 @@ loop:
       pending_send = do_send(selfptr, TRUE);
     }
 
-    struct thr_data* waitthr = get_congested_job_queue(selfptr);
-    if (waitthr == NULL)                 // Waiters resolved
-    {
-      goto loop;
-    }
-    else if (waitthr == selfptr)         // Avoid self-wait
-    {
-      //ndbout << "update_sched_config"
-      //       << ", thr_no=" << thr_no
-      //       << ", in 'self wait' - proceeding slowly" 
-      //       << endl;
-      selfptr->m_max_signals_per_jb = 1; // Proceed slowly
-      return sleeploop > 0;
-    }
-    else if (false)
-    {
-      ndbout << "update_sched_config"
-             << ", thr_no: " << thr_no << " waitfor: " << waitthr->m_thr_no
-             << endl;
-    }
-    /**
-     * Wait for thread 'waitthr' to consume some of the
-     * pending signals in m_in_queue[].
-     * Will recheck queue status with 'check_recv_queue'
-     * after latch has been set, and *before* going to sleep.
-     */
-    const Uint32 nano_wait = 1000*1000;    /* -> 1 ms */
-    thr_job_queue_head *wait_queue = waitthr->m_in_queue_head + thr_no;
-
-    const bool waited = yield(&wait_queue->m_waiter,
-                              nano_wait,
-                              check_congested_job_queue,
-                              wait_queue);
-    if (waited)
-    {
-      sleeploop++;
-      //ndbout << "update_sched_config"
-      //       << ", thr_no=" << thr_no
-      //       << ", waited due to 'job_buffers_full'" 
-      //       << endl;
-    }
+    const Uint32 wait = 1000000;    /* 1 ms */
+    yield(&selfptr->m_waiter, wait, check_job_buffer_full, selfptr);
     goto loop;
   }
 
@@ -3931,7 +3807,7 @@ mt_job_thread_main(void *thr_arg)
     else
     {
       /* No signals processed, prepare to sleep to wait for more */
-      if ((pending_send + send_sum) > 0)
+      if (pending_send || send_sum > 0)
       {
         /* About to sleep, _must_ send now. */
         pending_send = do_send(selfptr, TRUE);
@@ -3960,7 +3836,7 @@ mt_job_thread_main(void *thr_arg)
      */
     if (sum >= selfptr->m_max_exec_signals)
     {
-      if (update_sched_config(selfptr, pending_send + send_sum))
+      if (update_sched_config(selfptr, pending_send))
       {
         /* Update current time after sleeping */
         now = NdbTick_CurrentMillisecond();
@@ -4309,7 +4185,6 @@ thr_init(struct thr_repository* rep, str
   {
     selfptr->m_in_queue_head[i].m_read_index = 0;
     selfptr->m_in_queue_head[i].m_write_index = 0;
-    selfptr->m_in_queue_head[i].m_waiter.init();
     buffer = may_communicate(i,thr_no) 
               ? seize_buffer(rep, thr_no, false) : NULL;
     selfptr->m_in_queue[i].m_buffers[0] = buffer;

No bundle (reason: useless for push emails).
Thread
bzr push into mysql-5.5-cluster-7.2 branch (ole.john.aske:4113 to 4114) Ole John Aske23 Nov