List:Commits« Previous MessageNext Message »
From:knielsen Date:October 11 2007 3:25pm
Subject:bk commit into 5.1 tree (knielsen:1.2615)
View as plain text  
Below is the list of changes that have just been committed into a local
5.1 repository of knielsen. When knielsen does a push these changes will
be propagated to the main repository and, within 24 hours after the
push, to the public repository.
For information on how to access the public repository
see http://dev.mysql.com/doc/mysql/en/installing-source-tree.html

ChangeSet@stripped, 2007-10-11 17:24:44+02:00, knielsen@ymer.(none) +1 -0
  Clean up job buffer handling, fix instability.

  storage/ndb/src/kernel/vm/mt/mt.cpp@stripped, 2007-10-11 17:24:40+02:00, knielsen@ymer.(none) +395 -340
    Clean up job buffer handling, fix instability.

diff -Nrup a/storage/ndb/src/kernel/vm/mt/mt.cpp b/storage/ndb/src/kernel/vm/mt/mt.cpp
--- a/storage/ndb/src/kernel/vm/mt/mt.cpp	2007-10-10 09:34:49 +02:00
+++ b/storage/ndb/src/kernel/vm/mt/mt.cpp	2007-10-11 17:24:40 +02:00
@@ -34,7 +34,7 @@
 #include <sys/types.h>
 
 static const Uint32 NUM_THREADS = 3;
-#define MAX_THREADS 63
+#define MAX_THREADS 4
 
 static inline
 int
@@ -44,6 +44,15 @@ xcng(volatile unsigned * addr, int val)
   return val;
 }
 
+/* Memory barriers, these definitions are for x64_64. */
+#define mb() 	asm volatile("mfence":::"memory")
+/* According to Intel docs, it does not reorder loads. */
+//#define rmb()	asm volatile("lfence":::"memory")
+#define rmb()	asm volatile("" ::: "memory")
+#define wmb()	asm volatile("" ::: "memory")
+#define read_barrier_depends()	do {} while(0)
+
+
 #ifdef USE_FUTEX
 
 static inline
@@ -185,6 +194,7 @@ void
 unlock(struct thr_spin_lock* sl)
 {
   sl->m_lock = 0;
+  mb();
 }
 
 static
@@ -276,16 +286,12 @@ struct thr_jb_write_state
   */
   Uint32 m_write_index;
   Uint32 m_write_pos;
-  /*
-    These are the old values of m_write_index and m_write_pos that were last
-    flushed to the consumer thread, in thr_job_queue::m_write_index and
-    thr_job_buffer::m_len.
 
-    These are used to judge when it is time to flush a new batch of signals to
-    the consumer thread.
-  */
-  Uint32 m_old_write_index;
-  Uint32 m_old_write_pos;
+  /* Thread-local copy of thr_job_queue::m_buffers[m_write_index]. */
+  thr_job_buffer *m_write_buffer;
+
+  /* Number of signals inserted since last flush to thr_job_queue. */
+  Uint32 m_pending_signals;
 };
 
 /*
@@ -300,15 +306,15 @@ struct thr_jb_read_state
   */
   Uint32 m_read_index;
   /*
-    Thread local copy of thr_job_queue::m_buffers[m_read_index].
-   */
-  Uint32 m_read_buffer;
-  /*
     Index into m_read_buffer->m_data[] of the next signal to execute from the
     current buffer.
   */
   Uint32 m_read_pos;
   /*
+    Thread local copy of thr_job_queue::m_buffers[m_read_index].
+   */
+  thr_job_buffer *m_read_buffer;
+  /*
     These are thread-local copies of thr_job_queue::m_write_index and
     thr_job_buffer::m_len. They are read once at the start of the signal
     execution loop and used to determine when the end of available signals is
@@ -361,11 +367,17 @@ struct thr_data
   struct thr_job_queue m_jba;
   struct thr_spin_lock m_jba_write_lock;
   /*
-    In m_jba_next_buffer we keep a free buffer at all times, so that when
+    In m_next_buffer we keep a free buffer at all times, so that when
     we hold the lock and find we need a new buffer, we can use this and this
     way defer allocation to after releasing the lock.
   */
-  struct thr_job_buffer* m_jba_next_buffer;
+  struct thr_job_buffer* m_next_buffer;
+  /* Thread-local read state of prio A buffer. */
+  struct thr_jb_read_state m_jba_read_state;
+  /*
+    There is no m_jba_write_state, as we have multiple writers to the prio A
+    queue, so local state becomes invalid as soon as we release the lock.
+  */
 
   /*
     We keep a small number of buffers in a thread-local cyclic FIFO, so that
@@ -378,8 +390,15 @@ struct thr_data
   /* m_first_unused is the first unused entry in m_free_fifo. */
   Uint32 m_first_unused;
 
-  struct thr_job_buffer* m_out_queue[MAX_THREADS];
+  /*
+    These are the thread input queues, where other threads deliver signals
+    into.
+  */
   struct thr_job_queue m_in_queue[MAX_THREADS];
+  /* These are the write states of m_in_queue[self] in each thread. */
+  struct thr_jb_write_state m_write_states[MAX_THREADS];
+  /* These are the read states of all of our own m_in_queue[]. */
+  struct thr_jb_read_state m_read_states[MAX_THREADS];
 
   /* Jam buffers for making trace files at crashes. */
   EmulatedJamBuffer *m_jam;
@@ -564,137 +583,6 @@ release_buffer(struct thr_repository* re
 }
 
 static
-void
-transfer_buffer(struct thr_repository* rep, unsigned from, unsigned to)
-{
-  require(from < NUM_THREADS && to < NUM_THREADS);
-  unsigned old;
-  unsigned * writeptr = &(rep->m_thread[to].m_in_queue[from].m_write_index);
-  volatile unsigned * readptr = &(rep->m_thread[to].m_in_queue[from].m_read_index);
-  unsigned readidx = (*readptr >> 16);
-  unsigned writeidx = * writeptr;
-  unsigned nextidx = (writeidx + 1) % thr_job_queue::SIZE;
-  thr_job_buffer* src = rep->m_thread[from].m_out_queue[to];
-
-  if (unlikely(readidx == nextidx))
-    goto check_full;
-
-  if (0)
-    ndbout_c("transfer from: %d to: %d (read: %d write: %d)",
-	     from, to, readidx, writeidx);
-  
-do_transfer:  
-  rep->m_thread[to].m_in_queue[from].m_buffers[writeidx] = src;
-  // need storestore barrier
-  // but xcng is full barrier
-  old = xcng(writeptr, nextidx);
-  assert(old == writeidx);
-  wakeup(&rep->m_thread[to].m_waiter); // potentially wakeup
-  
-  rep->m_thread[from].m_out_queue[to] = seize_buffer(rep, from, false);
-  return ;
-check_full:
-  ndbout_c("sleep in transfer from %u to %u", from, to);
-  for (unsigned i = 0; i<1000; i++)
-  {
-    usleep(1000);
-    if ((*readptr >> 16) != nextidx)
-      goto do_transfer;
-  }
-  
-  abort();
-}
-
-static
-unsigned
-dojob(struct thr_data* selfptr, struct thr_job_buffer* ptr, struct Signal* sig,
-      Uint32 *current_pos, Uint32 *watchDogCounter, Uint32 *signalIdCounter)
-{
-  unsigned cnt = 0;
-  unsigned *posptr = ptr->m_data + (*current_pos & 0xffff);
-  unsigned *endptr = ptr->m_data + ptr->m_len;
-  SimulatedBlock** blockptr = selfptr->m_blocks - MIN_BLOCK_NO;
-
-  while (posptr < endptr)
-  {
-    SignalHeader* s = reinterpret_cast<SignalHeader*>(posptr);
-    unsigned seccnt = s->m_noOfSections;
-    unsigned siglen = (sizeof(*s)>>2) + s->theLength;
-    unsigned bno = s->theReceiversBlockNumber;
-    unsigned gsn = s->theVerId_signalNumber;
-    SimulatedBlock * block = blockptr[bno];
-    *watchDogCounter = 1;
-    s->theSignalId = (*signalIdCounter)++;
-    memcpy(&sig->header, s, 4*siglen);
-    sig->m_sectionPtrI[0] = posptr[siglen + 0];
-    sig->m_sectionPtrI[1] = posptr[siglen + 1];
-    sig->m_sectionPtrI[2] = posptr[siglen + 2];
-
-    /*
-      Update just before execute so signal dump can know how far we are.
-    */
-    assert((*current_pos & 0xffff) + siglen + seccnt < 0x10000);
-    *current_pos += siglen + seccnt;
-
-    block->executeFunction(gsn, sig);
-    
-    cnt++;
-    posptr += siglen + seccnt;
-  }
-  
-  return cnt;
-}
-
-static
-unsigned
-dojoba(struct thr_repository * rep, unsigned thr_no,
-       Signal* signal, unsigned write_index,
-       Uint32 *watchDogCounter, Uint32 *signalIdCounter)
-{
-  unsigned sum = 0;
-  struct thr_job_buffer* buffer;
-  struct thr_data* selfptr = rep->m_thread + thr_no;
-  Uint32 *read_idx_ptr = &(selfptr->m_jba.m_read_index);
-  unsigned read_index = *read_idx_ptr;
-  unsigned wi_buf = write_index >> 16;
-
-  unsigned ri_buf = read_index >> 16;
-  while (ri_buf != wi_buf)
-  {
-    buffer = selfptr->m_jba.m_buffers[ri_buf];
-    /* Guard against overflow in lower 16 bit of *read_idx_ptr. */
-    assert(thr_job_buffer::SIZE < 0x10000);
-    sum += dojob(selfptr, buffer, signal,
-                 read_idx_ptr, watchDogCounter, signalIdCounter);
-    release_buffer(rep, thr_no, buffer);
-    ri_buf = (ri_buf + 1) % thr_job_queue::SIZE;
-    /*
-      Update executed-so-far position, so signal traces will work.
-
-      ToDo: memory barrier here, all prior loads to complete before
-      following store.
-    */
-    *read_idx_ptr = (ri_buf << 16);
-  }
-
-  buffer = selfptr->m_jba.m_buffers[ri_buf];
-  /*
-    We need to take the write lock here to make sure we do not execute any
-    partially written signals due to memory reordering.
-
-    Actually, it would be enough to just pass a value of
-    buffer->m_len read under lock before calling dojoba(). And more
-    actually, a memory barrier in that place would also be sufficient.
-  */
-  // ToDo: This deadlocks currently, as the spinlocks are not recursive
-//  lock(&selfptr->m_jba.m_write_lock);
-  sum += dojob(selfptr, buffer, signal, read_idx_ptr, watchDogCounter, signalIdCounter);
-//  unlock(&selfptr->m_jba.m_write_lock);
-
-  return sum;
-}
-
-static
 inline
 Uint32
 scan_queue(struct thr_data* selfptr, Uint32 cnt, Uint32 end, Uint32* ptr)
@@ -717,6 +605,12 @@ scan_queue(struct thr_data* selfptr, Uin
       SignalHeader* s = reinterpret_cast<SignalHeader*>(page + pos);
       if (0)
 	ndbout_c("found %p val: %d end: %d", s, val & 0xFFFF, end);
+      /*
+        ToDo: Do measurements of the frequency of these prio A timed signals.
+
+        If they are frequent, we may want to optimize, as sending one prio A
+        signal is somewhat expensive compared to sending one prio B.
+      */
       sendprioa(thr_no, s->theReceiversBlockNumber, s);
       * (page + pos) = free;
       free = idx;
@@ -808,6 +702,51 @@ scan_time_queues(struct thr_data* selfpt
   abort();
 }
 
+/*
+  Flush the write state to the job queue, making any new signals available to
+  receiving threads.
+*/
+static inline
+void
+flush_write_state(Uint32 dst, thr_job_queue *q, thr_jb_write_state *w)
+{
+  /*
+    Two write memory barriers here, as assigning m_len may make signal data
+    available to other threads, and assigning m_write_index may make new
+    buffers available.
+
+    We could optimize this by only doing it as needed, and only doing it
+    once before setting all m_len, and once before setting all m_write_index.
+
+    But wmb() is a no-op anyway in x86 ...
+  */
+  wmb();
+  w->m_write_buffer->m_len = w->m_write_pos;
+  wmb();
+  q->m_write_index = w->m_write_index;
+  w->m_pending_signals = 0;
+
+  wakeup(&(g_thr_repository.m_thread[dst].m_waiter));
+}
+
+static
+void
+flush_jbb_write_state(thr_data *selfptr)
+{
+  Uint32 thr_count = g_thr_repository.m_thread_count;
+  Uint32 self = selfptr->m_thr_no;
+
+  for (Uint32 thr_no = 0; thr_no < thr_count; thr_no ++)
+  {
+    thr_jb_write_state *w = &(selfptr->m_write_states[thr_no]);
+    if (w->m_pending_signals > 0)
+    {
+      thr_job_queue *q = &(g_thr_repository.m_thread[thr_no].m_in_queue[self]);
+      flush_write_state(thr_no, q, w);
+    }
+  }
+}
+
 Uint32 senderThreadId;
 
 static
@@ -848,19 +787,7 @@ do_receive(struct thr_repository*rep, st
   }
   unlock(&rep->m_receive_lock);
   
-  unsigned cnt = rep->m_thread_count;
-  
-  /**
-   * Transfer all out buffers
-   */
-  for (unsigned i = 0; i<cnt; i++)
-  {
-    thr_job_buffer * jb = selfptr->m_out_queue[i];
-    if (jb->m_len)
-    {
-      transfer_buffer(rep, thr_no, i);
-    }
-  }
+  flush_jbb_write_state(selfptr);
 }
 
 static
@@ -880,6 +807,214 @@ sendpacked(struct thr_data* selfptr, Sig
     b_tup->executeFunction(GSN_SEND_PACKED, signal);
 }
 
+/*
+  Insert a signal in a job queue.
+
+  The signal is not visible to consumers yet after return from this function,
+  only recorded in the thr_jb_write_state. It is necessary to first call
+  flush_write_state() for this.
+
+  The new_buffer is a job buffer to use if the current one gets full. If used,
+  we return true, indicating that the caller should allocate a new one for
+  the next call. (This is done to allow to insert under lock, but do the
+  allocation outside the lock).
+*/
+static inline
+bool
+insert_signal(thr_job_queue *q, thr_jb_write_state *w, Uint32 prioa,
+              const struct SignalHeader* s, thr_job_buffer *new_buffer)
+{
+  Uint32 write_pos = w->m_write_pos;
+  unsigned siglen = (sizeof(*s) >> 2) + s->theLength + s->m_noOfSections;
+  assert(w->m_write_buffer == q->m_buffers[w->m_write_index]);
+  memcpy(&(w->m_write_buffer->m_data[write_pos]), s, 4*siglen);
+  write_pos += siglen;
+  w->m_pending_signals++;
+
+  /*
+    We make sure that there is always room for at least one signal in the
+    current buffer in the queue, so one insert is always possible without
+    adding a new buffer.
+  */
+  if (likely(write_pos + 32 <= thr_job_buffer::SIZE))
+  {
+    w->m_write_pos = write_pos;
+    return false;
+  }
+  else
+  {
+    /*
+      Need a write memory barrier here, as this might make signal data visible
+      to other threads.
+
+      ToDo: We actually only need the wmb() here if we already make this
+      buffer visible to the other thread. So we might optimize it a bit. But
+      wmb() is a no-op on x86 anyway...
+    */
+    wmb();
+    w->m_write_buffer->m_len = write_pos;
+    Uint32 write_index = (w->m_write_index + 1) % thr_job_queue::SIZE;
+    /*
+      Full job buffer is fatal.
+
+      ToDo: should we wait for it to become non-full? There is no guarantee
+      that this will actually happen...
+
+      Or alternatively, ndbrequire() ?
+    */
+    assert(write_index != q->m_read_index);
+    new_buffer->m_len = 0;
+    new_buffer->m_prioa = prioa;
+    q->m_buffers[write_index] = new_buffer;
+    w->m_write_index = write_index;
+    w->m_write_pos = 0;
+    w->m_write_buffer = new_buffer;
+    return true;                // Buffer new_buffer used
+  }
+
+  return false;                 // Buffer new_buffer not used
+}
+
+static
+void
+read_jbb_state(thr_data *selfptr, Uint32 count)
+{
+  for (Uint32 i = 0; i < count; i++)
+  {
+    thr_jb_read_state *r = &(selfptr->m_read_states[i]);
+    const thr_job_queue *q = &(selfptr->m_in_queue[i]);
+    Uint32 index = q->m_write_index;
+    r->m_write_index = index;
+    read_barrier_depends();
+    r->m_write_pos = q->m_buffers[index]->m_len;
+  }
+}
+
+static
+void
+read_jba_state(thr_data *selfptr)
+{
+  const thr_job_queue *jba = &(selfptr->m_jba);
+  Uint32 index = jba->m_write_index;
+  selfptr->m_jba_read_state.m_write_index = index;
+  read_barrier_depends();
+  selfptr->m_jba_read_state.m_write_pos = jba->m_buffers[index]->m_len;
+}
+
+/*
+  Execute at most MAX_SIGNALS signals from one job queue, updating local read
+  state as appropriate.
+
+  Returns number of signals actually executed.
+*/
+static
+Uint32
+execute_signals(thr_data *selfptr, thr_job_queue *q, thr_jb_read_state *r,
+                Signal *sig, Uint32 max_signals,
+                Uint32 *watchDogCounter, Uint32 *signalIdCounter)
+{
+  Uint32 num_signals = 0;
+
+  Uint32 read_index = r->m_read_index;
+  Uint32 write_index = r->m_write_index;
+  Uint32 read_pos = r->m_read_pos;
+  Uint32 write_pos = (read_index == write_index ?
+                      r->m_write_pos :
+                      q->m_buffers[read_index]->m_len);
+  thr_job_buffer *read_buffer = r->m_read_buffer;
+  SimulatedBlock** blockptr = selfptr->m_blocks - MIN_BLOCK_NO;
+
+  while (num_signals < max_signals)
+  {
+    while (read_pos >= write_pos)
+    {
+      if (read_index == write_index)
+      {
+        /* No more available now. */
+        return num_signals;
+      }
+      else
+      {
+        /* Move to next buffer. */
+        read_index = (read_index + 1) % thr_job_queue::SIZE;
+        release_buffer(&g_thr_repository, selfptr->m_thr_no, read_buffer);
+        read_buffer = q->m_buffers[read_index];
+        read_pos = 0;
+        write_pos = (read_index == write_index ?
+                     r->m_write_pos :
+                     q->m_buffers[read_index]->m_len);
+        /* Update thread-local read state. */
+        r->m_read_index = q->m_read_index = read_index;
+        r->m_read_buffer = read_buffer;
+        r->m_read_pos = read_pos;
+      }
+    }
+
+    /* Now execute the signal. */
+    SignalHeader* s =
+      reinterpret_cast<SignalHeader*>(&(read_buffer->m_data[read_pos]));
+    Uint32 seccnt = s->m_noOfSections;
+    Uint32 siglen = (sizeof(*s)>>2) + s->theLength;
+    Uint32 bno = s->theReceiversBlockNumber;
+    Uint32 gsn = s->theVerId_signalNumber;
+    SimulatedBlock * block = blockptr[bno];
+    *watchDogCounter = 1;
+    /* Must update original buffer so signal dump will see it. */
+    s->theSignalId = (*signalIdCounter)++;
+    memcpy(&sig->header, s, 4*siglen);
+    sig->m_sectionPtrI[0] = read_buffer->m_data[read_pos + siglen + 0];
+    sig->m_sectionPtrI[1] = read_buffer->m_data[read_pos + siglen + 1];
+    sig->m_sectionPtrI[2] = read_buffer->m_data[read_pos + siglen + 2];
+
+    read_pos += siglen + seccnt;
+    /* Update just before execute so signal dump can know how far we are. */
+    r->m_read_pos = read_pos;
+
+    block->executeFunction(gsn, sig);
+
+    num_signals++;
+  }
+
+  return num_signals;
+}
+
+static
+Uint32
+run_job_buffers(thr_data *selfptr, Signal *sig,
+                Uint32 *watchDogCounter, Uint32 *signalIdCounter)
+{
+  static const Uint32 MAX_SIGNALS_PER_JB = 100;
+  Uint32 thr_count = g_thr_repository.m_thread_count;
+  Uint32 signal_count = 0;
+
+  read_jbb_state(selfptr, thr_count);
+  /*
+    A load memory barrier to ensure that we see any prio A signal sent later
+    than loaded prio B signals.
+  */
+  rmb();
+
+  for (Uint32 send_thr_no = 0; send_thr_no < thr_count; send_thr_no++)
+  {
+    /* Read the prio A state often, to avoid starvation of prio A. */
+    read_jba_state(selfptr);
+    static Uint32 max_prioA = thr_job_queue::SIZE * thr_job_buffer::SIZE;
+    signal_count += execute_signals(selfptr, &(selfptr->m_jba),
+                                    &(selfptr->m_jba_read_state), sig,
+                                    max_prioA, watchDogCounter,
+                                    signalIdCounter);
+
+    /* Now execute prio B signals from one thread. */
+    thr_job_queue *queue = &(selfptr->m_in_queue[send_thr_no]);
+    thr_jb_read_state *read_state = &(selfptr->m_read_states[send_thr_no]);
+    signal_count += execute_signals(selfptr, queue, read_state,
+                                    sig, MAX_SIGNALS_PER_JB,
+                                    watchDogCounter, signalIdCounter);
+  }
+
+  return signal_count;
+}
+
 static inline Uint32
 block2ThreadId(Uint32 block)
 {
@@ -945,7 +1080,7 @@ mt_thr_main(void *thr_arg)
   {
     cpu_set_t mask;
     CPU_ZERO(&mask);
-    CPU_SET((thr_no & 1), &mask);
+    CPU_SET(((thr_no & 1)*2 + 1), &mask);
     sched_setaffinity(tid, sizeof(mask), &mask);
   }
 
@@ -968,85 +1103,21 @@ mt_thr_main(void *thr_arg)
   globalEmulatorData.theWatchDog->registerWatchedThread(&watchDogCounter,
                                                         thr_no);
 
-  /**
-   * prio A
-   */
-  unsigned* a_re_idxptr = &selfptr->m_jba.m_read_index;
-  volatile unsigned * a_wr_idxptr = &selfptr->m_jba.m_write_index;
-
   while (globalData.theRestartFlag != perform_stop)
   { 
-    unsigned sum = 0;
-    unsigned cnt = rep->m_thread_count;
-
     watchDogCounter = 2;
     scan_time_queues(selfptr);
-    unsigned jba_read_index = * a_re_idxptr;
 
-    /**
-     * Process each inbuffer
-     */
-    for (unsigned i = 0; i<cnt; i++)
-    {
-      /*
-        We must load the prio B state _before_ executing prio A signals.
-        Otherwise a later prio B signal may race to be executed ahead of an
-        earlier prio A signal.
-      */
-      Uint32 *readptr = &(selfptr->m_in_queue[i].m_read_index);
-      unsigned ri = *readptr >> 16;
-      unsigned wi = selfptr->m_in_queue[i].m_write_index;
-
-      /*
-        Now, having loaded the state of the prio B job buffer, we must first
-        execute _all_ prio A signals, to make sure that any A signal is
-        executed here before any B signal sent later by same sender thread.
-
-        We also need to make sure to read the prio A state under the prio A
-        lock (baring any other memory barrier mechanisms), to be sure that
-        any prio A activity done by other threads before visible prio B
-        activity will be visible in this thread.
-      */
-      lock(&selfptr->m_jba_write_lock);
-      Uint32 jba_write_index = * a_wr_idxptr;
-      unlock(&selfptr->m_jba_write_lock);
-
-      if (jba_read_index != jba_write_index)
-      {
-	sum += dojoba(rep, thr_no, &signal, jba_write_index,
-                      &watchDogCounter, &thrSignalId);
-	jba_read_index = * a_re_idxptr;
-      }
-      
-      thr_job_buffer * jb = selfptr->m_in_queue[i].m_buffers[ri];
-      if (ri != wi)
-      {
-	sum += dojob(selfptr, jb, &signal,
-                     readptr, &watchDogCounter, &thrSignalId);
-	ri = (ri + 1) % thr_job_queue::SIZE;
-	*readptr = ri << 16;
-	release_buffer(rep, thr_no, jb);
-      }
-      jba_write_index = * a_wr_idxptr;
-    }
+    Uint32 sum = run_job_buffers(selfptr, &signal,
+                                 &watchDogCounter, &thrSignalId);
     
     watchDogCounter = 1;
     sendpacked(selfptr, &signal, thr_no);
     
     if (sum)
     {
-      /**
-       * Transfer all out buffers
-       */
       watchDogCounter = 6;
-      for (unsigned i = 0; i<cnt; i++)
-      {
-	thr_job_buffer * jb = selfptr->m_out_queue[i];
-	if (jb->m_len)
-	{
-	  transfer_buffer(rep, thr_no, i);
-	}
-      }
+      flush_jbb_write_state(selfptr);
     }
     
     if (trylock(&rep->m_send_lock) == 0)
@@ -1054,9 +1125,13 @@ mt_thr_main(void *thr_arg)
       do_send(rep, selfptr, &watchDogCounter);
     }
     
-    if (
-thr_no == 2 &&
-trylock(&rep->m_receive_lock) == 0)
+    /*
+      We only do receive in thread 2, which _only_ does receive.
+
+      Otherwise we have a problem waking up a thread that is sleeping in the
+      transporter
+    */
+    if (thr_no == 2 && trylock(&rep->m_receive_lock) == 0)
     {
       do_receive(rep, selfptr, sum ? 0 : 1, &watchDogCounter);
     }
@@ -1071,126 +1146,58 @@ trylock(&rep->m_receive_lock) == 0)
 }
 
 void
-thr_transporter_main(struct thr_repository* rep, unsigned thr_no)
-{
-}
-
-static
-inline
-int
-insert(thr_job_buffer* ptr, const struct SignalHeader* s)
-{
-  unsigned len = ptr->m_len;
-  unsigned* pos = ptr->m_data + len;
-  unsigned siglen = (sizeof(*s) >> 2) + s->theLength + s->m_noOfSections;
-  ptr->m_len = len + siglen;
-  memcpy(pos, s, 4*siglen);
-  return thr_job_buffer::SIZE - (len + siglen + 32); // > 0 not full, <=0 full
-}
-
-void
 sendlocal(Uint32 self, Uint32 block, const SignalHeader* s)
 {
+  static const Uint32 MAX_SIGNALS_BEFORE_FLUSH = 20;
   Uint32 dst = block2ThreadId(block);
   struct thr_repository* rep = &g_thr_repository;
   struct thr_data * selfptr = rep->m_thread + self;
 
-  int res = insert(selfptr->m_out_queue[dst], s);
-  if (res <= 0)
+  thr_job_queue *q = &(rep->m_thread[dst].m_in_queue[self]);
+  thr_jb_write_state *w = &(selfptr->m_write_states[dst]);
+  if (insert_signal(q, w, false, s, selfptr->m_next_buffer))
   {
-    transfer_buffer(rep, self, dst);
+    selfptr->m_next_buffer = seize_buffer(rep, self, false);
   }
-}
 
-/* This must be called only while holding the m_jba.m_write_lock mutex. */
-static
-inline
-Uint32
-insert_prioa(thr_data *dstptr, const SignalHeader *s)
-{
-  Uint32 siglen = (sizeof(*s) >> 2) + s->theLength + s->m_noOfSections;
-  Uint32 wi = dstptr->m_jba.m_write_index;
-  Uint32 buf = wi >> 16;
-  Uint32 newwi = wi + siglen;
-  Uint32 pos = wi & 0xFFFF;
-  Uint32 newpos = pos + siglen;
-
-  struct thr_job_buffer * buffer = * (dstptr->m_jba.m_buffers + buf);
-  assert(buffer->m_len == pos);
-  buffer->m_len = newpos;
-  memcpy(buffer->m_data + pos, s, 4*siglen);
-  dstptr->m_jba.m_write_index = newwi;
-
-  return newpos;
+  if (w->m_pending_signals >= MAX_SIGNALS_BEFORE_FLUSH)
+    flush_write_state(dst, q, w);
 }
+
 void
-sendprioa(Uint32 self, Uint32 block, const SignalHeader* s)
+sendprioa(Uint32 self, Uint32 block, const SignalHeader *s)
 {
-  static const Uint32 EXTRA_SPACE
-    = (sizeof(SignalHeader) >> 2) + StopForCrash::SignalLength;
   Uint32 dst = block2ThreadId(block);
   struct thr_repository* rep = &g_thr_repository;
   struct thr_data * selfptr = rep->m_thread + self;
-  struct thr_data * dstptr = rep->m_thread + dst;
+  struct thr_data *dstptr = &(rep->m_thread[dst]);
 
-  struct thr_job_buffer* newbuffer = selfptr->m_jba_next_buffer;
+  thr_job_queue *q = &(dstptr->m_jba);
+  thr_jb_write_state w;
 
   lock(&dstptr->m_jba_write_lock);
 
-  Uint32 newpos = insert_prioa(dstptr, s);
-
-  if (0)
-    ndbout_c("sendprioa %s from %s to %s",
-	     getSignalName(s->theVerId_signalNumber),
-	     getBlockName(refToBlock(s->theSendersBlockRef)),
-	     getBlockName(s->theReceiversBlockNumber));
-
-  /*
-    We reserve extra space for a GSN_STOP_FOR_CRASH signal, so that we can
-    send this signal during a crash in _any_ thread, not just threads which
-    have the infrastructure for selfptr->m_jba->m_next_buffer.
-  */
-  if (unlikely(newpos + 32 + EXTRA_SPACE >= thr_job_buffer::SIZE))
-  {
-    Uint32 newwi = ((dstptr->m_jba.m_write_index >> 16) + 1) % thr_job_queue::SIZE;
-
-    /*
-      There seems to be a race (or missing overflow check) here, nothing
-      seems to prevent senders from overwriting not yet executed
-      previous signal data.
-
-      But that is perhaps unlikely to occur in practise, given the size
-      of buffers, the low amount of prio A signals sent, and the
-      frequency of executing prio A.
+  Uint32 index = q->m_write_index;
+  w.m_write_index = index;
+  thr_job_buffer *buffer = q->m_buffers[index];
+  w.m_write_buffer = buffer;
+  w.m_write_pos = buffer->m_len;
+  w.m_pending_signals = 0;
+  bool buffer_used = insert_signal(q, &w, true, s, selfptr->m_next_buffer);
+  flush_write_state(dst, q, &w);
 
-      For now, I'll add an assert() ...
-    */
-    assert(newwi != (dstptr->m_jba.m_read_index >> 16));
-
-    dstptr->m_jba.m_buffers[newwi] = newbuffer;
-    newwi = (newwi << 16);
-    dstptr->m_jba.m_write_index = newwi;
-  }
-  
   unlock(&dstptr->m_jba_write_lock);
 
-  /* ToDo: Hm, shouldn't we possibly wake up the other thread here? */
-
-  /**
-   * Note, do malloc outside of critical region
-   */
-  if (newpos + 32 + EXTRA_SPACE >= thr_job_buffer::SIZE)
-  {
-    selfptr->m_jba_next_buffer = seize_buffer(rep, self, true);
-  }
+  if (buffer_used)
+    selfptr->m_next_buffer = seize_buffer(rep, self, true);
 }
 
 /*
   This functions sends a prio A STOP_FOR_CRASH signal to a thread.
 
   It works when called from any other thread, not just from job processing
-  threads. This works by having regular sendprioa() reserve space for one
-  GSN_STOP_FOR_CRASH signal.
+  threads. But note that this signal will be the last signal to be executed by
+  the other thread, as it will exit immediately.
 */
 static
 void
@@ -1198,7 +1205,11 @@ sendprioa_STOP_FOR_CRASH(Uint32 dst)
 {
   SignalT<StopForCrash::SignalLength> signalT;
   struct thr_repository* rep = &g_thr_repository;
-  struct thr_data * dstptr = rep->m_thread + dst;
+  /* As this signal will be the last one executed by the other thread, it does
+     not matter which buffer we use in case the current buffer is filled up by
+     the STOP_FOR_CRASH signal; the data in it will never be read.
+  */
+  static thr_job_buffer dummy_buffer;
 
   /*
     Currently we have two threads with fixed block assignment.
@@ -1212,6 +1223,7 @@ sendprioa_STOP_FOR_CRASH(Uint32 dst)
   else if (dst == 1)
     bno = DBLQH;
   assert(block2ThreadId(bno) == dst);
+  struct thr_data * dstptr = &(rep->m_thread[dst]);
 
   memset(&signalT.header, 0, sizeof(SignalHeader));
   signalT.header.theVerId_signalNumber   = GSN_STOP_FOR_CRASH;
@@ -1224,8 +1236,20 @@ sendprioa_STOP_FOR_CRASH(Uint32 dst)
   StopForCrash * const stopForCrash = (StopForCrash *)&signalT.theData[0];
   stopForCrash->flags = 0;
 
+  thr_job_queue *q = &(dstptr->m_jba);
+  thr_jb_write_state w;
+
   lock(&dstptr->m_jba_write_lock);
-  insert_prioa(dstptr, &(signalT.header));
+
+  Uint32 index = q->m_write_index;
+  w.m_write_index = index;
+  thr_job_buffer *buffer = q->m_buffers[index];
+  w.m_write_buffer = buffer;
+  w.m_write_pos = buffer->m_len;
+  w.m_pending_signals = 0;
+  insert_signal(q, &w, true, &(signalT.header), &dummy_buffer);
+  flush_write_state(dst, q, &w);
+
   unlock(&dstptr->m_jba_write_lock);
 }
 
@@ -1372,18 +1396,29 @@ init(struct thr_repository* rep, struct 
   selfptr->m_thr_no = thr_no;
   selfptr->m_first_free = 0;
   selfptr->m_first_unused = 0;
-  for (i = 0; i<cnt; i++)
-    selfptr->m_out_queue[i] = seize_buffer(rep, thr_no, false);
   
   selfptr->m_jba.m_read_index = 0;
   selfptr->m_jba.m_write_index = 0;
-  selfptr->m_jba.m_buffers[0] = seize_buffer(rep, thr_no, true);
-  selfptr->m_jba_next_buffer = seize_buffer(rep, thr_no, true);
+  thr_job_buffer *buffer = seize_buffer(rep, thr_no, true);
+  selfptr->m_jba.m_buffers[0] = buffer;
+  selfptr->m_jba_read_state.m_read_index = 0;
+  selfptr->m_jba_read_state.m_read_buffer = buffer;
+  selfptr->m_jba_read_state.m_read_pos = 0;
+  selfptr->m_jba_read_state.m_write_index = 0;
+  selfptr->m_jba_read_state.m_write_pos = 0;
+  selfptr->m_next_buffer = seize_buffer(rep, thr_no, false);
   
   for (i = 0; i<cnt; i++)
   {
     selfptr->m_in_queue[i].m_read_index = 0;
     selfptr->m_in_queue[i].m_write_index = 0;
+    buffer = seize_buffer(rep, thr_no, false);
+    selfptr->m_in_queue[i].m_buffers[0] = buffer;
+    selfptr->m_read_states[i].m_read_index = 0;
+    selfptr->m_read_states[i].m_read_buffer = buffer;
+    selfptr->m_read_states[i].m_read_pos = 0;
+    selfptr->m_read_states[i].m_write_index = 0;
+    selfptr->m_read_states[i].m_write_pos = 0;
   }
 
   init(&selfptr->m_tq);
@@ -1391,6 +1426,22 @@ init(struct thr_repository* rep, struct 
   selfptr->m_jam = NULL;
 }
 
+/* Have to do this after init of all m_in_queues is done. */
+static
+void
+init2(struct thr_repository* rep, struct thr_data *selfptr, unsigned int cnt,
+      unsigned thr_no)
+{
+  for (Uint32 i = 0; i<cnt; i++)
+  {
+    selfptr->m_write_states[i].m_write_index = 0;
+    selfptr->m_write_states[i].m_write_pos = 0;
+    selfptr->m_write_states[i].m_write_buffer =
+      rep->m_thread[i].m_in_queue[thr_no].m_buffers[0];
+    selfptr->m_write_states[i].m_pending_signals = 0;
+  }    
+}
+
 static
 void
 init(struct thr_repository* rep, unsigned int cnt)
@@ -1400,6 +1451,10 @@ init(struct thr_repository* rep, unsigne
   {
     init(rep, rep->m_thread + i, cnt, i);
   }
+  for (unsigned int i = 0; i<cnt; i++)
+  {
+    init2(rep, rep->m_thread + i, cnt, i);
+  }
 
   rep->stopped_threads = 0;
   pthread_mutex_init(&rep->stop_for_crash_mutex, NULL);
@@ -1678,16 +1733,13 @@ FastScheduler::dumpSignalMemory(Uint32 t
   */
 
   /* Scan all available buffers with already executed signals. */
-  Uint32 jbb_current_thread = 0;
+  Uint32 jbb_current_thr = 0;
   Uint32 jbb_start = thr_ptr->m_first_free;
   Uint32 jb_end = thr_ptr->m_first_unused;
   const thr_job_buffer *jbb = NULL;
   Uint32 jbb_pos, jbb_len;
   bool no_more_priob = false;
   const thr_job_buffer *jba = NULL;
-  Uint32 jba_read_index = thr_ptr->m_jba.m_read_index;
-  Uint32 jba_current = jba_read_index >> 16;
-  Uint32 jba_execute_pos = jba_read_index & 0xffff;
   Uint32 jba_start = jbb_start;
   Uint32 jba_pos, jba_len;
   bool no_more_prioa = false;
@@ -1718,8 +1770,10 @@ FastScheduler::dumpSignalMemory(Uint32 t
           No more released prio A buffers, but there might be some stuff
           in the current buffer.
         */
+        Uint32 jba_execute_pos = thr_ptr->m_jba_read_state.m_read_pos;
         if (jba_execute_pos > 0)
         {
+          Uint32 jba_current = thr_ptr->m_jba_read_state.m_read_index;
           jba = thr_ptr->m_jba.m_buffers[jba_current];
           jba_pos = 0;
           jba_len = jba_execute_pos;
@@ -1751,15 +1805,16 @@ FastScheduler::dumpSignalMemory(Uint32 t
           No more released prio B buffers, but there might be some stuff
           in the buffer(s) currently being processed.
         */
-        while (jbb_current_thread < g_thr_repository.m_thread_count)
+        while (jbb_current_thr < g_thr_repository.m_thread_count)
         {
-          const thr_job_queue *q = &(thr_ptr->m_in_queue[jbb_current_thread]);
-          jbb_current_thread++;
-          Uint32 read_index = q->m_read_index;
-          Uint32 read_pos = read_index & 0xffff;
+          const thr_job_queue *q = &(thr_ptr->m_in_queue[jbb_current_thr]);
+          const thr_jb_read_state *r = &(thr_ptr->m_read_states[jbb_current_thr]);
+          jbb_current_thr++;
+          Uint32 read_index = r->m_read_index;
+          Uint32 read_pos = r->m_read_pos;
           if (read_pos > 0)
           {
-            jbb = q->m_buffers[read_index >> 16];
+            jbb = q->m_buffers[read_index];
             jbb_pos = 0;
             jbb_len = read_pos;
             break;
Thread
bk commit into 5.1 tree (knielsen:1.2615)knielsen11 Oct
  • Re: bk commit into 5.1 tree (knielsen:1.2615)Jonas Oreland11 Oct