From: Date: December 11 2008 11:40am Subject: bzr commit into mysql-5.1 branch (ole.john.aske:3167) Bug#41301 List-Archive: http://lists.mysql.com/commits/61323 X-Bug: 41301 Message-Id: <20081211104035.9022F1BF@fimafeng09.norway.sun.com> MIME-Version: 1.0 Content-Type: text/plain; charset="us-ascii" Content-Transfer-Encoding: 7bit #At file:///net/vidar01/export/home/tmp/oleja/mysql/mysql-5.1-telco-6.4/ based on revid:jonas@stripped 3167 Ole John Aske 2008-12-11 Commit for Bug#41301 after Jonas first review. modified: storage/ndb/src/common/transporter/TransporterRegistry.cpp storage/ndb/src/kernel/blocks/dbtup/DbtupRoutines.cpp storage/ndb/src/kernel/vm/Emulator.hpp storage/ndb/src/kernel/vm/mt.cpp === modified file 'storage/ndb/src/common/transporter/TransporterRegistry.cpp' --- a/storage/ndb/src/common/transporter/TransporterRegistry.cpp 2008-11-26 10:36:20 +0000 +++ b/storage/ndb/src/common/transporter/TransporterRegistry.cpp 2008-12-11 10:40:24 +0000 @@ -1085,6 +1085,7 @@ TransporterRegistry::get_tcp_data(TCP_Tr void TransporterRegistry::performReceive() { + bool hasReceived = false; #ifdef NDB_TCP_TRANSPORTER #if defined(HAVE_EPOLL_CREATE) if (likely(m_epoll_fd != -1)) @@ -1128,7 +1129,9 @@ TransporterRegistry::performReceive() if (t->hasReceiveData()) { - callbackObj->checkJobBuffer(); + if (hasReceived) + callbackObj->checkJobBuffer(); + hasReceived = true; Uint32 * ptr; Uint32 sz = t->getReceiveData(&ptr); callbackObj->transporter_recv_from(nodeId); @@ -1152,7 +1155,9 @@ TransporterRegistry::performReceive() { if(t->isConnected() && t->checkConnected()) { - callbackObj->checkJobBuffer(); + if (hasReceived) + callbackObj->checkJobBuffer(); + hasReceived = true; Uint32 * readPtr, * eodPtr; t->getReceivePtr(&readPtr, &eodPtr); callbackObj->transporter_recv_from(nodeId); @@ -1170,7 +1175,9 @@ TransporterRegistry::performReceive() if(is_connected(nodeId)){ if(t->isConnected() && t->checkConnected()) { - callbackObj->checkJobBuffer(); + if (hasReceived) + callbackObj->checkJobBuffer(); + hasReceived = true; Uint32 * readPtr, * eodPtr; t->getReceivePtr(&readPtr, &eodPtr); callbackObj->transporter_recv_from(nodeId); === modified file 'storage/ndb/src/kernel/blocks/dbtup/DbtupRoutines.cpp' --- a/storage/ndb/src/kernel/blocks/dbtup/DbtupRoutines.cpp 2008-11-03 08:34:28 +0000 +++ b/storage/ndb/src/kernel/blocks/dbtup/DbtupRoutines.cpp 2008-12-11 10:40:24 +0000 @@ -479,16 +479,26 @@ Dbtup::readFixedSizeTHTwoWordNotNULL(Uin } } -static -inline + +static inline void -zero32(Uint8* dstPtr, Uint32 len) +zero32(Uint8* dstPtr, const Uint32 len) { - while ((len & 3) != 0) + Uint32 odd = len & 3; + if (odd != 0) { - dstPtr[len++] = 0; + Uint32 aligned = len & ~3; + Uint8* dst = dstPtr+aligned; + switch(odd){ /* odd is: {1..3} */ + case 1: + dst[1] = 0; + case 2: + dst[2] = 0; + default: /* Known to be odd==3 */ + dst[3] = 0; + } } -} +} bool Dbtup::readFixedSizeTHManyWordNotNULL(Uint8* outBuffer, === modified file 'storage/ndb/src/kernel/vm/Emulator.hpp' --- a/storage/ndb/src/kernel/vm/Emulator.hpp 2008-01-01 12:45:11 +0000 +++ b/storage/ndb/src/kernel/vm/Emulator.hpp 2008-12-11 10:40:24 +0000 @@ -31,7 +31,6 @@ extern class TimeQueue global extern class FastScheduler globalScheduler; extern class TransporterRegistry globalTransporterRegistry; extern struct GlobalData globalData; -extern struct thr_repository g_thr_repository; #ifdef VM_TRACE extern class SignalLoggerManager globalSignalLoggers; === modified file 'storage/ndb/src/kernel/vm/mt.cpp' --- a/storage/ndb/src/kernel/vm/mt.cpp 2008-11-15 15:43:59 +0000 +++ b/storage/ndb/src/kernel/vm/mt.cpp 2008-12-11 10:40:24 +0000 @@ -131,7 +131,7 @@ struct thr_wait volatile unsigned m_futex_state; enum { FS_RUNNING = 0, - FS_SLEEPING = 1, + FS_SLEEPING = 1 }; thr_wait() { xcng(&m_futex_state, FS_RUNNING);} void init () {} @@ -144,10 +144,10 @@ struct thr_wait * if that returns true will it actually sleep, else it will return * immediately. This is needed to avoid races with wakeup. */ -static +static inline void yield(struct thr_wait* wait, const struct timespec *timeout, - bool (*check_callback)(void *), void *check_arg) + bool (*check_callback)(struct thr_data *), struct thr_data *check_arg) { volatile unsigned * val = &wait->m_futex_state; #ifndef NDEBUG @@ -173,7 +173,7 @@ yield(struct thr_wait* wait, const struc xcng(val, thr_wait::FS_RUNNING); } -static +static inline int wakeup(struct thr_wait* wait) { @@ -195,12 +195,11 @@ wakeup(struct thr_wait* wait) struct thr_wait { + bool m_need_wakeup; + NdbMutex *m_mutex; NdbCondition *m_cond; - thr_wait() { - m_mutex = 0; - m_cond = 0; - } + thr_wait() : m_mutex(0), m_cond(0), m_need_wakeup(false) {} void init() { m_mutex = NdbMutex_Create(); @@ -208,29 +207,43 @@ struct thr_wait } }; -static +static inline void yield(struct thr_wait* wait, const struct timespec *timeout, - bool (*check_callback)(void *), void *check_arg) + bool (*check_callback)(struct thr_data *), struct thr_data *check_arg) { Uint32 msec = (1000 * timeout->tv_sec) + (timeout->tv_nsec / 1000000); NdbMutex_Lock(wait->m_mutex); - if ((*check_callback)(check_arg)) - NdbCondition_WaitTimeout(wait->m_cond, wait->m_mutex, msec); + while ((*check_callback)(check_arg)) /* May have spurious wakeups: Always recheck condition predicate */ + { + wait->m_need_wakeup = true; + if (NdbCondition_WaitTimeout(wait->m_cond, wait->m_mutex, msec) == ETIMEDOUT) + { + wait->m_need_wakeup = false; + break; + } + } NdbMutex_Unlock(wait->m_mutex); } -static + +static inline int wakeup(struct thr_wait* wait) { NdbMutex_Lock(wait->m_mutex); - NdbCondition_Signal(wait->m_cond); + // We should avoid signaling when not waiting for wakeup + if (wait->m_need_wakeup) + { + wait->m_need_wakeup = false; + NdbCondition_Signal(wait->m_cond); + } NdbMutex_Unlock(wait->m_mutex); return 0; } + #endif static inline void @@ -489,12 +502,27 @@ struct thr_job_buffer // 32k Uint32 m_data[SIZE]; }; + +/** + * thr_job_queue is shared between consumer / producer. + * + * The hot-spot of the thr_job_queue are the read/write indexes. + * As they are updated and read frequently they have been placed + * in its own thr_job_queue_head[] in order to make them fit inside a + * single/few cache lines and thereby avoid complete L1-cache replacement + * every time the job_queue is scanned. + */ +struct thr_job_queue_head +{ + unsigned m_read_index; // Read/written by consumer, read by producer + unsigned m_write_index; // Read/written by producer, read by consumer +}; + struct thr_job_queue { static const unsigned SIZE = 30; - unsigned m_read_index; // Read/written by consumer, read by producer - unsigned m_write_index; // Read/written by producer, read by consumer + struct thr_job_queue_head* m_head; struct thr_job_buffer* m_buffers[SIZE]; }; @@ -561,8 +589,15 @@ struct thr_jb_read_state * execution loop and used to determine when the end of available signals is * reached. */ - Uint32 m_write_index; - Uint32 m_write_pos; + Uint32 m_read_end; // End within current thr_job_buffer. (*m_read_buffer) + + Uint32 m_write_index; // Last available thr_job_buffer. + + bool is_empty() const + { +// assert (m_read_index != m_write_index || m_read_pos <= m_read_end); + return (m_read_index == m_write_index) && (m_read_pos >= m_read_end); + } }; /** @@ -662,6 +697,7 @@ struct thr_data struct thr_tq m_tq; /* Prio A signal incoming queue. */ + struct thr_job_queue_head m_jba_head; struct thr_job_queue m_jba; struct thr_spin_lock m_jba_write_lock; /* @@ -692,6 +728,7 @@ struct thr_data * These are the thread input queues, where other threads deliver signals * into. */ + struct thr_job_queue_head m_in_queue_head[MAX_THREADS]; struct thr_job_queue m_in_queue[MAX_THREADS]; /* These are the write states of m_in_queue[self] in each thread. */ struct thr_jb_write_state m_write_states[MAX_THREADS]; @@ -763,6 +800,7 @@ struct trp_callback : public Transporter }; extern trp_callback g_trp_callback; // Forward declaration +extern struct thr_repository g_thr_repository; struct thr_repository { @@ -1033,7 +1071,7 @@ handle_time_wrap(struct thr_data* selfpt } } -static +static inline void scan_time_queues(struct thr_data* selfptr) { @@ -1204,10 +1242,33 @@ senddelay(Uint32 thr_no, const SignalHea /* * Flush the write state to the job queue, making any new signals available to * receiving threads. + * + * Two versions: + * - The general version flush_write_state_other() which may flush to any thread, + * and possibly signal any waiters. + * - The special version flush_write_state_self() which should only be used + * to flush messages to itself. + * + * Call to these functions are encapsulated through flush_write_state which decides + * which of these functions to call. */ static inline void -flush_write_state(Uint32 dst, thr_job_queue *q, thr_jb_write_state *w) +flush_write_state_self(thr_job_queue_head *q_head, thr_jb_write_state *w) +{ + /* + * Can simplify the flush_write_state when writing to myself: + * Simply update write references wo/ mutex, memory barrier and signaling + */ + w->m_write_buffer->m_len = w->m_write_pos; + q_head->m_write_index = w->m_write_index; + w->m_pending_signals_wakeup = 0; + w->m_pending_signals = 0; +} + +static inline +void +flush_write_state_other(thr_data *dstptr, thr_job_queue_head *q_head, thr_jb_write_state *w) { /* * Two write memory barriers here, as assigning m_len may make signal data @@ -1219,20 +1280,37 @@ flush_write_state(Uint32 dst, thr_job_qu * * But wmb() is a no-op anyway in x86 ... */ + wmb(); w->m_write_buffer->m_len = w->m_write_pos; wmb(); - q->m_write_index = w->m_write_index; + q_head->m_write_index = w->m_write_index; + w->m_pending_signals_wakeup += w->m_pending_signals; w->m_pending_signals = 0; if (w->m_pending_signals_wakeup >= MAX_SIGNALS_BEFORE_WAKEUP) { w->m_pending_signals_wakeup = 0; - wakeup(&(g_thr_repository.m_thread[dst].m_waiter)); + wakeup(&(dstptr->m_waiter)); + } +} + +static inline +void +flush_write_state(const thr_data *selfptr, thr_data *dstptr, thr_job_queue_head *q_head, thr_jb_write_state *w) +{ + if (dstptr == selfptr) + { + flush_write_state_self(q_head, w); + } + else + { + flush_write_state_other(dstptr, q_head, w); } } + static void flush_jbb_write_state(thr_data *selfptr) @@ -1240,14 +1318,15 @@ flush_jbb_write_state(thr_data *selfptr) Uint32 thr_count = g_thr_repository.m_thread_count; Uint32 self = selfptr->m_thr_no; - for (Uint32 thr_no = 0; thr_no < thr_count; thr_no ++) + thr_jb_write_state *w = selfptr->m_write_states; + thr_data *thrptr = g_thr_repository.m_thread; + for (Uint32 thr_no = 0; thr_no < thr_count; thr_no++, thrptr++, w++) { - thr_jb_write_state *w = selfptr->m_write_states + thr_no; if (w->m_pending_signals || w->m_pending_signals_wakeup) { w->m_pending_signals_wakeup = MAX_SIGNALS_BEFORE_WAKEUP; - thr_job_queue *q = g_thr_repository.m_thread[thr_no].m_in_queue + self; - flush_write_state(thr_no, q, w); + thr_job_queue_head *q_head = thrptr->m_in_queue_head + self; + flush_write_state(selfptr, thrptr, q_head, w); } } } @@ -1259,10 +1338,11 @@ flush_jbb_write_state(thr_data *selfptr) static int check_job_buffers(struct thr_repository* rep) { - for (unsigned i = 0; im_thread; + for (unsigned i = 0; im_thread+i; - for (unsigned j = 0; jm_in_queue_head; + for (unsigned j = 0; jm_in_queue[j].m_read_index; - unsigned wi = thrptr->m_in_queue[j].m_write_index; + unsigned ri = q_head->m_read_index; + unsigned wi = q_head->m_write_index; unsigned busy = (wi >= ri) ? wi - ri : (thr_job_queue::SIZE - ri) + wi; if (4*busy >= thr_job_queue::SIZE) { @@ -1386,6 +1466,15 @@ trp_callback::checkJobBuffer() * except for QMGR open/close connection, but that is not * (i think) sufficient to create a deadlock */ + + /** FIXME: + * On a CMT chip where #CPU >= #NDB-threads sched_yield() is effectively a NOOP as + * there will normally be an idle CPU available to immediately resume thread execution. + * On a Niagara chip this may severely impact performance as the CPUs are virtualized + * by timemultiplexing the physical core. The thread should really be 'parked' on + * a condition to free its execution resources. + */ +// usleep(a-few-usec); /* A micro-sleep would likely have been better... */ sched_yield(); } while (check_job_buffers(rep)); } @@ -1941,7 +2030,7 @@ insert_signal(thr_job_queue *q, thr_jb_w * * Or alternatively, ndbrequire() ? */ - assert(write_index != q->m_read_index); + assert(write_index != q->m_head->m_read_index); new_buffer->m_len = 0; new_buffer->m_prioa = prioa; q->m_buffers[write_index] = new_buffer; @@ -1958,44 +2047,47 @@ static void read_jbb_state(thr_data *selfptr, Uint32 count) { - for (Uint32 i = 0; i < count; i++) - { - thr_jb_read_state *r = selfptr->m_read_states + i; - const thr_job_queue *q = selfptr->m_in_queue +i; - Uint32 index = q->m_write_index; - r->m_write_index = index; - read_barrier_depends(); - r->m_write_pos = q->m_buffers[index]->m_len; + + thr_jb_read_state *r = selfptr->m_read_states; + const thr_job_queue *q = selfptr->m_in_queue; + for (Uint32 i = 0; i < count; i++,r++,q++) + { + Uint32 read_index = r->m_read_index; + + if (r->m_write_index == read_index) // Optimization: Only reload when possibly empty. + { // (Avoid cache reload of shared thr_job_queue_head) + r->m_write_index = q->m_head->m_write_index; + read_barrier_depends(); + r->m_read_end = q->m_buffers[read_index]->m_len; + } } } static -void +bool read_jba_state(thr_data *selfptr) { - const thr_job_queue *jba = &(selfptr->m_jba); - Uint32 index = jba->m_write_index; - selfptr->m_jba_read_state.m_write_index = index; + thr_jb_read_state *r = &(selfptr->m_jba_read_state); + r->m_write_index = selfptr->m_jba_head.m_write_index; read_barrier_depends(); - selfptr->m_jba_read_state.m_write_pos = jba->m_buffers[index]->m_len; + r->m_read_end = selfptr->m_jba.m_buffers[r->m_read_index]->m_len; + return r->is_empty(); } /* Check all job queues, return true only if all are empty. */ static bool -check_queues_empty(void *data) +check_queues_empty(thr_data *selfptr) { Uint32 thr_count = g_thr_repository.m_thread_count; - thr_data *selfptr = reinterpret_cast(data); + bool empty = read_jba_state(selfptr); + if (!empty) + return false; read_jbb_state(selfptr, thr_count); - read_jba_state(selfptr); - const thr_jb_read_state *r = &(selfptr->m_jba_read_state); - if (r->m_read_index < r->m_write_index || r->m_read_pos < r->m_write_pos) - return false; - for (Uint32 i = 0; i < thr_count; i++) + const thr_jb_read_state *r = selfptr->m_read_states; + for (Uint32 i = 0; i < thr_count; i++,r++) { - r = selfptr->m_read_states + i;; - if (r->m_read_index < r->m_write_index || r->m_read_pos < r->m_write_pos) + if (!r->is_empty()) return false; } return true; @@ -2049,19 +2141,20 @@ execute_signals(thr_data *selfptr, thr_j Signal *sig, Uint32 max_signals, Uint32 *watchDogCounter, Uint32 *signalIdCounter) { - Uint32 num_signals = 0; - + Uint32 num_signals; Uint32 read_index = r->m_read_index; Uint32 write_index = r->m_write_index; Uint32 read_pos = r->m_read_pos; - Uint32 write_pos = (read_index == write_index ? - r->m_write_pos : - q->m_buffers[read_index]->m_len); + Uint32 read_end = r->m_read_end; + + if (read_index == write_index && read_pos >= read_end) + return 0; // empty read_state + thr_job_buffer *read_buffer = r->m_read_buffer; - while (num_signals < max_signals) + for (num_signals = 0; num_signals < max_signals; num_signals++) { - while (read_pos >= write_pos) + while (read_pos >= read_end) { if (read_index == write_index) { @@ -2075,13 +2168,12 @@ execute_signals(thr_data *selfptr, thr_j release_buffer(&g_thr_repository, selfptr->m_thr_no, read_buffer); read_buffer = q->m_buffers[read_index]; read_pos = 0; - write_pos = (read_index == write_index ? - r->m_write_pos : - q->m_buffers[read_index]->m_len); + read_end = read_buffer->m_len; /* Update thread-local read state. */ - r->m_read_index = q->m_read_index = read_index; + r->m_read_index = q->m_head->m_read_index = read_index; r->m_read_buffer = read_buffer; r->m_read_pos = read_pos; + r->m_read_end = read_end; } } @@ -2090,20 +2182,18 @@ execute_signals(thr_data *selfptr, thr_j * (Though on Intel Core 2, they do not give much speedup, as apparently * the hardware prefetcher is already doing a fairly good job). */ -#ifdef __GNUC__ - __builtin_prefetch (read_buffer->m_data + read_pos + 16, 0, 3); - __builtin_prefetch ((Uint32 *)&sig->header + 16, 1, 3); -#endif + PREFETCH_READ (read_buffer->m_data + read_pos + 16); + PREFETCH_WRITE ((Uint32 *)&sig->header + 16); /* Now execute the signal. */ SignalHeader* s = reinterpret_cast(read_buffer->m_data + read_pos); Uint32 seccnt = s->m_noOfSections; Uint32 siglen = (sizeof(*s)>>2) + s->theLength; -#ifdef __GNUC__ if(siglen>16) - __builtin_prefetch (read_buffer->m_data + read_pos + 32, 0, 3); -#endif + { + PREFETCH_READ (read_buffer->m_data + read_pos + 32); + } Uint32 bno = blockToMain(s->theReceiversBlockNumber); Uint32 ino = map_instance(s); SimulatedBlock* block = globalData.getBlock(bno, ino); @@ -2143,8 +2233,6 @@ execute_signals(thr_data *selfptr, thr_j #endif block->executeFunction(gsn, sig); - - num_signals++; } return num_signals; @@ -2165,19 +2253,21 @@ run_job_buffers(thr_data *selfptr, Signa */ rmb(); - for (Uint32 send_thr_no = 0; send_thr_no < thr_count; send_thr_no++) + thr_job_queue *queue = selfptr->m_in_queue; + thr_jb_read_state *read_state = selfptr->m_read_states; + for (Uint32 send_thr_no = 0; send_thr_no < thr_count; send_thr_no++,queue++,read_state++) { /* Read the prio A state often, to avoid starvation of prio A. */ - read_jba_state(selfptr); - static Uint32 max_prioA = thr_job_queue::SIZE * thr_job_buffer::SIZE; - signal_count += execute_signals(selfptr, &(selfptr->m_jba), - &(selfptr->m_jba_read_state), sig, - max_prioA, watchDogCounter, - signalIdCounter); - + bool jba_empty = read_jba_state(selfptr); + if (!jba_empty) + { + static Uint32 max_prioA = thr_job_queue::SIZE * thr_job_buffer::SIZE; + signal_count += execute_signals(selfptr, &(selfptr->m_jba), + &(selfptr->m_jba_read_state), sig, + max_prioA, watchDogCounter, + signalIdCounter); + } /* Now execute prio B signals from one thread. */ - thr_job_queue *queue = selfptr->m_in_queue + send_thr_no; - thr_jb_read_state *read_state = selfptr->m_read_states + send_thr_no; signal_count += execute_signals(selfptr, queue, read_state, sig, MAX_SIGNALS_PER_JB, watchDogCounter, signalIdCounter); @@ -2430,6 +2520,7 @@ mt_receiver_thread_main(void *thr_arg) unsigned thr_no = selfptr->m_thr_no; Uint32& watchDogCounter = selfptr->m_watchdog_counter; Uint32 thrSignalId = 0; + bool has_received = false; init_thread(selfptr); receiverThreadId = thr_no; @@ -2454,7 +2545,7 @@ mt_receiver_thread_main(void *thr_arg) Uint32 sum = run_job_buffers(selfptr, signal, &watchDogCounter, &thrSignalId); - if (sum) + if (sum || has_received) { watchDogCounter = 6; flush_jbb_write_state(selfptr); @@ -2464,18 +2555,18 @@ mt_receiver_thread_main(void *thr_arg) watchDogCounter = 7; + has_received = false; if (globalTransporterRegistry.pollReceive(1)) { if (check_job_buffers(rep) == 0) { watchDogCounter = 8; - lock(&rep->m_receive_lock); - globalTransporterRegistry.performReceive(); - unlock(&rep->m_receive_lock); + lock(&rep->m_receive_lock); + globalTransporterRegistry.performReceive(); + unlock(&rep->m_receive_lock); + has_received = true; } } - - flush_jbb_write_state(selfptr); } globalEmulatorData.theWatchDog->unregisterWatchedThread(thr_no); @@ -2483,6 +2574,7 @@ mt_receiver_thread_main(void *thr_arg) } static +inline void sendpacked(struct thr_data* thr_ptr, Signal* signal) { @@ -2586,20 +2678,20 @@ sendlocal(Uint32 self, const SignalHeade Uint32 dst = block2ThreadId(block, instance); struct thr_repository* rep = &g_thr_repository; struct thr_data * selfptr = rep->m_thread + self; + struct thr_data * dstptr = rep->m_thread + dst; selfptr->m_priob_count++; Uint32 siglen = (sizeof(*s) >> 2) + s->theLength + s->m_noOfSections; selfptr->m_priob_size += siglen; - thr_job_queue *q = rep->m_thread[dst].m_in_queue + self; + thr_job_queue *q = dstptr->m_in_queue + self; thr_jb_write_state *w = selfptr->m_write_states + dst; if (insert_signal(q, w, false, s, data, secPtr, selfptr->m_next_buffer)) { selfptr->m_next_buffer = seize_buffer(rep, self, false); } - if (w->m_pending_signals >= MAX_SIGNALS_BEFORE_FLUSH) - flush_write_state(dst, q, w); + flush_write_state(selfptr, dstptr, q->m_head, w); } void @@ -2611,7 +2703,7 @@ sendprioa(Uint32 self, const SignalHeade Uint32 dst = block2ThreadId(block, instance); struct thr_repository* rep = &g_thr_repository; - struct thr_data * selfptr = rep->m_thread + self; + struct thr_data *selfptr = rep->m_thread + self; struct thr_data *dstptr = rep->m_thread + dst; selfptr->m_prioa_count++; @@ -2623,7 +2715,7 @@ sendprioa(Uint32 self, const SignalHeade lock(&dstptr->m_jba_write_lock); - Uint32 index = q->m_write_index; + Uint32 index = q->m_head->m_write_index; w.m_write_index = index; thr_job_buffer *buffer = q->m_buffers[index]; w.m_write_buffer = buffer; @@ -2632,7 +2724,7 @@ sendprioa(Uint32 self, const SignalHeade w.m_pending_signals_wakeup = MAX_SIGNALS_BEFORE_WAKEUP; bool buf_used = insert_signal(q, &w, true, s, data, secPtr, selfptr->m_next_buffer); - flush_write_state(dst, q, &w); + flush_write_state(selfptr, dstptr, q->m_head, &w); unlock(&dstptr->m_jba_write_lock); @@ -2689,7 +2781,7 @@ mt_send_remote(Uint32 self, const Signal */ static void -sendprioa_STOP_FOR_CRASH(Uint32 dst) +sendprioa_STOP_FOR_CRASH(const struct thr_data *selfptr, Uint32 dst) { SignalT signalT; struct thr_repository* rep = &g_thr_repository; @@ -2738,7 +2830,7 @@ sendprioa_STOP_FOR_CRASH(Uint32 dst) lock(&dstptr->m_jba_write_lock); - Uint32 index = q->m_write_index; + Uint32 index = q->m_head->m_write_index; w.m_write_index = index; thr_job_buffer *buffer = q->m_buffers[index]; w.m_write_buffer = buffer; @@ -2747,7 +2839,7 @@ sendprioa_STOP_FOR_CRASH(Uint32 dst) w.m_pending_signals_wakeup = MAX_SIGNALS_BEFORE_WAKEUP; insert_signal(q, &w, true, &signalT.header, signalT.theData, NULL, &dummy_buffer); - flush_write_state(dst, q, &w); + flush_write_state(selfptr, dstptr, q->m_head, &w); unlock(&dstptr->m_jba_write_lock); } @@ -2777,31 +2869,32 @@ thr_init(struct thr_repository* rep, str selfptr->m_first_free = 0; selfptr->m_first_unused = 0; - selfptr->m_jba.m_read_index = 0; - selfptr->m_jba.m_write_index = 0; + selfptr->m_jba_head.m_read_index = 0; + selfptr->m_jba_head.m_write_index = 0; + selfptr->m_jba.m_head = &selfptr->m_jba_head; thr_job_buffer *buffer = seize_buffer(rep, thr_no, true); selfptr->m_jba.m_buffers[0] = buffer; selfptr->m_jba_read_state.m_read_index = 0; selfptr->m_jba_read_state.m_read_buffer = buffer; selfptr->m_jba_read_state.m_read_pos = 0; + selfptr->m_jba_read_state.m_read_end = 0; selfptr->m_jba_read_state.m_write_index = 0; - selfptr->m_jba_read_state.m_write_pos = 0; selfptr->m_next_buffer = seize_buffer(rep, thr_no, false); selfptr->m_send_buffer_pool.set_pool(&rep->m_free_list); for (i = 0; im_in_queue[i].m_read_index = 0; - selfptr->m_in_queue[i].m_write_index = 0; + selfptr->m_in_queue_head[i].m_read_index = 0; + selfptr->m_in_queue_head[i].m_write_index = 0; + selfptr->m_in_queue[i].m_head = &selfptr->m_in_queue_head[i]; buffer = seize_buffer(rep, thr_no, false); selfptr->m_in_queue[i].m_buffers[0] = buffer; selfptr->m_read_states[i].m_read_index = 0; selfptr->m_read_states[i].m_read_buffer = buffer; selfptr->m_read_states[i].m_read_pos = 0; + selfptr->m_read_states[i].m_read_end = 0; selfptr->m_read_states[i].m_write_index = 0; - selfptr->m_read_states[i].m_write_pos = 0; } - queue_init(&selfptr->m_tq); selfptr->m_prioa_count = 0; @@ -3167,7 +3260,7 @@ FastScheduler::traceDumpPrepare(NdbShutd continue; } - sendprioa_STOP_FOR_CRASH(thr_no); + sendprioa_STOP_FOR_CRASH(selfptr, thr_no); waitFor_count++; }