List:Commits« Previous MessageNext Message »
From:Ole John Aske Date:December 11 2008 11:40am
Subject:bzr commit into mysql-5.1 branch (ole.john.aske:3167) Bug#41301
View as plain text  
#At file:///net/vidar01/export/home/tmp/oleja/mysql/mysql-5.1-telco-6.4/ based on
revid:jonas@stripped

 3167 Ole John Aske	2008-12-11
      Commit for Bug#41301 after Jonas first review.
modified:
  storage/ndb/src/common/transporter/TransporterRegistry.cpp
  storage/ndb/src/kernel/blocks/dbtup/DbtupRoutines.cpp
  storage/ndb/src/kernel/vm/Emulator.hpp
  storage/ndb/src/kernel/vm/mt.cpp

=== modified file 'storage/ndb/src/common/transporter/TransporterRegistry.cpp'
--- a/storage/ndb/src/common/transporter/TransporterRegistry.cpp	2008-11-26 10:36:20 +0000
+++ b/storage/ndb/src/common/transporter/TransporterRegistry.cpp	2008-12-11 10:40:24 +0000
@@ -1085,6 +1085,7 @@ TransporterRegistry::get_tcp_data(TCP_Tr
 void
 TransporterRegistry::performReceive()
 {
+  bool hasReceived = false;
 #ifdef NDB_TCP_TRANSPORTER
 #if defined(HAVE_EPOLL_CREATE)
   if (likely(m_epoll_fd != -1))
@@ -1128,7 +1129,9 @@ TransporterRegistry::performReceive()
           
           if (t->hasReceiveData())
           {
-            callbackObj->checkJobBuffer();
+            if (hasReceived)
+              callbackObj->checkJobBuffer();
+            hasReceived = true;
             Uint32 * ptr;
             Uint32 sz = t->getReceiveData(&ptr);
             callbackObj->transporter_recv_from(nodeId);
@@ -1152,7 +1155,9 @@ TransporterRegistry::performReceive()
     {
       if(t->isConnected() && t->checkConnected())
       {
-        callbackObj->checkJobBuffer();
+        if (hasReceived)
+          callbackObj->checkJobBuffer();
+        hasReceived = true;
         Uint32 * readPtr, * eodPtr;
         t->getReceivePtr(&readPtr, &eodPtr);
         callbackObj->transporter_recv_from(nodeId);
@@ -1170,7 +1175,9 @@ TransporterRegistry::performReceive()
     if(is_connected(nodeId)){
       if(t->isConnected() && t->checkConnected())
       {
-        callbackObj->checkJobBuffer();
+        if (hasReceived)
+          callbackObj->checkJobBuffer();
+        hasReceived = true;
         Uint32 * readPtr, * eodPtr;
         t->getReceivePtr(&readPtr, &eodPtr);
         callbackObj->transporter_recv_from(nodeId);

=== modified file 'storage/ndb/src/kernel/blocks/dbtup/DbtupRoutines.cpp'
--- a/storage/ndb/src/kernel/blocks/dbtup/DbtupRoutines.cpp	2008-11-03 08:34:28 +0000
+++ b/storage/ndb/src/kernel/blocks/dbtup/DbtupRoutines.cpp	2008-12-11 10:40:24 +0000
@@ -479,16 +479,26 @@ Dbtup::readFixedSizeTHTwoWordNotNULL(Uin
   }
 }
 
-static
-inline
+
+static inline
 void
-zero32(Uint8* dstPtr, Uint32 len)
+zero32(Uint8* dstPtr, const Uint32 len)
 {
-  while ((len & 3) != 0) 
+  Uint32 odd = len & 3;
+  if (odd != 0)
   {
-    dstPtr[len++] = 0;
+    Uint32 aligned = len & ~3;
+    Uint8* dst = dstPtr+aligned;
+    switch(odd){     /* odd is: {1..3} */
+    case 1:
+      dst[1] = 0;
+    case 2:
+      dst[2] = 0;
+    default:         /* Known to be odd==3 */
+      dst[3] = 0;
+    }
   }
-}
+} 
 
 bool
 Dbtup::readFixedSizeTHManyWordNotNULL(Uint8* outBuffer,

=== modified file 'storage/ndb/src/kernel/vm/Emulator.hpp'
--- a/storage/ndb/src/kernel/vm/Emulator.hpp	2008-01-01 12:45:11 +0000
+++ b/storage/ndb/src/kernel/vm/Emulator.hpp	2008-12-11 10:40:24 +0000
@@ -31,7 +31,6 @@ extern class  TimeQueue           global
 extern class  FastScheduler       globalScheduler;
 extern class  TransporterRegistry globalTransporterRegistry;
 extern struct GlobalData          globalData;
-extern struct thr_repository      g_thr_repository;
 
 #ifdef VM_TRACE
 extern class SignalLoggerManager globalSignalLoggers;

=== modified file 'storage/ndb/src/kernel/vm/mt.cpp'
--- a/storage/ndb/src/kernel/vm/mt.cpp	2008-11-15 15:43:59 +0000
+++ b/storage/ndb/src/kernel/vm/mt.cpp	2008-12-11 10:40:24 +0000
@@ -131,7 +131,7 @@ struct thr_wait
   volatile unsigned m_futex_state;
   enum {
     FS_RUNNING = 0,
-    FS_SLEEPING = 1,
+    FS_SLEEPING = 1
   };
   thr_wait() { xcng(&m_futex_state, FS_RUNNING);}
   void init () {}
@@ -144,10 +144,10 @@ struct thr_wait
  * if that returns true will it actually sleep, else it will return
  * immediately. This is needed to avoid races with wakeup.
  */
-static
+static inline
 void
 yield(struct thr_wait* wait, const struct timespec *timeout,
-      bool (*check_callback)(void *), void *check_arg)
+      bool (*check_callback)(struct thr_data *), struct thr_data *check_arg)
 {
   volatile unsigned * val = &wait->m_futex_state;
 #ifndef NDEBUG
@@ -173,7 +173,7 @@ yield(struct thr_wait* wait, const struc
   xcng(val, thr_wait::FS_RUNNING);
 }
 
-static
+static inline
 int
 wakeup(struct thr_wait* wait)
 {
@@ -195,12 +195,11 @@ wakeup(struct thr_wait* wait)
 
 struct thr_wait
 {
+  bool m_need_wakeup;
+ 
   NdbMutex *m_mutex;
   NdbCondition *m_cond;
-  thr_wait() {
-    m_mutex = 0;
-    m_cond = 0;
-  }
+  thr_wait() : m_mutex(0), m_cond(0), m_need_wakeup(false) {}
 
   void init() {
     m_mutex = NdbMutex_Create();
@@ -208,29 +207,43 @@ struct thr_wait
   }
 };
 
-static
+static inline
 void
 yield(struct thr_wait* wait, const struct timespec *timeout,
-      bool (*check_callback)(void *), void *check_arg)
+      bool (*check_callback)(struct thr_data *), struct thr_data *check_arg)
 {
   Uint32 msec = 
     (1000 * timeout->tv_sec) + 
     (timeout->tv_nsec / 1000000);
   NdbMutex_Lock(wait->m_mutex);
-  if ((*check_callback)(check_arg))
-    NdbCondition_WaitTimeout(wait->m_cond, wait->m_mutex, msec);
+  while ((*check_callback)(check_arg)) /* May have spurious wakeups: Always recheck
condition predicate */
+  {
+    wait->m_need_wakeup = true;
+    if (NdbCondition_WaitTimeout(wait->m_cond, wait->m_mutex, msec) == ETIMEDOUT)
+    {
+      wait->m_need_wakeup = false;
+      break;
+    }
+  }
   NdbMutex_Unlock(wait->m_mutex);
 }
 
-static
+
+static inline
 int
 wakeup(struct thr_wait* wait)
 {
   NdbMutex_Lock(wait->m_mutex);
-  NdbCondition_Signal(wait->m_cond);
+  // We should avoid signaling when not waiting for wakeup
+  if (wait->m_need_wakeup)
+  {
+    wait->m_need_wakeup = false;
+    NdbCondition_Signal(wait->m_cond);
+  }
   NdbMutex_Unlock(wait->m_mutex);
   return 0;
 }
+
 #endif
 
 static inline void
@@ -489,12 +502,27 @@ struct thr_job_buffer // 32k
   Uint32 m_data[SIZE];
 };  
 
+
+/**
+ * thr_job_queue is shared between consumer / producer. 
+ *
+ * The hot-spot of the thr_job_queue are the read/write indexes.
+ * As they are updated and read frequently they have been placed
+ * in its own thr_job_queue_head[] in order to make them fit inside a
+ * single/few cache lines and thereby avoid complete L1-cache replacement
+ * every time the job_queue is scanned.
+ */
+struct thr_job_queue_head
+{
+  unsigned m_read_index;  // Read/written by consumer, read by producer
+  unsigned m_write_index; // Read/written by producer, read by consumer
+};
+
 struct thr_job_queue
 {
   static const unsigned SIZE = 30;
 
-  unsigned m_read_index; // Read/written by consumer, read by producer
-  unsigned m_write_index; // Read/written by producer, read by consumer
+  struct thr_job_queue_head* m_head;
   struct thr_job_buffer* m_buffers[SIZE];
 };
 
@@ -561,8 +589,15 @@ struct thr_jb_read_state
    * execution loop and used to determine when the end of available signals is
    * reached.
    */
-  Uint32 m_write_index;
-  Uint32 m_write_pos;
+  Uint32 m_read_end;    // End within current thr_job_buffer. (*m_read_buffer)
+
+  Uint32 m_write_index; // Last available thr_job_buffer.
+
+  bool is_empty() const
+  {
+//  assert (m_read_index != m_write_index  ||  m_read_pos <= m_read_end);
+    return (m_read_index == m_write_index) && (m_read_pos >= m_read_end);
+  }
 };
 
 /**
@@ -662,6 +697,7 @@ struct thr_data
   struct thr_tq m_tq;
 
   /* Prio A signal incoming queue. */
+  struct thr_job_queue_head m_jba_head;
   struct thr_job_queue m_jba;
   struct thr_spin_lock m_jba_write_lock;
   /*
@@ -692,6 +728,7 @@ struct thr_data
    * These are the thread input queues, where other threads deliver signals
    * into.
    */
+  struct thr_job_queue_head m_in_queue_head[MAX_THREADS];
   struct thr_job_queue m_in_queue[MAX_THREADS];
   /* These are the write states of m_in_queue[self] in each thread. */
   struct thr_jb_write_state m_write_states[MAX_THREADS];
@@ -763,6 +800,7 @@ struct trp_callback : public Transporter
 };
 
 extern trp_callback g_trp_callback;             // Forward declaration
+extern struct thr_repository g_thr_repository;
 
 struct thr_repository
 {
@@ -1033,7 +1071,7 @@ handle_time_wrap(struct thr_data* selfpt
   }
 }
 
-static
+static inline
 void
 scan_time_queues(struct thr_data* selfptr)
 {
@@ -1204,10 +1242,33 @@ senddelay(Uint32 thr_no, const SignalHea
 /*
  * Flush the write state to the job queue, making any new signals available to
  * receiving threads.
+ *
+ * Two versions:
+ *    - The general version flush_write_state_other() which may flush to any thread,
+ *      and possibly signal any waiters.
+ *    - The special version flush_write_state_self() which should only be used
+ *      to flush messages to itself.
+ *
+ * Call to these functions are encapsulated through flush_write_state which decides
+ * which of these functions to call.
  */
 static inline
 void
-flush_write_state(Uint32 dst, thr_job_queue *q, thr_jb_write_state *w)
+flush_write_state_self(thr_job_queue_head *q_head, thr_jb_write_state *w)
+{
+  /* 
+   * Can simplify the flush_write_state when writing to myself:
+   * Simply update write references wo/ mutex, memory barrier and signaling
+   */
+  w->m_write_buffer->m_len = w->m_write_pos;
+  q_head->m_write_index = w->m_write_index;
+  w->m_pending_signals_wakeup = 0;
+  w->m_pending_signals = 0;
+}
+
+static inline
+void
+flush_write_state_other(thr_data *dstptr, thr_job_queue_head *q_head, thr_jb_write_state
*w)
 {
   /*
    * Two write memory barriers here, as assigning m_len may make signal data
@@ -1219,20 +1280,37 @@ flush_write_state(Uint32 dst, thr_job_qu
    *
    * But wmb() is a no-op anyway in x86 ...
    */
+
   wmb();
   w->m_write_buffer->m_len = w->m_write_pos;
   wmb();
-  q->m_write_index = w->m_write_index;
+  q_head->m_write_index = w->m_write_index;
+
   w->m_pending_signals_wakeup += w->m_pending_signals;
   w->m_pending_signals = 0;
 
   if (w->m_pending_signals_wakeup >= MAX_SIGNALS_BEFORE_WAKEUP)
   {
     w->m_pending_signals_wakeup = 0;
-    wakeup(&(g_thr_repository.m_thread[dst].m_waiter));
+    wakeup(&(dstptr->m_waiter));
+  }
+}
+
+static inline
+void
+flush_write_state(const thr_data *selfptr, thr_data *dstptr, thr_job_queue_head *q_head,
thr_jb_write_state *w)
+{
+  if (dstptr == selfptr)
+  {
+    flush_write_state_self(q_head, w);
+  }
+  else
+  {
+    flush_write_state_other(dstptr, q_head, w);
   }
 }
 
+
 static
 void
 flush_jbb_write_state(thr_data *selfptr)
@@ -1240,14 +1318,15 @@ flush_jbb_write_state(thr_data *selfptr)
   Uint32 thr_count = g_thr_repository.m_thread_count;
   Uint32 self = selfptr->m_thr_no;
 
-  for (Uint32 thr_no = 0; thr_no < thr_count; thr_no ++)
+  thr_jb_write_state *w = selfptr->m_write_states;
+  thr_data *thrptr = g_thr_repository.m_thread;
+  for (Uint32 thr_no = 0; thr_no < thr_count; thr_no++, thrptr++, w++)
   {
-    thr_jb_write_state *w = selfptr->m_write_states + thr_no;
     if (w->m_pending_signals || w->m_pending_signals_wakeup)
     {
       w->m_pending_signals_wakeup = MAX_SIGNALS_BEFORE_WAKEUP;
-      thr_job_queue *q = g_thr_repository.m_thread[thr_no].m_in_queue + self;
-      flush_write_state(thr_no, q, w);
+      thr_job_queue_head *q_head = thrptr->m_in_queue_head + self;
+      flush_write_state(selfptr, thrptr, q_head, w);
     }
   }
 }
@@ -1259,10 +1338,11 @@ flush_jbb_write_state(thr_data *selfptr)
 static int
 check_job_buffers(struct thr_repository* rep)
 {
-  for (unsigned i = 0; i<num_threads; i++)
+  thr_data *thrptr = rep->m_thread;
+  for (unsigned i = 0; i<num_threads; i++, thrptr++)
   {
-    thr_data * thrptr = rep->m_thread+i;
-    for (unsigned j = 0; j<num_threads; j++)
+    const thr_job_queue_head *q_head = thrptr->m_in_queue_head;
+    for (unsigned j = 0; j<num_threads; j++,q_head++)
     {
       /**
        * These values are read wo/ locks...
@@ -1276,8 +1356,8 @@ check_job_buffers(struct thr_repository*
        *   conservative (i.e it can be better than guess, if read-index has
        *   moved but we didnt see it)
        */
-      unsigned ri = thrptr->m_in_queue[j].m_read_index;
-      unsigned wi = thrptr->m_in_queue[j].m_write_index;
+      unsigned ri = q_head->m_read_index;
+      unsigned wi = q_head->m_write_index;
       unsigned busy = (wi >= ri) ? wi - ri : (thr_job_queue::SIZE - ri) + wi;
       if (4*busy >= thr_job_queue::SIZE)
       {
@@ -1386,6 +1466,15 @@ trp_callback::checkJobBuffer()
        * except for QMGR open/close connection, but that is not
        * (i think) sufficient to create a deadlock
        */
+
+      /** FIXME:
+       *  On a CMT chip where #CPU >= #NDB-threads sched_yield() is effectively a NOOP
as
+       *  there will normally be an idle CPU available to immediately resume thread
execution.
+       *  On a Niagara chip this may severely impact performance as the CPUs are
virtualized
+       *  by timemultiplexing the physical core. The thread should really be 'parked' on
+       *  a condition to free its execution resources.
+       */
+//    usleep(a-few-usec);  /* A micro-sleep would likely have been better... */
       sched_yield();
     } while (check_job_buffers(rep));
   }
@@ -1941,7 +2030,7 @@ insert_signal(thr_job_queue *q, thr_jb_w
      *
      * Or alternatively, ndbrequire() ?
      */
-    assert(write_index != q->m_read_index);
+    assert(write_index != q->m_head->m_read_index);
     new_buffer->m_len = 0;
     new_buffer->m_prioa = prioa;
     q->m_buffers[write_index] = new_buffer;
@@ -1958,44 +2047,47 @@ static
 void
 read_jbb_state(thr_data *selfptr, Uint32 count)
 {
-  for (Uint32 i = 0; i < count; i++)
-  {
-    thr_jb_read_state *r = selfptr->m_read_states + i;
-    const thr_job_queue *q = selfptr->m_in_queue +i;
-    Uint32 index = q->m_write_index;
-    r->m_write_index = index;
-    read_barrier_depends();
-    r->m_write_pos = q->m_buffers[index]->m_len;
+
+  thr_jb_read_state *r = selfptr->m_read_states;
+  const thr_job_queue *q = selfptr->m_in_queue;
+  for (Uint32 i = 0; i < count; i++,r++,q++)
+  {
+    Uint32 read_index = r->m_read_index;
+
+    if (r->m_write_index == read_index)  // Optimization: Only reload when possibly
empty.
+    {                                    // (Avoid cache reload of shared
thr_job_queue_head)
+      r->m_write_index = q->m_head->m_write_index;
+      read_barrier_depends();
+      r->m_read_end = q->m_buffers[read_index]->m_len;
+    }
   }
 }
 
 static
-void
+bool
 read_jba_state(thr_data *selfptr)
 {
-  const thr_job_queue *jba = &(selfptr->m_jba);
-  Uint32 index = jba->m_write_index;
-  selfptr->m_jba_read_state.m_write_index = index;
+  thr_jb_read_state *r = &(selfptr->m_jba_read_state);
+  r->m_write_index = selfptr->m_jba_head.m_write_index;
   read_barrier_depends();
-  selfptr->m_jba_read_state.m_write_pos = jba->m_buffers[index]->m_len;
+  r->m_read_end = selfptr->m_jba.m_buffers[r->m_read_index]->m_len;
+  return r->is_empty();
 }
 
 /* Check all job queues, return true only if all are empty. */
 static bool
-check_queues_empty(void *data)
+check_queues_empty(thr_data *selfptr)
 {
   Uint32 thr_count = g_thr_repository.m_thread_count;
-  thr_data *selfptr = reinterpret_cast<thr_data *>(data);
+  bool empty = read_jba_state(selfptr);
+  if (!empty)
+    return false;
 
   read_jbb_state(selfptr, thr_count);
-  read_jba_state(selfptr);
-  const thr_jb_read_state *r = &(selfptr->m_jba_read_state);
-  if (r->m_read_index < r->m_write_index || r->m_read_pos <
r->m_write_pos)
-    return false;
-  for (Uint32 i = 0; i < thr_count; i++)
+  const thr_jb_read_state *r = selfptr->m_read_states;
+  for (Uint32 i = 0; i < thr_count; i++,r++)
   {
-    r = selfptr->m_read_states + i;;
-    if (r->m_read_index < r->m_write_index || r->m_read_pos <
r->m_write_pos)
+    if (!r->is_empty())
       return false;
   }
   return true;
@@ -2049,19 +2141,20 @@ execute_signals(thr_data *selfptr, thr_j
                 Signal *sig, Uint32 max_signals,
                 Uint32 *watchDogCounter, Uint32 *signalIdCounter)
 {
-  Uint32 num_signals = 0;
-
+  Uint32 num_signals;
   Uint32 read_index = r->m_read_index;
   Uint32 write_index = r->m_write_index;
   Uint32 read_pos = r->m_read_pos;
-  Uint32 write_pos = (read_index == write_index ?
-                      r->m_write_pos :
-                      q->m_buffers[read_index]->m_len);
+  Uint32 read_end = r->m_read_end;
+
+  if (read_index == write_index && read_pos >= read_end)
+    return 0;          // empty read_state
+
   thr_job_buffer *read_buffer = r->m_read_buffer;
 
-  while (num_signals < max_signals)
+  for (num_signals = 0; num_signals < max_signals; num_signals++)
   {
-    while (read_pos >= write_pos)
+    while (read_pos >= read_end)
     {
       if (read_index == write_index)
       {
@@ -2075,13 +2168,12 @@ execute_signals(thr_data *selfptr, thr_j
         release_buffer(&g_thr_repository, selfptr->m_thr_no, read_buffer);
         read_buffer = q->m_buffers[read_index];
         read_pos = 0;
-        write_pos = (read_index == write_index ?
-                     r->m_write_pos :
-                     q->m_buffers[read_index]->m_len);
+        read_end = read_buffer->m_len;
         /* Update thread-local read state. */
-        r->m_read_index = q->m_read_index = read_index;
+        r->m_read_index = q->m_head->m_read_index = read_index;
         r->m_read_buffer = read_buffer;
         r->m_read_pos = read_pos;
+        r->m_read_end = read_end;
       }
     }
 
@@ -2090,20 +2182,18 @@ execute_signals(thr_data *selfptr, thr_j
      * (Though on Intel Core 2, they do not give much speedup, as apparently
      * the hardware prefetcher is already doing a fairly good job).
      */
-#ifdef __GNUC__
-    __builtin_prefetch (read_buffer->m_data + read_pos + 16, 0, 3);
-    __builtin_prefetch ((Uint32 *)&sig->header + 16, 1, 3);
-#endif
+    PREFETCH_READ (read_buffer->m_data + read_pos + 16);
+    PREFETCH_WRITE ((Uint32 *)&sig->header + 16);
 
     /* Now execute the signal. */
     SignalHeader* s =
       reinterpret_cast<SignalHeader*>(read_buffer->m_data + read_pos);
     Uint32 seccnt = s->m_noOfSections;
     Uint32 siglen = (sizeof(*s)>>2) + s->theLength;
-#ifdef __GNUC__
     if(siglen>16)
-      __builtin_prefetch (read_buffer->m_data + read_pos + 32, 0, 3);
-#endif
+    {
+      PREFETCH_READ (read_buffer->m_data + read_pos + 32);
+    }
     Uint32 bno = blockToMain(s->theReceiversBlockNumber);
     Uint32 ino = map_instance(s);
     SimulatedBlock* block = globalData.getBlock(bno, ino);
@@ -2143,8 +2233,6 @@ execute_signals(thr_data *selfptr, thr_j
 #endif
 
     block->executeFunction(gsn, sig);
-
-    num_signals++;
   }
 
   return num_signals;
@@ -2165,19 +2253,21 @@ run_job_buffers(thr_data *selfptr, Signa
    */
   rmb();
 
-  for (Uint32 send_thr_no = 0; send_thr_no < thr_count; send_thr_no++)
+  thr_job_queue *queue = selfptr->m_in_queue;
+  thr_jb_read_state *read_state = selfptr->m_read_states;
+  for (Uint32 send_thr_no = 0; send_thr_no < thr_count;
send_thr_no++,queue++,read_state++)
   {
     /* Read the prio A state often, to avoid starvation of prio A. */
-    read_jba_state(selfptr);
-    static Uint32 max_prioA = thr_job_queue::SIZE * thr_job_buffer::SIZE;
-    signal_count += execute_signals(selfptr, &(selfptr->m_jba),
-                                    &(selfptr->m_jba_read_state), sig,
-                                    max_prioA, watchDogCounter,
-                                    signalIdCounter);
-
+    bool jba_empty = read_jba_state(selfptr);
+    if (!jba_empty)
+    {
+      static Uint32 max_prioA = thr_job_queue::SIZE * thr_job_buffer::SIZE;
+      signal_count += execute_signals(selfptr, &(selfptr->m_jba),
+                                      &(selfptr->m_jba_read_state), sig,
+                                      max_prioA, watchDogCounter,
+                                      signalIdCounter);
+    }
     /* Now execute prio B signals from one thread. */
-    thr_job_queue *queue = selfptr->m_in_queue + send_thr_no;
-    thr_jb_read_state *read_state = selfptr->m_read_states + send_thr_no;
     signal_count += execute_signals(selfptr, queue, read_state,
                                     sig, MAX_SIGNALS_PER_JB,
                                     watchDogCounter, signalIdCounter);
@@ -2430,6 +2520,7 @@ mt_receiver_thread_main(void *thr_arg)
   unsigned thr_no = selfptr->m_thr_no;
   Uint32& watchDogCounter = selfptr->m_watchdog_counter;
   Uint32 thrSignalId = 0;
+  bool has_received = false;
 
   init_thread(selfptr);
   receiverThreadId = thr_no;
@@ -2454,7 +2545,7 @@ mt_receiver_thread_main(void *thr_arg)
     Uint32 sum = run_job_buffers(selfptr, signal,
                                  &watchDogCounter, &thrSignalId);
 
-    if (sum)
+    if (sum || has_received)
     {
       watchDogCounter = 6;
       flush_jbb_write_state(selfptr);
@@ -2464,18 +2555,18 @@ mt_receiver_thread_main(void *thr_arg)
 
     watchDogCounter = 7;
 
+    has_received = false;
     if (globalTransporterRegistry.pollReceive(1))
     {
       if (check_job_buffers(rep) == 0)
       {
 	watchDogCounter = 8;
-	lock(&rep->m_receive_lock);
-	globalTransporterRegistry.performReceive();
-	unlock(&rep->m_receive_lock);
+        lock(&rep->m_receive_lock);
+        globalTransporterRegistry.performReceive();
+        unlock(&rep->m_receive_lock);
+        has_received = true;
       }
     }
-
-    flush_jbb_write_state(selfptr);
   }
 
   globalEmulatorData.theWatchDog->unregisterWatchedThread(thr_no);
@@ -2483,6 +2574,7 @@ mt_receiver_thread_main(void *thr_arg)
 }
 
 static
+inline
 void
 sendpacked(struct thr_data* thr_ptr, Signal* signal)
 {
@@ -2586,20 +2678,20 @@ sendlocal(Uint32 self, const SignalHeade
   Uint32 dst = block2ThreadId(block, instance);
   struct thr_repository* rep = &g_thr_repository;
   struct thr_data * selfptr = rep->m_thread + self;
+  struct thr_data * dstptr = rep->m_thread + dst;
 
   selfptr->m_priob_count++;
   Uint32 siglen = (sizeof(*s) >> 2) + s->theLength + s->m_noOfSections;
   selfptr->m_priob_size += siglen;
 
-  thr_job_queue *q = rep->m_thread[dst].m_in_queue + self;
+  thr_job_queue *q = dstptr->m_in_queue + self;
   thr_jb_write_state *w = selfptr->m_write_states + dst;
   if (insert_signal(q, w, false, s, data, secPtr, selfptr->m_next_buffer))
   {
     selfptr->m_next_buffer = seize_buffer(rep, self, false);
   }
-
   if (w->m_pending_signals >= MAX_SIGNALS_BEFORE_FLUSH)
-    flush_write_state(dst, q, w);
+    flush_write_state(selfptr, dstptr, q->m_head, w);
 }
 
 void
@@ -2611,7 +2703,7 @@ sendprioa(Uint32 self, const SignalHeade
 
   Uint32 dst = block2ThreadId(block, instance);
   struct thr_repository* rep = &g_thr_repository;
-  struct thr_data * selfptr = rep->m_thread + self;
+  struct thr_data *selfptr = rep->m_thread + self;
   struct thr_data *dstptr = rep->m_thread + dst;
 
   selfptr->m_prioa_count++;
@@ -2623,7 +2715,7 @@ sendprioa(Uint32 self, const SignalHeade
 
   lock(&dstptr->m_jba_write_lock);
 
-  Uint32 index = q->m_write_index;
+  Uint32 index = q->m_head->m_write_index;
   w.m_write_index = index;
   thr_job_buffer *buffer = q->m_buffers[index];
   w.m_write_buffer = buffer;
@@ -2632,7 +2724,7 @@ sendprioa(Uint32 self, const SignalHeade
   w.m_pending_signals_wakeup = MAX_SIGNALS_BEFORE_WAKEUP;
   bool buf_used = insert_signal(q, &w, true, s, data, secPtr,
                                 selfptr->m_next_buffer);
-  flush_write_state(dst, q, &w);
+  flush_write_state(selfptr, dstptr, q->m_head, &w);
 
   unlock(&dstptr->m_jba_write_lock);
 
@@ -2689,7 +2781,7 @@ mt_send_remote(Uint32 self, const Signal
  */
 static
 void
-sendprioa_STOP_FOR_CRASH(Uint32 dst)
+sendprioa_STOP_FOR_CRASH(const struct thr_data *selfptr, Uint32 dst)
 {
   SignalT<StopForCrash::SignalLength> signalT;
   struct thr_repository* rep = &g_thr_repository;
@@ -2738,7 +2830,7 @@ sendprioa_STOP_FOR_CRASH(Uint32 dst)
 
   lock(&dstptr->m_jba_write_lock);
 
-  Uint32 index = q->m_write_index;
+  Uint32 index = q->m_head->m_write_index;
   w.m_write_index = index;
   thr_job_buffer *buffer = q->m_buffers[index];
   w.m_write_buffer = buffer;
@@ -2747,7 +2839,7 @@ sendprioa_STOP_FOR_CRASH(Uint32 dst)
   w.m_pending_signals_wakeup = MAX_SIGNALS_BEFORE_WAKEUP;
   insert_signal(q, &w, true, &signalT.header, signalT.theData, NULL,
                 &dummy_buffer);
-  flush_write_state(dst, q, &w);
+  flush_write_state(selfptr, dstptr, q->m_head, &w);
 
   unlock(&dstptr->m_jba_write_lock);
 }
@@ -2777,31 +2869,32 @@ thr_init(struct thr_repository* rep, str
   selfptr->m_first_free = 0;
   selfptr->m_first_unused = 0;
   
-  selfptr->m_jba.m_read_index = 0;
-  selfptr->m_jba.m_write_index = 0;
+  selfptr->m_jba_head.m_read_index = 0;
+  selfptr->m_jba_head.m_write_index = 0;
+  selfptr->m_jba.m_head = &selfptr->m_jba_head;
   thr_job_buffer *buffer = seize_buffer(rep, thr_no, true);
   selfptr->m_jba.m_buffers[0] = buffer;
   selfptr->m_jba_read_state.m_read_index = 0;
   selfptr->m_jba_read_state.m_read_buffer = buffer;
   selfptr->m_jba_read_state.m_read_pos = 0;
+  selfptr->m_jba_read_state.m_read_end = 0;
   selfptr->m_jba_read_state.m_write_index = 0;
-  selfptr->m_jba_read_state.m_write_pos = 0;
   selfptr->m_next_buffer = seize_buffer(rep, thr_no, false);
   selfptr->m_send_buffer_pool.set_pool(&rep->m_free_list);
   
   for (i = 0; i<cnt; i++)
   {
-    selfptr->m_in_queue[i].m_read_index = 0;
-    selfptr->m_in_queue[i].m_write_index = 0;
+    selfptr->m_in_queue_head[i].m_read_index = 0;
+    selfptr->m_in_queue_head[i].m_write_index = 0;
+    selfptr->m_in_queue[i].m_head = &selfptr->m_in_queue_head[i];
     buffer = seize_buffer(rep, thr_no, false);
     selfptr->m_in_queue[i].m_buffers[0] = buffer;
     selfptr->m_read_states[i].m_read_index = 0;
     selfptr->m_read_states[i].m_read_buffer = buffer;
     selfptr->m_read_states[i].m_read_pos = 0;
+    selfptr->m_read_states[i].m_read_end = 0;
     selfptr->m_read_states[i].m_write_index = 0;
-    selfptr->m_read_states[i].m_write_pos = 0;
   }
-
   queue_init(&selfptr->m_tq);
 
   selfptr->m_prioa_count = 0;
@@ -3167,7 +3260,7 @@ FastScheduler::traceDumpPrepare(NdbShutd
       continue;
     }
 
-    sendprioa_STOP_FOR_CRASH(thr_no);
+    sendprioa_STOP_FOR_CRASH(selfptr, thr_no);
 
     waitFor_count++;
   }

Thread
bzr commit into mysql-5.1 branch (ole.john.aske:3167) Bug#41301Ole John Aske11 Dec