From: Mikael Ronstrom Date: September 4 2012 1:11pm Subject: bzr push into mysql-5.5-cluster-7.2 branch (mikael.ronstrom:3968 to 3969) List-Archive: http://lists.mysql.com/commits/144688 Message-Id: <201209041312.q84DCSWY006634@dator6> MIME-Version: 1.0 Content-Type: text/plain; charset="us-ascii" Content-Transfer-Encoding: 7bit 3969 Mikael Ronstrom 2012-09-04 Attempt at improving CPU usage for NDB API with improved API thread contention modified: sql/ha_ndbcluster.cc storage/ndb/src/ndbapi/TransporterFacade.cpp storage/ndb/src/ndbapi/TransporterFacade.hpp storage/ndb/src/ndbapi/trp_client.hpp 3968 Mikael Ronstrom 2012-08-28 Fixed m_node_active modified: storage/ndb/src/ndbapi/TransporterFacade.cpp === modified file 'sql/ha_ndbcluster.cc' --- a/sql/ha_ndbcluster.cc revid:mikael.ronstrom@stripped +++ b/sql/ha_ndbcluster.cc revid:mikael.ronstrom@stripped @@ -59,6 +59,7 @@ #include "ndb_util_thread.h" #include "ndb_local_connection.h" #include "ndb_local_schema.h" +#include "../storage/ndb/src/common/util/parse_mask.hpp" // ndb interface initialization/cleanup extern "C" void ndb_init_internal(); === modified file 'storage/ndb/src/ndbapi/TransporterFacade.cpp' --- a/storage/ndb/src/ndbapi/TransporterFacade.cpp revid:mikael.ronstrom@stripped +++ b/storage/ndb/src/ndbapi/TransporterFacade.cpp revid:mikael.ronstrom@stripped @@ -59,7 +59,7 @@ static int indexToNumber(int index) #define TRP_DEBUG(t) #endif -#define DBG_POLL 0 +#define DBG_POLL 1 #define dbg(x,y) if (DBG_POLL) printf("%llu : " x "\n", NdbTick_CurrentNanosecond() / 1000, y) /***************************************************************************** @@ -1032,6 +1032,10 @@ TransporterFacade::for_each(trp_client* trp_client * woken[16]; Uint32 cnt_woken = 0; Uint32 sz = m_threads.m_statusNext.size(); + Uint32 save_imm_wakeups = m_imm_wakeups; + + /* No immediate wakeups due to for_each handling to simplify code */ + m_imm_wakeups = 0; for (Uint32 i = 0; i < sz ; i ++) { trp_client * clnt = m_threads.m_objectExecute[i]; @@ -1075,6 +1079,7 @@ TransporterFacade::for_each(trp_client* unlock_poll_mutex(); unlock_and_signal(woken, cnt_woken); } + m_imm_wakeups = save_imm_wakeups; } void @@ -2013,6 +2018,7 @@ TransporterFacade::get_poll_owner(trp_cl switch(clnt->m_poll.m_waiting) { case trp_client::PollQueue::PQ_WOKEN: + case trp_client::PollQueue::PQ_IMM_WOKEN: dbg("%p - PQ_WOKEN", clnt); // we have already been taken out of poll queue assert(clnt->m_poll.m_poll_queue == false); @@ -2204,6 +2210,8 @@ TransporterFacade::try_lock_last_client( } } +#define MAX_IMMEDIATE_WAKEUPS 1 + void TransporterFacade::do_poll(trp_client* clnt, Uint32 wait_time, @@ -2229,7 +2237,10 @@ TransporterFacade::do_poll(trp_client* c clnt->m_poll.m_poll_owner = true; clnt->m_poll.start_poll(clnt); dbg("%p->external_poll", clnt); + /* Allow for a number of immediate wakeups */ + m_imm_wakeups = MAX_IMMEDIATE_WAKEUPS; external_poll(clnt, wait_time); + m_imm_wakeups = 0; Uint32 cnt_woken = 0; Uint32 cnt = clnt->m_poll.m_locked_cnt - 1; // skip self @@ -2342,12 +2353,69 @@ TransporterFacade::do_poll(trp_client* c } void +TransporterFacade::handle_immediate_wakeup(trp_client *clnt) +{ + trp_client *poll_owner = m_poll_owner; + Uint32 cnt = poll_owner->m_poll.m_locked_cnt; + trp_client *last_client = poll_owner->m_poll.m_locked_clients[cnt - 1]; + + assert(poll_owner->m_poll.m_locked_clients[0] != clnt); + assert(poll_owner->m_poll.m_locked_clients[0] == m_poll_owner); + + lock_poll_mutex(); + remove_from_poll_queue(clnt); + unlock_poll_mutex(); + + assert(poll_owner->m_poll.m_locked_clients[0] == m_poll_owner); + for (Uint32 i = 1; i < cnt; i++) + { + assert(poll_owner->m_poll.m_locked_clients[0] == m_poll_owner); + if (poll_owner->m_poll.m_locked_clients[i] == clnt) + { + assert(poll_owner->m_poll.m_locked_clients[0] == m_poll_owner); + poll_owner->m_poll.m_locked_clients[i] = last_client; + assert(poll_owner->m_poll.m_locked_clients[0] == m_poll_owner); + poll_owner->m_poll.m_locked_cnt--; + assert(poll_owner->m_poll.m_locked_clients[0] == m_poll_owner); + NdbCondition_Signal(clnt->m_poll.m_condition); + NdbMutex_Unlock(clnt->m_mutex); + return; + } + } + assert(false); +} + +void TransporterFacade::wakeup(trp_client* clnt) { switch(clnt->m_poll.m_waiting) { case trp_client::PollQueue::PQ_WAITING: - dbg("TransporterFacade::wakeup(%p) PQ_WAITING => PQ_WOKEN", clnt); - clnt->m_poll.m_waiting = trp_client::PollQueue::PQ_WOKEN; + { + if (m_imm_wakeups == 0 || + clnt == m_poll_owner || + clnt->m_poll.m_poll_owner) + { + dbg("TransporterFacade::wakeup(%p) PQ_WAITING => PQ_WOKEN", clnt); + clnt->m_poll.m_waiting = trp_client::PollQueue::PQ_WOKEN; + break; + } + else + { + /* + To ensure we make full use of the available CPUs we wake a few + threads early on instead of waiting until all signals have been + processed. Also decreases response time slightly in single + thread case. + */ + dbg("TransporterFacade::wakeup(%p) PQ_WAITING => PQ_IMM_WOKEN", clnt); + clnt->m_poll.m_waiting = trp_client::PollQueue::PQ_IMM_WOKEN; + m_imm_wakeups--; + handle_immediate_wakeup(clnt); + break; + } + } + case trp_client::PollQueue::PQ_IMM_WOKEN: + dbg("TransporterFacade::wakeup(%p) PQ_IMM_WOKEN", clnt); break; case trp_client::PollQueue::PQ_WOKEN: dbg("TransporterFacade::wakeup(%p) PQ_WOKEN", clnt); @@ -2421,6 +2489,7 @@ trp_client::PollQueue::lock_client (trp_ void TransporterFacade::add_to_poll_queue(trp_client* clnt) { + dbg("%p->add_to_poll_queue", clnt); assert(clnt != 0); assert(clnt->m_poll.m_prev == 0); assert(clnt->m_poll.m_next == 0); @@ -2459,6 +2528,7 @@ TransporterFacade::remove_from_poll_queu void TransporterFacade::remove_from_poll_queue(trp_client* clnt) { + dbg("%p->remove_from_poll_queue", clnt); assert(clnt != 0); assert(clnt->m_poll.m_locked == true); assert(clnt->m_poll.m_poll_owner == false); === modified file 'storage/ndb/src/ndbapi/TransporterFacade.hpp' --- a/storage/ndb/src/ndbapi/TransporterFacade.hpp revid:mikael.ronstrom@stripped +++ b/storage/ndb/src/ndbapi/TransporterFacade.hpp revid:mikael.ronstrom@stripped @@ -113,6 +113,7 @@ private: bool get_node_alive(NodeId nodeId) const; bool getIsNodeSendable(NodeId nodeId) const; + void handle_immediate_wakeup(trp_client*); public: Uint32 getMinDbNodeVersion() const; @@ -197,6 +198,7 @@ public: /* Support method to lock the receiver thread to its CPU */ void lock_recv_thread_cpu(); + Uint32 m_imm_wakeups; trp_client * m_poll_owner; trp_client * m_poll_queue_head; // First in queue trp_client * m_poll_queue_tail; // Last in queue === modified file 'storage/ndb/src/ndbapi/trp_client.hpp' --- a/storage/ndb/src/ndbapi/trp_client.hpp revid:mikael.ronstrom@stripped +++ b/storage/ndb/src/ndbapi/trp_client.hpp revid:mikael.ronstrom@stripped @@ -118,7 +118,7 @@ private: bool m_locked; bool m_poll_owner; bool m_poll_queue; - enum { PQ_WOKEN, PQ_IDLE, PQ_WAITING } m_waiting; + enum { PQ_IMM_WOKEN, PQ_WOKEN, PQ_IDLE, PQ_WAITING } m_waiting; trp_client *m_prev; trp_client *m_next; NdbCondition * m_condition; No bundle (reason: useless for push emails).