#At file:///net/vidar01/export/home/tmp/oleja/mysql/mysql-5.1-telco-6.4/ based on revid:jonas@stripped
3167 Ole John Aske 2008-12-11
Commit for Bug#41301 after Jonas first review.
modified:
storage/ndb/src/common/transporter/TransporterRegistry.cpp
storage/ndb/src/kernel/blocks/dbtup/DbtupRoutines.cpp
storage/ndb/src/kernel/vm/Emulator.hpp
storage/ndb/src/kernel/vm/mt.cpp
=== modified file 'storage/ndb/src/common/transporter/TransporterRegistry.cpp'
--- a/storage/ndb/src/common/transporter/TransporterRegistry.cpp 2008-11-26 10:36:20 +0000
+++ b/storage/ndb/src/common/transporter/TransporterRegistry.cpp 2008-12-11 10:40:24 +0000
@@ -1085,6 +1085,7 @@ TransporterRegistry::get_tcp_data(TCP_Tr
void
TransporterRegistry::performReceive()
{
+ bool hasReceived = false;
#ifdef NDB_TCP_TRANSPORTER
#if defined(HAVE_EPOLL_CREATE)
if (likely(m_epoll_fd != -1))
@@ -1128,7 +1129,9 @@ TransporterRegistry::performReceive()
if (t->hasReceiveData())
{
- callbackObj->checkJobBuffer();
+ if (hasReceived)
+ callbackObj->checkJobBuffer();
+ hasReceived = true;
Uint32 * ptr;
Uint32 sz = t->getReceiveData(&ptr);
callbackObj->transporter_recv_from(nodeId);
@@ -1152,7 +1155,9 @@ TransporterRegistry::performReceive()
{
if(t->isConnected() && t->checkConnected())
{
- callbackObj->checkJobBuffer();
+ if (hasReceived)
+ callbackObj->checkJobBuffer();
+ hasReceived = true;
Uint32 * readPtr, * eodPtr;
t->getReceivePtr(&readPtr, &eodPtr);
callbackObj->transporter_recv_from(nodeId);
@@ -1170,7 +1175,9 @@ TransporterRegistry::performReceive()
if(is_connected(nodeId)){
if(t->isConnected() && t->checkConnected())
{
- callbackObj->checkJobBuffer();
+ if (hasReceived)
+ callbackObj->checkJobBuffer();
+ hasReceived = true;
Uint32 * readPtr, * eodPtr;
t->getReceivePtr(&readPtr, &eodPtr);
callbackObj->transporter_recv_from(nodeId);
=== modified file 'storage/ndb/src/kernel/blocks/dbtup/DbtupRoutines.cpp'
--- a/storage/ndb/src/kernel/blocks/dbtup/DbtupRoutines.cpp 2008-11-03 08:34:28 +0000
+++ b/storage/ndb/src/kernel/blocks/dbtup/DbtupRoutines.cpp 2008-12-11 10:40:24 +0000
@@ -479,16 +479,26 @@ Dbtup::readFixedSizeTHTwoWordNotNULL(Uin
}
}
-static
-inline
+
+static inline
void
-zero32(Uint8* dstPtr, Uint32 len)
+zero32(Uint8* dstPtr, const Uint32 len)
{
- while ((len & 3) != 0)
+ Uint32 odd = len & 3;
+ if (odd != 0)
{
- dstPtr[len++] = 0;
+ Uint32 aligned = len & ~3;
+ Uint8* dst = dstPtr+aligned;
+ switch(odd){ /* odd is: {1..3} */
+ case 1:
+ dst[1] = 0;
+ case 2:
+ dst[2] = 0;
+ default: /* Known to be odd==3 */
+ dst[3] = 0;
+ }
}
-}
+}
bool
Dbtup::readFixedSizeTHManyWordNotNULL(Uint8* outBuffer,
=== modified file 'storage/ndb/src/kernel/vm/Emulator.hpp'
--- a/storage/ndb/src/kernel/vm/Emulator.hpp 2008-01-01 12:45:11 +0000
+++ b/storage/ndb/src/kernel/vm/Emulator.hpp 2008-12-11 10:40:24 +0000
@@ -31,7 +31,6 @@ extern class TimeQueue global
extern class FastScheduler globalScheduler;
extern class TransporterRegistry globalTransporterRegistry;
extern struct GlobalData globalData;
-extern struct thr_repository g_thr_repository;
#ifdef VM_TRACE
extern class SignalLoggerManager globalSignalLoggers;
=== modified file 'storage/ndb/src/kernel/vm/mt.cpp'
--- a/storage/ndb/src/kernel/vm/mt.cpp 2008-11-15 15:43:59 +0000
+++ b/storage/ndb/src/kernel/vm/mt.cpp 2008-12-11 10:40:24 +0000
@@ -131,7 +131,7 @@ struct thr_wait
volatile unsigned m_futex_state;
enum {
FS_RUNNING = 0,
- FS_SLEEPING = 1,
+ FS_SLEEPING = 1
};
thr_wait() { xcng(&m_futex_state, FS_RUNNING);}
void init () {}
@@ -144,10 +144,10 @@ struct thr_wait
* if that returns true will it actually sleep, else it will return
* immediately. This is needed to avoid races with wakeup.
*/
-static
+static inline
void
yield(struct thr_wait* wait, const struct timespec *timeout,
- bool (*check_callback)(void *), void *check_arg)
+ bool (*check_callback)(struct thr_data *), struct thr_data *check_arg)
{
volatile unsigned * val = &wait->m_futex_state;
#ifndef NDEBUG
@@ -173,7 +173,7 @@ yield(struct thr_wait* wait, const struc
xcng(val, thr_wait::FS_RUNNING);
}
-static
+static inline
int
wakeup(struct thr_wait* wait)
{
@@ -195,12 +195,11 @@ wakeup(struct thr_wait* wait)
struct thr_wait
{
+ bool m_need_wakeup;
+
NdbMutex *m_mutex;
NdbCondition *m_cond;
- thr_wait() {
- m_mutex = 0;
- m_cond = 0;
- }
+ thr_wait() : m_mutex(0), m_cond(0), m_need_wakeup(false) {}
void init() {
m_mutex = NdbMutex_Create();
@@ -208,29 +207,43 @@ struct thr_wait
}
};
-static
+static inline
void
yield(struct thr_wait* wait, const struct timespec *timeout,
- bool (*check_callback)(void *), void *check_arg)
+ bool (*check_callback)(struct thr_data *), struct thr_data *check_arg)
{
Uint32 msec =
(1000 * timeout->tv_sec) +
(timeout->tv_nsec / 1000000);
NdbMutex_Lock(wait->m_mutex);
- if ((*check_callback)(check_arg))
- NdbCondition_WaitTimeout(wait->m_cond, wait->m_mutex, msec);
+ while ((*check_callback)(check_arg)) /* May have spurious wakeups: Always recheck condition predicate */
+ {
+ wait->m_need_wakeup = true;
+ if (NdbCondition_WaitTimeout(wait->m_cond, wait->m_mutex, msec) == ETIMEDOUT)
+ {
+ wait->m_need_wakeup = false;
+ break;
+ }
+ }
NdbMutex_Unlock(wait->m_mutex);
}
-static
+
+static inline
int
wakeup(struct thr_wait* wait)
{
NdbMutex_Lock(wait->m_mutex);
- NdbCondition_Signal(wait->m_cond);
+ // We should avoid signaling when not waiting for wakeup
+ if (wait->m_need_wakeup)
+ {
+ wait->m_need_wakeup = false;
+ NdbCondition_Signal(wait->m_cond);
+ }
NdbMutex_Unlock(wait->m_mutex);
return 0;
}
+
#endif
static inline void
@@ -489,12 +502,27 @@ struct thr_job_buffer // 32k
Uint32 m_data[SIZE];
};
+
+/**
+ * thr_job_queue is shared between consumer / producer.
+ *
+ * The hot-spot of the thr_job_queue are the read/write indexes.
+ * As they are updated and read frequently they have been placed
+ * in its own thr_job_queue_head[] in order to make them fit inside a
+ * single/few cache lines and thereby avoid complete L1-cache replacement
+ * every time the job_queue is scanned.
+ */
+struct thr_job_queue_head
+{
+ unsigned m_read_index; // Read/written by consumer, read by producer
+ unsigned m_write_index; // Read/written by producer, read by consumer
+};
+
struct thr_job_queue
{
static const unsigned SIZE = 30;
- unsigned m_read_index; // Read/written by consumer, read by producer
- unsigned m_write_index; // Read/written by producer, read by consumer
+ struct thr_job_queue_head* m_head;
struct thr_job_buffer* m_buffers[SIZE];
};
@@ -561,8 +589,15 @@ struct thr_jb_read_state
* execution loop and used to determine when the end of available signals is
* reached.
*/
- Uint32 m_write_index;
- Uint32 m_write_pos;
+ Uint32 m_read_end; // End within current thr_job_buffer. (*m_read_buffer)
+
+ Uint32 m_write_index; // Last available thr_job_buffer.
+
+ bool is_empty() const
+ {
+// assert (m_read_index != m_write_index || m_read_pos <= m_read_end);
+ return (m_read_index == m_write_index) && (m_read_pos >= m_read_end);
+ }
};
/**
@@ -662,6 +697,7 @@ struct thr_data
struct thr_tq m_tq;
/* Prio A signal incoming queue. */
+ struct thr_job_queue_head m_jba_head;
struct thr_job_queue m_jba;
struct thr_spin_lock m_jba_write_lock;
/*
@@ -692,6 +728,7 @@ struct thr_data
* These are the thread input queues, where other threads deliver signals
* into.
*/
+ struct thr_job_queue_head m_in_queue_head[MAX_THREADS];
struct thr_job_queue m_in_queue[MAX_THREADS];
/* These are the write states of m_in_queue[self] in each thread. */
struct thr_jb_write_state m_write_states[MAX_THREADS];
@@ -763,6 +800,7 @@ struct trp_callback : public Transporter
};
extern trp_callback g_trp_callback; // Forward declaration
+extern struct thr_repository g_thr_repository;
struct thr_repository
{
@@ -1033,7 +1071,7 @@ handle_time_wrap(struct thr_data* selfpt
}
}
-static
+static inline
void
scan_time_queues(struct thr_data* selfptr)
{
@@ -1204,10 +1242,33 @@ senddelay(Uint32 thr_no, const SignalHea
/*
* Flush the write state to the job queue, making any new signals available to
* receiving threads.
+ *
+ * Two versions:
+ * - The general version flush_write_state_other() which may flush to any thread,
+ * and possibly signal any waiters.
+ * - The special version flush_write_state_self() which should only be used
+ * to flush messages to itself.
+ *
+ * Call to these functions are encapsulated through flush_write_state which decides
+ * which of these functions to call.
*/
static inline
void
-flush_write_state(Uint32 dst, thr_job_queue *q, thr_jb_write_state *w)
+flush_write_state_self(thr_job_queue_head *q_head, thr_jb_write_state *w)
+{
+ /*
+ * Can simplify the flush_write_state when writing to myself:
+ * Simply update write references wo/ mutex, memory barrier and signaling
+ */
+ w->m_write_buffer->m_len = w->m_write_pos;
+ q_head->m_write_index = w->m_write_index;
+ w->m_pending_signals_wakeup = 0;
+ w->m_pending_signals = 0;
+}
+
+static inline
+void
+flush_write_state_other(thr_data *dstptr, thr_job_queue_head *q_head, thr_jb_write_state *w)
{
/*
* Two write memory barriers here, as assigning m_len may make signal data
@@ -1219,20 +1280,37 @@ flush_write_state(Uint32 dst, thr_job_qu
*
* But wmb() is a no-op anyway in x86 ...
*/
+
wmb();
w->m_write_buffer->m_len = w->m_write_pos;
wmb();
- q->m_write_index = w->m_write_index;
+ q_head->m_write_index = w->m_write_index;
+
w->m_pending_signals_wakeup += w->m_pending_signals;
w->m_pending_signals = 0;
if (w->m_pending_signals_wakeup >= MAX_SIGNALS_BEFORE_WAKEUP)
{
w->m_pending_signals_wakeup = 0;
- wakeup(&(g_thr_repository.m_thread[dst].m_waiter));
+ wakeup(&(dstptr->m_waiter));
+ }
+}
+
+static inline
+void
+flush_write_state(const thr_data *selfptr, thr_data *dstptr, thr_job_queue_head *q_head, thr_jb_write_state *w)
+{
+ if (dstptr == selfptr)
+ {
+ flush_write_state_self(q_head, w);
+ }
+ else
+ {
+ flush_write_state_other(dstptr, q_head, w);
}
}
+
static
void
flush_jbb_write_state(thr_data *selfptr)
@@ -1240,14 +1318,15 @@ flush_jbb_write_state(thr_data *selfptr)
Uint32 thr_count = g_thr_repository.m_thread_count;
Uint32 self = selfptr->m_thr_no;
- for (Uint32 thr_no = 0; thr_no < thr_count; thr_no ++)
+ thr_jb_write_state *w = selfptr->m_write_states;
+ thr_data *thrptr = g_thr_repository.m_thread;
+ for (Uint32 thr_no = 0; thr_no < thr_count; thr_no++, thrptr++, w++)
{
- thr_jb_write_state *w = selfptr->m_write_states + thr_no;
if (w->m_pending_signals || w->m_pending_signals_wakeup)
{
w->m_pending_signals_wakeup = MAX_SIGNALS_BEFORE_WAKEUP;
- thr_job_queue *q = g_thr_repository.m_thread[thr_no].m_in_queue + self;
- flush_write_state(thr_no, q, w);
+ thr_job_queue_head *q_head = thrptr->m_in_queue_head + self;
+ flush_write_state(selfptr, thrptr, q_head, w);
}
}
}
@@ -1259,10 +1338,11 @@ flush_jbb_write_state(thr_data *selfptr)
static int
check_job_buffers(struct thr_repository* rep)
{
- for (unsigned i = 0; i<num_threads; i++)
+ thr_data *thrptr = rep->m_thread;
+ for (unsigned i = 0; i<num_threads; i++, thrptr++)
{
- thr_data * thrptr = rep->m_thread+i;
- for (unsigned j = 0; j<num_threads; j++)
+ const thr_job_queue_head *q_head = thrptr->m_in_queue_head;
+ for (unsigned j = 0; j<num_threads; j++,q_head++)
{
/**
* These values are read wo/ locks...
@@ -1276,8 +1356,8 @@ check_job_buffers(struct thr_repository*
* conservative (i.e it can be better than guess, if read-index has
* moved but we didnt see it)
*/
- unsigned ri = thrptr->m_in_queue[j].m_read_index;
- unsigned wi = thrptr->m_in_queue[j].m_write_index;
+ unsigned ri = q_head->m_read_index;
+ unsigned wi = q_head->m_write_index;
unsigned busy = (wi >= ri) ? wi - ri : (thr_job_queue::SIZE - ri) + wi;
if (4*busy >= thr_job_queue::SIZE)
{
@@ -1386,6 +1466,15 @@ trp_callback::checkJobBuffer()
* except for QMGR open/close connection, but that is not
* (i think) sufficient to create a deadlock
*/
+
+ /** FIXME:
+ * On a CMT chip where #CPU >= #NDB-threads sched_yield() is effectively a NOOP as
+ * there will normally be an idle CPU available to immediately resume thread execution.
+ * On a Niagara chip this may severely impact performance as the CPUs are virtualized
+ * by timemultiplexing the physical core. The thread should really be 'parked' on
+ * a condition to free its execution resources.
+ */
+// usleep(a-few-usec); /* A micro-sleep would likely have been better... */
sched_yield();
} while (check_job_buffers(rep));
}
@@ -1941,7 +2030,7 @@ insert_signal(thr_job_queue *q, thr_jb_w
*
* Or alternatively, ndbrequire() ?
*/
- assert(write_index != q->m_read_index);
+ assert(write_index != q->m_head->m_read_index);
new_buffer->m_len = 0;
new_buffer->m_prioa = prioa;
q->m_buffers[write_index] = new_buffer;
@@ -1958,44 +2047,47 @@ static
void
read_jbb_state(thr_data *selfptr, Uint32 count)
{
- for (Uint32 i = 0; i < count; i++)
- {
- thr_jb_read_state *r = selfptr->m_read_states + i;
- const thr_job_queue *q = selfptr->m_in_queue +i;
- Uint32 index = q->m_write_index;
- r->m_write_index = index;
- read_barrier_depends();
- r->m_write_pos = q->m_buffers[index]->m_len;
+
+ thr_jb_read_state *r = selfptr->m_read_states;
+ const thr_job_queue *q = selfptr->m_in_queue;
+ for (Uint32 i = 0; i < count; i++,r++,q++)
+ {
+ Uint32 read_index = r->m_read_index;
+
+ if (r->m_write_index == read_index) // Optimization: Only reload when possibly empty.
+ { // (Avoid cache reload of shared thr_job_queue_head)
+ r->m_write_index = q->m_head->m_write_index;
+ read_barrier_depends();
+ r->m_read_end = q->m_buffers[read_index]->m_len;
+ }
}
}
static
-void
+bool
read_jba_state(thr_data *selfptr)
{
- const thr_job_queue *jba = &(selfptr->m_jba);
- Uint32 index = jba->m_write_index;
- selfptr->m_jba_read_state.m_write_index = index;
+ thr_jb_read_state *r = &(selfptr->m_jba_read_state);
+ r->m_write_index = selfptr->m_jba_head.m_write_index;
read_barrier_depends();
- selfptr->m_jba_read_state.m_write_pos = jba->m_buffers[index]->m_len;
+ r->m_read_end = selfptr->m_jba.m_buffers[r->m_read_index]->m_len;
+ return r->is_empty();
}
/* Check all job queues, return true only if all are empty. */
static bool
-check_queues_empty(void *data)
+check_queues_empty(thr_data *selfptr)
{
Uint32 thr_count = g_thr_repository.m_thread_count;
- thr_data *selfptr = reinterpret_cast<thr_data *>(data);
+ bool empty = read_jba_state(selfptr);
+ if (!empty)
+ return false;
read_jbb_state(selfptr, thr_count);
- read_jba_state(selfptr);
- const thr_jb_read_state *r = &(selfptr->m_jba_read_state);
- if (r->m_read_index < r->m_write_index || r->m_read_pos < r->m_write_pos)
- return false;
- for (Uint32 i = 0; i < thr_count; i++)
+ const thr_jb_read_state *r = selfptr->m_read_states;
+ for (Uint32 i = 0; i < thr_count; i++,r++)
{
- r = selfptr->m_read_states + i;;
- if (r->m_read_index < r->m_write_index || r->m_read_pos < r->m_write_pos)
+ if (!r->is_empty())
return false;
}
return true;
@@ -2049,19 +2141,20 @@ execute_signals(thr_data *selfptr, thr_j
Signal *sig, Uint32 max_signals,
Uint32 *watchDogCounter, Uint32 *signalIdCounter)
{
- Uint32 num_signals = 0;
-
+ Uint32 num_signals;
Uint32 read_index = r->m_read_index;
Uint32 write_index = r->m_write_index;
Uint32 read_pos = r->m_read_pos;
- Uint32 write_pos = (read_index == write_index ?
- r->m_write_pos :
- q->m_buffers[read_index]->m_len);
+ Uint32 read_end = r->m_read_end;
+
+ if (read_index == write_index && read_pos >= read_end)
+ return 0; // empty read_state
+
thr_job_buffer *read_buffer = r->m_read_buffer;
- while (num_signals < max_signals)
+ for (num_signals = 0; num_signals < max_signals; num_signals++)
{
- while (read_pos >= write_pos)
+ while (read_pos >= read_end)
{
if (read_index == write_index)
{
@@ -2075,13 +2168,12 @@ execute_signals(thr_data *selfptr, thr_j
release_buffer(&g_thr_repository, selfptr->m_thr_no, read_buffer);
read_buffer = q->m_buffers[read_index];
read_pos = 0;
- write_pos = (read_index == write_index ?
- r->m_write_pos :
- q->m_buffers[read_index]->m_len);
+ read_end = read_buffer->m_len;
/* Update thread-local read state. */
- r->m_read_index = q->m_read_index = read_index;
+ r->m_read_index = q->m_head->m_read_index = read_index;
r->m_read_buffer = read_buffer;
r->m_read_pos = read_pos;
+ r->m_read_end = read_end;
}
}
@@ -2090,20 +2182,18 @@ execute_signals(thr_data *selfptr, thr_j
* (Though on Intel Core 2, they do not give much speedup, as apparently
* the hardware prefetcher is already doing a fairly good job).
*/
-#ifdef __GNUC__
- __builtin_prefetch (read_buffer->m_data + read_pos + 16, 0, 3);
- __builtin_prefetch ((Uint32 *)&sig->header + 16, 1, 3);
-#endif
+ PREFETCH_READ (read_buffer->m_data + read_pos + 16);
+ PREFETCH_WRITE ((Uint32 *)&sig->header + 16);
/* Now execute the signal. */
SignalHeader* s =
reinterpret_cast<SignalHeader*>(read_buffer->m_data + read_pos);
Uint32 seccnt = s->m_noOfSections;
Uint32 siglen = (sizeof(*s)>>2) + s->theLength;
-#ifdef __GNUC__
if(siglen>16)
- __builtin_prefetch (read_buffer->m_data + read_pos + 32, 0, 3);
-#endif
+ {
+ PREFETCH_READ (read_buffer->m_data + read_pos + 32);
+ }
Uint32 bno = blockToMain(s->theReceiversBlockNumber);
Uint32 ino = map_instance(s);
SimulatedBlock* block = globalData.getBlock(bno, ino);
@@ -2143,8 +2233,6 @@ execute_signals(thr_data *selfptr, thr_j
#endif
block->executeFunction(gsn, sig);
-
- num_signals++;
}
return num_signals;
@@ -2165,19 +2253,21 @@ run_job_buffers(thr_data *selfptr, Signa
*/
rmb();
- for (Uint32 send_thr_no = 0; send_thr_no < thr_count; send_thr_no++)
+ thr_job_queue *queue = selfptr->m_in_queue;
+ thr_jb_read_state *read_state = selfptr->m_read_states;
+ for (Uint32 send_thr_no = 0; send_thr_no < thr_count; send_thr_no++,queue++,read_state++)
{
/* Read the prio A state often, to avoid starvation of prio A. */
- read_jba_state(selfptr);
- static Uint32 max_prioA = thr_job_queue::SIZE * thr_job_buffer::SIZE;
- signal_count += execute_signals(selfptr, &(selfptr->m_jba),
- &(selfptr->m_jba_read_state), sig,
- max_prioA, watchDogCounter,
- signalIdCounter);
-
+ bool jba_empty = read_jba_state(selfptr);
+ if (!jba_empty)
+ {
+ static Uint32 max_prioA = thr_job_queue::SIZE * thr_job_buffer::SIZE;
+ signal_count += execute_signals(selfptr, &(selfptr->m_jba),
+ &(selfptr->m_jba_read_state), sig,
+ max_prioA, watchDogCounter,
+ signalIdCounter);
+ }
/* Now execute prio B signals from one thread. */
- thr_job_queue *queue = selfptr->m_in_queue + send_thr_no;
- thr_jb_read_state *read_state = selfptr->m_read_states + send_thr_no;
signal_count += execute_signals(selfptr, queue, read_state,
sig, MAX_SIGNALS_PER_JB,
watchDogCounter, signalIdCounter);
@@ -2430,6 +2520,7 @@ mt_receiver_thread_main(void *thr_arg)
unsigned thr_no = selfptr->m_thr_no;
Uint32& watchDogCounter = selfptr->m_watchdog_counter;
Uint32 thrSignalId = 0;
+ bool has_received = false;
init_thread(selfptr);
receiverThreadId = thr_no;
@@ -2454,7 +2545,7 @@ mt_receiver_thread_main(void *thr_arg)
Uint32 sum = run_job_buffers(selfptr, signal,
&watchDogCounter, &thrSignalId);
- if (sum)
+ if (sum || has_received)
{
watchDogCounter = 6;
flush_jbb_write_state(selfptr);
@@ -2464,18 +2555,18 @@ mt_receiver_thread_main(void *thr_arg)
watchDogCounter = 7;
+ has_received = false;
if (globalTransporterRegistry.pollReceive(1))
{
if (check_job_buffers(rep) == 0)
{
watchDogCounter = 8;
- lock(&rep->m_receive_lock);
- globalTransporterRegistry.performReceive();
- unlock(&rep->m_receive_lock);
+ lock(&rep->m_receive_lock);
+ globalTransporterRegistry.performReceive();
+ unlock(&rep->m_receive_lock);
+ has_received = true;
}
}
-
- flush_jbb_write_state(selfptr);
}
globalEmulatorData.theWatchDog->unregisterWatchedThread(thr_no);
@@ -2483,6 +2574,7 @@ mt_receiver_thread_main(void *thr_arg)
}
static
+inline
void
sendpacked(struct thr_data* thr_ptr, Signal* signal)
{
@@ -2586,20 +2678,20 @@ sendlocal(Uint32 self, const SignalHeade
Uint32 dst = block2ThreadId(block, instance);
struct thr_repository* rep = &g_thr_repository;
struct thr_data * selfptr = rep->m_thread + self;
+ struct thr_data * dstptr = rep->m_thread + dst;
selfptr->m_priob_count++;
Uint32 siglen = (sizeof(*s) >> 2) + s->theLength + s->m_noOfSections;
selfptr->m_priob_size += siglen;
- thr_job_queue *q = rep->m_thread[dst].m_in_queue + self;
+ thr_job_queue *q = dstptr->m_in_queue + self;
thr_jb_write_state *w = selfptr->m_write_states + dst;
if (insert_signal(q, w, false, s, data, secPtr, selfptr->m_next_buffer))
{
selfptr->m_next_buffer = seize_buffer(rep, self, false);
}
-
if (w->m_pending_signals >= MAX_SIGNALS_BEFORE_FLUSH)
- flush_write_state(dst, q, w);
+ flush_write_state(selfptr, dstptr, q->m_head, w);
}
void
@@ -2611,7 +2703,7 @@ sendprioa(Uint32 self, const SignalHeade
Uint32 dst = block2ThreadId(block, instance);
struct thr_repository* rep = &g_thr_repository;
- struct thr_data * selfptr = rep->m_thread + self;
+ struct thr_data *selfptr = rep->m_thread + self;
struct thr_data *dstptr = rep->m_thread + dst;
selfptr->m_prioa_count++;
@@ -2623,7 +2715,7 @@ sendprioa(Uint32 self, const SignalHeade
lock(&dstptr->m_jba_write_lock);
- Uint32 index = q->m_write_index;
+ Uint32 index = q->m_head->m_write_index;
w.m_write_index = index;
thr_job_buffer *buffer = q->m_buffers[index];
w.m_write_buffer = buffer;
@@ -2632,7 +2724,7 @@ sendprioa(Uint32 self, const SignalHeade
w.m_pending_signals_wakeup = MAX_SIGNALS_BEFORE_WAKEUP;
bool buf_used = insert_signal(q, &w, true, s, data, secPtr,
selfptr->m_next_buffer);
- flush_write_state(dst, q, &w);
+ flush_write_state(selfptr, dstptr, q->m_head, &w);
unlock(&dstptr->m_jba_write_lock);
@@ -2689,7 +2781,7 @@ mt_send_remote(Uint32 self, const Signal
*/
static
void
-sendprioa_STOP_FOR_CRASH(Uint32 dst)
+sendprioa_STOP_FOR_CRASH(const struct thr_data *selfptr, Uint32 dst)
{
SignalT<StopForCrash::SignalLength> signalT;
struct thr_repository* rep = &g_thr_repository;
@@ -2738,7 +2830,7 @@ sendprioa_STOP_FOR_CRASH(Uint32 dst)
lock(&dstptr->m_jba_write_lock);
- Uint32 index = q->m_write_index;
+ Uint32 index = q->m_head->m_write_index;
w.m_write_index = index;
thr_job_buffer *buffer = q->m_buffers[index];
w.m_write_buffer = buffer;
@@ -2747,7 +2839,7 @@ sendprioa_STOP_FOR_CRASH(Uint32 dst)
w.m_pending_signals_wakeup = MAX_SIGNALS_BEFORE_WAKEUP;
insert_signal(q, &w, true, &signalT.header, signalT.theData, NULL,
&dummy_buffer);
- flush_write_state(dst, q, &w);
+ flush_write_state(selfptr, dstptr, q->m_head, &w);
unlock(&dstptr->m_jba_write_lock);
}
@@ -2777,31 +2869,32 @@ thr_init(struct thr_repository* rep, str
selfptr->m_first_free = 0;
selfptr->m_first_unused = 0;
- selfptr->m_jba.m_read_index = 0;
- selfptr->m_jba.m_write_index = 0;
+ selfptr->m_jba_head.m_read_index = 0;
+ selfptr->m_jba_head.m_write_index = 0;
+ selfptr->m_jba.m_head = &selfptr->m_jba_head;
thr_job_buffer *buffer = seize_buffer(rep, thr_no, true);
selfptr->m_jba.m_buffers[0] = buffer;
selfptr->m_jba_read_state.m_read_index = 0;
selfptr->m_jba_read_state.m_read_buffer = buffer;
selfptr->m_jba_read_state.m_read_pos = 0;
+ selfptr->m_jba_read_state.m_read_end = 0;
selfptr->m_jba_read_state.m_write_index = 0;
- selfptr->m_jba_read_state.m_write_pos = 0;
selfptr->m_next_buffer = seize_buffer(rep, thr_no, false);
selfptr->m_send_buffer_pool.set_pool(&rep->m_free_list);
for (i = 0; i<cnt; i++)
{
- selfptr->m_in_queue[i].m_read_index = 0;
- selfptr->m_in_queue[i].m_write_index = 0;
+ selfptr->m_in_queue_head[i].m_read_index = 0;
+ selfptr->m_in_queue_head[i].m_write_index = 0;
+ selfptr->m_in_queue[i].m_head = &selfptr->m_in_queue_head[i];
buffer = seize_buffer(rep, thr_no, false);
selfptr->m_in_queue[i].m_buffers[0] = buffer;
selfptr->m_read_states[i].m_read_index = 0;
selfptr->m_read_states[i].m_read_buffer = buffer;
selfptr->m_read_states[i].m_read_pos = 0;
+ selfptr->m_read_states[i].m_read_end = 0;
selfptr->m_read_states[i].m_write_index = 0;
- selfptr->m_read_states[i].m_write_pos = 0;
}
-
queue_init(&selfptr->m_tq);
selfptr->m_prioa_count = 0;
@@ -3167,7 +3260,7 @@ FastScheduler::traceDumpPrepare(NdbShutd
continue;
}
- sendprioa_STOP_FOR_CRASH(thr_no);
+ sendprioa_STOP_FOR_CRASH(selfptr, thr_no);
waitFor_count++;
}
| Thread |
|---|
| • bzr commit into mysql-5.1 branch (ole.john.aske:3167) Bug#41301 | Ole John Aske | 11 Dec |