#At file:///home/jonas/src/telco-7.0/ based on revid:pekka@stripped
4089 jonas oreland 2011-01-04
ndb - micro optimizations of mt.cpp
- make sure each mutex/spinlock is in separate 64byte cacheline
- reduce contention on thr_safe_pool<T> by adding release_list/release_all
(instead of always releasing individial items)
Johan benchmarked 10% throughput increase in heavy benchmark
modified:
storage/ndb/src/kernel/vm/mt.cpp
=== modified file 'storage/ndb/src/kernel/vm/mt.cpp'
--- a/storage/ndb/src/kernel/vm/mt.cpp 2010-11-25 12:53:32 +0000
+++ b/storage/ndb/src/kernel/vm/mt.cpp 2011-01-04 19:22:29 +0000
@@ -49,6 +49,9 @@ GlobalData::mt_getBlock(BlockNumber bloc
#define memcpy __builtin_memcpy
#endif
+/* size of a cacheline */
+#define NDB_CL 64
+
/* Constants found by benchmarks to be reasonable values. */
/* Maximum number of signals to execute before sending to remote nodes. */
@@ -263,6 +266,7 @@ wakeup(struct thr_wait* wait)
#endif
#ifdef NDB_HAVE_XCNG
+template <unsigned SZ>
struct thr_spin_lock
{
thr_spin_lock(const char * name = 0)
@@ -271,14 +275,17 @@ struct thr_spin_lock
register_lock(this, name);
}
- volatile Uint32 m_lock;
+ union {
+ volatile Uint32 m_lock;
+ char pad[SZ];
+ };
};
static
+ATTRIBUTE_NOINLINE
void
-lock_slow(struct thr_spin_lock* sl)
+lock_slow(void * sl, volatile unsigned * val)
{
- volatile unsigned* val = &sl->m_lock;
mt_lock_stat* s = lookup_lock(sl); // lookup before owning lock
loop:
@@ -303,22 +310,24 @@ loop:
}
}
+template <unsigned SZ>
static
inline
void
-lock(struct thr_spin_lock* sl)
+lock(struct thr_spin_lock<SZ>* sl)
{
volatile unsigned* val = &sl->m_lock;
if (likely(xcng(val, 1) == 0))
return;
- lock_slow(sl);
+ lock_slow(sl, val);
}
+template <unsigned SZ>
static
inline
void
-unlock(struct thr_spin_lock* sl)
+unlock(struct thr_spin_lock<SZ>* sl)
{
/**
* Memory barrier here, to make sure all of our stores are visible before
@@ -328,10 +337,11 @@ unlock(struct thr_spin_lock* sl)
sl->m_lock = 0;
}
+template <unsigned SZ>
static
inline
int
-trylock(struct thr_spin_lock* sl)
+trylock(struct thr_spin_lock<SZ>* sl)
{
volatile unsigned* val = &sl->m_lock;
return xcng(val, 1);
@@ -340,38 +350,45 @@ trylock(struct thr_spin_lock* sl)
#define thr_spin_lock thr_mutex
#endif
+template <unsigned SZ>
struct thr_mutex
{
thr_mutex(const char * name = 0) {
- m_mutex = NdbMutex_Create();
+ NdbMutex_Init(&m_mutex);
register_lock(this, name);
}
- NdbMutex * m_mutex;
+ union {
+ NdbMutex m_mutex;
+ char pad[SZ];
+ };
};
+template <unsigned SZ>
static
inline
void
-lock(struct thr_mutex* sl)
+lock(struct thr_mutex<SZ>* sl)
{
- NdbMutex_Lock(sl->m_mutex);
+ NdbMutex_Lock(&sl.m_mutex);
}
+template <unsigned SZ>
static
inline
void
-unlock(struct thr_mutex* sl)
+unlock(struct thr_mutex<SZ>* sl)
{
- NdbMutex_Unlock(sl->m_mutex);
+ NdbMutex_Unlock(&sl.m_mutex);
}
+template <unsigned SZ>
static
inline
int
-trylock(struct thr_mutex * sl)
+trylock(struct thr_mutex<SZ> * sl)
{
- return NdbMutex_Trylock(sl->m_mutex);
+ return NdbMutex_Trylock(&sl.m_mutex);
}
/**
@@ -384,7 +401,7 @@ struct thr_safe_pool
T* m_free_list;
Uint32 m_cnt;
- thr_spin_lock m_lock;
+ thr_spin_lock<NDB_CL - (sizeof(T*) + sizeof(Uint32))> m_lock;
T* seize(Ndbd_mem_manager *mm, Uint32 rg) {
T* ret = 0;
@@ -394,7 +411,7 @@ struct thr_safe_pool
assert(m_cnt);
m_cnt--;
ret = m_free_list;
- m_free_list = *reinterpret_cast<T**>(m_free_list);
+ m_free_list = ret->m_next;
unlock(&m_lock);
}
else
@@ -413,12 +430,20 @@ struct thr_safe_pool
void release(Ndbd_mem_manager *mm, Uint32 rg, T* t) {
lock(&m_lock);
- T** nextptr = reinterpret_cast<T**>(t);
- * nextptr = m_free_list;
+ t->m_next = m_free_list;
m_free_list = t;
m_cnt++;
unlock(&m_lock);
}
+
+ void release_list(Ndbd_mem_manager *mm, Uint32 rg,
+ T* head, T* tail, Uint32 cnt) {
+ lock(&m_lock);
+ tail->m_next = m_free_list;
+ m_free_list = head;
+ m_cnt += cnt;
+ unlock(&m_lock);
+ }
};
/**
@@ -446,6 +471,8 @@ public:
}
else
tmp = m_global_pool->seize(mm, rg);
+
+ validate();
return tmp;
}
@@ -459,6 +486,8 @@ public:
}
else
m_global_pool->release(mm, rg, t);
+
+ validate();
}
/**
@@ -469,24 +498,74 @@ public:
m_free++;
t->m_next = m_freelist;
m_freelist = t;
+
+ validate();
}
+ void validate() const {
+#ifdef VM_TRACE
+ Uint32 cnt = 0;
+ T* t = m_freelist;
+ while (t)
+ {
+ cnt++;
+ t = t->m_next;
+ }
+ assert(cnt == m_free);
+#endif
+ }
/**
* Release entries so that m_max_free is honored
* (likely used together with release_local)
*/
void release_global(Ndbd_mem_manager *mm, Uint32 rg) {
+ validate();
+ unsigned cnt = 0;
unsigned free = m_free;
Uint32 maxfree = m_max_free;
+ assert(maxfree > 0);
- while (free > maxfree)
+ T* head = m_freelist;
+ T* tail = m_freelist;
+ if (free > maxfree)
{
- T* t = seize(0, 0);
- m_global_pool->release(mm, rg, t);
+ cnt++;
free--;
+
+ while (free > maxfree)
+ {
+ cnt++;
+ free--;
+ tail = tail->m_next;
+ }
+
+ assert(free == maxfree);
+
+ m_free = free;
+ m_freelist = tail->m_next;
+ m_global_pool->release_list(mm, rg, head, tail, cnt);
+ }
+ validate();
+ }
+
+ void release_all(Ndbd_mem_manager *mm, Uint32 rg) {
+ validate();
+ T* head = m_freelist;
+ T* tail = m_freelist;
+ if (tail)
+ {
+ unsigned cnt = 1;
+ while (tail->m_next != 0)
+ {
+ cnt++;
+ tail = tail->m_next;
+ }
+ m_global_pool->release_list(mm, rg, head, tail, cnt);
+ m_free = 0;
+ m_freelist = 0;
}
- assert(free == m_free);
+ validate();
}
void set_pool(thr_safe_pool<T> * pool) { m_global_pool = pool; }
@@ -520,7 +599,11 @@ struct thr_job_buffer // 32k
* signals from released buffers.
*/
Uint32 m_prioa;
- Uint32 m_data[SIZE];
+ union {
+ Uint32 m_data[SIZE];
+
+ thr_job_buffer * m_next; // For free-list
+ };
};
static
@@ -729,7 +812,6 @@ struct thr_data
thr_wait m_waiter;
unsigned m_thr_no;
- pthread_t m_thr_id;
/**
* max signals to execute per JBB buffer
@@ -745,15 +827,11 @@ struct thr_data
struct thr_tq m_tq;
/* Prio A signal incoming queue. */
- struct thr_job_queue_head m_jba_head;
+ struct thr_spin_lock<64> m_jba_write_lock;
struct thr_job_queue m_jba;
- struct thr_spin_lock m_jba_write_lock;
- /*
- * In m_next_buffer we keep a free buffer at all times, so that when
- * we hold the lock and find we need a new buffer, we can use this and this
- * way defer allocation to after releasing the lock.
- */
- struct thr_job_buffer* m_next_buffer;
+
+ struct thr_job_queue_head m_jba_head;
+
/* Thread-local read state of prio A buffer. */
struct thr_jb_read_state m_jba_read_state;
/*
@@ -762,6 +840,13 @@ struct thr_data
*/
/*
+ * In m_next_buffer we keep a free buffer at all times, so that when
+ * we hold the lock and find we need a new buffer, we can use this and this
+ * way defer allocation to after releasing the lock.
+ */
+ struct thr_job_buffer* m_next_buffer;
+
+ /*
* We keep a small number of buffers in a thread-local cyclic FIFO, so that
* we can avoid going to the global pool in most cases, and so that we have
* recent buffers available for dumping in trace files.
@@ -818,6 +903,7 @@ struct thr_data
SectionSegmentPool::Cache m_sectionPoolCache;
Uint32 m_cpu;
+ pthread_t m_thr_id;
NdbThread* m_thread;
};
@@ -863,15 +949,14 @@ struct thr_repository
m_sb_pool("sendbufferpool")
{}
- unsigned m_thread_count;
-
- struct thr_spin_lock m_receive_lock;
- struct thr_spin_lock m_section_lock;
- struct thr_spin_lock m_mem_manager_lock;
- Ndbd_mem_manager * m_mm;
- struct thr_data m_thread[MAX_THREADS];
+ struct thr_spin_lock<64> m_receive_lock;
+ struct thr_spin_lock<64> m_section_lock;
+ struct thr_spin_lock<64> m_mem_manager_lock;
struct thr_safe_pool<thr_job_buffer> m_jb_pool;
struct thr_safe_pool<thr_send_page> m_sb_pool;
+ Ndbd_mem_manager * m_mm;
+ unsigned m_thread_count;
+ struct thr_data m_thread[MAX_THREADS];
/**
* send buffer handling
@@ -881,14 +966,14 @@ struct thr_repository
struct send_buffer
{
/**
- * pending data
+ * lock
*/
- struct thr_send_buffer m_buffer;
+ struct thr_spin_lock<8> m_send_lock;
/**
- * lock
+ * pending data
*/
- struct thr_spin_lock m_send_lock;
+ struct thr_send_buffer m_buffer;
/**
* Flag used to coordinate sending to same remote node from different
@@ -1878,7 +1963,7 @@ trp_callback::reset_send_buffer(NodeId n
unlock(&sb->m_send_lock);
- pool.release_global(rep->m_mm, RG_TRANSPORTER_BUFFERS);
+ pool.release_all(rep->m_mm, RG_TRANSPORTER_BUFFERS);
}
static inline
Attachment: [text/bzr-bundle] bzr/jonas@mysql.com-20110104192229-nd1lndv16qcby6gh.bundle