3969 Mikael Ronstrom 2012-09-04
Attempt at improving CPU usage for NDB API with improved API thread contention
modified:
sql/ha_ndbcluster.cc
storage/ndb/src/ndbapi/TransporterFacade.cpp
storage/ndb/src/ndbapi/TransporterFacade.hpp
storage/ndb/src/ndbapi/trp_client.hpp
3968 Mikael Ronstrom 2012-08-28
Fixed m_node_active
modified:
storage/ndb/src/ndbapi/TransporterFacade.cpp
=== modified file 'sql/ha_ndbcluster.cc'
--- a/sql/ha_ndbcluster.cc revid:mikael.ronstrom@stripped
+++ b/sql/ha_ndbcluster.cc revid:mikael.ronstrom@stripped
@@ -59,6 +59,7 @@
#include "ndb_util_thread.h"
#include "ndb_local_connection.h"
#include "ndb_local_schema.h"
+#include "../storage/ndb/src/common/util/parse_mask.hpp"
// ndb interface initialization/cleanup
extern "C" void ndb_init_internal();
=== modified file 'storage/ndb/src/ndbapi/TransporterFacade.cpp'
--- a/storage/ndb/src/ndbapi/TransporterFacade.cpp revid:mikael.ronstrom@stripped
+++ b/storage/ndb/src/ndbapi/TransporterFacade.cpp revid:mikael.ronstrom@stripped
@@ -59,7 +59,7 @@ static int indexToNumber(int index)
#define TRP_DEBUG(t)
#endif
-#define DBG_POLL 0
+#define DBG_POLL 1
#define dbg(x,y) if (DBG_POLL) printf("%llu : " x "\n", NdbTick_CurrentNanosecond() / 1000, y)
/*****************************************************************************
@@ -1032,6 +1032,10 @@ TransporterFacade::for_each(trp_client*
trp_client * woken[16];
Uint32 cnt_woken = 0;
Uint32 sz = m_threads.m_statusNext.size();
+ Uint32 save_imm_wakeups = m_imm_wakeups;
+
+ /* No immediate wakeups due to for_each handling to simplify code */
+ m_imm_wakeups = 0;
for (Uint32 i = 0; i < sz ; i ++)
{
trp_client * clnt = m_threads.m_objectExecute[i];
@@ -1075,6 +1079,7 @@ TransporterFacade::for_each(trp_client*
unlock_poll_mutex();
unlock_and_signal(woken, cnt_woken);
}
+ m_imm_wakeups = save_imm_wakeups;
}
void
@@ -2013,6 +2018,7 @@ TransporterFacade::get_poll_owner(trp_cl
switch(clnt->m_poll.m_waiting) {
case trp_client::PollQueue::PQ_WOKEN:
+ case trp_client::PollQueue::PQ_IMM_WOKEN:
dbg("%p - PQ_WOKEN", clnt);
// we have already been taken out of poll queue
assert(clnt->m_poll.m_poll_queue == false);
@@ -2204,6 +2210,8 @@ TransporterFacade::try_lock_last_client(
}
}
+#define MAX_IMMEDIATE_WAKEUPS 1
+
void
TransporterFacade::do_poll(trp_client* clnt,
Uint32 wait_time,
@@ -2229,7 +2237,10 @@ TransporterFacade::do_poll(trp_client* c
clnt->m_poll.m_poll_owner = true;
clnt->m_poll.start_poll(clnt);
dbg("%p->external_poll", clnt);
+ /* Allow for a number of immediate wakeups */
+ m_imm_wakeups = MAX_IMMEDIATE_WAKEUPS;
external_poll(clnt, wait_time);
+ m_imm_wakeups = 0;
Uint32 cnt_woken = 0;
Uint32 cnt = clnt->m_poll.m_locked_cnt - 1; // skip self
@@ -2342,12 +2353,69 @@ TransporterFacade::do_poll(trp_client* c
}
void
+TransporterFacade::handle_immediate_wakeup(trp_client *clnt)
+{
+ trp_client *poll_owner = m_poll_owner;
+ Uint32 cnt = poll_owner->m_poll.m_locked_cnt;
+ trp_client *last_client = poll_owner->m_poll.m_locked_clients[cnt - 1];
+
+ assert(poll_owner->m_poll.m_locked_clients[0] != clnt);
+ assert(poll_owner->m_poll.m_locked_clients[0] == m_poll_owner);
+
+ lock_poll_mutex();
+ remove_from_poll_queue(clnt);
+ unlock_poll_mutex();
+
+ assert(poll_owner->m_poll.m_locked_clients[0] == m_poll_owner);
+ for (Uint32 i = 1; i < cnt; i++)
+ {
+ assert(poll_owner->m_poll.m_locked_clients[0] == m_poll_owner);
+ if (poll_owner->m_poll.m_locked_clients[i] == clnt)
+ {
+ assert(poll_owner->m_poll.m_locked_clients[0] == m_poll_owner);
+ poll_owner->m_poll.m_locked_clients[i] = last_client;
+ assert(poll_owner->m_poll.m_locked_clients[0] == m_poll_owner);
+ poll_owner->m_poll.m_locked_cnt--;
+ assert(poll_owner->m_poll.m_locked_clients[0] == m_poll_owner);
+ NdbCondition_Signal(clnt->m_poll.m_condition);
+ NdbMutex_Unlock(clnt->m_mutex);
+ return;
+ }
+ }
+ assert(false);
+}
+
+void
TransporterFacade::wakeup(trp_client* clnt)
{
switch(clnt->m_poll.m_waiting) {
case trp_client::PollQueue::PQ_WAITING:
- dbg("TransporterFacade::wakeup(%p) PQ_WAITING => PQ_WOKEN", clnt);
- clnt->m_poll.m_waiting = trp_client::PollQueue::PQ_WOKEN;
+ {
+ if (m_imm_wakeups == 0 ||
+ clnt == m_poll_owner ||
+ clnt->m_poll.m_poll_owner)
+ {
+ dbg("TransporterFacade::wakeup(%p) PQ_WAITING => PQ_WOKEN", clnt);
+ clnt->m_poll.m_waiting = trp_client::PollQueue::PQ_WOKEN;
+ break;
+ }
+ else
+ {
+ /*
+ To ensure we make full use of the available CPUs we wake a few
+ threads early on instead of waiting until all signals have been
+ processed. Also decreases response time slightly in single
+ thread case.
+ */
+ dbg("TransporterFacade::wakeup(%p) PQ_WAITING => PQ_IMM_WOKEN", clnt);
+ clnt->m_poll.m_waiting = trp_client::PollQueue::PQ_IMM_WOKEN;
+ m_imm_wakeups--;
+ handle_immediate_wakeup(clnt);
+ break;
+ }
+ }
+ case trp_client::PollQueue::PQ_IMM_WOKEN:
+ dbg("TransporterFacade::wakeup(%p) PQ_IMM_WOKEN", clnt);
break;
case trp_client::PollQueue::PQ_WOKEN:
dbg("TransporterFacade::wakeup(%p) PQ_WOKEN", clnt);
@@ -2421,6 +2489,7 @@ trp_client::PollQueue::lock_client (trp_
void
TransporterFacade::add_to_poll_queue(trp_client* clnt)
{
+ dbg("%p->add_to_poll_queue", clnt);
assert(clnt != 0);
assert(clnt->m_poll.m_prev == 0);
assert(clnt->m_poll.m_next == 0);
@@ -2459,6 +2528,7 @@ TransporterFacade::remove_from_poll_queu
void
TransporterFacade::remove_from_poll_queue(trp_client* clnt)
{
+ dbg("%p->remove_from_poll_queue", clnt);
assert(clnt != 0);
assert(clnt->m_poll.m_locked == true);
assert(clnt->m_poll.m_poll_owner == false);
=== modified file 'storage/ndb/src/ndbapi/TransporterFacade.hpp'
--- a/storage/ndb/src/ndbapi/TransporterFacade.hpp revid:mikael.ronstrom@stripped
+++ b/storage/ndb/src/ndbapi/TransporterFacade.hpp revid:mikael.ronstrom@stripped
@@ -113,6 +113,7 @@ private:
bool get_node_alive(NodeId nodeId) const;
bool getIsNodeSendable(NodeId nodeId) const;
+ void handle_immediate_wakeup(trp_client*);
public:
Uint32 getMinDbNodeVersion() const;
@@ -197,6 +198,7 @@ public:
/* Support method to lock the receiver thread to its CPU */
void lock_recv_thread_cpu();
+ Uint32 m_imm_wakeups;
trp_client * m_poll_owner;
trp_client * m_poll_queue_head; // First in queue
trp_client * m_poll_queue_tail; // Last in queue
=== modified file 'storage/ndb/src/ndbapi/trp_client.hpp'
--- a/storage/ndb/src/ndbapi/trp_client.hpp revid:mikael.ronstrom@stripped
+++ b/storage/ndb/src/ndbapi/trp_client.hpp revid:mikael.ronstrom@stripped
@@ -118,7 +118,7 @@ private:
bool m_locked;
bool m_poll_owner;
bool m_poll_queue;
- enum { PQ_WOKEN, PQ_IDLE, PQ_WAITING } m_waiting;
+ enum { PQ_IMM_WOKEN, PQ_WOKEN, PQ_IDLE, PQ_WAITING } m_waiting;
trp_client *m_prev;
trp_client *m_next;
NdbCondition * m_condition;
No bundle (reason: useless for push emails).
| Thread |
|---|
| • bzr push into mysql-5.5-cluster-7.2 branch (mikael.ronstrom:3968 to 3969) | Mikael Ronstrom | 4 Sep |