#At file:///net/vidar01/export/home/tmp/oleja/mysql/mysql-5.1-telco-6.4/ based on revid:jonas@stripped
3163 Ole John Aske 2008-12-08
Bug#41301, collection of optimization improvements
modified:
storage/ndb/include/ndb_global.h.in
storage/ndb/src/common/transporter/TransporterRegistry.cpp
storage/ndb/src/kernel/blocks/dbtup/DbtupRoutines.cpp
storage/ndb/src/kernel/blocks/dbtux/Dbtux.hpp
storage/ndb/src/kernel/error/ErrorReporter.hpp
storage/ndb/src/kernel/vm/Emulator.hpp
storage/ndb/src/kernel/vm/SafeCounter.hpp
storage/ndb/src/kernel/vm/SimulatedBlock.hpp
storage/ndb/src/kernel/vm/mt.cpp
per-file messages:
storage/ndb/include/ndb_global.h.in
no-branch hinting to move abort,exit out of common execution path.
=== modified file 'storage/ndb/include/ndb_global.h.in'
=== modified file 'storage/ndb/include/ndb_global.h.in'
--- a/storage/ndb/include/ndb_global.h.in 2008-10-30 10:43:10 +0000
+++ b/storage/ndb/include/ndb_global.h.in 2008-12-08 14:16:30 +0000
@@ -41,6 +41,15 @@
#include <my_global.h>
+/**
+ * Provide branch predict hinting for SunStudio compilers:
+ * Considder to move this into my_global.h...
+ */
+#if defined(__SUNPRO_CC) && (__SUNPRO_CC>=0x540)
+# pragma rarely_called(abort,exit)
+# pragma does_not_return(abort,exit)
+#endif
+
#if ! (NDB_SIZEOF_CHAR == SIZEOF_CHAR)
#error "Invalid define for Uint8"
#endif
=== modified file 'storage/ndb/src/common/transporter/TransporterRegistry.cpp'
--- a/storage/ndb/src/common/transporter/TransporterRegistry.cpp 2008-11-26 10:36:20 +0000
+++ b/storage/ndb/src/common/transporter/TransporterRegistry.cpp 2008-12-08 14:16:30 +0000
@@ -1085,6 +1085,7 @@
void
TransporterRegistry::performReceive()
{
+ bool hasReceived = false;
#ifdef NDB_TCP_TRANSPORTER
#if defined(HAVE_EPOLL_CREATE)
if (likely(m_epoll_fd != -1))
@@ -1128,7 +1129,8 @@
if (t->hasReceiveData())
{
- callbackObj->checkJobBuffer();
+ if (hasReceived) callbackObj->checkJobBuffer();
+ hasReceived = true;
Uint32 * ptr;
Uint32 sz = t->getReceiveData(&ptr);
callbackObj->transporter_recv_from(nodeId);
@@ -1152,7 +1154,8 @@
{
if(t->isConnected() && t->checkConnected())
{
- callbackObj->checkJobBuffer();
+ if (hasReceived) callbackObj->checkJobBuffer();
+ hasReceived = true;
Uint32 * readPtr, * eodPtr;
t->getReceivePtr(&readPtr, &eodPtr);
callbackObj->transporter_recv_from(nodeId);
@@ -1170,7 +1173,8 @@
if(is_connected(nodeId)){
if(t->isConnected() && t->checkConnected())
{
- callbackObj->checkJobBuffer();
+ if (hasReceived) callbackObj->checkJobBuffer();
+ hasReceived = true;
Uint32 * readPtr, * eodPtr;
t->getReceivePtr(&readPtr, &eodPtr);
callbackObj->transporter_recv_from(nodeId);
=== modified file 'storage/ndb/src/kernel/blocks/dbtup/DbtupRoutines.cpp'
--- a/storage/ndb/src/kernel/blocks/dbtup/DbtupRoutines.cpp 2008-11-03 08:34:28 +0000
+++ b/storage/ndb/src/kernel/blocks/dbtup/DbtupRoutines.cpp 2008-12-08 14:16:30 +0000
@@ -479,14 +479,20 @@
}
}
-static
-inline
+static inline
void
-zero32(Uint8* dstPtr, Uint32 len)
-{
- while ((len & 3) != 0)
- {
- dstPtr[len++] = 0;
+zero32(Uint8* dstPtr, const Uint32 len)
+{ Uint32 odd = len & 3; /* odd: {0..3} */
+
+ if (odd != 0) /* odd: {1..3} */
+ { Uint8* dst = dstPtr+len;
+ dst[0] = 0;
+ if (odd <= 2) /* odd: {1..2} */
+ { dst[1] = 0;
+ if (odd == 1)
+ { dst[2] = 0;
+ }
+ }
}
}
=== modified file 'storage/ndb/src/kernel/blocks/dbtux/Dbtux.hpp'
--- a/storage/ndb/src/kernel/blocks/dbtux/Dbtux.hpp 2008-10-05 07:14:21 +0000
+++ b/storage/ndb/src/kernel/blocks/dbtux/Dbtux.hpp 2008-12-08 14:16:30 +0000
@@ -541,6 +541,10 @@
TreeEnt getMinMax(unsigned i);
// for ndbrequire and ndbassert
void progError(int line, int cause, const char* file);
+
+#if defined(__SUNPRO_CC) && (__SUNPRO_CC>=0x540)
+# pragma rarely_called(progError)
+#endif
};
// methods
=== modified file 'storage/ndb/src/kernel/error/ErrorReporter.hpp'
--- a/storage/ndb/src/kernel/error/ErrorReporter.hpp 2008-01-01 12:45:11 +0000
+++ b/storage/ndb/src/kernel/error/ErrorReporter.hpp 2008-12-08 14:16:30 +0000
@@ -48,7 +48,13 @@
static int get_trace_no();
static const char* formatTimeStampString();
-
+
+#if defined(__SUNPRO_CC) && (__SUNPRO_CC>=0x540)
+# pragma rarely_called(handleAssert)
+# pragma rarely_called(handleError)
+# pragma rarely_called(handleWarning)
+#endif
+
private:
static enum NdbShutdownType s_errorHandlerShutdownType;
};
=== modified file 'storage/ndb/src/kernel/vm/Emulator.hpp'
--- a/storage/ndb/src/kernel/vm/Emulator.hpp 2008-01-01 12:45:11 +0000
+++ b/storage/ndb/src/kernel/vm/Emulator.hpp 2008-12-08 14:16:30 +0000
@@ -31,7 +31,6 @@
extern class FastScheduler globalScheduler;
extern class TransporterRegistry globalTransporterRegistry;
extern struct GlobalData globalData;
-extern struct thr_repository g_thr_repository;
#ifdef VM_TRACE
extern class SignalLoggerManager globalSignalLoggers;
=== modified file 'storage/ndb/src/kernel/vm/SafeCounter.hpp'
--- a/storage/ndb/src/kernel/vm/SafeCounter.hpp 2008-02-20 09:04:29 +0000
+++ b/storage/ndb/src/kernel/vm/SafeCounter.hpp 2008-12-08 14:16:30 +0000
@@ -102,6 +102,10 @@
BlockReference reference() const;
void progError(int line, int err_code, const char* extra = 0);
+
+#if defined(__SUNPRO_CC) && (__SUNPRO_CC>=0x540)
+# pragma rarely_called(progError)
+#endif
};
=== modified file 'storage/ndb/src/kernel/vm/SimulatedBlock.hpp'
--- a/storage/ndb/src/kernel/vm/SimulatedBlock.hpp 2008-12-01 18:05:11 +0000
+++ b/storage/ndb/src/kernel/vm/SimulatedBlock.hpp 2008-12-08 14:16:30 +0000
@@ -307,6 +307,11 @@
void handle_lingering_sections_after_execute(Signal*) const;
void handle_lingering_sections_after_execute(SectionHandle*) const;
+#if defined(__SUNPRO_CC) && (__SUNPRO_CC>=0x540)
+# pragma rarely_called(handle_invalid_sections_in_send_signal)
+# pragma rarely_called(handle_lingering_sections_after_execute)
+#endif
+
/**********************************************************
* Fragmented signals
*/
@@ -487,6 +492,11 @@
* errormessage describing the problem
*/
void progError(int line, int err_code, const char* extradata=NULL) const ;
+
+#if defined(__SUNPRO_CC) && (__SUNPRO_CC>=0x540)
+# pragma rarely_called(progError)
+#endif
+
private:
void signal_error(Uint32, Uint32, Uint32, const char*, int) const ;
const NodeId theNodeId;
@@ -693,6 +703,10 @@
BlockReference reference() const;
void progError(int line, int err_code, const char* extra = 0);
+
+#if defined(__SUNPRO_CC) && (__SUNPRO_CC>=0x540)
+# pragma rarely_called(progError)
+#endif
};
friend class MutexManager;
=== modified file 'storage/ndb/src/kernel/vm/mt.cpp'
--- a/storage/ndb/src/kernel/vm/mt.cpp 2008-11-15 15:43:59 +0000
+++ b/storage/ndb/src/kernel/vm/mt.cpp 2008-12-08 14:16:30 +0000
@@ -131,7 +131,7 @@
volatile unsigned m_futex_state;
enum {
FS_RUNNING = 0,
- FS_SLEEPING = 1,
+ FS_SLEEPING = 1
};
thr_wait() { xcng(&m_futex_state, FS_RUNNING);}
void init () {}
@@ -144,10 +144,10 @@
* if that returns true will it actually sleep, else it will return
* immediately. This is needed to avoid races with wakeup.
*/
-static
+static inline
void
yield(struct thr_wait* wait, const struct timespec *timeout,
- bool (*check_callback)(void *), void *check_arg)
+ bool (*check_callback)(struct thr_data *), struct thr_data *check_arg)
{
volatile unsigned * val = &wait->m_futex_state;
#ifndef NDEBUG
@@ -173,7 +173,7 @@
xcng(val, thr_wait::FS_RUNNING);
}
-static
+static inline
int
wakeup(struct thr_wait* wait)
{
@@ -195,12 +195,11 @@
struct thr_wait
{
+ bool m_need_wakeup;
+
NdbMutex *m_mutex;
NdbCondition *m_cond;
- thr_wait() {
- m_mutex = 0;
- m_cond = 0;
- }
+ thr_wait() : m_mutex(0), m_cond(0), m_need_wakeup(false) {}
void init() {
m_mutex = NdbMutex_Create();
@@ -208,29 +207,43 @@
}
};
-static
+static inline
void
yield(struct thr_wait* wait, const struct timespec *timeout,
- bool (*check_callback)(void *), void *check_arg)
+ bool (*check_callback)(struct thr_data *), struct thr_data *check_arg)
{
Uint32 msec =
(1000 * timeout->tv_sec) +
(timeout->tv_nsec / 1000000);
NdbMutex_Lock(wait->m_mutex);
- if ((*check_callback)(check_arg))
- NdbCondition_WaitTimeout(wait->m_cond, wait->m_mutex, msec);
+ while ((*check_callback)(check_arg)) /* May have spurious wakeups: Always recheck condition predicate */
+ {
+ wait->m_need_wakeup = true;
+ if (NdbCondition_WaitTimeout(wait->m_cond, wait->m_mutex, msec) == ETIMEDOUT)
+ {
+ wait->m_need_wakeup = false;
+ break;
+ }
+ }
NdbMutex_Unlock(wait->m_mutex);
}
-static
+
+static inline
int
wakeup(struct thr_wait* wait)
{
NdbMutex_Lock(wait->m_mutex);
- NdbCondition_Signal(wait->m_cond);
+ // We should avoid signaling when not waiting for wakeup
+ if (wait->m_need_wakeup)
+ {
+ wait->m_need_wakeup = false;
+ NdbCondition_Signal(wait->m_cond);
+ }
NdbMutex_Unlock(wait->m_mutex);
return 0;
}
+
#endif
static inline void
@@ -246,6 +259,7 @@
#define assert(x) require(x)
#ifdef NDB_HAVE_XCNG
+
struct thr_spin_lock
{
thr_spin_lock(const char * name = 0)
@@ -489,12 +503,27 @@
Uint32 m_data[SIZE];
};
+
+/**
+ * thr_job_queue is shared between consumer / producer.
+ *
+ * The hot-spot of the thr_job_queue are the read/write indexes.
+ * As they are updated and read frequently they have been placed
+ * in its own thr_job_queue_head[] in order to make them fit inside a
+ * single/few cache lines and thereby avoid complete L1-cache replacement
+ * every time the job_queue is scanned.
+ */
+struct thr_job_queue_head
+{
+ unsigned m_read_index; // Read/written by consumer, read by producer
+ unsigned m_write_index; // Read/written by producer, read by consumer
+};
+
struct thr_job_queue
{
static const unsigned SIZE = 30;
- unsigned m_read_index; // Read/written by consumer, read by producer
- unsigned m_write_index; // Read/written by producer, read by consumer
+ struct thr_job_queue_head* head;
struct thr_job_buffer* m_buffers[SIZE];
};
@@ -561,8 +590,12 @@
* execution loop and used to determine when the end of available signals is
* reached.
*/
- Uint32 m_write_index;
- Uint32 m_write_pos;
+ Uint32 m_read_end; // End within current thr_job_buffer. (*m_read_buffer)
+
+ Uint32 m_write_index; // Last available thr_job_buffer.
+
+ bool is_empty() const
+ { return (m_read_index == m_write_index) && (m_read_pos >= m_read_end); }
};
/**
@@ -662,6 +695,7 @@
struct thr_tq m_tq;
/* Prio A signal incoming queue. */
+ struct thr_job_queue_head m_jba_head;
struct thr_job_queue m_jba;
struct thr_spin_lock m_jba_write_lock;
/*
@@ -692,6 +726,7 @@
* These are the thread input queues, where other threads deliver signals
* into.
*/
+ struct thr_job_queue_head m_in_queue_head[MAX_THREADS];
struct thr_job_queue m_in_queue[MAX_THREADS];
/* These are the write states of m_in_queue[self] in each thread. */
struct thr_jb_write_state m_write_states[MAX_THREADS];
@@ -763,6 +798,7 @@
};
extern trp_callback g_trp_callback; // Forward declaration
+extern struct thr_repository g_thr_repository;
struct thr_repository
{
@@ -1033,7 +1069,7 @@
}
}
-static
+static inline
void
scan_time_queues(struct thr_data* selfptr)
{
@@ -1204,10 +1240,29 @@
/*
* Flush the write state to the job queue, making any new signals available to
* receiving threads.
+ *
+ * Two versions:
+ * - The general version flush_write_state() which may flush to any thread,
+ * - The special version flush_write_state_self() which should only be used
+ * to flush messages to itself.
*/
static inline
void
-flush_write_state(Uint32 dst, thr_job_queue *q, thr_jb_write_state *w)
+flush_write_state_self(thr_data *selfptr, thr_job_queue_head *q_head, thr_jb_write_state *w)
+{
+ /*
+ * Could simplify the flush_write_state when writing to myself:
+ * Simply update write references wo/ mutex, memory barrier and signaling
+ */
+ w->m_write_buffer->m_len = w->m_write_pos;
+ q_head->m_write_index = w->m_write_index;
+ w->m_pending_signals_wakeup = 0;
+ w->m_pending_signals = 0;
+}
+
+static inline
+void
+flush_write_state(thr_data *dstptr, thr_job_queue_head *q_head, thr_jb_write_state *w)
{
/*
* Two write memory barriers here, as assigning m_len may make signal data
@@ -1219,17 +1274,19 @@
*
* But wmb() is a no-op anyway in x86 ...
*/
+
wmb();
w->m_write_buffer->m_len = w->m_write_pos;
wmb();
- q->m_write_index = w->m_write_index;
+ q_head->m_write_index = w->m_write_index;
+
w->m_pending_signals_wakeup += w->m_pending_signals;
w->m_pending_signals = 0;
if (w->m_pending_signals_wakeup >= MAX_SIGNALS_BEFORE_WAKEUP)
{
w->m_pending_signals_wakeup = 0;
- wakeup(&(g_thr_repository.m_thread[dst].m_waiter));
+ wakeup(&(dstptr->m_waiter));
}
}
@@ -1240,14 +1297,20 @@
Uint32 thr_count = g_thr_repository.m_thread_count;
Uint32 self = selfptr->m_thr_no;
- for (Uint32 thr_no = 0; thr_no < thr_count; thr_no ++)
+ thr_jb_write_state *w = selfptr->m_write_states;
+ thr_data *thrptr = g_thr_repository.m_thread;
+ for (Uint32 thr_no = 0; thr_no < thr_count; thr_no++, thrptr++, w++)
{
- thr_jb_write_state *w = selfptr->m_write_states + thr_no;
if (w->m_pending_signals || w->m_pending_signals_wakeup)
{
w->m_pending_signals_wakeup = MAX_SIGNALS_BEFORE_WAKEUP;
- thr_job_queue *q = g_thr_repository.m_thread[thr_no].m_in_queue + self;
- flush_write_state(thr_no, q, w);
+ thr_job_queue_head *q_head = thrptr->m_in_queue_head + self;
+ if (thrptr == selfptr)
+ { flush_write_state_self(thrptr, q_head, w);
+ }
+ else
+ { flush_write_state(thrptr, q_head, w);
+ }
}
}
}
@@ -1259,10 +1322,11 @@
static int
check_job_buffers(struct thr_repository* rep)
{
- for (unsigned i = 0; i<num_threads; i++)
+ thr_data *thrptr = rep->m_thread;
+ for (unsigned i = 0; i<num_threads; i++, thrptr++)
{
- thr_data * thrptr = rep->m_thread+i;
- for (unsigned j = 0; j<num_threads; j++)
+ const thr_job_queue_head *q_head = thrptr->m_in_queue_head;
+ for (unsigned j = 0; j<num_threads; j++,q_head++)
{
/**
* These values are read wo/ locks...
@@ -1276,8 +1340,8 @@
* conservative (i.e it can be better than guess, if read-index has
* moved but we didnt see it)
*/
- unsigned ri = thrptr->m_in_queue[j].m_read_index;
- unsigned wi = thrptr->m_in_queue[j].m_write_index;
+ unsigned ri = q_head->m_read_index;
+ unsigned wi = q_head->m_write_index;
unsigned busy = (wi >= ri) ? wi - ri : (thr_job_queue::SIZE - ri) + wi;
if (4*busy >= thr_job_queue::SIZE)
{
@@ -1386,6 +1450,15 @@
* except for QMGR open/close connection, but that is not
* (i think) sufficient to create a deadlock
*/
+
+ /** FIXME:
+ * On a CMT chip where #CPU >= #NDB-threads sched_yield() is effectively a NOOP as
+ * there will normally be an idle CPU available to immediately resume thread execution.
+ * On a Niagara chip this may severely impact performance as the CPUs are virtualized
+ * by timemultiplexing the physical core. The thread should really be 'parked' on
+ * a condition to free its execution resources.
+ */
+// usleep(a-few-usec); /* A micro-sleep would likely have been better... */
sched_yield();
} while (check_job_buffers(rep));
}
@@ -1941,7 +2014,7 @@
*
* Or alternatively, ndbrequire() ?
*/
- assert(write_index != q->m_read_index);
+ assert(write_index != q->head->m_read_index);
new_buffer->m_len = 0;
new_buffer->m_prioa = prioa;
q->m_buffers[write_index] = new_buffer;
@@ -1955,50 +2028,50 @@
}
static
-void
+bool
read_jbb_state(thr_data *selfptr, Uint32 count)
{
- for (Uint32 i = 0; i < count; i++)
+ bool empty = true;
+
+ thr_jb_read_state *r = selfptr->m_read_states;
+ const thr_job_queue *q = selfptr->m_in_queue;
+ for (Uint32 i = 0; i < count; i++,r++,q++)
{
- thr_jb_read_state *r = selfptr->m_read_states + i;
- const thr_job_queue *q = selfptr->m_in_queue +i;
- Uint32 index = q->m_write_index;
- r->m_write_index = index;
- read_barrier_depends();
- r->m_write_pos = q->m_buffers[index]->m_len;
+ Uint32 read_index = r->m_read_index;
+
+ if (r->m_write_index != read_index) // Optimization: Avoid reload when known non-empty.
+ { empty = false; // (To prevent extensive cache reload of (invalidated)
+ } // shared thr_job_queue_head)
+ else
+ { r->m_write_index = q->head->m_write_index;
+ read_barrier_depends();
+ r->m_read_end = q->m_buffers[read_index]->m_len;
+ empty &= r->is_empty();
+ }
}
+ return empty;
}
static
-void
+bool
read_jba_state(thr_data *selfptr)
{
- const thr_job_queue *jba = &(selfptr->m_jba);
- Uint32 index = jba->m_write_index;
- selfptr->m_jba_read_state.m_write_index = index;
+ thr_jb_read_state *r = &(selfptr->m_jba_read_state);
+ r->m_write_index = selfptr->m_jba_head.m_write_index;
read_barrier_depends();
- selfptr->m_jba_read_state.m_write_pos = jba->m_buffers[index]->m_len;
+ r->m_read_end = selfptr->m_jba.m_buffers[r->m_read_index]->m_len;
+ return r->is_empty();
}
/* Check all job queues, return true only if all are empty. */
static bool
-check_queues_empty(void *data)
+check_queues_empty(thr_data *selfptr)
{
- Uint32 thr_count = g_thr_repository.m_thread_count;
- thr_data *selfptr = reinterpret_cast<thr_data *>(data);
+ bool empty = read_jba_state(selfptr);
+ if (!empty) return false;
- read_jbb_state(selfptr, thr_count);
- read_jba_state(selfptr);
- const thr_jb_read_state *r = &(selfptr->m_jba_read_state);
- if (r->m_read_index < r->m_write_index || r->m_read_pos < r->m_write_pos)
- return false;
- for (Uint32 i = 0; i < thr_count; i++)
- {
- r = selfptr->m_read_states + i;;
- if (r->m_read_index < r->m_write_index || r->m_read_pos < r->m_write_pos)
- return false;
- }
- return true;
+ empty = read_jbb_state(selfptr, g_thr_repository.m_thread_count);
+ return empty;
}
/*
@@ -2049,19 +2122,20 @@
Signal *sig, Uint32 max_signals,
Uint32 *watchDogCounter, Uint32 *signalIdCounter)
{
- Uint32 num_signals = 0;
-
+ Uint32 num_signals;
Uint32 read_index = r->m_read_index;
Uint32 write_index = r->m_write_index;
Uint32 read_pos = r->m_read_pos;
- Uint32 write_pos = (read_index == write_index ?
- r->m_write_pos :
- q->m_buffers[read_index]->m_len);
+ Uint32 read_end = r->m_read_end;
+
+ if (read_index == write_index && read_pos >= read_end)
+ return 0; // empty read_state
+
thr_job_buffer *read_buffer = r->m_read_buffer;
- while (num_signals < max_signals)
+ for (num_signals = 0; num_signals < max_signals; num_signals++)
{
- while (read_pos >= write_pos)
+ while (read_pos >= read_end)
{
if (read_index == write_index)
{
@@ -2075,13 +2149,12 @@
release_buffer(&g_thr_repository, selfptr->m_thr_no, read_buffer);
read_buffer = q->m_buffers[read_index];
read_pos = 0;
- write_pos = (read_index == write_index ?
- r->m_write_pos :
- q->m_buffers[read_index]->m_len);
+ read_end = read_buffer->m_len;
/* Update thread-local read state. */
- r->m_read_index = q->m_read_index = read_index;
+ r->m_read_index = q->head->m_read_index = read_index;
r->m_read_buffer = read_buffer;
r->m_read_pos = read_pos;
+ r->m_read_end = read_end;
}
}
@@ -2090,20 +2163,17 @@
* (Though on Intel Core 2, they do not give much speedup, as apparently
* the hardware prefetcher is already doing a fairly good job).
*/
-#ifdef __GNUC__
- __builtin_prefetch (read_buffer->m_data + read_pos + 16, 0, 3);
- __builtin_prefetch ((Uint32 *)&sig->header + 16, 1, 3);
-#endif
+ PREFETCH_READ (read_buffer->m_data + read_pos + 16);
+ PREFETCH_WRITE ((Uint32 *)&sig->header + 16);
/* Now execute the signal. */
SignalHeader* s =
reinterpret_cast<SignalHeader*>(read_buffer->m_data + read_pos);
Uint32 seccnt = s->m_noOfSections;
Uint32 siglen = (sizeof(*s)>>2) + s->theLength;
-#ifdef __GNUC__
- if(siglen>16)
- __builtin_prefetch (read_buffer->m_data + read_pos + 32, 0, 3);
-#endif
+ if(siglen>16) {
+ PREFETCH_READ (read_buffer->m_data + read_pos + 32);
+ }
Uint32 bno = blockToMain(s->theReceiversBlockNumber);
Uint32 ino = map_instance(s);
SimulatedBlock* block = globalData.getBlock(bno, ino);
@@ -2143,8 +2213,6 @@
#endif
block->executeFunction(gsn, sig);
-
- num_signals++;
}
return num_signals;
@@ -2158,26 +2226,40 @@
Uint32 thr_count = g_thr_repository.m_thread_count;
Uint32 signal_count = 0;
- read_jbb_state(selfptr, thr_count);
+ bool jbb_empty = read_jbb_state(selfptr, thr_count);
/*
* A load memory barrier to ensure that we see any prio A signal sent later
* than loaded prio B signals.
*/
rmb();
+ if (jbb_empty)
+ {
+ /* Prio B's are empty, check and possibly execute prio A signals */
+ bool jba_empty = read_jba_state(selfptr);
+ if (jba_empty)
+ return 0;
- for (Uint32 send_thr_no = 0; send_thr_no < thr_count; send_thr_no++)
- {
- /* Read the prio A state often, to avoid starvation of prio A. */
- read_jba_state(selfptr);
static Uint32 max_prioA = thr_job_queue::SIZE * thr_job_buffer::SIZE;
- signal_count += execute_signals(selfptr, &(selfptr->m_jba),
+ return execute_signals(selfptr, &(selfptr->m_jba),
&(selfptr->m_jba_read_state), sig,
max_prioA, watchDogCounter,
signalIdCounter);
+ }
+ thr_job_queue *queue = selfptr->m_in_queue;
+ thr_jb_read_state *read_state = selfptr->m_read_states;
+ for (Uint32 send_thr_no = 0; send_thr_no < thr_count; send_thr_no++,queue++,read_state++)
+ {
+ /* Read the prio A state often, to avoid starvation of prio A. */
+ bool jba_empty = read_jba_state(selfptr);
+ if (!jba_empty)
+ { static Uint32 max_prioA = thr_job_queue::SIZE * thr_job_buffer::SIZE;
+ signal_count += execute_signals(selfptr, &(selfptr->m_jba),
+ &(selfptr->m_jba_read_state), sig,
+ max_prioA, watchDogCounter,
+ signalIdCounter);
+ }
/* Now execute prio B signals from one thread. */
- thr_job_queue *queue = selfptr->m_in_queue + send_thr_no;
- thr_jb_read_state *read_state = selfptr->m_read_states + send_thr_no;
signal_count += execute_signals(selfptr, queue, read_state,
sig, MAX_SIGNALS_PER_JB,
watchDogCounter, signalIdCounter);
@@ -2430,6 +2512,7 @@
unsigned thr_no = selfptr->m_thr_no;
Uint32& watchDogCounter = selfptr->m_watchdog_counter;
Uint32 thrSignalId = 0;
+ bool has_received = false;
init_thread(selfptr);
receiverThreadId = thr_no;
@@ -2454,7 +2537,7 @@
Uint32 sum = run_job_buffers(selfptr, signal,
&watchDogCounter, &thrSignalId);
- if (sum)
+ if (sum || has_received)
{
watchDogCounter = 6;
flush_jbb_write_state(selfptr);
@@ -2464,18 +2547,18 @@
watchDogCounter = 7;
+ has_received = false;
if (globalTransporterRegistry.pollReceive(1))
{
if (check_job_buffers(rep) == 0)
{
watchDogCounter = 8;
- lock(&rep->m_receive_lock);
- globalTransporterRegistry.performReceive();
- unlock(&rep->m_receive_lock);
+ lock(&rep->m_receive_lock);
+ globalTransporterRegistry.performReceive();
+ unlock(&rep->m_receive_lock);
+ has_received = true;
}
}
-
- flush_jbb_write_state(selfptr);
}
globalEmulatorData.theWatchDog->unregisterWatchedThread(thr_no);
@@ -2483,6 +2566,7 @@
}
static
+inline
void
sendpacked(struct thr_data* thr_ptr, Signal* signal)
{
@@ -2558,7 +2642,7 @@
}
if (sum == 0)
- {
+ {
yield(&selfptr->m_waiter, &nowait, check_queues_empty, selfptr);
}
}
@@ -2586,20 +2670,24 @@
Uint32 dst = block2ThreadId(block, instance);
struct thr_repository* rep = &g_thr_repository;
struct thr_data * selfptr = rep->m_thread + self;
+ struct thr_data * dstptr = rep->m_thread + dst;
selfptr->m_priob_count++;
Uint32 siglen = (sizeof(*s) >> 2) + s->theLength + s->m_noOfSections;
selfptr->m_priob_size += siglen;
- thr_job_queue *q = rep->m_thread[dst].m_in_queue + self;
+ thr_job_queue *q = dstptr->m_in_queue + self;
thr_jb_write_state *w = selfptr->m_write_states + dst;
if (insert_signal(q, w, false, s, data, secPtr, selfptr->m_next_buffer))
{
selfptr->m_next_buffer = seize_buffer(rep, self, false);
}
-
if (w->m_pending_signals >= MAX_SIGNALS_BEFORE_FLUSH)
- flush_write_state(dst, q, w);
+ { if (dstptr == selfptr)
+ flush_write_state_self(dstptr, q->head, w);
+ else
+ flush_write_state(dstptr, q->head, w);
+ }
}
void
@@ -2623,7 +2711,7 @@
lock(&dstptr->m_jba_write_lock);
- Uint32 index = q->m_write_index;
+ Uint32 index = q->head->m_write_index;
w.m_write_index = index;
thr_job_buffer *buffer = q->m_buffers[index];
w.m_write_buffer = buffer;
@@ -2632,7 +2720,8 @@
w.m_pending_signals_wakeup = MAX_SIGNALS_BEFORE_WAKEUP;
bool buf_used = insert_signal(q, &w, true, s, data, secPtr,
selfptr->m_next_buffer);
- flush_write_state(dst, q, &w);
+
+ flush_write_state(dstptr, q->head, &w);
unlock(&dstptr->m_jba_write_lock);
@@ -2738,7 +2827,7 @@
lock(&dstptr->m_jba_write_lock);
- Uint32 index = q->m_write_index;
+ Uint32 index = q->head->m_write_index;
w.m_write_index = index;
thr_job_buffer *buffer = q->m_buffers[index];
w.m_write_buffer = buffer;
@@ -2747,7 +2836,7 @@
w.m_pending_signals_wakeup = MAX_SIGNALS_BEFORE_WAKEUP;
insert_signal(q, &w, true, &signalT.header, signalT.theData, NULL,
&dummy_buffer);
- flush_write_state(dst, q, &w);
+ flush_write_state(dstptr, q->head, &w);
unlock(&dstptr->m_jba_write_lock);
}
@@ -2777,31 +2866,32 @@
selfptr->m_first_free = 0;
selfptr->m_first_unused = 0;
- selfptr->m_jba.m_read_index = 0;
- selfptr->m_jba.m_write_index = 0;
+ selfptr->m_jba_head.m_read_index = 0;
+ selfptr->m_jba_head.m_write_index = 0;
+ selfptr->m_jba.head = &selfptr->m_jba_head;
thr_job_buffer *buffer = seize_buffer(rep, thr_no, true);
selfptr->m_jba.m_buffers[0] = buffer;
selfptr->m_jba_read_state.m_read_index = 0;
selfptr->m_jba_read_state.m_read_buffer = buffer;
selfptr->m_jba_read_state.m_read_pos = 0;
+ selfptr->m_jba_read_state.m_read_end = 0;
selfptr->m_jba_read_state.m_write_index = 0;
- selfptr->m_jba_read_state.m_write_pos = 0;
selfptr->m_next_buffer = seize_buffer(rep, thr_no, false);
selfptr->m_send_buffer_pool.set_pool(&rep->m_free_list);
for (i = 0; i<cnt; i++)
{
- selfptr->m_in_queue[i].m_read_index = 0;
- selfptr->m_in_queue[i].m_write_index = 0;
+ selfptr->m_in_queue_head[i].m_read_index = 0;
+ selfptr->m_in_queue_head[i].m_write_index = 0;
+ selfptr->m_in_queue[i].head = &selfptr->m_in_queue_head[i];
buffer = seize_buffer(rep, thr_no, false);
selfptr->m_in_queue[i].m_buffers[0] = buffer;
selfptr->m_read_states[i].m_read_index = 0;
selfptr->m_read_states[i].m_read_buffer = buffer;
selfptr->m_read_states[i].m_read_pos = 0;
+ selfptr->m_read_states[i].m_read_end = 0;
selfptr->m_read_states[i].m_write_index = 0;
- selfptr->m_read_states[i].m_write_pos = 0;
}
-
queue_init(&selfptr->m_tq);
selfptr->m_prioa_count = 0;
| Thread |
|---|
| • bzr commit into mysql-5.1 branch (ole.john.aske:3163) Bug#41301 | Ole John Aske | 8 Dec |