List:Commits« Previous MessageNext Message »
From:Jonas Oreland Date:October 11 2007 10:14pm
Subject:Re: bk commit into 5.1 tree (knielsen:1.2615)
View as plain text  
great!

easy to read...
does sit fix random crashes? (i only managed to get them using release compiled...)


two minor things:
1) the assert on thr_job_queue full, only assert will cause silent(or maybe not) failure
in release compiled.
2) you need to pad the struct thr_jb_write_state to avoid cache-pollution, right?

and then some style things:
2) i personally strongly dislike these
/*
   comments wo/ the '*' here, which I've only seen at mysql
*/

/**
 * and I think mysql is actually coming around to use this way instead...
 * which atleast I've seen in many places...
 */

what do you think ?

3) same goes for &(array[7]) instead of (array + 7), what do you think?
   (i also *think* that (array + 7) is mysql coding standard but not 100% sure)

4) i'd also prefer not to to use the g_thr_repository so much, but rather pass it along as
argument
   do you think there will be a performance penalty for this, are did you just want to
avoid lots
   of arguments.

/jonas


knielsen@stripped wrote:
> Below is the list of changes that have just been committed into a local
> 5.1 repository of knielsen. When knielsen does a push these changes will
> be propagated to the main repository and, within 24 hours after the
> push, to the public repository.
> For information on how to access the public repository
> see http://dev.mysql.com/doc/mysql/en/installing-source-tree.html
> 
> ChangeSet@stripped, 2007-10-11 17:24:44+02:00, knielsen@ymer.(none) +1 -0
>   Clean up job buffer handling, fix instability.
> 
>   storage/ndb/src/kernel/vm/mt/mt.cpp@stripped, 2007-10-11 17:24:40+02:00,
> knielsen@ymer.(none) +395 -340
>     Clean up job buffer handling, fix instability.
> 
> diff -Nrup a/storage/ndb/src/kernel/vm/mt/mt.cpp
> b/storage/ndb/src/kernel/vm/mt/mt.cpp
> --- a/storage/ndb/src/kernel/vm/mt/mt.cpp	2007-10-10 09:34:49 +02:00
> +++ b/storage/ndb/src/kernel/vm/mt/mt.cpp	2007-10-11 17:24:40 +02:00
> @@ -34,7 +34,7 @@
>  #include <sys/types.h>
>  
>  static const Uint32 NUM_THREADS = 3;
> -#define MAX_THREADS 63
> +#define MAX_THREADS 4
>  
>  static inline
>  int
> @@ -44,6 +44,15 @@ xcng(volatile unsigned * addr, int val)
>    return val;
>  }
>  
> +/* Memory barriers, these definitions are for x64_64. */
> +#define mb() 	asm volatile("mfence":::"memory")
> +/* According to Intel docs, it does not reorder loads. */
> +//#define rmb()	asm volatile("lfence":::"memory")
> +#define rmb()	asm volatile("" ::: "memory")
> +#define wmb()	asm volatile("" ::: "memory")
> +#define read_barrier_depends()	do {} while(0)
> +
> +
>  #ifdef USE_FUTEX
>  
>  static inline
> @@ -185,6 +194,7 @@ void
>  unlock(struct thr_spin_lock* sl)
>  {
>    sl->m_lock = 0;
> +  mb();
>  }
>  
>  static
> @@ -276,16 +286,12 @@ struct thr_jb_write_state
>    */
>    Uint32 m_write_index;
>    Uint32 m_write_pos;
> -  /*
> -    These are the old values of m_write_index and m_write_pos that were last
> -    flushed to the consumer thread, in thr_job_queue::m_write_index and
> -    thr_job_buffer::m_len.
>  
> -    These are used to judge when it is time to flush a new batch of signals to
> -    the consumer thread.
> -  */
> -  Uint32 m_old_write_index;
> -  Uint32 m_old_write_pos;
> +  /* Thread-local copy of thr_job_queue::m_buffers[m_write_index]. */
> +  thr_job_buffer *m_write_buffer;
> +
> +  /* Number of signals inserted since last flush to thr_job_queue. */
> +  Uint32 m_pending_signals;
>  };
>  
>  /*
> @@ -300,15 +306,15 @@ struct thr_jb_read_state
>    */
>    Uint32 m_read_index;
>    /*
> -    Thread local copy of thr_job_queue::m_buffers[m_read_index].
> -   */
> -  Uint32 m_read_buffer;
> -  /*
>      Index into m_read_buffer->m_data[] of the next signal to execute from the
>      current buffer.
>    */
>    Uint32 m_read_pos;
>    /*
> +    Thread local copy of thr_job_queue::m_buffers[m_read_index].
> +   */
> +  thr_job_buffer *m_read_buffer;
> +  /*
>      These are thread-local copies of thr_job_queue::m_write_index and
>      thr_job_buffer::m_len. They are read once at the start of the signal
>      execution loop and used to determine when the end of available signals is
> @@ -361,11 +367,17 @@ struct thr_data
>    struct thr_job_queue m_jba;
>    struct thr_spin_lock m_jba_write_lock;
>    /*
> -    In m_jba_next_buffer we keep a free buffer at all times, so that when
> +    In m_next_buffer we keep a free buffer at all times, so that when
>      we hold the lock and find we need a new buffer, we can use this and this
>      way defer allocation to after releasing the lock.
>    */
> -  struct thr_job_buffer* m_jba_next_buffer;
> +  struct thr_job_buffer* m_next_buffer;
> +  /* Thread-local read state of prio A buffer. */
> +  struct thr_jb_read_state m_jba_read_state;
> +  /*
> +    There is no m_jba_write_state, as we have multiple writers to the prio A
> +    queue, so local state becomes invalid as soon as we release the lock.
> +  */
>  
>    /*
>      We keep a small number of buffers in a thread-local cyclic FIFO, so that
> @@ -378,8 +390,15 @@ struct thr_data
>    /* m_first_unused is the first unused entry in m_free_fifo. */
>    Uint32 m_first_unused;
>  
> -  struct thr_job_buffer* m_out_queue[MAX_THREADS];
> +  /*
> +    These are the thread input queues, where other threads deliver signals
> +    into.
> +  */
>    struct thr_job_queue m_in_queue[MAX_THREADS];
> +  /* These are the write states of m_in_queue[self] in each thread. */
> +  struct thr_jb_write_state m_write_states[MAX_THREADS];
> +  /* These are the read states of all of our own m_in_queue[]. */
> +  struct thr_jb_read_state m_read_states[MAX_THREADS];
>  
>    /* Jam buffers for making trace files at crashes. */
>    EmulatedJamBuffer *m_jam;
> @@ -564,137 +583,6 @@ release_buffer(struct thr_repository* re
>  }
>  
>  static
> -void
> -transfer_buffer(struct thr_repository* rep, unsigned from, unsigned to)
> -{
> -  require(from < NUM_THREADS && to < NUM_THREADS);
> -  unsigned old;
> -  unsigned * writeptr = &(rep->m_thread[to].m_in_queue[from].m_write_index);
> -  volatile unsigned * readptr =
> &(rep->m_thread[to].m_in_queue[from].m_read_index);
> -  unsigned readidx = (*readptr >> 16);
> -  unsigned writeidx = * writeptr;
> -  unsigned nextidx = (writeidx + 1) % thr_job_queue::SIZE;
> -  thr_job_buffer* src = rep->m_thread[from].m_out_queue[to];
> -
> -  if (unlikely(readidx == nextidx))
> -    goto check_full;
> -
> -  if (0)
> -    ndbout_c("transfer from: %d to: %d (read: %d write: %d)",
> -	     from, to, readidx, writeidx);
> -  
> -do_transfer:  
> -  rep->m_thread[to].m_in_queue[from].m_buffers[writeidx] = src;
> -  // need storestore barrier
> -  // but xcng is full barrier
> -  old = xcng(writeptr, nextidx);
> -  assert(old == writeidx);
> -  wakeup(&rep->m_thread[to].m_waiter); // potentially wakeup
> -  
> -  rep->m_thread[from].m_out_queue[to] = seize_buffer(rep, from, false);
> -  return ;
> -check_full:
> -  ndbout_c("sleep in transfer from %u to %u", from, to);
> -  for (unsigned i = 0; i<1000; i++)
> -  {
> -    usleep(1000);
> -    if ((*readptr >> 16) != nextidx)
> -      goto do_transfer;
> -  }
> -  
> -  abort();
> -}
> -
> -static
> -unsigned
> -dojob(struct thr_data* selfptr, struct thr_job_buffer* ptr, struct Signal* sig,
> -      Uint32 *current_pos, Uint32 *watchDogCounter, Uint32 *signalIdCounter)
> -{
> -  unsigned cnt = 0;
> -  unsigned *posptr = ptr->m_data + (*current_pos & 0xffff);
> -  unsigned *endptr = ptr->m_data + ptr->m_len;
> -  SimulatedBlock** blockptr = selfptr->m_blocks - MIN_BLOCK_NO;
> -
> -  while (posptr < endptr)
> -  {
> -    SignalHeader* s = reinterpret_cast<SignalHeader*>(posptr);
> -    unsigned seccnt = s->m_noOfSections;
> -    unsigned siglen = (sizeof(*s)>>2) + s->theLength;
> -    unsigned bno = s->theReceiversBlockNumber;
> -    unsigned gsn = s->theVerId_signalNumber;
> -    SimulatedBlock * block = blockptr[bno];
> -    *watchDogCounter = 1;
> -    s->theSignalId = (*signalIdCounter)++;
> -    memcpy(&sig->header, s, 4*siglen);
> -    sig->m_sectionPtrI[0] = posptr[siglen + 0];
> -    sig->m_sectionPtrI[1] = posptr[siglen + 1];
> -    sig->m_sectionPtrI[2] = posptr[siglen + 2];
> -
> -    /*
> -      Update just before execute so signal dump can know how far we are.
> -    */
> -    assert((*current_pos & 0xffff) + siglen + seccnt < 0x10000);
> -    *current_pos += siglen + seccnt;
> -
> -    block->executeFunction(gsn, sig);
> -    
> -    cnt++;
> -    posptr += siglen + seccnt;
> -  }
> -  
> -  return cnt;
> -}
> -
> -static
> -unsigned
> -dojoba(struct thr_repository * rep, unsigned thr_no,
> -       Signal* signal, unsigned write_index,
> -       Uint32 *watchDogCounter, Uint32 *signalIdCounter)
> -{
> -  unsigned sum = 0;
> -  struct thr_job_buffer* buffer;
> -  struct thr_data* selfptr = rep->m_thread + thr_no;
> -  Uint32 *read_idx_ptr = &(selfptr->m_jba.m_read_index);
> -  unsigned read_index = *read_idx_ptr;
> -  unsigned wi_buf = write_index >> 16;
> -
> -  unsigned ri_buf = read_index >> 16;
> -  while (ri_buf != wi_buf)
> -  {
> -    buffer = selfptr->m_jba.m_buffers[ri_buf];
> -    /* Guard against overflow in lower 16 bit of *read_idx_ptr. */
> -    assert(thr_job_buffer::SIZE < 0x10000);
> -    sum += dojob(selfptr, buffer, signal,
> -                 read_idx_ptr, watchDogCounter, signalIdCounter);
> -    release_buffer(rep, thr_no, buffer);
> -    ri_buf = (ri_buf + 1) % thr_job_queue::SIZE;
> -    /*
> -      Update executed-so-far position, so signal traces will work.
> -
> -      ToDo: memory barrier here, all prior loads to complete before
> -      following store.
> -    */
> -    *read_idx_ptr = (ri_buf << 16);
> -  }
> -
> -  buffer = selfptr->m_jba.m_buffers[ri_buf];
> -  /*
> -    We need to take the write lock here to make sure we do not execute any
> -    partially written signals due to memory reordering.
> -
> -    Actually, it would be enough to just pass a value of
> -    buffer->m_len read under lock before calling dojoba(). And more
> -    actually, a memory barrier in that place would also be sufficient.
> -  */
> -  // ToDo: This deadlocks currently, as the spinlocks are not recursive
> -//  lock(&selfptr->m_jba.m_write_lock);
> -  sum += dojob(selfptr, buffer, signal, read_idx_ptr, watchDogCounter,
> signalIdCounter);
> -//  unlock(&selfptr->m_jba.m_write_lock);
> -
> -  return sum;
> -}
> -
> -static
>  inline
>  Uint32
>  scan_queue(struct thr_data* selfptr, Uint32 cnt, Uint32 end, Uint32* ptr)
> @@ -717,6 +605,12 @@ scan_queue(struct thr_data* selfptr, Uin
>        SignalHeader* s = reinterpret_cast<SignalHeader*>(page + pos);
>        if (0)
>  	ndbout_c("found %p val: %d end: %d", s, val & 0xFFFF, end);
> +      /*
> +        ToDo: Do measurements of the frequency of these prio A timed signals.
> +
> +        If they are frequent, we may want to optimize, as sending one prio A
> +        signal is somewhat expensive compared to sending one prio B.
> +      */
>        sendprioa(thr_no, s->theReceiversBlockNumber, s);
>        * (page + pos) = free;
>        free = idx;
> @@ -808,6 +702,51 @@ scan_time_queues(struct thr_data* selfpt
>    abort();
>  }
>  
> +/*
> +  Flush the write state to the job queue, making any new signals available to
> +  receiving threads.
> +*/
> +static inline
> +void
> +flush_write_state(Uint32 dst, thr_job_queue *q, thr_jb_write_state *w)
> +{
> +  /*
> +    Two write memory barriers here, as assigning m_len may make signal data
> +    available to other threads, and assigning m_write_index may make new
> +    buffers available.
> +
> +    We could optimize this by only doing it as needed, and only doing it
> +    once before setting all m_len, and once before setting all m_write_index.
> +
> +    But wmb() is a no-op anyway in x86 ...
> +  */
> +  wmb();
> +  w->m_write_buffer->m_len = w->m_write_pos;
> +  wmb();
> +  q->m_write_index = w->m_write_index;
> +  w->m_pending_signals = 0;
> +
> +  wakeup(&(g_thr_repository.m_thread[dst].m_waiter));
> +}
> +
> +static
> +void
> +flush_jbb_write_state(thr_data *selfptr)
> +{
> +  Uint32 thr_count = g_thr_repository.m_thread_count;
> +  Uint32 self = selfptr->m_thr_no;
> +
> +  for (Uint32 thr_no = 0; thr_no < thr_count; thr_no ++)
> +  {
> +    thr_jb_write_state *w = &(selfptr->m_write_states[thr_no]);
> +    if (w->m_pending_signals > 0)
> +    {
> +      thr_job_queue *q = &(g_thr_repository.m_thread[thr_no].m_in_queue[self]);
> +      flush_write_state(thr_no, q, w);
> +    }
> +  }
> +}
> +
>  Uint32 senderThreadId;
>  
>  static
> @@ -848,19 +787,7 @@ do_receive(struct thr_repository*rep, st
>    }
>    unlock(&rep->m_receive_lock);
>    
> -  unsigned cnt = rep->m_thread_count;
> -  
> -  /**
> -   * Transfer all out buffers
> -   */
> -  for (unsigned i = 0; i<cnt; i++)
> -  {
> -    thr_job_buffer * jb = selfptr->m_out_queue[i];
> -    if (jb->m_len)
> -    {
> -      transfer_buffer(rep, thr_no, i);
> -    }
> -  }
> +  flush_jbb_write_state(selfptr);
>  }
>  
>  static
> @@ -880,6 +807,214 @@ sendpacked(struct thr_data* selfptr, Sig
>      b_tup->executeFunction(GSN_SEND_PACKED, signal);
>  }
>  
> +/*
> +  Insert a signal in a job queue.
> +
> +  The signal is not visible to consumers yet after return from this function,
> +  only recorded in the thr_jb_write_state. It is necessary to first call
> +  flush_write_state() for this.
> +
> +  The new_buffer is a job buffer to use if the current one gets full. If used,
> +  we return true, indicating that the caller should allocate a new one for
> +  the next call. (This is done to allow to insert under lock, but do the
> +  allocation outside the lock).
> +*/
> +static inline
> +bool
> +insert_signal(thr_job_queue *q, thr_jb_write_state *w, Uint32 prioa,
> +              const struct SignalHeader* s, thr_job_buffer *new_buffer)
> +{
> +  Uint32 write_pos = w->m_write_pos;
> +  unsigned siglen = (sizeof(*s) >> 2) + s->theLength +
> s->m_noOfSections;
> +  assert(w->m_write_buffer == q->m_buffers[w->m_write_index]);
> +  memcpy(&(w->m_write_buffer->m_data[write_pos]), s, 4*siglen);
> +  write_pos += siglen;
> +  w->m_pending_signals++;
> +
> +  /*
> +    We make sure that there is always room for at least one signal in the
> +    current buffer in the queue, so one insert is always possible without
> +    adding a new buffer.
> +  */
> +  if (likely(write_pos + 32 <= thr_job_buffer::SIZE))
> +  {
> +    w->m_write_pos = write_pos;
> +    return false;
> +  }
> +  else
> +  {
> +    /*
> +      Need a write memory barrier here, as this might make signal data visible
> +      to other threads.
> +
> +      ToDo: We actually only need the wmb() here if we already make this
> +      buffer visible to the other thread. So we might optimize it a bit. But
> +      wmb() is a no-op on x86 anyway...
> +    */
> +    wmb();
> +    w->m_write_buffer->m_len = write_pos;
> +    Uint32 write_index = (w->m_write_index + 1) % thr_job_queue::SIZE;
> +    /*
> +      Full job buffer is fatal.
> +
> +      ToDo: should we wait for it to become non-full? There is no guarantee
> +      that this will actually happen...
> +
> +      Or alternatively, ndbrequire() ?
> +    */
> +    assert(write_index != q->m_read_index);
> +    new_buffer->m_len = 0;
> +    new_buffer->m_prioa = prioa;
> +    q->m_buffers[write_index] = new_buffer;
> +    w->m_write_index = write_index;
> +    w->m_write_pos = 0;
> +    w->m_write_buffer = new_buffer;
> +    return true;                // Buffer new_buffer used
> +  }
> +
> +  return false;                 // Buffer new_buffer not used
> +}
> +
> +static
> +void
> +read_jbb_state(thr_data *selfptr, Uint32 count)
> +{
> +  for (Uint32 i = 0; i < count; i++)
> +  {
> +    thr_jb_read_state *r = &(selfptr->m_read_states[i]);
> +    const thr_job_queue *q = &(selfptr->m_in_queue[i]);
> +    Uint32 index = q->m_write_index;
> +    r->m_write_index = index;
> +    read_barrier_depends();
> +    r->m_write_pos = q->m_buffers[index]->m_len;
> +  }
> +}
> +
> +static
> +void
> +read_jba_state(thr_data *selfptr)
> +{
> +  const thr_job_queue *jba = &(selfptr->m_jba);
> +  Uint32 index = jba->m_write_index;
> +  selfptr->m_jba_read_state.m_write_index = index;
> +  read_barrier_depends();
> +  selfptr->m_jba_read_state.m_write_pos = jba->m_buffers[index]->m_len;
> +}
> +
> +/*
> +  Execute at most MAX_SIGNALS signals from one job queue, updating local read
> +  state as appropriate.
> +
> +  Returns number of signals actually executed.
> +*/
> +static
> +Uint32
> +execute_signals(thr_data *selfptr, thr_job_queue *q, thr_jb_read_state *r,
> +                Signal *sig, Uint32 max_signals,
> +                Uint32 *watchDogCounter, Uint32 *signalIdCounter)
> +{
> +  Uint32 num_signals = 0;
> +
> +  Uint32 read_index = r->m_read_index;
> +  Uint32 write_index = r->m_write_index;
> +  Uint32 read_pos = r->m_read_pos;
> +  Uint32 write_pos = (read_index == write_index ?
> +                      r->m_write_pos :
> +                      q->m_buffers[read_index]->m_len);
> +  thr_job_buffer *read_buffer = r->m_read_buffer;
> +  SimulatedBlock** blockptr = selfptr->m_blocks - MIN_BLOCK_NO;
> +
> +  while (num_signals < max_signals)
> +  {
> +    while (read_pos >= write_pos)
> +    {
> +      if (read_index == write_index)
> +      {
> +        /* No more available now. */
> +        return num_signals;
> +      }
> +      else
> +      {
> +        /* Move to next buffer. */
> +        read_index = (read_index + 1) % thr_job_queue::SIZE;
> +        release_buffer(&g_thr_repository, selfptr->m_thr_no, read_buffer);
> +        read_buffer = q->m_buffers[read_index];
> +        read_pos = 0;
> +        write_pos = (read_index == write_index ?
> +                     r->m_write_pos :
> +                     q->m_buffers[read_index]->m_len);
> +        /* Update thread-local read state. */
> +        r->m_read_index = q->m_read_index = read_index;
> +        r->m_read_buffer = read_buffer;
> +        r->m_read_pos = read_pos;
> +      }
> +    }
> +
> +    /* Now execute the signal. */
> +    SignalHeader* s =
> +     
> reinterpret_cast<SignalHeader*>(&(read_buffer->m_data[read_pos]));
> +    Uint32 seccnt = s->m_noOfSections;
> +    Uint32 siglen = (sizeof(*s)>>2) + s->theLength;
> +    Uint32 bno = s->theReceiversBlockNumber;
> +    Uint32 gsn = s->theVerId_signalNumber;
> +    SimulatedBlock * block = blockptr[bno];
> +    *watchDogCounter = 1;
> +    /* Must update original buffer so signal dump will see it. */
> +    s->theSignalId = (*signalIdCounter)++;
> +    memcpy(&sig->header, s, 4*siglen);
> +    sig->m_sectionPtrI[0] = read_buffer->m_data[read_pos + siglen + 0];
> +    sig->m_sectionPtrI[1] = read_buffer->m_data[read_pos + siglen + 1];
> +    sig->m_sectionPtrI[2] = read_buffer->m_data[read_pos + siglen + 2];
> +
> +    read_pos += siglen + seccnt;
> +    /* Update just before execute so signal dump can know how far we are. */
> +    r->m_read_pos = read_pos;
> +
> +    block->executeFunction(gsn, sig);
> +
> +    num_signals++;
> +  }
> +
> +  return num_signals;
> +}
> +
> +static
> +Uint32
> +run_job_buffers(thr_data *selfptr, Signal *sig,
> +                Uint32 *watchDogCounter, Uint32 *signalIdCounter)
> +{
> +  static const Uint32 MAX_SIGNALS_PER_JB = 100;
> +  Uint32 thr_count = g_thr_repository.m_thread_count;
> +  Uint32 signal_count = 0;
> +
> +  read_jbb_state(selfptr, thr_count);
> +  /*
> +    A load memory barrier to ensure that we see any prio A signal sent later
> +    than loaded prio B signals.
> +  */
> +  rmb();
> +
> +  for (Uint32 send_thr_no = 0; send_thr_no < thr_count; send_thr_no++)
> +  {
> +    /* Read the prio A state often, to avoid starvation of prio A. */
> +    read_jba_state(selfptr);
> +    static Uint32 max_prioA = thr_job_queue::SIZE * thr_job_buffer::SIZE;
> +    signal_count += execute_signals(selfptr, &(selfptr->m_jba),
> +                                    &(selfptr->m_jba_read_state), sig,
> +                                    max_prioA, watchDogCounter,
> +                                    signalIdCounter);
> +
> +    /* Now execute prio B signals from one thread. */
> +    thr_job_queue *queue = &(selfptr->m_in_queue[send_thr_no]);
> +    thr_jb_read_state *read_state = &(selfptr->m_read_states[send_thr_no]);
> +    signal_count += execute_signals(selfptr, queue, read_state,
> +                                    sig, MAX_SIGNALS_PER_JB,
> +                                    watchDogCounter, signalIdCounter);
> +  }
> +
> +  return signal_count;
> +}
> +
>  static inline Uint32
>  block2ThreadId(Uint32 block)
>  {
> @@ -945,7 +1080,7 @@ mt_thr_main(void *thr_arg)
>    {
>      cpu_set_t mask;
>      CPU_ZERO(&mask);
> -    CPU_SET((thr_no & 1), &mask);
> +    CPU_SET(((thr_no & 1)*2 + 1), &mask);
>      sched_setaffinity(tid, sizeof(mask), &mask);
>    }
>  
> @@ -968,85 +1103,21 @@ mt_thr_main(void *thr_arg)
>    globalEmulatorData.theWatchDog->registerWatchedThread(&watchDogCounter,
>                                                          thr_no);
>  
> -  /**
> -   * prio A
> -   */
> -  unsigned* a_re_idxptr = &selfptr->m_jba.m_read_index;
> -  volatile unsigned * a_wr_idxptr = &selfptr->m_jba.m_write_index;
> -
>    while (globalData.theRestartFlag != perform_stop)
>    { 
> -    unsigned sum = 0;
> -    unsigned cnt = rep->m_thread_count;
> -
>      watchDogCounter = 2;
>      scan_time_queues(selfptr);
> -    unsigned jba_read_index = * a_re_idxptr;
>  
> -    /**
> -     * Process each inbuffer
> -     */
> -    for (unsigned i = 0; i<cnt; i++)
> -    {
> -      /*
> -        We must load the prio B state _before_ executing prio A signals.
> -        Otherwise a later prio B signal may race to be executed ahead of an
> -        earlier prio A signal.
> -      */
> -      Uint32 *readptr = &(selfptr->m_in_queue[i].m_read_index);
> -      unsigned ri = *readptr >> 16;
> -      unsigned wi = selfptr->m_in_queue[i].m_write_index;
> -
> -      /*
> -        Now, having loaded the state of the prio B job buffer, we must first
> -        execute _all_ prio A signals, to make sure that any A signal is
> -        executed here before any B signal sent later by same sender thread.
> -
> -        We also need to make sure to read the prio A state under the prio A
> -        lock (baring any other memory barrier mechanisms), to be sure that
> -        any prio A activity done by other threads before visible prio B
> -        activity will be visible in this thread.
> -      */
> -      lock(&selfptr->m_jba_write_lock);
> -      Uint32 jba_write_index = * a_wr_idxptr;
> -      unlock(&selfptr->m_jba_write_lock);
> -
> -      if (jba_read_index != jba_write_index)
> -      {
> -	sum += dojoba(rep, thr_no, &signal, jba_write_index,
> -                      &watchDogCounter, &thrSignalId);
> -	jba_read_index = * a_re_idxptr;
> -      }
> -      
> -      thr_job_buffer * jb = selfptr->m_in_queue[i].m_buffers[ri];
> -      if (ri != wi)
> -      {
> -	sum += dojob(selfptr, jb, &signal,
> -                     readptr, &watchDogCounter, &thrSignalId);
> -	ri = (ri + 1) % thr_job_queue::SIZE;
> -	*readptr = ri << 16;
> -	release_buffer(rep, thr_no, jb);
> -      }
> -      jba_write_index = * a_wr_idxptr;
> -    }
> +    Uint32 sum = run_job_buffers(selfptr, &signal,
> +                                 &watchDogCounter, &thrSignalId);
>      
>      watchDogCounter = 1;
>      sendpacked(selfptr, &signal, thr_no);
>      
>      if (sum)
>      {
> -      /**
> -       * Transfer all out buffers
> -       */
>        watchDogCounter = 6;
> -      for (unsigned i = 0; i<cnt; i++)
> -      {
> -	thr_job_buffer * jb = selfptr->m_out_queue[i];
> -	if (jb->m_len)
> -	{
> -	  transfer_buffer(rep, thr_no, i);
> -	}
> -      }
> +      flush_jbb_write_state(selfptr);
>      }
>      
>      if (trylock(&rep->m_send_lock) == 0)
> @@ -1054,9 +1125,13 @@ mt_thr_main(void *thr_arg)
>        do_send(rep, selfptr, &watchDogCounter);
>      }
>      
> -    if (
> -thr_no == 2 &&
> -trylock(&rep->m_receive_lock) == 0)
> +    /*
> +      We only do receive in thread 2, which _only_ does receive.
> +
> +      Otherwise we have a problem waking up a thread that is sleeping in the
> +      transporter
> +    */
> +    if (thr_no == 2 && trylock(&rep->m_receive_lock) == 0)
>      {
>        do_receive(rep, selfptr, sum ? 0 : 1, &watchDogCounter);
>      }
> @@ -1071,126 +1146,58 @@ trylock(&rep->m_receive_lock) == 0)
>  }
>  
>  void
> -thr_transporter_main(struct thr_repository* rep, unsigned thr_no)
> -{
> -}
> -
> -static
> -inline
> -int
> -insert(thr_job_buffer* ptr, const struct SignalHeader* s)
> -{
> -  unsigned len = ptr->m_len;
> -  unsigned* pos = ptr->m_data + len;
> -  unsigned siglen = (sizeof(*s) >> 2) + s->theLength +
> s->m_noOfSections;
> -  ptr->m_len = len + siglen;
> -  memcpy(pos, s, 4*siglen);
> -  return thr_job_buffer::SIZE - (len + siglen + 32); // > 0 not full, <=0
> full
> -}
> -
> -void
>  sendlocal(Uint32 self, Uint32 block, const SignalHeader* s)
>  {
> +  static const Uint32 MAX_SIGNALS_BEFORE_FLUSH = 20;
>    Uint32 dst = block2ThreadId(block);
>    struct thr_repository* rep = &g_thr_repository;
>    struct thr_data * selfptr = rep->m_thread + self;
>  
> -  int res = insert(selfptr->m_out_queue[dst], s);
> -  if (res <= 0)
> +  thr_job_queue *q = &(rep->m_thread[dst].m_in_queue[self]);
> +  thr_jb_write_state *w = &(selfptr->m_write_states[dst]);
> +  if (insert_signal(q, w, false, s, selfptr->m_next_buffer))
>    {
> -    transfer_buffer(rep, self, dst);
> +    selfptr->m_next_buffer = seize_buffer(rep, self, false);
>    }
> -}
>  
> -/* This must be called only while holding the m_jba.m_write_lock mutex. */
> -static
> -inline
> -Uint32
> -insert_prioa(thr_data *dstptr, const SignalHeader *s)
> -{
> -  Uint32 siglen = (sizeof(*s) >> 2) + s->theLength + s->m_noOfSections;
> -  Uint32 wi = dstptr->m_jba.m_write_index;
> -  Uint32 buf = wi >> 16;
> -  Uint32 newwi = wi + siglen;
> -  Uint32 pos = wi & 0xFFFF;
> -  Uint32 newpos = pos + siglen;
> -
> -  struct thr_job_buffer * buffer = * (dstptr->m_jba.m_buffers + buf);
> -  assert(buffer->m_len == pos);
> -  buffer->m_len = newpos;
> -  memcpy(buffer->m_data + pos, s, 4*siglen);
> -  dstptr->m_jba.m_write_index = newwi;
> -
> -  return newpos;
> +  if (w->m_pending_signals >= MAX_SIGNALS_BEFORE_FLUSH)
> +    flush_write_state(dst, q, w);
>  }
> +
>  void
> -sendprioa(Uint32 self, Uint32 block, const SignalHeader* s)
> +sendprioa(Uint32 self, Uint32 block, const SignalHeader *s)
>  {
> -  static const Uint32 EXTRA_SPACE
> -    = (sizeof(SignalHeader) >> 2) + StopForCrash::SignalLength;
>    Uint32 dst = block2ThreadId(block);
>    struct thr_repository* rep = &g_thr_repository;
>    struct thr_data * selfptr = rep->m_thread + self;
> -  struct thr_data * dstptr = rep->m_thread + dst;
> +  struct thr_data *dstptr = &(rep->m_thread[dst]);
>  
> -  struct thr_job_buffer* newbuffer = selfptr->m_jba_next_buffer;
> +  thr_job_queue *q = &(dstptr->m_jba);
> +  thr_jb_write_state w;
>  
>    lock(&dstptr->m_jba_write_lock);
>  
> -  Uint32 newpos = insert_prioa(dstptr, s);
> -
> -  if (0)
> -    ndbout_c("sendprioa %s from %s to %s",
> -	     getSignalName(s->theVerId_signalNumber),
> -	     getBlockName(refToBlock(s->theSendersBlockRef)),
> -	     getBlockName(s->theReceiversBlockNumber));
> -
> -  /*
> -    We reserve extra space for a GSN_STOP_FOR_CRASH signal, so that we can
> -    send this signal during a crash in _any_ thread, not just threads which
> -    have the infrastructure for selfptr->m_jba->m_next_buffer.
> -  */
> -  if (unlikely(newpos + 32 + EXTRA_SPACE >= thr_job_buffer::SIZE))
> -  {
> -    Uint32 newwi = ((dstptr->m_jba.m_write_index >> 16) + 1) %
> thr_job_queue::SIZE;
> -
> -    /*
> -      There seems to be a race (or missing overflow check) here, nothing
> -      seems to prevent senders from overwriting not yet executed
> -      previous signal data.
> -
> -      But that is perhaps unlikely to occur in practise, given the size
> -      of buffers, the low amount of prio A signals sent, and the
> -      frequency of executing prio A.
> +  Uint32 index = q->m_write_index;
> +  w.m_write_index = index;
> +  thr_job_buffer *buffer = q->m_buffers[index];
> +  w.m_write_buffer = buffer;
> +  w.m_write_pos = buffer->m_len;
> +  w.m_pending_signals = 0;
> +  bool buffer_used = insert_signal(q, &w, true, s, selfptr->m_next_buffer);
> +  flush_write_state(dst, q, &w);
>  
> -      For now, I'll add an assert() ...
> -    */
> -    assert(newwi != (dstptr->m_jba.m_read_index >> 16));
> -
> -    dstptr->m_jba.m_buffers[newwi] = newbuffer;
> -    newwi = (newwi << 16);
> -    dstptr->m_jba.m_write_index = newwi;
> -  }
> -  
>    unlock(&dstptr->m_jba_write_lock);
>  
> -  /* ToDo: Hm, shouldn't we possibly wake up the other thread here? */
> -
> -  /**
> -   * Note, do malloc outside of critical region
> -   */
> -  if (newpos + 32 + EXTRA_SPACE >= thr_job_buffer::SIZE)
> -  {
> -    selfptr->m_jba_next_buffer = seize_buffer(rep, self, true);
> -  }
> +  if (buffer_used)
> +    selfptr->m_next_buffer = seize_buffer(rep, self, true);
>  }
>  
>  /*
>    This functions sends a prio A STOP_FOR_CRASH signal to a thread.
>  
>    It works when called from any other thread, not just from job processing
> -  threads. This works by having regular sendprioa() reserve space for one
> -  GSN_STOP_FOR_CRASH signal.
> +  threads. But note that this signal will be the last signal to be executed by
> +  the other thread, as it will exit immediately.
>  */
>  static
>  void
> @@ -1198,7 +1205,11 @@ sendprioa_STOP_FOR_CRASH(Uint32 dst)
>  {
>    SignalT<StopForCrash::SignalLength> signalT;
>    struct thr_repository* rep = &g_thr_repository;
> -  struct thr_data * dstptr = rep->m_thread + dst;
> +  /* As this signal will be the last one executed by the other thread, it does
> +     not matter which buffer we use in case the current buffer is filled up by
> +     the STOP_FOR_CRASH signal; the data in it will never be read.
> +  */
> +  static thr_job_buffer dummy_buffer;
>  
>    /*
>      Currently we have two threads with fixed block assignment.
> @@ -1212,6 +1223,7 @@ sendprioa_STOP_FOR_CRASH(Uint32 dst)
>    else if (dst == 1)
>      bno = DBLQH;
>    assert(block2ThreadId(bno) == dst);
> +  struct thr_data * dstptr = &(rep->m_thread[dst]);
>  
>    memset(&signalT.header, 0, sizeof(SignalHeader));
>    signalT.header.theVerId_signalNumber   = GSN_STOP_FOR_CRASH;
> @@ -1224,8 +1236,20 @@ sendprioa_STOP_FOR_CRASH(Uint32 dst)
>    StopForCrash * const stopForCrash = (StopForCrash *)&signalT.theData[0];
>    stopForCrash->flags = 0;
>  
> +  thr_job_queue *q = &(dstptr->m_jba);
> +  thr_jb_write_state w;
> +
>    lock(&dstptr->m_jba_write_lock);
> -  insert_prioa(dstptr, &(signalT.header));
> +
> +  Uint32 index = q->m_write_index;
> +  w.m_write_index = index;
> +  thr_job_buffer *buffer = q->m_buffers[index];
> +  w.m_write_buffer = buffer;
> +  w.m_write_pos = buffer->m_len;
> +  w.m_pending_signals = 0;
> +  insert_signal(q, &w, true, &(signalT.header), &dummy_buffer);
> +  flush_write_state(dst, q, &w);
> +
>    unlock(&dstptr->m_jba_write_lock);
>  }
>  
> @@ -1372,18 +1396,29 @@ init(struct thr_repository* rep, struct 
>    selfptr->m_thr_no = thr_no;
>    selfptr->m_first_free = 0;
>    selfptr->m_first_unused = 0;
> -  for (i = 0; i<cnt; i++)
> -    selfptr->m_out_queue[i] = seize_buffer(rep, thr_no, false);
>    
>    selfptr->m_jba.m_read_index = 0;
>    selfptr->m_jba.m_write_index = 0;
> -  selfptr->m_jba.m_buffers[0] = seize_buffer(rep, thr_no, true);
> -  selfptr->m_jba_next_buffer = seize_buffer(rep, thr_no, true);
> +  thr_job_buffer *buffer = seize_buffer(rep, thr_no, true);
> +  selfptr->m_jba.m_buffers[0] = buffer;
> +  selfptr->m_jba_read_state.m_read_index = 0;
> +  selfptr->m_jba_read_state.m_read_buffer = buffer;
> +  selfptr->m_jba_read_state.m_read_pos = 0;
> +  selfptr->m_jba_read_state.m_write_index = 0;
> +  selfptr->m_jba_read_state.m_write_pos = 0;
> +  selfptr->m_next_buffer = seize_buffer(rep, thr_no, false);
>    
>    for (i = 0; i<cnt; i++)
>    {
>      selfptr->m_in_queue[i].m_read_index = 0;
>      selfptr->m_in_queue[i].m_write_index = 0;
> +    buffer = seize_buffer(rep, thr_no, false);
> +    selfptr->m_in_queue[i].m_buffers[0] = buffer;
> +    selfptr->m_read_states[i].m_read_index = 0;
> +    selfptr->m_read_states[i].m_read_buffer = buffer;
> +    selfptr->m_read_states[i].m_read_pos = 0;
> +    selfptr->m_read_states[i].m_write_index = 0;
> +    selfptr->m_read_states[i].m_write_pos = 0;
>    }
>  
>    init(&selfptr->m_tq);
> @@ -1391,6 +1426,22 @@ init(struct thr_repository* rep, struct 
>    selfptr->m_jam = NULL;
>  }
>  
> +/* Have to do this after init of all m_in_queues is done. */
> +static
> +void
> +init2(struct thr_repository* rep, struct thr_data *selfptr, unsigned int cnt,
> +      unsigned thr_no)
> +{
> +  for (Uint32 i = 0; i<cnt; i++)
> +  {
> +    selfptr->m_write_states[i].m_write_index = 0;
> +    selfptr->m_write_states[i].m_write_pos = 0;
> +    selfptr->m_write_states[i].m_write_buffer =
> +      rep->m_thread[i].m_in_queue[thr_no].m_buffers[0];
> +    selfptr->m_write_states[i].m_pending_signals = 0;
> +  }    
> +}
> +
>  static
>  void
>  init(struct thr_repository* rep, unsigned int cnt)
> @@ -1400,6 +1451,10 @@ init(struct thr_repository* rep, unsigne
>    {
>      init(rep, rep->m_thread + i, cnt, i);
>    }
> +  for (unsigned int i = 0; i<cnt; i++)
> +  {
> +    init2(rep, rep->m_thread + i, cnt, i);
> +  }
>  
>    rep->stopped_threads = 0;
>    pthread_mutex_init(&rep->stop_for_crash_mutex, NULL);
> @@ -1678,16 +1733,13 @@ FastScheduler::dumpSignalMemory(Uint32 t
>    */
>  
>    /* Scan all available buffers with already executed signals. */
> -  Uint32 jbb_current_thread = 0;
> +  Uint32 jbb_current_thr = 0;
>    Uint32 jbb_start = thr_ptr->m_first_free;
>    Uint32 jb_end = thr_ptr->m_first_unused;
>    const thr_job_buffer *jbb = NULL;
>    Uint32 jbb_pos, jbb_len;
>    bool no_more_priob = false;
>    const thr_job_buffer *jba = NULL;
> -  Uint32 jba_read_index = thr_ptr->m_jba.m_read_index;
> -  Uint32 jba_current = jba_read_index >> 16;
> -  Uint32 jba_execute_pos = jba_read_index & 0xffff;
>    Uint32 jba_start = jbb_start;
>    Uint32 jba_pos, jba_len;
>    bool no_more_prioa = false;
> @@ -1718,8 +1770,10 @@ FastScheduler::dumpSignalMemory(Uint32 t
>            No more released prio A buffers, but there might be some stuff
>            in the current buffer.
>          */
> +        Uint32 jba_execute_pos = thr_ptr->m_jba_read_state.m_read_pos;
>          if (jba_execute_pos > 0)
>          {
> +          Uint32 jba_current = thr_ptr->m_jba_read_state.m_read_index;
>            jba = thr_ptr->m_jba.m_buffers[jba_current];
>            jba_pos = 0;
>            jba_len = jba_execute_pos;
> @@ -1751,15 +1805,16 @@ FastScheduler::dumpSignalMemory(Uint32 t
>            No more released prio B buffers, but there might be some stuff
>            in the buffer(s) currently being processed.
>          */
> -        while (jbb_current_thread < g_thr_repository.m_thread_count)
> +        while (jbb_current_thr < g_thr_repository.m_thread_count)
>          {
> -          const thr_job_queue *q =
> &(thr_ptr->m_in_queue[jbb_current_thread]);
> -          jbb_current_thread++;
> -          Uint32 read_index = q->m_read_index;
> -          Uint32 read_pos = read_index & 0xffff;
> +          const thr_job_queue *q = &(thr_ptr->m_in_queue[jbb_current_thr]);
> +          const thr_jb_read_state *r =
> &(thr_ptr->m_read_states[jbb_current_thr]);
> +          jbb_current_thr++;
> +          Uint32 read_index = r->m_read_index;
> +          Uint32 read_pos = r->m_read_pos;
>            if (read_pos > 0)
>            {
> -            jbb = q->m_buffers[read_index >> 16];
> +            jbb = q->m_buffers[read_index];
>              jbb_pos = 0;
>              jbb_len = read_pos;
>              break;
> 

Thread
bk commit into 5.1 tree (knielsen:1.2615)knielsen11 Oct
  • Re: bk commit into 5.1 tree (knielsen:1.2615)Jonas Oreland11 Oct