great!
easy to read...
does sit fix random crashes? (i only managed to get them using release compiled...)
two minor things:
1) the assert on thr_job_queue full, only assert will cause silent(or maybe not) failure
in release compiled.
2) you need to pad the struct thr_jb_write_state to avoid cache-pollution, right?
and then some style things:
2) i personally strongly dislike these
/*
comments wo/ the '*' here, which I've only seen at mysql
*/
/**
* and I think mysql is actually coming around to use this way instead...
* which atleast I've seen in many places...
*/
what do you think ?
3) same goes for &(array[7]) instead of (array + 7), what do you think?
(i also *think* that (array + 7) is mysql coding standard but not 100% sure)
4) i'd also prefer not to to use the g_thr_repository so much, but rather pass it along as
argument
do you think there will be a performance penalty for this, are did you just want to
avoid lots
of arguments.
/jonas
knielsen@stripped wrote:
> Below is the list of changes that have just been committed into a local
> 5.1 repository of knielsen. When knielsen does a push these changes will
> be propagated to the main repository and, within 24 hours after the
> push, to the public repository.
> For information on how to access the public repository
> see http://dev.mysql.com/doc/mysql/en/installing-source-tree.html
>
> ChangeSet@stripped, 2007-10-11 17:24:44+02:00, knielsen@ymer.(none) +1 -0
> Clean up job buffer handling, fix instability.
>
> storage/ndb/src/kernel/vm/mt/mt.cpp@stripped, 2007-10-11 17:24:40+02:00,
> knielsen@ymer.(none) +395 -340
> Clean up job buffer handling, fix instability.
>
> diff -Nrup a/storage/ndb/src/kernel/vm/mt/mt.cpp
> b/storage/ndb/src/kernel/vm/mt/mt.cpp
> --- a/storage/ndb/src/kernel/vm/mt/mt.cpp 2007-10-10 09:34:49 +02:00
> +++ b/storage/ndb/src/kernel/vm/mt/mt.cpp 2007-10-11 17:24:40 +02:00
> @@ -34,7 +34,7 @@
> #include <sys/types.h>
>
> static const Uint32 NUM_THREADS = 3;
> -#define MAX_THREADS 63
> +#define MAX_THREADS 4
>
> static inline
> int
> @@ -44,6 +44,15 @@ xcng(volatile unsigned * addr, int val)
> return val;
> }
>
> +/* Memory barriers, these definitions are for x64_64. */
> +#define mb() asm volatile("mfence":::"memory")
> +/* According to Intel docs, it does not reorder loads. */
> +//#define rmb() asm volatile("lfence":::"memory")
> +#define rmb() asm volatile("" ::: "memory")
> +#define wmb() asm volatile("" ::: "memory")
> +#define read_barrier_depends() do {} while(0)
> +
> +
> #ifdef USE_FUTEX
>
> static inline
> @@ -185,6 +194,7 @@ void
> unlock(struct thr_spin_lock* sl)
> {
> sl->m_lock = 0;
> + mb();
> }
>
> static
> @@ -276,16 +286,12 @@ struct thr_jb_write_state
> */
> Uint32 m_write_index;
> Uint32 m_write_pos;
> - /*
> - These are the old values of m_write_index and m_write_pos that were last
> - flushed to the consumer thread, in thr_job_queue::m_write_index and
> - thr_job_buffer::m_len.
>
> - These are used to judge when it is time to flush a new batch of signals to
> - the consumer thread.
> - */
> - Uint32 m_old_write_index;
> - Uint32 m_old_write_pos;
> + /* Thread-local copy of thr_job_queue::m_buffers[m_write_index]. */
> + thr_job_buffer *m_write_buffer;
> +
> + /* Number of signals inserted since last flush to thr_job_queue. */
> + Uint32 m_pending_signals;
> };
>
> /*
> @@ -300,15 +306,15 @@ struct thr_jb_read_state
> */
> Uint32 m_read_index;
> /*
> - Thread local copy of thr_job_queue::m_buffers[m_read_index].
> - */
> - Uint32 m_read_buffer;
> - /*
> Index into m_read_buffer->m_data[] of the next signal to execute from the
> current buffer.
> */
> Uint32 m_read_pos;
> /*
> + Thread local copy of thr_job_queue::m_buffers[m_read_index].
> + */
> + thr_job_buffer *m_read_buffer;
> + /*
> These are thread-local copies of thr_job_queue::m_write_index and
> thr_job_buffer::m_len. They are read once at the start of the signal
> execution loop and used to determine when the end of available signals is
> @@ -361,11 +367,17 @@ struct thr_data
> struct thr_job_queue m_jba;
> struct thr_spin_lock m_jba_write_lock;
> /*
> - In m_jba_next_buffer we keep a free buffer at all times, so that when
> + In m_next_buffer we keep a free buffer at all times, so that when
> we hold the lock and find we need a new buffer, we can use this and this
> way defer allocation to after releasing the lock.
> */
> - struct thr_job_buffer* m_jba_next_buffer;
> + struct thr_job_buffer* m_next_buffer;
> + /* Thread-local read state of prio A buffer. */
> + struct thr_jb_read_state m_jba_read_state;
> + /*
> + There is no m_jba_write_state, as we have multiple writers to the prio A
> + queue, so local state becomes invalid as soon as we release the lock.
> + */
>
> /*
> We keep a small number of buffers in a thread-local cyclic FIFO, so that
> @@ -378,8 +390,15 @@ struct thr_data
> /* m_first_unused is the first unused entry in m_free_fifo. */
> Uint32 m_first_unused;
>
> - struct thr_job_buffer* m_out_queue[MAX_THREADS];
> + /*
> + These are the thread input queues, where other threads deliver signals
> + into.
> + */
> struct thr_job_queue m_in_queue[MAX_THREADS];
> + /* These are the write states of m_in_queue[self] in each thread. */
> + struct thr_jb_write_state m_write_states[MAX_THREADS];
> + /* These are the read states of all of our own m_in_queue[]. */
> + struct thr_jb_read_state m_read_states[MAX_THREADS];
>
> /* Jam buffers for making trace files at crashes. */
> EmulatedJamBuffer *m_jam;
> @@ -564,137 +583,6 @@ release_buffer(struct thr_repository* re
> }
>
> static
> -void
> -transfer_buffer(struct thr_repository* rep, unsigned from, unsigned to)
> -{
> - require(from < NUM_THREADS && to < NUM_THREADS);
> - unsigned old;
> - unsigned * writeptr = &(rep->m_thread[to].m_in_queue[from].m_write_index);
> - volatile unsigned * readptr =
> &(rep->m_thread[to].m_in_queue[from].m_read_index);
> - unsigned readidx = (*readptr >> 16);
> - unsigned writeidx = * writeptr;
> - unsigned nextidx = (writeidx + 1) % thr_job_queue::SIZE;
> - thr_job_buffer* src = rep->m_thread[from].m_out_queue[to];
> -
> - if (unlikely(readidx == nextidx))
> - goto check_full;
> -
> - if (0)
> - ndbout_c("transfer from: %d to: %d (read: %d write: %d)",
> - from, to, readidx, writeidx);
> -
> -do_transfer:
> - rep->m_thread[to].m_in_queue[from].m_buffers[writeidx] = src;
> - // need storestore barrier
> - // but xcng is full barrier
> - old = xcng(writeptr, nextidx);
> - assert(old == writeidx);
> - wakeup(&rep->m_thread[to].m_waiter); // potentially wakeup
> -
> - rep->m_thread[from].m_out_queue[to] = seize_buffer(rep, from, false);
> - return ;
> -check_full:
> - ndbout_c("sleep in transfer from %u to %u", from, to);
> - for (unsigned i = 0; i<1000; i++)
> - {
> - usleep(1000);
> - if ((*readptr >> 16) != nextidx)
> - goto do_transfer;
> - }
> -
> - abort();
> -}
> -
> -static
> -unsigned
> -dojob(struct thr_data* selfptr, struct thr_job_buffer* ptr, struct Signal* sig,
> - Uint32 *current_pos, Uint32 *watchDogCounter, Uint32 *signalIdCounter)
> -{
> - unsigned cnt = 0;
> - unsigned *posptr = ptr->m_data + (*current_pos & 0xffff);
> - unsigned *endptr = ptr->m_data + ptr->m_len;
> - SimulatedBlock** blockptr = selfptr->m_blocks - MIN_BLOCK_NO;
> -
> - while (posptr < endptr)
> - {
> - SignalHeader* s = reinterpret_cast<SignalHeader*>(posptr);
> - unsigned seccnt = s->m_noOfSections;
> - unsigned siglen = (sizeof(*s)>>2) + s->theLength;
> - unsigned bno = s->theReceiversBlockNumber;
> - unsigned gsn = s->theVerId_signalNumber;
> - SimulatedBlock * block = blockptr[bno];
> - *watchDogCounter = 1;
> - s->theSignalId = (*signalIdCounter)++;
> - memcpy(&sig->header, s, 4*siglen);
> - sig->m_sectionPtrI[0] = posptr[siglen + 0];
> - sig->m_sectionPtrI[1] = posptr[siglen + 1];
> - sig->m_sectionPtrI[2] = posptr[siglen + 2];
> -
> - /*
> - Update just before execute so signal dump can know how far we are.
> - */
> - assert((*current_pos & 0xffff) + siglen + seccnt < 0x10000);
> - *current_pos += siglen + seccnt;
> -
> - block->executeFunction(gsn, sig);
> -
> - cnt++;
> - posptr += siglen + seccnt;
> - }
> -
> - return cnt;
> -}
> -
> -static
> -unsigned
> -dojoba(struct thr_repository * rep, unsigned thr_no,
> - Signal* signal, unsigned write_index,
> - Uint32 *watchDogCounter, Uint32 *signalIdCounter)
> -{
> - unsigned sum = 0;
> - struct thr_job_buffer* buffer;
> - struct thr_data* selfptr = rep->m_thread + thr_no;
> - Uint32 *read_idx_ptr = &(selfptr->m_jba.m_read_index);
> - unsigned read_index = *read_idx_ptr;
> - unsigned wi_buf = write_index >> 16;
> -
> - unsigned ri_buf = read_index >> 16;
> - while (ri_buf != wi_buf)
> - {
> - buffer = selfptr->m_jba.m_buffers[ri_buf];
> - /* Guard against overflow in lower 16 bit of *read_idx_ptr. */
> - assert(thr_job_buffer::SIZE < 0x10000);
> - sum += dojob(selfptr, buffer, signal,
> - read_idx_ptr, watchDogCounter, signalIdCounter);
> - release_buffer(rep, thr_no, buffer);
> - ri_buf = (ri_buf + 1) % thr_job_queue::SIZE;
> - /*
> - Update executed-so-far position, so signal traces will work.
> -
> - ToDo: memory barrier here, all prior loads to complete before
> - following store.
> - */
> - *read_idx_ptr = (ri_buf << 16);
> - }
> -
> - buffer = selfptr->m_jba.m_buffers[ri_buf];
> - /*
> - We need to take the write lock here to make sure we do not execute any
> - partially written signals due to memory reordering.
> -
> - Actually, it would be enough to just pass a value of
> - buffer->m_len read under lock before calling dojoba(). And more
> - actually, a memory barrier in that place would also be sufficient.
> - */
> - // ToDo: This deadlocks currently, as the spinlocks are not recursive
> -// lock(&selfptr->m_jba.m_write_lock);
> - sum += dojob(selfptr, buffer, signal, read_idx_ptr, watchDogCounter,
> signalIdCounter);
> -// unlock(&selfptr->m_jba.m_write_lock);
> -
> - return sum;
> -}
> -
> -static
> inline
> Uint32
> scan_queue(struct thr_data* selfptr, Uint32 cnt, Uint32 end, Uint32* ptr)
> @@ -717,6 +605,12 @@ scan_queue(struct thr_data* selfptr, Uin
> SignalHeader* s = reinterpret_cast<SignalHeader*>(page + pos);
> if (0)
> ndbout_c("found %p val: %d end: %d", s, val & 0xFFFF, end);
> + /*
> + ToDo: Do measurements of the frequency of these prio A timed signals.
> +
> + If they are frequent, we may want to optimize, as sending one prio A
> + signal is somewhat expensive compared to sending one prio B.
> + */
> sendprioa(thr_no, s->theReceiversBlockNumber, s);
> * (page + pos) = free;
> free = idx;
> @@ -808,6 +702,51 @@ scan_time_queues(struct thr_data* selfpt
> abort();
> }
>
> +/*
> + Flush the write state to the job queue, making any new signals available to
> + receiving threads.
> +*/
> +static inline
> +void
> +flush_write_state(Uint32 dst, thr_job_queue *q, thr_jb_write_state *w)
> +{
> + /*
> + Two write memory barriers here, as assigning m_len may make signal data
> + available to other threads, and assigning m_write_index may make new
> + buffers available.
> +
> + We could optimize this by only doing it as needed, and only doing it
> + once before setting all m_len, and once before setting all m_write_index.
> +
> + But wmb() is a no-op anyway in x86 ...
> + */
> + wmb();
> + w->m_write_buffer->m_len = w->m_write_pos;
> + wmb();
> + q->m_write_index = w->m_write_index;
> + w->m_pending_signals = 0;
> +
> + wakeup(&(g_thr_repository.m_thread[dst].m_waiter));
> +}
> +
> +static
> +void
> +flush_jbb_write_state(thr_data *selfptr)
> +{
> + Uint32 thr_count = g_thr_repository.m_thread_count;
> + Uint32 self = selfptr->m_thr_no;
> +
> + for (Uint32 thr_no = 0; thr_no < thr_count; thr_no ++)
> + {
> + thr_jb_write_state *w = &(selfptr->m_write_states[thr_no]);
> + if (w->m_pending_signals > 0)
> + {
> + thr_job_queue *q = &(g_thr_repository.m_thread[thr_no].m_in_queue[self]);
> + flush_write_state(thr_no, q, w);
> + }
> + }
> +}
> +
> Uint32 senderThreadId;
>
> static
> @@ -848,19 +787,7 @@ do_receive(struct thr_repository*rep, st
> }
> unlock(&rep->m_receive_lock);
>
> - unsigned cnt = rep->m_thread_count;
> -
> - /**
> - * Transfer all out buffers
> - */
> - for (unsigned i = 0; i<cnt; i++)
> - {
> - thr_job_buffer * jb = selfptr->m_out_queue[i];
> - if (jb->m_len)
> - {
> - transfer_buffer(rep, thr_no, i);
> - }
> - }
> + flush_jbb_write_state(selfptr);
> }
>
> static
> @@ -880,6 +807,214 @@ sendpacked(struct thr_data* selfptr, Sig
> b_tup->executeFunction(GSN_SEND_PACKED, signal);
> }
>
> +/*
> + Insert a signal in a job queue.
> +
> + The signal is not visible to consumers yet after return from this function,
> + only recorded in the thr_jb_write_state. It is necessary to first call
> + flush_write_state() for this.
> +
> + The new_buffer is a job buffer to use if the current one gets full. If used,
> + we return true, indicating that the caller should allocate a new one for
> + the next call. (This is done to allow to insert under lock, but do the
> + allocation outside the lock).
> +*/
> +static inline
> +bool
> +insert_signal(thr_job_queue *q, thr_jb_write_state *w, Uint32 prioa,
> + const struct SignalHeader* s, thr_job_buffer *new_buffer)
> +{
> + Uint32 write_pos = w->m_write_pos;
> + unsigned siglen = (sizeof(*s) >> 2) + s->theLength +
> s->m_noOfSections;
> + assert(w->m_write_buffer == q->m_buffers[w->m_write_index]);
> + memcpy(&(w->m_write_buffer->m_data[write_pos]), s, 4*siglen);
> + write_pos += siglen;
> + w->m_pending_signals++;
> +
> + /*
> + We make sure that there is always room for at least one signal in the
> + current buffer in the queue, so one insert is always possible without
> + adding a new buffer.
> + */
> + if (likely(write_pos + 32 <= thr_job_buffer::SIZE))
> + {
> + w->m_write_pos = write_pos;
> + return false;
> + }
> + else
> + {
> + /*
> + Need a write memory barrier here, as this might make signal data visible
> + to other threads.
> +
> + ToDo: We actually only need the wmb() here if we already make this
> + buffer visible to the other thread. So we might optimize it a bit. But
> + wmb() is a no-op on x86 anyway...
> + */
> + wmb();
> + w->m_write_buffer->m_len = write_pos;
> + Uint32 write_index = (w->m_write_index + 1) % thr_job_queue::SIZE;
> + /*
> + Full job buffer is fatal.
> +
> + ToDo: should we wait for it to become non-full? There is no guarantee
> + that this will actually happen...
> +
> + Or alternatively, ndbrequire() ?
> + */
> + assert(write_index != q->m_read_index);
> + new_buffer->m_len = 0;
> + new_buffer->m_prioa = prioa;
> + q->m_buffers[write_index] = new_buffer;
> + w->m_write_index = write_index;
> + w->m_write_pos = 0;
> + w->m_write_buffer = new_buffer;
> + return true; // Buffer new_buffer used
> + }
> +
> + return false; // Buffer new_buffer not used
> +}
> +
> +static
> +void
> +read_jbb_state(thr_data *selfptr, Uint32 count)
> +{
> + for (Uint32 i = 0; i < count; i++)
> + {
> + thr_jb_read_state *r = &(selfptr->m_read_states[i]);
> + const thr_job_queue *q = &(selfptr->m_in_queue[i]);
> + Uint32 index = q->m_write_index;
> + r->m_write_index = index;
> + read_barrier_depends();
> + r->m_write_pos = q->m_buffers[index]->m_len;
> + }
> +}
> +
> +static
> +void
> +read_jba_state(thr_data *selfptr)
> +{
> + const thr_job_queue *jba = &(selfptr->m_jba);
> + Uint32 index = jba->m_write_index;
> + selfptr->m_jba_read_state.m_write_index = index;
> + read_barrier_depends();
> + selfptr->m_jba_read_state.m_write_pos = jba->m_buffers[index]->m_len;
> +}
> +
> +/*
> + Execute at most MAX_SIGNALS signals from one job queue, updating local read
> + state as appropriate.
> +
> + Returns number of signals actually executed.
> +*/
> +static
> +Uint32
> +execute_signals(thr_data *selfptr, thr_job_queue *q, thr_jb_read_state *r,
> + Signal *sig, Uint32 max_signals,
> + Uint32 *watchDogCounter, Uint32 *signalIdCounter)
> +{
> + Uint32 num_signals = 0;
> +
> + Uint32 read_index = r->m_read_index;
> + Uint32 write_index = r->m_write_index;
> + Uint32 read_pos = r->m_read_pos;
> + Uint32 write_pos = (read_index == write_index ?
> + r->m_write_pos :
> + q->m_buffers[read_index]->m_len);
> + thr_job_buffer *read_buffer = r->m_read_buffer;
> + SimulatedBlock** blockptr = selfptr->m_blocks - MIN_BLOCK_NO;
> +
> + while (num_signals < max_signals)
> + {
> + while (read_pos >= write_pos)
> + {
> + if (read_index == write_index)
> + {
> + /* No more available now. */
> + return num_signals;
> + }
> + else
> + {
> + /* Move to next buffer. */
> + read_index = (read_index + 1) % thr_job_queue::SIZE;
> + release_buffer(&g_thr_repository, selfptr->m_thr_no, read_buffer);
> + read_buffer = q->m_buffers[read_index];
> + read_pos = 0;
> + write_pos = (read_index == write_index ?
> + r->m_write_pos :
> + q->m_buffers[read_index]->m_len);
> + /* Update thread-local read state. */
> + r->m_read_index = q->m_read_index = read_index;
> + r->m_read_buffer = read_buffer;
> + r->m_read_pos = read_pos;
> + }
> + }
> +
> + /* Now execute the signal. */
> + SignalHeader* s =
> +
> reinterpret_cast<SignalHeader*>(&(read_buffer->m_data[read_pos]));
> + Uint32 seccnt = s->m_noOfSections;
> + Uint32 siglen = (sizeof(*s)>>2) + s->theLength;
> + Uint32 bno = s->theReceiversBlockNumber;
> + Uint32 gsn = s->theVerId_signalNumber;
> + SimulatedBlock * block = blockptr[bno];
> + *watchDogCounter = 1;
> + /* Must update original buffer so signal dump will see it. */
> + s->theSignalId = (*signalIdCounter)++;
> + memcpy(&sig->header, s, 4*siglen);
> + sig->m_sectionPtrI[0] = read_buffer->m_data[read_pos + siglen + 0];
> + sig->m_sectionPtrI[1] = read_buffer->m_data[read_pos + siglen + 1];
> + sig->m_sectionPtrI[2] = read_buffer->m_data[read_pos + siglen + 2];
> +
> + read_pos += siglen + seccnt;
> + /* Update just before execute so signal dump can know how far we are. */
> + r->m_read_pos = read_pos;
> +
> + block->executeFunction(gsn, sig);
> +
> + num_signals++;
> + }
> +
> + return num_signals;
> +}
> +
> +static
> +Uint32
> +run_job_buffers(thr_data *selfptr, Signal *sig,
> + Uint32 *watchDogCounter, Uint32 *signalIdCounter)
> +{
> + static const Uint32 MAX_SIGNALS_PER_JB = 100;
> + Uint32 thr_count = g_thr_repository.m_thread_count;
> + Uint32 signal_count = 0;
> +
> + read_jbb_state(selfptr, thr_count);
> + /*
> + A load memory barrier to ensure that we see any prio A signal sent later
> + than loaded prio B signals.
> + */
> + rmb();
> +
> + for (Uint32 send_thr_no = 0; send_thr_no < thr_count; send_thr_no++)
> + {
> + /* Read the prio A state often, to avoid starvation of prio A. */
> + read_jba_state(selfptr);
> + static Uint32 max_prioA = thr_job_queue::SIZE * thr_job_buffer::SIZE;
> + signal_count += execute_signals(selfptr, &(selfptr->m_jba),
> + &(selfptr->m_jba_read_state), sig,
> + max_prioA, watchDogCounter,
> + signalIdCounter);
> +
> + /* Now execute prio B signals from one thread. */
> + thr_job_queue *queue = &(selfptr->m_in_queue[send_thr_no]);
> + thr_jb_read_state *read_state = &(selfptr->m_read_states[send_thr_no]);
> + signal_count += execute_signals(selfptr, queue, read_state,
> + sig, MAX_SIGNALS_PER_JB,
> + watchDogCounter, signalIdCounter);
> + }
> +
> + return signal_count;
> +}
> +
> static inline Uint32
> block2ThreadId(Uint32 block)
> {
> @@ -945,7 +1080,7 @@ mt_thr_main(void *thr_arg)
> {
> cpu_set_t mask;
> CPU_ZERO(&mask);
> - CPU_SET((thr_no & 1), &mask);
> + CPU_SET(((thr_no & 1)*2 + 1), &mask);
> sched_setaffinity(tid, sizeof(mask), &mask);
> }
>
> @@ -968,85 +1103,21 @@ mt_thr_main(void *thr_arg)
> globalEmulatorData.theWatchDog->registerWatchedThread(&watchDogCounter,
> thr_no);
>
> - /**
> - * prio A
> - */
> - unsigned* a_re_idxptr = &selfptr->m_jba.m_read_index;
> - volatile unsigned * a_wr_idxptr = &selfptr->m_jba.m_write_index;
> -
> while (globalData.theRestartFlag != perform_stop)
> {
> - unsigned sum = 0;
> - unsigned cnt = rep->m_thread_count;
> -
> watchDogCounter = 2;
> scan_time_queues(selfptr);
> - unsigned jba_read_index = * a_re_idxptr;
>
> - /**
> - * Process each inbuffer
> - */
> - for (unsigned i = 0; i<cnt; i++)
> - {
> - /*
> - We must load the prio B state _before_ executing prio A signals.
> - Otherwise a later prio B signal may race to be executed ahead of an
> - earlier prio A signal.
> - */
> - Uint32 *readptr = &(selfptr->m_in_queue[i].m_read_index);
> - unsigned ri = *readptr >> 16;
> - unsigned wi = selfptr->m_in_queue[i].m_write_index;
> -
> - /*
> - Now, having loaded the state of the prio B job buffer, we must first
> - execute _all_ prio A signals, to make sure that any A signal is
> - executed here before any B signal sent later by same sender thread.
> -
> - We also need to make sure to read the prio A state under the prio A
> - lock (baring any other memory barrier mechanisms), to be sure that
> - any prio A activity done by other threads before visible prio B
> - activity will be visible in this thread.
> - */
> - lock(&selfptr->m_jba_write_lock);
> - Uint32 jba_write_index = * a_wr_idxptr;
> - unlock(&selfptr->m_jba_write_lock);
> -
> - if (jba_read_index != jba_write_index)
> - {
> - sum += dojoba(rep, thr_no, &signal, jba_write_index,
> - &watchDogCounter, &thrSignalId);
> - jba_read_index = * a_re_idxptr;
> - }
> -
> - thr_job_buffer * jb = selfptr->m_in_queue[i].m_buffers[ri];
> - if (ri != wi)
> - {
> - sum += dojob(selfptr, jb, &signal,
> - readptr, &watchDogCounter, &thrSignalId);
> - ri = (ri + 1) % thr_job_queue::SIZE;
> - *readptr = ri << 16;
> - release_buffer(rep, thr_no, jb);
> - }
> - jba_write_index = * a_wr_idxptr;
> - }
> + Uint32 sum = run_job_buffers(selfptr, &signal,
> + &watchDogCounter, &thrSignalId);
>
> watchDogCounter = 1;
> sendpacked(selfptr, &signal, thr_no);
>
> if (sum)
> {
> - /**
> - * Transfer all out buffers
> - */
> watchDogCounter = 6;
> - for (unsigned i = 0; i<cnt; i++)
> - {
> - thr_job_buffer * jb = selfptr->m_out_queue[i];
> - if (jb->m_len)
> - {
> - transfer_buffer(rep, thr_no, i);
> - }
> - }
> + flush_jbb_write_state(selfptr);
> }
>
> if (trylock(&rep->m_send_lock) == 0)
> @@ -1054,9 +1125,13 @@ mt_thr_main(void *thr_arg)
> do_send(rep, selfptr, &watchDogCounter);
> }
>
> - if (
> -thr_no == 2 &&
> -trylock(&rep->m_receive_lock) == 0)
> + /*
> + We only do receive in thread 2, which _only_ does receive.
> +
> + Otherwise we have a problem waking up a thread that is sleeping in the
> + transporter
> + */
> + if (thr_no == 2 && trylock(&rep->m_receive_lock) == 0)
> {
> do_receive(rep, selfptr, sum ? 0 : 1, &watchDogCounter);
> }
> @@ -1071,126 +1146,58 @@ trylock(&rep->m_receive_lock) == 0)
> }
>
> void
> -thr_transporter_main(struct thr_repository* rep, unsigned thr_no)
> -{
> -}
> -
> -static
> -inline
> -int
> -insert(thr_job_buffer* ptr, const struct SignalHeader* s)
> -{
> - unsigned len = ptr->m_len;
> - unsigned* pos = ptr->m_data + len;
> - unsigned siglen = (sizeof(*s) >> 2) + s->theLength +
> s->m_noOfSections;
> - ptr->m_len = len + siglen;
> - memcpy(pos, s, 4*siglen);
> - return thr_job_buffer::SIZE - (len + siglen + 32); // > 0 not full, <=0
> full
> -}
> -
> -void
> sendlocal(Uint32 self, Uint32 block, const SignalHeader* s)
> {
> + static const Uint32 MAX_SIGNALS_BEFORE_FLUSH = 20;
> Uint32 dst = block2ThreadId(block);
> struct thr_repository* rep = &g_thr_repository;
> struct thr_data * selfptr = rep->m_thread + self;
>
> - int res = insert(selfptr->m_out_queue[dst], s);
> - if (res <= 0)
> + thr_job_queue *q = &(rep->m_thread[dst].m_in_queue[self]);
> + thr_jb_write_state *w = &(selfptr->m_write_states[dst]);
> + if (insert_signal(q, w, false, s, selfptr->m_next_buffer))
> {
> - transfer_buffer(rep, self, dst);
> + selfptr->m_next_buffer = seize_buffer(rep, self, false);
> }
> -}
>
> -/* This must be called only while holding the m_jba.m_write_lock mutex. */
> -static
> -inline
> -Uint32
> -insert_prioa(thr_data *dstptr, const SignalHeader *s)
> -{
> - Uint32 siglen = (sizeof(*s) >> 2) + s->theLength + s->m_noOfSections;
> - Uint32 wi = dstptr->m_jba.m_write_index;
> - Uint32 buf = wi >> 16;
> - Uint32 newwi = wi + siglen;
> - Uint32 pos = wi & 0xFFFF;
> - Uint32 newpos = pos + siglen;
> -
> - struct thr_job_buffer * buffer = * (dstptr->m_jba.m_buffers + buf);
> - assert(buffer->m_len == pos);
> - buffer->m_len = newpos;
> - memcpy(buffer->m_data + pos, s, 4*siglen);
> - dstptr->m_jba.m_write_index = newwi;
> -
> - return newpos;
> + if (w->m_pending_signals >= MAX_SIGNALS_BEFORE_FLUSH)
> + flush_write_state(dst, q, w);
> }
> +
> void
> -sendprioa(Uint32 self, Uint32 block, const SignalHeader* s)
> +sendprioa(Uint32 self, Uint32 block, const SignalHeader *s)
> {
> - static const Uint32 EXTRA_SPACE
> - = (sizeof(SignalHeader) >> 2) + StopForCrash::SignalLength;
> Uint32 dst = block2ThreadId(block);
> struct thr_repository* rep = &g_thr_repository;
> struct thr_data * selfptr = rep->m_thread + self;
> - struct thr_data * dstptr = rep->m_thread + dst;
> + struct thr_data *dstptr = &(rep->m_thread[dst]);
>
> - struct thr_job_buffer* newbuffer = selfptr->m_jba_next_buffer;
> + thr_job_queue *q = &(dstptr->m_jba);
> + thr_jb_write_state w;
>
> lock(&dstptr->m_jba_write_lock);
>
> - Uint32 newpos = insert_prioa(dstptr, s);
> -
> - if (0)
> - ndbout_c("sendprioa %s from %s to %s",
> - getSignalName(s->theVerId_signalNumber),
> - getBlockName(refToBlock(s->theSendersBlockRef)),
> - getBlockName(s->theReceiversBlockNumber));
> -
> - /*
> - We reserve extra space for a GSN_STOP_FOR_CRASH signal, so that we can
> - send this signal during a crash in _any_ thread, not just threads which
> - have the infrastructure for selfptr->m_jba->m_next_buffer.
> - */
> - if (unlikely(newpos + 32 + EXTRA_SPACE >= thr_job_buffer::SIZE))
> - {
> - Uint32 newwi = ((dstptr->m_jba.m_write_index >> 16) + 1) %
> thr_job_queue::SIZE;
> -
> - /*
> - There seems to be a race (or missing overflow check) here, nothing
> - seems to prevent senders from overwriting not yet executed
> - previous signal data.
> -
> - But that is perhaps unlikely to occur in practise, given the size
> - of buffers, the low amount of prio A signals sent, and the
> - frequency of executing prio A.
> + Uint32 index = q->m_write_index;
> + w.m_write_index = index;
> + thr_job_buffer *buffer = q->m_buffers[index];
> + w.m_write_buffer = buffer;
> + w.m_write_pos = buffer->m_len;
> + w.m_pending_signals = 0;
> + bool buffer_used = insert_signal(q, &w, true, s, selfptr->m_next_buffer);
> + flush_write_state(dst, q, &w);
>
> - For now, I'll add an assert() ...
> - */
> - assert(newwi != (dstptr->m_jba.m_read_index >> 16));
> -
> - dstptr->m_jba.m_buffers[newwi] = newbuffer;
> - newwi = (newwi << 16);
> - dstptr->m_jba.m_write_index = newwi;
> - }
> -
> unlock(&dstptr->m_jba_write_lock);
>
> - /* ToDo: Hm, shouldn't we possibly wake up the other thread here? */
> -
> - /**
> - * Note, do malloc outside of critical region
> - */
> - if (newpos + 32 + EXTRA_SPACE >= thr_job_buffer::SIZE)
> - {
> - selfptr->m_jba_next_buffer = seize_buffer(rep, self, true);
> - }
> + if (buffer_used)
> + selfptr->m_next_buffer = seize_buffer(rep, self, true);
> }
>
> /*
> This functions sends a prio A STOP_FOR_CRASH signal to a thread.
>
> It works when called from any other thread, not just from job processing
> - threads. This works by having regular sendprioa() reserve space for one
> - GSN_STOP_FOR_CRASH signal.
> + threads. But note that this signal will be the last signal to be executed by
> + the other thread, as it will exit immediately.
> */
> static
> void
> @@ -1198,7 +1205,11 @@ sendprioa_STOP_FOR_CRASH(Uint32 dst)
> {
> SignalT<StopForCrash::SignalLength> signalT;
> struct thr_repository* rep = &g_thr_repository;
> - struct thr_data * dstptr = rep->m_thread + dst;
> + /* As this signal will be the last one executed by the other thread, it does
> + not matter which buffer we use in case the current buffer is filled up by
> + the STOP_FOR_CRASH signal; the data in it will never be read.
> + */
> + static thr_job_buffer dummy_buffer;
>
> /*
> Currently we have two threads with fixed block assignment.
> @@ -1212,6 +1223,7 @@ sendprioa_STOP_FOR_CRASH(Uint32 dst)
> else if (dst == 1)
> bno = DBLQH;
> assert(block2ThreadId(bno) == dst);
> + struct thr_data * dstptr = &(rep->m_thread[dst]);
>
> memset(&signalT.header, 0, sizeof(SignalHeader));
> signalT.header.theVerId_signalNumber = GSN_STOP_FOR_CRASH;
> @@ -1224,8 +1236,20 @@ sendprioa_STOP_FOR_CRASH(Uint32 dst)
> StopForCrash * const stopForCrash = (StopForCrash *)&signalT.theData[0];
> stopForCrash->flags = 0;
>
> + thr_job_queue *q = &(dstptr->m_jba);
> + thr_jb_write_state w;
> +
> lock(&dstptr->m_jba_write_lock);
> - insert_prioa(dstptr, &(signalT.header));
> +
> + Uint32 index = q->m_write_index;
> + w.m_write_index = index;
> + thr_job_buffer *buffer = q->m_buffers[index];
> + w.m_write_buffer = buffer;
> + w.m_write_pos = buffer->m_len;
> + w.m_pending_signals = 0;
> + insert_signal(q, &w, true, &(signalT.header), &dummy_buffer);
> + flush_write_state(dst, q, &w);
> +
> unlock(&dstptr->m_jba_write_lock);
> }
>
> @@ -1372,18 +1396,29 @@ init(struct thr_repository* rep, struct
> selfptr->m_thr_no = thr_no;
> selfptr->m_first_free = 0;
> selfptr->m_first_unused = 0;
> - for (i = 0; i<cnt; i++)
> - selfptr->m_out_queue[i] = seize_buffer(rep, thr_no, false);
>
> selfptr->m_jba.m_read_index = 0;
> selfptr->m_jba.m_write_index = 0;
> - selfptr->m_jba.m_buffers[0] = seize_buffer(rep, thr_no, true);
> - selfptr->m_jba_next_buffer = seize_buffer(rep, thr_no, true);
> + thr_job_buffer *buffer = seize_buffer(rep, thr_no, true);
> + selfptr->m_jba.m_buffers[0] = buffer;
> + selfptr->m_jba_read_state.m_read_index = 0;
> + selfptr->m_jba_read_state.m_read_buffer = buffer;
> + selfptr->m_jba_read_state.m_read_pos = 0;
> + selfptr->m_jba_read_state.m_write_index = 0;
> + selfptr->m_jba_read_state.m_write_pos = 0;
> + selfptr->m_next_buffer = seize_buffer(rep, thr_no, false);
>
> for (i = 0; i<cnt; i++)
> {
> selfptr->m_in_queue[i].m_read_index = 0;
> selfptr->m_in_queue[i].m_write_index = 0;
> + buffer = seize_buffer(rep, thr_no, false);
> + selfptr->m_in_queue[i].m_buffers[0] = buffer;
> + selfptr->m_read_states[i].m_read_index = 0;
> + selfptr->m_read_states[i].m_read_buffer = buffer;
> + selfptr->m_read_states[i].m_read_pos = 0;
> + selfptr->m_read_states[i].m_write_index = 0;
> + selfptr->m_read_states[i].m_write_pos = 0;
> }
>
> init(&selfptr->m_tq);
> @@ -1391,6 +1426,22 @@ init(struct thr_repository* rep, struct
> selfptr->m_jam = NULL;
> }
>
> +/* Have to do this after init of all m_in_queues is done. */
> +static
> +void
> +init2(struct thr_repository* rep, struct thr_data *selfptr, unsigned int cnt,
> + unsigned thr_no)
> +{
> + for (Uint32 i = 0; i<cnt; i++)
> + {
> + selfptr->m_write_states[i].m_write_index = 0;
> + selfptr->m_write_states[i].m_write_pos = 0;
> + selfptr->m_write_states[i].m_write_buffer =
> + rep->m_thread[i].m_in_queue[thr_no].m_buffers[0];
> + selfptr->m_write_states[i].m_pending_signals = 0;
> + }
> +}
> +
> static
> void
> init(struct thr_repository* rep, unsigned int cnt)
> @@ -1400,6 +1451,10 @@ init(struct thr_repository* rep, unsigne
> {
> init(rep, rep->m_thread + i, cnt, i);
> }
> + for (unsigned int i = 0; i<cnt; i++)
> + {
> + init2(rep, rep->m_thread + i, cnt, i);
> + }
>
> rep->stopped_threads = 0;
> pthread_mutex_init(&rep->stop_for_crash_mutex, NULL);
> @@ -1678,16 +1733,13 @@ FastScheduler::dumpSignalMemory(Uint32 t
> */
>
> /* Scan all available buffers with already executed signals. */
> - Uint32 jbb_current_thread = 0;
> + Uint32 jbb_current_thr = 0;
> Uint32 jbb_start = thr_ptr->m_first_free;
> Uint32 jb_end = thr_ptr->m_first_unused;
> const thr_job_buffer *jbb = NULL;
> Uint32 jbb_pos, jbb_len;
> bool no_more_priob = false;
> const thr_job_buffer *jba = NULL;
> - Uint32 jba_read_index = thr_ptr->m_jba.m_read_index;
> - Uint32 jba_current = jba_read_index >> 16;
> - Uint32 jba_execute_pos = jba_read_index & 0xffff;
> Uint32 jba_start = jbb_start;
> Uint32 jba_pos, jba_len;
> bool no_more_prioa = false;
> @@ -1718,8 +1770,10 @@ FastScheduler::dumpSignalMemory(Uint32 t
> No more released prio A buffers, but there might be some stuff
> in the current buffer.
> */
> + Uint32 jba_execute_pos = thr_ptr->m_jba_read_state.m_read_pos;
> if (jba_execute_pos > 0)
> {
> + Uint32 jba_current = thr_ptr->m_jba_read_state.m_read_index;
> jba = thr_ptr->m_jba.m_buffers[jba_current];
> jba_pos = 0;
> jba_len = jba_execute_pos;
> @@ -1751,15 +1805,16 @@ FastScheduler::dumpSignalMemory(Uint32 t
> No more released prio B buffers, but there might be some stuff
> in the buffer(s) currently being processed.
> */
> - while (jbb_current_thread < g_thr_repository.m_thread_count)
> + while (jbb_current_thr < g_thr_repository.m_thread_count)
> {
> - const thr_job_queue *q =
> &(thr_ptr->m_in_queue[jbb_current_thread]);
> - jbb_current_thread++;
> - Uint32 read_index = q->m_read_index;
> - Uint32 read_pos = read_index & 0xffff;
> + const thr_job_queue *q = &(thr_ptr->m_in_queue[jbb_current_thr]);
> + const thr_jb_read_state *r =
> &(thr_ptr->m_read_states[jbb_current_thr]);
> + jbb_current_thr++;
> + Uint32 read_index = r->m_read_index;
> + Uint32 read_pos = r->m_read_pos;
> if (read_pos > 0)
> {
> - jbb = q->m_buffers[read_index >> 16];
> + jbb = q->m_buffers[read_index];
> jbb_pos = 0;
> jbb_len = read_pos;
> break;
>