List:Commits« Previous MessageNext Message »
From:Ole John Aske Date:December 8 2008 2:16pm
Subject:bzr commit into mysql-5.1 branch (ole.john.aske:3163) Bug#41301
View as plain text  
#At file:///net/vidar01/export/home/tmp/oleja/mysql/mysql-5.1-telco-6.4/ based on revid:jonas@stripped

 3163 Ole John Aske	2008-12-08
      Bug#41301, collection of optimization improvements
modified:
  storage/ndb/include/ndb_global.h.in
  storage/ndb/src/common/transporter/TransporterRegistry.cpp
  storage/ndb/src/kernel/blocks/dbtup/DbtupRoutines.cpp
  storage/ndb/src/kernel/blocks/dbtux/Dbtux.hpp
  storage/ndb/src/kernel/error/ErrorReporter.hpp
  storage/ndb/src/kernel/vm/Emulator.hpp
  storage/ndb/src/kernel/vm/SafeCounter.hpp
  storage/ndb/src/kernel/vm/SimulatedBlock.hpp
  storage/ndb/src/kernel/vm/mt.cpp

per-file messages:
  storage/ndb/include/ndb_global.h.in
    no-branch hinting to move abort,exit out of common execution path.
=== modified file 'storage/ndb/include/ndb_global.h.in'

=== modified file 'storage/ndb/include/ndb_global.h.in'
--- a/storage/ndb/include/ndb_global.h.in	2008-10-30 10:43:10 +0000
+++ b/storage/ndb/include/ndb_global.h.in	2008-12-08 14:16:30 +0000
@@ -41,6 +41,15 @@
 
 #include <my_global.h>
 
+/**
+ * Provide branch predict hinting for SunStudio compilers:
+ * Considder to move this into my_global.h...
+ */
+#if defined(__SUNPRO_CC) && (__SUNPRO_CC>=0x540)
+#  pragma rarely_called(abort,exit)
+#  pragma does_not_return(abort,exit)
+#endif
+
 #if ! (NDB_SIZEOF_CHAR == SIZEOF_CHAR)
 #error "Invalid define for Uint8"
 #endif

=== modified file 'storage/ndb/src/common/transporter/TransporterRegistry.cpp'
--- a/storage/ndb/src/common/transporter/TransporterRegistry.cpp	2008-11-26 10:36:20 +0000
+++ b/storage/ndb/src/common/transporter/TransporterRegistry.cpp	2008-12-08 14:16:30 +0000
@@ -1085,6 +1085,7 @@
 void
 TransporterRegistry::performReceive()
 {
+  bool hasReceived = false;
 #ifdef NDB_TCP_TRANSPORTER
 #if defined(HAVE_EPOLL_CREATE)
   if (likely(m_epoll_fd != -1))
@@ -1128,7 +1129,8 @@
           
           if (t->hasReceiveData())
           {
-            callbackObj->checkJobBuffer();
+            if (hasReceived) callbackObj->checkJobBuffer();
+            hasReceived = true;
             Uint32 * ptr;
             Uint32 sz = t->getReceiveData(&ptr);
             callbackObj->transporter_recv_from(nodeId);
@@ -1152,7 +1154,8 @@
     {
       if(t->isConnected() && t->checkConnected())
       {
-        callbackObj->checkJobBuffer();
+        if (hasReceived) callbackObj->checkJobBuffer();
+        hasReceived = true;
         Uint32 * readPtr, * eodPtr;
         t->getReceivePtr(&readPtr, &eodPtr);
         callbackObj->transporter_recv_from(nodeId);
@@ -1170,7 +1173,8 @@
     if(is_connected(nodeId)){
       if(t->isConnected() && t->checkConnected())
       {
-        callbackObj->checkJobBuffer();
+        if (hasReceived) callbackObj->checkJobBuffer();
+        hasReceived = true;
         Uint32 * readPtr, * eodPtr;
         t->getReceivePtr(&readPtr, &eodPtr);
         callbackObj->transporter_recv_from(nodeId);

=== modified file 'storage/ndb/src/kernel/blocks/dbtup/DbtupRoutines.cpp'
--- a/storage/ndb/src/kernel/blocks/dbtup/DbtupRoutines.cpp	2008-11-03 08:34:28 +0000
+++ b/storage/ndb/src/kernel/blocks/dbtup/DbtupRoutines.cpp	2008-12-08 14:16:30 +0000
@@ -479,14 +479,20 @@
   }
 }
 
-static
-inline
+static inline
 void
-zero32(Uint8* dstPtr, Uint32 len)
-{
-  while ((len & 3) != 0) 
-  {
-    dstPtr[len++] = 0;
+zero32(Uint8* dstPtr, const Uint32 len)
+{ Uint32 odd = len & 3;  /* odd: {0..3} */
+
+  if (odd != 0)   /* odd: {1..3} */
+  { Uint8* dst = dstPtr+len;
+    dst[0] = 0;
+    if (odd <= 2) /* odd: {1..2} */ 
+    { dst[1] = 0;
+      if (odd == 1)
+      { dst[2] = 0;
+      }
+    }
   }
 }
 

=== modified file 'storage/ndb/src/kernel/blocks/dbtux/Dbtux.hpp'
--- a/storage/ndb/src/kernel/blocks/dbtux/Dbtux.hpp	2008-10-05 07:14:21 +0000
+++ b/storage/ndb/src/kernel/blocks/dbtux/Dbtux.hpp	2008-12-08 14:16:30 +0000
@@ -541,6 +541,10 @@
     TreeEnt getMinMax(unsigned i);
     // for ndbrequire and ndbassert
     void progError(int line, int cause, const char* file);
+
+#if defined(__SUNPRO_CC) && (__SUNPRO_CC>=0x540)
+#  pragma rarely_called(progError)
+#endif
   };
 
   // methods

=== modified file 'storage/ndb/src/kernel/error/ErrorReporter.hpp'
--- a/storage/ndb/src/kernel/error/ErrorReporter.hpp	2008-01-01 12:45:11 +0000
+++ b/storage/ndb/src/kernel/error/ErrorReporter.hpp	2008-12-08 14:16:30 +0000
@@ -48,7 +48,13 @@
   static int get_trace_no();
   
   static const char* formatTimeStampString();
-  
+
+#if defined(__SUNPRO_CC) && (__SUNPRO_CC>=0x540)
+#  pragma rarely_called(handleAssert)
+#  pragma rarely_called(handleError)
+#  pragma rarely_called(handleWarning)
+#endif
+
 private:
   static enum NdbShutdownType s_errorHandlerShutdownType;
 };

=== modified file 'storage/ndb/src/kernel/vm/Emulator.hpp'
--- a/storage/ndb/src/kernel/vm/Emulator.hpp	2008-01-01 12:45:11 +0000
+++ b/storage/ndb/src/kernel/vm/Emulator.hpp	2008-12-08 14:16:30 +0000
@@ -31,7 +31,6 @@
 extern class  FastScheduler       globalScheduler;
 extern class  TransporterRegistry globalTransporterRegistry;
 extern struct GlobalData          globalData;
-extern struct thr_repository      g_thr_repository;
 
 #ifdef VM_TRACE
 extern class SignalLoggerManager globalSignalLoggers;

=== modified file 'storage/ndb/src/kernel/vm/SafeCounter.hpp'
--- a/storage/ndb/src/kernel/vm/SafeCounter.hpp	2008-02-20 09:04:29 +0000
+++ b/storage/ndb/src/kernel/vm/SafeCounter.hpp	2008-12-08 14:16:30 +0000
@@ -102,6 +102,10 @@
 
   BlockReference reference() const;
   void progError(int line, int err_code, const char* extra = 0);
+
+#if defined(__SUNPRO_CC) && (__SUNPRO_CC>=0x540)
+#  pragma rarely_called(progError)
+#endif
 };
 
 

=== modified file 'storage/ndb/src/kernel/vm/SimulatedBlock.hpp'
--- a/storage/ndb/src/kernel/vm/SimulatedBlock.hpp	2008-12-01 18:05:11 +0000
+++ b/storage/ndb/src/kernel/vm/SimulatedBlock.hpp	2008-12-08 14:16:30 +0000
@@ -307,6 +307,11 @@
   void handle_lingering_sections_after_execute(Signal*) const;
   void handle_lingering_sections_after_execute(SectionHandle*) const;
 
+#if defined(__SUNPRO_CC) && (__SUNPRO_CC>=0x540)
+#  pragma rarely_called(handle_invalid_sections_in_send_signal)
+#  pragma rarely_called(handle_lingering_sections_after_execute)
+#endif
+
   /**********************************************************
    * Fragmented signals
    */
@@ -487,6 +492,11 @@
    * errormessage describing the problem
    */
   void progError(int line, int err_code, const char* extradata=NULL) const ;
+
+#if defined(__SUNPRO_CC) && (__SUNPRO_CC>=0x540)
+#  pragma rarely_called(progError)
+#endif
+
 private:
   void  signal_error(Uint32, Uint32, Uint32, const char*, int) const ;
   const NodeId         theNodeId;
@@ -693,6 +703,10 @@
     
     BlockReference reference() const;
     void progError(int line, int err_code, const char* extra = 0);
+
+#if defined(__SUNPRO_CC) && (__SUNPRO_CC>=0x540)
+#  pragma rarely_called(progError)
+#endif
   };
   
   friend class MutexManager;

=== modified file 'storage/ndb/src/kernel/vm/mt.cpp'
--- a/storage/ndb/src/kernel/vm/mt.cpp	2008-11-15 15:43:59 +0000
+++ b/storage/ndb/src/kernel/vm/mt.cpp	2008-12-08 14:16:30 +0000
@@ -131,7 +131,7 @@
   volatile unsigned m_futex_state;
   enum {
     FS_RUNNING = 0,
-    FS_SLEEPING = 1,
+    FS_SLEEPING = 1
   };
   thr_wait() { xcng(&m_futex_state, FS_RUNNING);}
   void init () {}
@@ -144,10 +144,10 @@
  * if that returns true will it actually sleep, else it will return
  * immediately. This is needed to avoid races with wakeup.
  */
-static
+static inline
 void
 yield(struct thr_wait* wait, const struct timespec *timeout,
-      bool (*check_callback)(void *), void *check_arg)
+      bool (*check_callback)(struct thr_data *), struct thr_data *check_arg)
 {
   volatile unsigned * val = &wait->m_futex_state;
 #ifndef NDEBUG
@@ -173,7 +173,7 @@
   xcng(val, thr_wait::FS_RUNNING);
 }
 
-static
+static inline
 int
 wakeup(struct thr_wait* wait)
 {
@@ -195,12 +195,11 @@
 
 struct thr_wait
 {
+  bool m_need_wakeup;
+ 
   NdbMutex *m_mutex;
   NdbCondition *m_cond;
-  thr_wait() {
-    m_mutex = 0;
-    m_cond = 0;
-  }
+  thr_wait() : m_mutex(0), m_cond(0), m_need_wakeup(false) {}
 
   void init() {
     m_mutex = NdbMutex_Create();
@@ -208,29 +207,43 @@
   }
 };
 
-static
+static inline
 void
 yield(struct thr_wait* wait, const struct timespec *timeout,
-      bool (*check_callback)(void *), void *check_arg)
+      bool (*check_callback)(struct thr_data *), struct thr_data *check_arg)
 {
   Uint32 msec = 
     (1000 * timeout->tv_sec) + 
     (timeout->tv_nsec / 1000000);
   NdbMutex_Lock(wait->m_mutex);
-  if ((*check_callback)(check_arg))
-    NdbCondition_WaitTimeout(wait->m_cond, wait->m_mutex, msec);
+  while ((*check_callback)(check_arg)) /* May have spurious wakeups: Always recheck condition predicate */
+  {
+    wait->m_need_wakeup = true;
+    if (NdbCondition_WaitTimeout(wait->m_cond, wait->m_mutex, msec) == ETIMEDOUT)
+    {
+      wait->m_need_wakeup = false;
+      break;
+    }
+  }
   NdbMutex_Unlock(wait->m_mutex);
 }
 
-static
+
+static inline
 int
 wakeup(struct thr_wait* wait)
 {
   NdbMutex_Lock(wait->m_mutex);
-  NdbCondition_Signal(wait->m_cond);
+  // We should avoid signaling when not waiting for wakeup
+  if (wait->m_need_wakeup)
+  {
+    wait->m_need_wakeup = false;
+    NdbCondition_Signal(wait->m_cond);
+  }
   NdbMutex_Unlock(wait->m_mutex);
   return 0;
 }
+
 #endif
 
 static inline void
@@ -246,6 +259,7 @@
 #define assert(x) require(x)
 
 #ifdef NDB_HAVE_XCNG
+
 struct thr_spin_lock
 {
   thr_spin_lock(const char * name = 0)
@@ -489,12 +503,27 @@
   Uint32 m_data[SIZE];
 };  
 
+
+/**
+ * thr_job_queue is shared between consumer / producer. 
+ *
+ * The hot-spot of the thr_job_queue are the read/write indexes.
+ * As they are updated and read frequently they have been placed
+ * in its own thr_job_queue_head[] in order to make them fit inside a
+ * single/few cache lines and thereby avoid complete L1-cache replacement
+ * every time the job_queue is scanned.
+ */
+struct thr_job_queue_head
+{
+  unsigned m_read_index;  // Read/written by consumer, read by producer
+  unsigned m_write_index; // Read/written by producer, read by consumer
+};
+
 struct thr_job_queue
 {
   static const unsigned SIZE = 30;
 
-  unsigned m_read_index; // Read/written by consumer, read by producer
-  unsigned m_write_index; // Read/written by producer, read by consumer
+  struct thr_job_queue_head* head;
   struct thr_job_buffer* m_buffers[SIZE];
 };
 
@@ -561,8 +590,12 @@
    * execution loop and used to determine when the end of available signals is
    * reached.
    */
-  Uint32 m_write_index;
-  Uint32 m_write_pos;
+  Uint32 m_read_end;    // End within current thr_job_buffer. (*m_read_buffer)
+
+  Uint32 m_write_index; // Last available thr_job_buffer.
+
+  bool is_empty() const
+  { return (m_read_index == m_write_index) && (m_read_pos >= m_read_end); }
 };
 
 /**
@@ -662,6 +695,7 @@
   struct thr_tq m_tq;
 
   /* Prio A signal incoming queue. */
+  struct thr_job_queue_head m_jba_head;
   struct thr_job_queue m_jba;
   struct thr_spin_lock m_jba_write_lock;
   /*
@@ -692,6 +726,7 @@
    * These are the thread input queues, where other threads deliver signals
    * into.
    */
+  struct thr_job_queue_head m_in_queue_head[MAX_THREADS];
   struct thr_job_queue m_in_queue[MAX_THREADS];
   /* These are the write states of m_in_queue[self] in each thread. */
   struct thr_jb_write_state m_write_states[MAX_THREADS];
@@ -763,6 +798,7 @@
 };
 
 extern trp_callback g_trp_callback;             // Forward declaration
+extern struct thr_repository g_thr_repository;
 
 struct thr_repository
 {
@@ -1033,7 +1069,7 @@
   }
 }
 
-static
+static inline
 void
 scan_time_queues(struct thr_data* selfptr)
 {
@@ -1204,10 +1240,29 @@
 /*
  * Flush the write state to the job queue, making any new signals available to
  * receiving threads.
+ *
+ * Two versions:
+ *    - The general version flush_write_state() which may flush to any thread,
+ *    - The special version flush_write_state_self() which should only be used
+ *      to flush messages to itself.
  */
 static inline
 void
-flush_write_state(Uint32 dst, thr_job_queue *q, thr_jb_write_state *w)
+flush_write_state_self(thr_data *selfptr, thr_job_queue_head *q_head, thr_jb_write_state *w)
+{
+  /* 
+   * Could simplify the flush_write_state when writing to myself:
+   * Simply update write references wo/ mutex, memory barrier and signaling
+   */
+  w->m_write_buffer->m_len = w->m_write_pos;
+  q_head->m_write_index = w->m_write_index;
+  w->m_pending_signals_wakeup = 0;
+  w->m_pending_signals = 0;
+}
+
+static inline
+void
+flush_write_state(thr_data *dstptr, thr_job_queue_head *q_head, thr_jb_write_state *w)
 {
   /*
    * Two write memory barriers here, as assigning m_len may make signal data
@@ -1219,17 +1274,19 @@
    *
    * But wmb() is a no-op anyway in x86 ...
    */
+
   wmb();
   w->m_write_buffer->m_len = w->m_write_pos;
   wmb();
-  q->m_write_index = w->m_write_index;
+  q_head->m_write_index = w->m_write_index;
+
   w->m_pending_signals_wakeup += w->m_pending_signals;
   w->m_pending_signals = 0;
 
   if (w->m_pending_signals_wakeup >= MAX_SIGNALS_BEFORE_WAKEUP)
   {
     w->m_pending_signals_wakeup = 0;
-    wakeup(&(g_thr_repository.m_thread[dst].m_waiter));
+    wakeup(&(dstptr->m_waiter));
   }
 }
 
@@ -1240,14 +1297,20 @@
   Uint32 thr_count = g_thr_repository.m_thread_count;
   Uint32 self = selfptr->m_thr_no;
 
-  for (Uint32 thr_no = 0; thr_no < thr_count; thr_no ++)
+  thr_jb_write_state *w = selfptr->m_write_states;
+  thr_data *thrptr = g_thr_repository.m_thread;
+  for (Uint32 thr_no = 0; thr_no < thr_count; thr_no++, thrptr++, w++)
   {
-    thr_jb_write_state *w = selfptr->m_write_states + thr_no;
     if (w->m_pending_signals || w->m_pending_signals_wakeup)
     {
       w->m_pending_signals_wakeup = MAX_SIGNALS_BEFORE_WAKEUP;
-      thr_job_queue *q = g_thr_repository.m_thread[thr_no].m_in_queue + self;
-      flush_write_state(thr_no, q, w);
+      thr_job_queue_head *q_head = thrptr->m_in_queue_head + self;
+      if (thrptr == selfptr)
+      { flush_write_state_self(thrptr, q_head, w);
+      }
+      else
+      { flush_write_state(thrptr, q_head, w);
+      }
     }
   }
 }
@@ -1259,10 +1322,11 @@
 static int
 check_job_buffers(struct thr_repository* rep)
 {
-  for (unsigned i = 0; i<num_threads; i++)
+  thr_data *thrptr = rep->m_thread;
+  for (unsigned i = 0; i<num_threads; i++, thrptr++)
   {
-    thr_data * thrptr = rep->m_thread+i;
-    for (unsigned j = 0; j<num_threads; j++)
+    const thr_job_queue_head *q_head = thrptr->m_in_queue_head;
+    for (unsigned j = 0; j<num_threads; j++,q_head++)
     {
       /**
        * These values are read wo/ locks...
@@ -1276,8 +1340,8 @@
        *   conservative (i.e it can be better than guess, if read-index has
        *   moved but we didnt see it)
        */
-      unsigned ri = thrptr->m_in_queue[j].m_read_index;
-      unsigned wi = thrptr->m_in_queue[j].m_write_index;
+      unsigned ri = q_head->m_read_index;
+      unsigned wi = q_head->m_write_index;
       unsigned busy = (wi >= ri) ? wi - ri : (thr_job_queue::SIZE - ri) + wi;
       if (4*busy >= thr_job_queue::SIZE)
       {
@@ -1386,6 +1450,15 @@
        * except for QMGR open/close connection, but that is not
        * (i think) sufficient to create a deadlock
        */
+
+      /** FIXME:
+       *  On a CMT chip where #CPU >= #NDB-threads sched_yield() is effectively a NOOP as
+       *  there will normally be an idle CPU available to immediately resume thread execution.
+       *  On a Niagara chip this may severely impact performance as the CPUs are virtualized
+       *  by timemultiplexing the physical core. The thread should really be 'parked' on
+       *  a condition to free its execution resources.
+       */
+//    usleep(a-few-usec);  /* A micro-sleep would likely have been better... */
       sched_yield();
     } while (check_job_buffers(rep));
   }
@@ -1941,7 +2014,7 @@
      *
      * Or alternatively, ndbrequire() ?
      */
-    assert(write_index != q->m_read_index);
+    assert(write_index != q->head->m_read_index);
     new_buffer->m_len = 0;
     new_buffer->m_prioa = prioa;
     q->m_buffers[write_index] = new_buffer;
@@ -1955,50 +2028,50 @@
 }
 
 static
-void
+bool
 read_jbb_state(thr_data *selfptr, Uint32 count)
 {
-  for (Uint32 i = 0; i < count; i++)
+  bool empty   = true;
+
+  thr_jb_read_state *r = selfptr->m_read_states;
+  const thr_job_queue *q = selfptr->m_in_queue;
+  for (Uint32 i = 0; i < count; i++,r++,q++)
   {
-    thr_jb_read_state *r = selfptr->m_read_states + i;
-    const thr_job_queue *q = selfptr->m_in_queue +i;
-    Uint32 index = q->m_write_index;
-    r->m_write_index = index;
-    read_barrier_depends();
-    r->m_write_pos = q->m_buffers[index]->m_len;
+    Uint32 read_index = r->m_read_index;
+
+    if (r->m_write_index != read_index)  // Optimization: Avoid reload when known non-empty.
+    { empty = false;                     // (To prevent extensive cache reload of (invalidated)
+    }                                    //  shared thr_job_queue_head)
+    else
+    { r->m_write_index = q->head->m_write_index;
+      read_barrier_depends();
+      r->m_read_end = q->m_buffers[read_index]->m_len;
+      empty &= r->is_empty();
+    }
   }
+  return empty;
 }
 
 static
-void
+bool
 read_jba_state(thr_data *selfptr)
 {
-  const thr_job_queue *jba = &(selfptr->m_jba);
-  Uint32 index = jba->m_write_index;
-  selfptr->m_jba_read_state.m_write_index = index;
+  thr_jb_read_state *r = &(selfptr->m_jba_read_state);
+  r->m_write_index = selfptr->m_jba_head.m_write_index;
   read_barrier_depends();
-  selfptr->m_jba_read_state.m_write_pos = jba->m_buffers[index]->m_len;
+  r->m_read_end = selfptr->m_jba.m_buffers[r->m_read_index]->m_len;
+  return r->is_empty();
 }
 
 /* Check all job queues, return true only if all are empty. */
 static bool
-check_queues_empty(void *data)
+check_queues_empty(thr_data *selfptr)
 {
-  Uint32 thr_count = g_thr_repository.m_thread_count;
-  thr_data *selfptr = reinterpret_cast<thr_data *>(data);
+  bool empty = read_jba_state(selfptr);
+  if (!empty) return false;
 
-  read_jbb_state(selfptr, thr_count);
-  read_jba_state(selfptr);
-  const thr_jb_read_state *r = &(selfptr->m_jba_read_state);
-  if (r->m_read_index < r->m_write_index || r->m_read_pos < r->m_write_pos)
-    return false;
-  for (Uint32 i = 0; i < thr_count; i++)
-  {
-    r = selfptr->m_read_states + i;;
-    if (r->m_read_index < r->m_write_index || r->m_read_pos < r->m_write_pos)
-      return false;
-  }
-  return true;
+  empty = read_jbb_state(selfptr, g_thr_repository.m_thread_count);
+  return empty;
 }
 
 /*
@@ -2049,19 +2122,20 @@
                 Signal *sig, Uint32 max_signals,
                 Uint32 *watchDogCounter, Uint32 *signalIdCounter)
 {
-  Uint32 num_signals = 0;
-
+  Uint32 num_signals;
   Uint32 read_index = r->m_read_index;
   Uint32 write_index = r->m_write_index;
   Uint32 read_pos = r->m_read_pos;
-  Uint32 write_pos = (read_index == write_index ?
-                      r->m_write_pos :
-                      q->m_buffers[read_index]->m_len);
+  Uint32 read_end = r->m_read_end;
+
+  if (read_index == write_index && read_pos >= read_end)
+    return 0;          // empty read_state
+
   thr_job_buffer *read_buffer = r->m_read_buffer;
 
-  while (num_signals < max_signals)
+  for (num_signals = 0; num_signals < max_signals; num_signals++)
   {
-    while (read_pos >= write_pos)
+    while (read_pos >= read_end)
     {
       if (read_index == write_index)
       {
@@ -2075,13 +2149,12 @@
         release_buffer(&g_thr_repository, selfptr->m_thr_no, read_buffer);
         read_buffer = q->m_buffers[read_index];
         read_pos = 0;
-        write_pos = (read_index == write_index ?
-                     r->m_write_pos :
-                     q->m_buffers[read_index]->m_len);
+        read_end = read_buffer->m_len;
         /* Update thread-local read state. */
-        r->m_read_index = q->m_read_index = read_index;
+        r->m_read_index = q->head->m_read_index = read_index;
         r->m_read_buffer = read_buffer;
         r->m_read_pos = read_pos;
+        r->m_read_end = read_end;
       }
     }
 
@@ -2090,20 +2163,17 @@
      * (Though on Intel Core 2, they do not give much speedup, as apparently
      * the hardware prefetcher is already doing a fairly good job).
      */
-#ifdef __GNUC__
-    __builtin_prefetch (read_buffer->m_data + read_pos + 16, 0, 3);
-    __builtin_prefetch ((Uint32 *)&sig->header + 16, 1, 3);
-#endif
+    PREFETCH_READ (read_buffer->m_data + read_pos + 16);
+    PREFETCH_WRITE ((Uint32 *)&sig->header + 16);
 
     /* Now execute the signal. */
     SignalHeader* s =
       reinterpret_cast<SignalHeader*>(read_buffer->m_data + read_pos);
     Uint32 seccnt = s->m_noOfSections;
     Uint32 siglen = (sizeof(*s)>>2) + s->theLength;
-#ifdef __GNUC__
-    if(siglen>16)
-      __builtin_prefetch (read_buffer->m_data + read_pos + 32, 0, 3);
-#endif
+    if(siglen>16) {
+      PREFETCH_READ (read_buffer->m_data + read_pos + 32);
+    }
     Uint32 bno = blockToMain(s->theReceiversBlockNumber);
     Uint32 ino = map_instance(s);
     SimulatedBlock* block = globalData.getBlock(bno, ino);
@@ -2143,8 +2213,6 @@
 #endif
 
     block->executeFunction(gsn, sig);
-
-    num_signals++;
   }
 
   return num_signals;
@@ -2158,26 +2226,40 @@
   Uint32 thr_count = g_thr_repository.m_thread_count;
   Uint32 signal_count = 0;
 
-  read_jbb_state(selfptr, thr_count);
+  bool jbb_empty = read_jbb_state(selfptr, thr_count);
   /*
    * A load memory barrier to ensure that we see any prio A signal sent later
    * than loaded prio B signals.
    */
   rmb();
+  if (jbb_empty)
+  {
+    /* Prio B's are empty, check and possibly execute prio A signals */
+    bool jba_empty = read_jba_state(selfptr);
+    if (jba_empty)
+      return 0;
 
-  for (Uint32 send_thr_no = 0; send_thr_no < thr_count; send_thr_no++)
-  {
-    /* Read the prio A state often, to avoid starvation of prio A. */
-    read_jba_state(selfptr);
     static Uint32 max_prioA = thr_job_queue::SIZE * thr_job_buffer::SIZE;
-    signal_count += execute_signals(selfptr, &(selfptr->m_jba),
+    return execute_signals(selfptr, &(selfptr->m_jba),
                                     &(selfptr->m_jba_read_state), sig,
                                     max_prioA, watchDogCounter,
                                     signalIdCounter);
+  }
 
+  thr_job_queue *queue = selfptr->m_in_queue;
+  thr_jb_read_state *read_state = selfptr->m_read_states;
+  for (Uint32 send_thr_no = 0; send_thr_no < thr_count; send_thr_no++,queue++,read_state++)
+  {
+    /* Read the prio A state often, to avoid starvation of prio A. */
+    bool jba_empty = read_jba_state(selfptr);
+    if (!jba_empty)
+    { static Uint32 max_prioA = thr_job_queue::SIZE * thr_job_buffer::SIZE;
+      signal_count += execute_signals(selfptr, &(selfptr->m_jba),
+                                      &(selfptr->m_jba_read_state), sig,
+                                      max_prioA, watchDogCounter,
+                                      signalIdCounter);
+    }
     /* Now execute prio B signals from one thread. */
-    thr_job_queue *queue = selfptr->m_in_queue + send_thr_no;
-    thr_jb_read_state *read_state = selfptr->m_read_states + send_thr_no;
     signal_count += execute_signals(selfptr, queue, read_state,
                                     sig, MAX_SIGNALS_PER_JB,
                                     watchDogCounter, signalIdCounter);
@@ -2430,6 +2512,7 @@
   unsigned thr_no = selfptr->m_thr_no;
   Uint32& watchDogCounter = selfptr->m_watchdog_counter;
   Uint32 thrSignalId = 0;
+  bool has_received = false;
 
   init_thread(selfptr);
   receiverThreadId = thr_no;
@@ -2454,7 +2537,7 @@
     Uint32 sum = run_job_buffers(selfptr, signal,
                                  &watchDogCounter, &thrSignalId);
 
-    if (sum)
+    if (sum || has_received)
     {
       watchDogCounter = 6;
       flush_jbb_write_state(selfptr);
@@ -2464,18 +2547,18 @@
 
     watchDogCounter = 7;
 
+    has_received = false;
     if (globalTransporterRegistry.pollReceive(1))
     {
       if (check_job_buffers(rep) == 0)
       {
 	watchDogCounter = 8;
-	lock(&rep->m_receive_lock);
-	globalTransporterRegistry.performReceive();
-	unlock(&rep->m_receive_lock);
+        lock(&rep->m_receive_lock);
+        globalTransporterRegistry.performReceive();
+        unlock(&rep->m_receive_lock);
+        has_received = true;
       }
     }
-
-    flush_jbb_write_state(selfptr);
   }
 
   globalEmulatorData.theWatchDog->unregisterWatchedThread(thr_no);
@@ -2483,6 +2566,7 @@
 }
 
 static
+inline
 void
 sendpacked(struct thr_data* thr_ptr, Signal* signal)
 {
@@ -2558,7 +2642,7 @@
     }
     
     if (sum == 0)
-    {
+    { 
       yield(&selfptr->m_waiter, &nowait, check_queues_empty, selfptr);
     }
   }
@@ -2586,20 +2670,24 @@
   Uint32 dst = block2ThreadId(block, instance);
   struct thr_repository* rep = &g_thr_repository;
   struct thr_data * selfptr = rep->m_thread + self;
+  struct thr_data * dstptr = rep->m_thread + dst;
 
   selfptr->m_priob_count++;
   Uint32 siglen = (sizeof(*s) >> 2) + s->theLength + s->m_noOfSections;
   selfptr->m_priob_size += siglen;
 
-  thr_job_queue *q = rep->m_thread[dst].m_in_queue + self;
+  thr_job_queue *q = dstptr->m_in_queue + self;
   thr_jb_write_state *w = selfptr->m_write_states + dst;
   if (insert_signal(q, w, false, s, data, secPtr, selfptr->m_next_buffer))
   {
     selfptr->m_next_buffer = seize_buffer(rep, self, false);
   }
-
   if (w->m_pending_signals >= MAX_SIGNALS_BEFORE_FLUSH)
-    flush_write_state(dst, q, w);
+  { if (dstptr == selfptr)
+      flush_write_state_self(dstptr, q->head, w);
+    else 
+      flush_write_state(dstptr, q->head, w);
+  }
 }
 
 void
@@ -2623,7 +2711,7 @@
 
   lock(&dstptr->m_jba_write_lock);
 
-  Uint32 index = q->m_write_index;
+  Uint32 index = q->head->m_write_index;
   w.m_write_index = index;
   thr_job_buffer *buffer = q->m_buffers[index];
   w.m_write_buffer = buffer;
@@ -2632,7 +2720,8 @@
   w.m_pending_signals_wakeup = MAX_SIGNALS_BEFORE_WAKEUP;
   bool buf_used = insert_signal(q, &w, true, s, data, secPtr,
                                 selfptr->m_next_buffer);
-  flush_write_state(dst, q, &w);
+
+  flush_write_state(dstptr, q->head, &w);
 
   unlock(&dstptr->m_jba_write_lock);
 
@@ -2738,7 +2827,7 @@
 
   lock(&dstptr->m_jba_write_lock);
 
-  Uint32 index = q->m_write_index;
+  Uint32 index = q->head->m_write_index;
   w.m_write_index = index;
   thr_job_buffer *buffer = q->m_buffers[index];
   w.m_write_buffer = buffer;
@@ -2747,7 +2836,7 @@
   w.m_pending_signals_wakeup = MAX_SIGNALS_BEFORE_WAKEUP;
   insert_signal(q, &w, true, &signalT.header, signalT.theData, NULL,
                 &dummy_buffer);
-  flush_write_state(dst, q, &w);
+  flush_write_state(dstptr, q->head, &w);
 
   unlock(&dstptr->m_jba_write_lock);
 }
@@ -2777,31 +2866,32 @@
   selfptr->m_first_free = 0;
   selfptr->m_first_unused = 0;
   
-  selfptr->m_jba.m_read_index = 0;
-  selfptr->m_jba.m_write_index = 0;
+  selfptr->m_jba_head.m_read_index = 0;
+  selfptr->m_jba_head.m_write_index = 0;
+  selfptr->m_jba.head = &selfptr->m_jba_head;
   thr_job_buffer *buffer = seize_buffer(rep, thr_no, true);
   selfptr->m_jba.m_buffers[0] = buffer;
   selfptr->m_jba_read_state.m_read_index = 0;
   selfptr->m_jba_read_state.m_read_buffer = buffer;
   selfptr->m_jba_read_state.m_read_pos = 0;
+  selfptr->m_jba_read_state.m_read_end = 0;
   selfptr->m_jba_read_state.m_write_index = 0;
-  selfptr->m_jba_read_state.m_write_pos = 0;
   selfptr->m_next_buffer = seize_buffer(rep, thr_no, false);
   selfptr->m_send_buffer_pool.set_pool(&rep->m_free_list);
   
   for (i = 0; i<cnt; i++)
   {
-    selfptr->m_in_queue[i].m_read_index = 0;
-    selfptr->m_in_queue[i].m_write_index = 0;
+    selfptr->m_in_queue_head[i].m_read_index = 0;
+    selfptr->m_in_queue_head[i].m_write_index = 0;
+    selfptr->m_in_queue[i].head = &selfptr->m_in_queue_head[i];
     buffer = seize_buffer(rep, thr_no, false);
     selfptr->m_in_queue[i].m_buffers[0] = buffer;
     selfptr->m_read_states[i].m_read_index = 0;
     selfptr->m_read_states[i].m_read_buffer = buffer;
     selfptr->m_read_states[i].m_read_pos = 0;
+    selfptr->m_read_states[i].m_read_end = 0;
     selfptr->m_read_states[i].m_write_index = 0;
-    selfptr->m_read_states[i].m_write_pos = 0;
   }
-
   queue_init(&selfptr->m_tq);
 
   selfptr->m_prioa_count = 0;

Thread
bzr commit into mysql-5.1 branch (ole.john.aske:3163) Bug#41301Ole John Aske8 Dec