List:Commits« Previous MessageNext Message »
From:Mikael Ronstrom Date:September 4 2012 1:11pm
Subject:bzr push into mysql-5.5-cluster-7.2 branch (mikael.ronstrom:3968 to 3969)
View as plain text  
 3969 Mikael Ronstrom	2012-09-04
      Attempt at improving CPU usage for NDB API with improved API thread contention

    modified:
      sql/ha_ndbcluster.cc
      storage/ndb/src/ndbapi/TransporterFacade.cpp
      storage/ndb/src/ndbapi/TransporterFacade.hpp
      storage/ndb/src/ndbapi/trp_client.hpp
 3968 Mikael Ronstrom	2012-08-28
      Fixed m_node_active

    modified:
      storage/ndb/src/ndbapi/TransporterFacade.cpp
=== modified file 'sql/ha_ndbcluster.cc'
--- a/sql/ha_ndbcluster.cc	revid:mikael.ronstrom@stripped
+++ b/sql/ha_ndbcluster.cc	revid:mikael.ronstrom@stripped
@@ -59,6 +59,7 @@
 #include "ndb_util_thread.h"
 #include "ndb_local_connection.h"
 #include "ndb_local_schema.h"
+#include "../storage/ndb/src/common/util/parse_mask.hpp"
 
 // ndb interface initialization/cleanup
 extern "C" void ndb_init_internal();

=== modified file 'storage/ndb/src/ndbapi/TransporterFacade.cpp'
--- a/storage/ndb/src/ndbapi/TransporterFacade.cpp	revid:mikael.ronstrom@stripped
+++ b/storage/ndb/src/ndbapi/TransporterFacade.cpp	revid:mikael.ronstrom@stripped
@@ -59,7 +59,7 @@ static int indexToNumber(int index)
 #define TRP_DEBUG(t)
 #endif
 
-#define DBG_POLL 0
+#define DBG_POLL 1
 #define dbg(x,y) if (DBG_POLL) printf("%llu : " x "\n", NdbTick_CurrentNanosecond() / 1000, y)
 
 /*****************************************************************************
@@ -1032,6 +1032,10 @@ TransporterFacade::for_each(trp_client* 
   trp_client * woken[16];
   Uint32 cnt_woken = 0;
   Uint32 sz = m_threads.m_statusNext.size();
+  Uint32 save_imm_wakeups = m_imm_wakeups;
+
+  /* No immediate wakeups due to for_each handling to simplify code */
+  m_imm_wakeups = 0;
   for (Uint32 i = 0; i < sz ; i ++) 
   {
     trp_client * clnt = m_threads.m_objectExecute[i];
@@ -1075,6 +1079,7 @@ TransporterFacade::for_each(trp_client* 
     unlock_poll_mutex();
     unlock_and_signal(woken, cnt_woken);
   }
+  m_imm_wakeups = save_imm_wakeups;
 }
 
 void
@@ -2013,6 +2018,7 @@ TransporterFacade::get_poll_owner(trp_cl
 
     switch(clnt->m_poll.m_waiting) {
     case trp_client::PollQueue::PQ_WOKEN:
+    case trp_client::PollQueue::PQ_IMM_WOKEN:
       dbg("%p - PQ_WOKEN", clnt);
       // we have already been taken out of poll queue
       assert(clnt->m_poll.m_poll_queue == false);
@@ -2204,6 +2210,8 @@ TransporterFacade::try_lock_last_client(
   }
 }
 
+#define MAX_IMMEDIATE_WAKEUPS 1
+
 void
 TransporterFacade::do_poll(trp_client* clnt,
                            Uint32 wait_time,
@@ -2229,7 +2237,10 @@ TransporterFacade::do_poll(trp_client* c
   clnt->m_poll.m_poll_owner = true;
   clnt->m_poll.start_poll(clnt);
   dbg("%p->external_poll", clnt);
+  /* Allow for a number of immediate wakeups */
+  m_imm_wakeups = MAX_IMMEDIATE_WAKEUPS;
   external_poll(clnt, wait_time);
+  m_imm_wakeups = 0;
 
   Uint32 cnt_woken = 0;
   Uint32 cnt = clnt->m_poll.m_locked_cnt - 1; // skip self
@@ -2342,12 +2353,69 @@ TransporterFacade::do_poll(trp_client* c
 }
 
 void
+TransporterFacade::handle_immediate_wakeup(trp_client *clnt)
+{
+  trp_client *poll_owner = m_poll_owner;
+  Uint32 cnt = poll_owner->m_poll.m_locked_cnt;
+  trp_client *last_client = poll_owner->m_poll.m_locked_clients[cnt - 1];
+
+  assert(poll_owner->m_poll.m_locked_clients[0] != clnt);
+  assert(poll_owner->m_poll.m_locked_clients[0] == m_poll_owner);
+
+  lock_poll_mutex();
+  remove_from_poll_queue(clnt);
+  unlock_poll_mutex();
+
+  assert(poll_owner->m_poll.m_locked_clients[0] == m_poll_owner);
+  for (Uint32 i = 1; i < cnt; i++)
+  {
+    assert(poll_owner->m_poll.m_locked_clients[0] == m_poll_owner);
+    if (poll_owner->m_poll.m_locked_clients[i] == clnt)
+    {
+      assert(poll_owner->m_poll.m_locked_clients[0] == m_poll_owner);
+      poll_owner->m_poll.m_locked_clients[i] = last_client;
+      assert(poll_owner->m_poll.m_locked_clients[0] == m_poll_owner);
+      poll_owner->m_poll.m_locked_cnt--;
+      assert(poll_owner->m_poll.m_locked_clients[0] == m_poll_owner);
+      NdbCondition_Signal(clnt->m_poll.m_condition);
+      NdbMutex_Unlock(clnt->m_mutex);
+      return;
+    }
+  }
+  assert(false);
+}
+
+void
 TransporterFacade::wakeup(trp_client* clnt)
 {
   switch(clnt->m_poll.m_waiting) {
   case trp_client::PollQueue::PQ_WAITING:
-    dbg("TransporterFacade::wakeup(%p) PQ_WAITING => PQ_WOKEN", clnt);
-    clnt->m_poll.m_waiting = trp_client::PollQueue::PQ_WOKEN;
+  {
+    if (m_imm_wakeups == 0 ||
+        clnt == m_poll_owner ||
+        clnt->m_poll.m_poll_owner)
+    {
+      dbg("TransporterFacade::wakeup(%p) PQ_WAITING => PQ_WOKEN", clnt);
+      clnt->m_poll.m_waiting = trp_client::PollQueue::PQ_WOKEN;
+      break;
+    }
+    else
+    {
+      /*
+        To ensure we make full use of the available CPUs we wake a few
+        threads early on instead of waiting until all signals have been
+        processed. Also decreases response time slightly in single
+        thread case.
+      */
+      dbg("TransporterFacade::wakeup(%p) PQ_WAITING => PQ_IMM_WOKEN", clnt);
+      clnt->m_poll.m_waiting = trp_client::PollQueue::PQ_IMM_WOKEN;
+      m_imm_wakeups--;
+      handle_immediate_wakeup(clnt);
+      break;
+    }
+  }
+  case trp_client::PollQueue::PQ_IMM_WOKEN:
+    dbg("TransporterFacade::wakeup(%p) PQ_IMM_WOKEN", clnt);
     break;
   case trp_client::PollQueue::PQ_WOKEN:
     dbg("TransporterFacade::wakeup(%p) PQ_WOKEN", clnt);
@@ -2421,6 +2489,7 @@ trp_client::PollQueue::lock_client (trp_
 void
 TransporterFacade::add_to_poll_queue(trp_client* clnt)
 {
+  dbg("%p->add_to_poll_queue", clnt);
   assert(clnt != 0);
   assert(clnt->m_poll.m_prev == 0);
   assert(clnt->m_poll.m_next == 0);
@@ -2459,6 +2528,7 @@ TransporterFacade::remove_from_poll_queu
 void
 TransporterFacade::remove_from_poll_queue(trp_client* clnt)
 {
+  dbg("%p->remove_from_poll_queue", clnt);
   assert(clnt != 0);
   assert(clnt->m_poll.m_locked == true);
   assert(clnt->m_poll.m_poll_owner == false);

=== modified file 'storage/ndb/src/ndbapi/TransporterFacade.hpp'
--- a/storage/ndb/src/ndbapi/TransporterFacade.hpp	revid:mikael.ronstrom@stripped
+++ b/storage/ndb/src/ndbapi/TransporterFacade.hpp	revid:mikael.ronstrom@stripped
@@ -113,6 +113,7 @@ private:
   bool   get_node_alive(NodeId nodeId) const;
   bool   getIsNodeSendable(NodeId nodeId) const;
 
+  void handle_immediate_wakeup(trp_client*);
 public:
   Uint32 getMinDbNodeVersion() const;
 
@@ -197,6 +198,7 @@ public:
   /* Support method to lock the receiver thread to its CPU */
   void lock_recv_thread_cpu();
 
+  Uint32       m_imm_wakeups;
   trp_client * m_poll_owner;
   trp_client * m_poll_queue_head; // First in queue
   trp_client * m_poll_queue_tail; // Last in queue

=== modified file 'storage/ndb/src/ndbapi/trp_client.hpp'
--- a/storage/ndb/src/ndbapi/trp_client.hpp	revid:mikael.ronstrom@stripped
+++ b/storage/ndb/src/ndbapi/trp_client.hpp	revid:mikael.ronstrom@stripped
@@ -118,7 +118,7 @@ private:
     bool m_locked;
     bool m_poll_owner;
     bool m_poll_queue;
-    enum { PQ_WOKEN, PQ_IDLE, PQ_WAITING } m_waiting;
+    enum { PQ_IMM_WOKEN, PQ_WOKEN, PQ_IDLE, PQ_WAITING } m_waiting;
     trp_client *m_prev;
     trp_client *m_next;
     NdbCondition * m_condition;

No bundle (reason: useless for push emails).
Thread
bzr push into mysql-5.5-cluster-7.2 branch (mikael.ronstrom:3968 to 3969) Mikael Ronstrom4 Sep