List:Commits« Previous MessageNext Message »
From:Mikael Ronstrom Date:July 12 2012 9:19pm
Subject:bzr push into mysql-5.5-cluster-7.2 branch (mikael.ronstrom:3956 to 3960)
View as plain text  
 3960 Mikael Ronstrom	2012-07-12
      Added options for setting receiver thread options in cluster connection interface and in MySQL Server

    modified:
      sql/ha_ndbcluster.cc
      sql/ha_ndbcluster_connection.cc
      sql/ha_ndbcluster_connection.h
      storage/ndb/include/ndbapi/ndb_cluster_connection.hpp
      storage/ndb/src/ndbapi/TransporterFacade.cpp
      storage/ndb/src/ndbapi/TransporterFacade.hpp
      storage/ndb/src/ndbapi/ndb_cluster_connection.cpp
 3959 Mikael Ronstrom	2012-07-12
      Made adaptive solution that uses receive thread in high-load situations

    modified:
      storage/ndb/src/ndbapi/TransporterFacade.cpp
      storage/ndb/src/ndbapi/TransporterFacade.hpp
      storage/ndb/src/ndbapi/trp_client.cpp
      storage/ndb/src/ndbapi/trp_client.hpp
 3958 Mikael Ronstrom	2012-07-12
      Set m_poll_owner on client object when becoming poll owner

    modified:
      storage/ndb/src/ndbapi/TransporterFacade.cpp
 3957 Mikael Ronstrom	2012-07-06
      Streamlined receiver thread code, added use of start_poll and complete_poll on receiver thread trp client

    modified:
      storage/ndb/src/ndbapi/TransporterFacade.cpp
 3956 Mikael Ronstrom	2012-07-05
      fix

    modified:
      storage/ndb/src/ndbapi/TransporterFacade.cpp
=== modified file 'sql/ha_ndbcluster.cc'
--- a/sql/ha_ndbcluster.cc	revid:mikael.ronstrom@stripped
+++ b/sql/ha_ndbcluster.cc	revid:mikael.ronstrom@stripped
@@ -73,6 +73,9 @@ static ulong opt_ndb_wait_connected;
 ulong opt_ndb_wait_setup;
 static ulong opt_ndb_cache_check_time;
 static uint opt_ndb_cluster_connection_pool;
+static uint opt_ndb_num_recv_threads;
+static uint opt_ndb_recv_thread_activation_threshold;
+static char* opt_ndb_recv_thread_cpu_mask;
 static char* opt_ndb_index_stat_option;
 static char* opt_ndb_connectstring;
 static uint opt_ndb_nodeid;
@@ -11924,7 +11927,9 @@ static int ndbcluster_init(void *p)
                          opt_ndb_cluster_connection_pool,
                          (global_opti_node_select & 1),
                          opt_ndb_connectstring,
-                         opt_ndb_nodeid))
+                         opt_ndb_nodeid,
+                         opt_ndb_num_recv_threads,
+                         opt_ndb_recv_thread_activation_threshold))
   {
     DBUG_PRINT("error", ("Could not initiate connection to cluster"));
     goto ndbcluster_init_error;
@@ -17082,7 +17087,7 @@ static MYSQL_SYSVAR_ULONG(
   0                                  /* block */
 );
 
-
+#define MAX_CLUSTER_CONNECTIONS 63
 static MYSQL_SYSVAR_UINT(
   cluster_connection_pool,           /* name */
   opt_ndb_cluster_connection_pool,   /* var */
@@ -17092,10 +17097,170 @@ static MYSQL_SYSVAR_UINT(
   NULL,                              /* update func. */
   1,                                 /* default */
   1,                                 /* min */
-  63,                                /* max */
+  MAX_CLUSTER_CONNECTIONS,           /* max */
+  0                                  /* block */
+);
+
+#define MAX_NUM_RECV_THREADS 4
+static MYSQL_SYSVAR_UINT(
+  num_recv_threads,   /* name */
+  opt_ndb_num_recv_threads,/* var */
+  PLUGIN_VAR_RQCMDARG | PLUGIN_VAR_READONLY,
+  "Number of receive threads per cluster connection",
+  NULL,                              /* check func. */
+  NULL,                              /* update func. */
+  1,                                 /* default */
+  1,                                 /* min */
+  MAX_NUM_RECV_THREADS,              /* max */
   0                                  /* block */
 );
 
+static
+void
+ndb_recv_thread_activation_threshold_update(MYSQL_THD,
+                                            struct st_mysql_sys_var *var,
+                                            void *var_ptr,
+                                            const void *save)
+{
+  ndb_set_recv_thread_activation_threshold(
+    opt_ndb_recv_thread_activation_threshold);
+}
+
+static MYSQL_SYSVAR_UINT(
+  recv_thread_activation_threshold,         /* name */
+  opt_ndb_recv_thread_activation_threshold, /* var */
+  PLUGIN_VAR_RQCMDARG,
+  "Activation threshold when receive thread takes over the polling "
+  "of the cluster connection (measured in concurrently active "
+  "threads)",
+  NULL,                                        /* check func. */
+  ndb_recv_thread_activation_threshold_update, /* update func. */
+  8,                                           /* default */
+  0,                                           /* min */
+  16,                                          /* max */
+  0                                            /* block */
+);
+
+static const int ndb_recv_thread_cpu_mask_option_buf_size = 512;
+char ndb_recv_thread_cpu_mask_option_buf[ndb_recv_thread_cpu_mask_option_buf_size];
+Uint16 recv_thread_cpuid_array[MAX_NUM_RECV_THREADS * MAX_CLUSTER_CONNECTIONS];
+Uint32 recv_thread_num_cpus;
+
+static int
+count_bits_in_hex_number(Uint32 hex_number)
+{
+  int num_cpus = 0;
+  for (int i = 0; i < 4; i++)
+  {
+    if (hex_number & (1 << i))
+    {
+      num_cpus++;
+    }
+  }
+  return num_cpus;
+}
+
+static int
+assign_cpuid_in_array_from_hexnumber(Uint32 hex_number, Uint32 base_cpuid)
+{
+  if ((recv_thread_num_cpus + count_bits_in_hex_number(hex_number)) >=
+      (MAX_NUM_RECV_THREADS * MAX_CLUSTER_CONNECTIONS))
+  {
+    return -1;
+  }
+  for (int i = 0; i < 4; i++)
+  {
+    if (hex_number & (1 << i))
+    {
+      recv_thread_cpuid_array[recv_thread_num_cpus++] = (base_cpuid + i);
+    }
+  }
+  return 0;
+}
+
+static int
+assign_cpus_from_character(char c, Uint32 base_cpuid)
+{
+  Uint32 hex_number;
+
+  if (c >= '0' && c <= '9')
+    hex_number = (c - '0');
+  else if (c >= 'A' && c <= 'F')
+    hex_number = (c - 'A') + 10;
+  else if (c >= 'a' && c <= 'f')
+    hex_number = (c - 'a') + 10;
+  else
+    return -1;
+  return assign_cpuid_in_array_from_hexnumber(hex_number, base_cpuid);
+}
+
+static int
+cpu_mask_parse(const char *str, int mask_str_len)
+{
+  Uint32 base_cpu_id = 0;
+  recv_thread_num_cpus = 0;
+  if (str[0] != '0' || str[1] != 'x')
+    goto error;
+  for (int i = mask_str_len - 1; i >= 2; i++)
+  {
+    /*
+      Parse string backwards, last character is representing
+      CPU 0-3, next to last represents 4-7 and so forth.
+    */
+    if (assign_cpus_from_character(str[i], base_cpu_id) != 0)
+      goto error;
+    base_cpu_id += 4;
+  }
+  return 0;
+error:
+  return -1;
+}
+
+static
+int
+ndb_recv_thread_cpu_mask_check(MYSQL_THD thd,
+                               struct st_mysql_sys_var *var,
+                               void *save,
+                               struct st_mysql_value *value)
+{
+  int mask_str_len;
+  char buf[ndb_recv_thread_cpu_mask_option_buf_size];
+  int len = sizeof(buf);
+  const char *str = value->val_str(value, buf, &len);
+  if (str != 0)
+  {
+    mask_str_len = strlen(str);
+    if ((mask_str_len >= len) ||
+        (cpu_mask_parse(str, mask_str_len) != 0))
+      goto error;
+  }
+error:
+  return 1;
+}
+
+static
+void
+ndb_recv_thread_cpu_mask_update(MYSQL_THD,
+                                struct st_mysql_sys_var *var,
+                                void *var_ptr,
+                                const void *save)
+{
+  ndb_set_recv_thread_cpu(recv_thread_cpuid_array,
+                          opt_ndb_num_recv_threads,
+                          recv_thread_num_cpus);
+}
+
+static MYSQL_SYSVAR_STR(
+  recv_thread_cpu_mask,             /* name */
+  opt_ndb_recv_thread_cpu_mask,     /* var */
+  PLUGIN_VAR_RQCMDARG,
+  "CPU mask for locking receiver threads to specific CPU, specified "
+  " as hexadecimal as e.g. 0x33, one CPU is used per receiver thread.",
+  ndb_recv_thread_cpu_mask_check,      /* check func. */
+  ndb_recv_thread_cpu_mask_update,     /* update func. */
+  ndb_recv_thread_cpu_mask_option_buf
+);
+
 /* should be in index_stat.h */
 
 extern int
@@ -17373,6 +17538,9 @@ static struct st_mysql_sys_var* system_v
   MYSQL_SYSVAR(wait_connected),
   MYSQL_SYSVAR(wait_setup),
   MYSQL_SYSVAR(cluster_connection_pool),
+  MYSQL_SYSVAR(num_recv_threads),
+  MYSQL_SYSVAR(recv_thread_activation_threshold),
+  MYSQL_SYSVAR(recv_thread_cpu_mask),
   MYSQL_SYSVAR(report_thresh_binlog_mem_usage),
   MYSQL_SYSVAR(report_thresh_binlog_epoch_slip),
   MYSQL_SYSVAR(log_update_as_write),

=== modified file 'sql/ha_ndbcluster_connection.cc'
--- a/sql/ha_ndbcluster_connection.cc	revid:mikael.ronstrom@stripped
+++ b/sql/ha_ndbcluster_connection.cc	revid:mikael.ronstrom@stripped
@@ -45,7 +45,9 @@ ndbcluster_connect(int (*connect_callbac
                    uint connection_pool_size,
                    bool optimized_node_select,
                    const char* connect_string,
-                   uint force_nodeid)
+                   uint force_nodeid,
+                   uint num_recv_threads,
+                   uint recv_thread_activation_threshold)
 {
   NDB_TICKS end_time;
 
@@ -77,6 +79,9 @@ ndbcluster_connect(int (*connect_callbac
     g_ndb_cluster_connection->set_name(buf);
   }
   g_ndb_cluster_connection->set_optimized_node_selection(optimized_node_select);
+  g_ndb_cluster_connection->set_num_recv_threads(num_recv_threads);
+  g_ndb_cluster_connection->set_recv_thread_activation_threshold(
+                                      recv_thread_activation_threshold);
 
   // Create a Ndb object to open the connection  to NDB
   if ( (g_ndb= new Ndb(g_ndb_cluster_connection, "sys")) == 0 )
@@ -135,6 +140,8 @@ ndbcluster_connect(int (*connect_callbac
         g_pool[i]->set_name(buf);
       }
       g_pool[i]->set_optimized_node_selection(optimized_node_select);
+      g_pool[i]->set_num_recv_threads(num_recv_threads);
+      g_pool[i]->set_recv_thread_activation_threshold(recv_thread_activation_threshold);
     }
   }
 
@@ -289,6 +296,40 @@ int ndb_has_node_id(uint id)
   return 0;
 }
 
+int ndb_set_recv_thread_activation_threshold(Uint32 threshold)
+{
+  for (uint i= 0; i < g_pool_alloc; i++)
+  {
+    g_pool[i]->set_recv_thread_activation_threshold(threshold);
+  }
+  return 0;
+}
+
+int
+ndb_set_recv_thread_cpu(Uint16 *cpuid_array,
+                        Uint32 num_recv_threads,
+                        Uint32 cpuid_array_size)
+{
+  Uint32 num_cpu_needed = num_recv_threads * g_pool_alloc;
+  Uint32 cpu_index = 0;
+  if (cpuid_array_size < num_cpu_needed)
+  {
+    /* Ignore cpu masks that is too short */
+    return 0;
+  }
+  for (Uint32 i = 0; i < g_pool_alloc; i++)
+  {
+    for (Uint32 j = 0; j < num_recv_threads; j++)
+    {
+      g_pool[i]->set_recv_thread_cpu(&cpuid_array[cpu_index],
+                                     (Uint32)1,
+                                     j);
+      cpu_index++;
+    }
+  }
+  return 0;
+}
+
 void ndb_get_connection_stats(Uint64* statsArr)
 {
   Uint64 connectionStats[ Ndb::NumClientStatistics ];

=== modified file 'sql/ha_ndbcluster_connection.h'
--- a/sql/ha_ndbcluster_connection.h	revid:mikael.ronstrom@stripped
+++ b/sql/ha_ndbcluster_connection.h	revid:mikael.ronstrom@stripped
@@ -19,13 +19,21 @@ int ndbcluster_connect(int (*connect_cal
                        ulong wait_connected,
                        uint connection_pool_size,
                        bool optimized_node_select,
-                       const char* connect_string, uint force_nodeid);
+                       const char* connect_string,
+                       uint force_nodeid,
+                       uint num_recv_threads,
+                       uint recv_thread_activation_threshold);
+
 void ndbcluster_disconnect(void);
 
 Ndb_cluster_connection *ndb_get_cluster_connection();
 ulonglong ndb_get_latest_trans_gci();
 void ndb_set_latest_trans_gci(ulonglong val);
 int ndb_has_node_id(uint id);
+int ndb_set_recv_thread_activation_threshold(Uint32 threshold);
+int ndb_set_recv_thread_cpu(Uint16 *cpuid_array,
+                            Uint32 num_recv_threads,
+                            Uint32 cpuid_array_size);
 void ndb_get_connection_stats(Uint64* statsArr);
 
 /* perform random sleep in the range milli_sleep to 2*milli_sleep */

=== modified file 'storage/ndb/include/ndbapi/ndb_cluster_connection.hpp'
--- a/storage/ndb/include/ndbapi/ndb_cluster_connection.hpp	revid:mikael.ronstrom@stripped
+++ b/storage/ndb/include/ndbapi/ndb_cluster_connection.hpp	revid:mikael.ronstrom@stripped
@@ -189,6 +189,30 @@ public:
  void set_max_adaptive_send_time(Uint32 milliseconds);
  Uint32 get_max_adaptive_send_time();
 
+  /**
+   * Configuration handling of the receiver thread(s).
+   * We can set the number of receiver threads, we can set the cpu to bind
+   * the receiver thread to. We can also set the level of when we activate
+   * the receiver thread as the receiver, before this level the normal
+   * user threads are used to receive signals. If we set the level to
+   * 16 or higher we will never use receive threads as receivers.
+   *
+   * By default we have one receiver thread, this thread is not locked to
+   * any specific CPU and the level is 8.
+   * 
+   * The number of receive threads can only be set at a time before the
+   * connect call is made to connect to the other nodes in the cluster.
+   * The other methods can be called at any time.
+   *
+   * All methods return -1 as an error indication
+   */
+  int set_num_recv_threads(Uint32 num_recv_threads);
+  int get_num_recv_threads() const;
+  int set_recv_thread_cpu(Uint16 *cpuid_array,
+                          Uint32 array_len,
+                          Uint32 recv_thread_id = 0);
+  int set_recv_thread_activation_threshold(Uint32 threshold);
+  int get_recv_thread_activation_threshold() const;
 
 #ifndef DOXYGEN_SHOULD_SKIP_INTERNAL
   int get_no_ready();

=== modified file 'storage/ndb/src/ndbapi/TransporterFacade.cpp'
--- a/storage/ndb/src/ndbapi/TransporterFacade.cpp	revid:mikael.ronstrom@stripped
+++ b/storage/ndb/src/ndbapi/TransporterFacade.cpp	revid:mikael.ronstrom@stripped
@@ -1,5 +1,5 @@
 /*
-   Copyright (c) 2003, 2011, Oracle and/or its affiliates. All rights reserved.
+   Copyright (c) 2003, 2012, Oracle and/or its affiliates. All rights reserved.
 
    This program is free software; you can redistribute it and/or modify
    it under the terms of the GNU General Public License as published by
@@ -330,7 +330,7 @@ TransporterFacade::deliver_signal(Signal
    * one packed signal to the NDB API.
    */
   const Uint32 MAX_MESSAGES_IN_LOCKED_CLIENTS =
-    NDB_ARRAY_SIZE(m_poll_owner->m_poll.m_locked_clients) - 6;
+    m_poll_owner->m_poll.m_lock_array_size - 6;
    return m_poll_owner->m_poll.m_locked_cnt >= MAX_MESSAGES_IN_LOCKED_CLIENTS;
 }
 
@@ -581,7 +581,7 @@ runReceiveResponse_C(void * me)
 ReceiveThreadClient::ReceiveThreadClient(TransporterFacade * facade)
 {
   DBUG_ENTER("ReceiveThreadClient::ReceiveThreadClient");
-  Uint32 ret = this->open(facade);
+  Uint32 ret = this->open(facade, -1, true);
   if (unlikely(ret == 0))
   {
     ndbout_c("Failed to register receive thread, ret = %d", ret);
@@ -636,13 +636,16 @@ TransporterFacade::checkClusterMgr(NDB_T
 }
 
 bool
-TransporterFacade::become_poll_owner(trp_client* clnt)
+TransporterFacade::become_poll_owner(trp_client* clnt,
+                                     NDB_TICKS currTime)
 {
   bool poll_owner = false;
   lock_poll_mutex();
   if (m_poll_owner == NULL)
   {
     poll_owner = true;
+    m_num_active_clients = 0;
+    m_receive_activation_time = currTime;
     m_poll_owner = clnt;
   }
   unlock_poll_mutex();
@@ -654,6 +657,71 @@ TransporterFacade::become_poll_owner(trp
   return false;
 }
 
+int
+TransporterFacade::set_num_recv_threads(Uint32 num_recv_threads)
+{
+  return -1;
+}
+
+int
+TransporterFacade::get_num_recv_threads() const
+{
+  return 1;
+}
+
+int
+TransporterFacade::set_recv_thread_cpu(Uint16 *cpuid_array,
+                                       Uint32 array_len,
+                                       Uint32 recv_thread_id)
+{
+  if (array_len > 1 || array_len == 0)
+  {
+    return -1;
+  }
+  if (recv_thread_id != 0)
+  {
+    return -1;
+  }
+  recv_thread_cpu_id = cpuid_array[0];
+  if (theTransporterRegistry)
+  {
+    /* Receiver thread already started, lock cpu now */
+    lock_recv_thread_cpu();
+  }
+  return 0;
+}
+
+int
+TransporterFacade::set_recv_thread_activation_threshold(Uint32 threshold)
+{
+  if (threshold >= 16)
+  {
+    threshold = 256;
+  }
+  min_active_clients_recv_thread = threshold;
+  return 0;
+}
+
+void
+TransporterFacade::lock_recv_thread_cpu()
+{
+  while (theReceiveThread == NULL)
+  {
+    NdbSleep_MilliSleep(1);
+  }
+  if (recv_thread_cpu_id != NO_RECV_THREAD_CPU_ID)
+  {
+    NdbThread_LockCPU(theReceiveThread, (Uint32)recv_thread_cpu_id);
+  }
+}
+
+int
+TransporterFacade::get_recv_thread_activation_threshold() const
+{
+  return min_active_clients_recv_thread;
+}
+
+#define MIN_ACTIVE_CLIENTS_RECV_THREAD 8
 /*
   The receiver thread is changed to only wake up once every 10 milliseconds
   to poll. It will first check that nobody owns the poll "right" before
@@ -672,20 +740,47 @@ void TransporterFacade::threadMainReceiv
   NdbThread_set_shm_sigmask(TRUE);
 #endif
   ReceiveThreadClient *recv_client = new ReceiveThreadClient(this);
-  while (theReceiveThread == NULL)
-  {
-    NdbSleep_MilliSleep(1);
-  }
-  // Temporarily hardcoded
-  NdbThread_LockCPU(theReceiveThread, (Uint32)7);
+  lock_recv_thread_cpu();
   while(!theStopReceive)
   {
     currTime = NdbTick_CurrentMillisecond();
     checkClusterMgr(currTime, lastTime);
     if (!poll_owner)
-      poll_owner = become_poll_owner(recv_client);
+    {
+      /*
+         We only take the step to become poll owner in receive thread if
+         we are sufficiently active, at least 16 threads active.
+      */
+      if (m_num_active_clients > min_active_clients_recv_thread)
+      {
+        poll_owner = become_poll_owner(recv_client, currTime);
+      }
+      else
+      {
+        NdbSleep_MilliSleep(100);
+      }
+    }
     if (poll_owner)
-      do_poll(recv_client, 100, true);
+    {
+      bool stay_poll_owner = true;
+      if ((currTime - m_receive_activation_time) > (NDB_TICKS)1000)
+      {
+        /* Reset timer for next activation check time */
+        m_receive_activation_time = currTime;
+        lock_poll_mutex();
+        if (m_num_active_clients < (min_active_clients_recv_thread / 2))
+        {
+          /* Go back to not have an active receive thread */
+          stay_poll_owner = false;
+          poll_owner = false;
+        }
+        m_num_active_clients = 0; /* Reset active clients for next timeslot */
+        unlock_poll_mutex();
+      }
+      recv_client->start_poll();
+      do_poll(recv_client, 10, true, stay_poll_owner);
+      recv_client->complete_poll();
+    }
   }//while
   delete recv_client;
   theTransporterRegistry->stopReceiving();
@@ -724,9 +819,12 @@ TransporterFacade::external_poll(trp_cli
 }
 
 TransporterFacade::TransporterFacade(GlobalDictCache *cache) :
+  min_active_clients_recv_thread(MIN_ACTIVE_CLIENTS_RECV_THREAD),
+  recv_thread_cpu_id(NO_RECV_THREAD_CPU_ID),
   m_poll_owner(NULL),
   m_poll_queue_head(NULL),
   m_poll_queue_tail(NULL),
+  m_num_active_clients(0),
   theTransporterRegistry(0),
   theOwnId(0),
   theStartNodeId(1),
@@ -1953,7 +2051,7 @@ TransporterFacade::finish_poll(trp_clien
   {
     Uint32 lock_cnt = clnt->m_poll.m_locked_cnt;
     assert(lock_cnt >= 1);
-    assert(lock_cnt <= NDB_ARRAY_SIZE(clnt->m_poll.m_locked_clients));
+    assert(lock_cnt <= clnt->m_poll.m_lock_array_size);
     assert(clnt->m_poll.m_locked_clients[0] == clnt);
     // no duplicates
     if (DBG_POLL) printf("after external_poll: cnt: %u ", lock_cnt);
@@ -2079,15 +2177,16 @@ TransporterFacade::try_lock_last_client(
 void
 TransporterFacade::do_poll(trp_client* clnt,
                            Uint32 wait_time,
-                           bool poll_owner)
+                           bool is_poll_owner,
+                           bool stay_poll_owner)
 {
   dbg("do_poll(%p)", clnt);
   clnt->m_poll.m_waiting = trp_client::PollQueue::PQ_WAITING;
-  if (!poll_owner)
+  assert(clnt->m_poll.m_locked == true);
+  assert(clnt->m_poll.m_poll_owner == false);
+  assert(clnt->m_poll.m_poll_queue == false);
+  if (!is_poll_owner)
   {
-    assert(clnt->m_poll.m_locked == true);
-    assert(clnt->m_poll.m_poll_owner == false);
-    assert(clnt->m_poll.m_poll_queue == false);
     if (!get_poll_owner(clnt, wait_time))
       return;
   }
@@ -2105,12 +2204,15 @@ TransporterFacade::do_poll(trp_client* c
   Uint32 cnt_woken = 0;
   Uint32 cnt = clnt->m_poll.m_locked_cnt - 1; // skip self
   trp_client ** arr = clnt->m_poll.m_locked_clients + 1; // skip self
-  if (!poll_owner)
-    clnt->m_poll.m_poll_owner = false;
+  clnt->m_poll.m_poll_owner = false;
   finish_poll(clnt, cnt, cnt_woken, arr);
 
   lock_poll_mutex();
 
+  if ((cnt + 1) > m_num_active_clients)
+  {
+    m_num_active_clients = cnt + 1;
+  }
   /**
    * now remove all woken from poll queue
    * note: poll mutex held
@@ -2119,7 +2221,7 @@ TransporterFacade::do_poll(trp_client* c
 
   bool new_owner_locked = true;
   trp_client * new_owner = NULL;
-  if (poll_owner)
+  if (stay_poll_owner)
   {
     unlock_poll_mutex();
   }
@@ -2142,7 +2244,7 @@ TransporterFacade::do_poll(trp_client* c
     NdbMutex_Unlock(arr[i]->m_mutex);
   }
 
-  if (poll_owner)
+  if (stay_poll_owner)
   {
     clnt->m_poll.m_locked_cnt = 0;
     dbg("%p->do_poll return", clnt);
@@ -2274,13 +2376,13 @@ inline
 void
 trp_client::PollQueue::lock_client (trp_client* clnt)
 {
-  assert(m_locked_cnt <= NDB_ARRAY_SIZE(m_locked_clients));
+  assert(m_locked_cnt <= m_lock_array_size);
   if (check_if_locked(clnt))
     return;
 
   dbg("lock_client(%p)", clnt);
 
-  assert(m_locked_cnt < NDB_ARRAY_SIZE(m_locked_clients));
+  assert(m_locked_cnt < m_lock_array_size);
   m_locked_clients[m_locked_cnt++] = clnt;
   NdbMutex_Lock(clnt->m_mutex);
   return;

=== modified file 'storage/ndb/src/ndbapi/TransporterFacade.hpp'
--- a/storage/ndb/src/ndbapi/TransporterFacade.hpp	revid:mikael.ronstrom@stripped
+++ b/storage/ndb/src/ndbapi/TransporterFacade.hpp	revid:mikael.ronstrom@stripped
@@ -158,7 +158,8 @@ public:
   void start_poll(trp_client*);
   void do_poll(trp_client* clnt,
                Uint32 wait_time,
-               bool poll_owner = false);
+               bool is_poll_owner = false,
+               bool stay_poll_owner = false);
   void complete_poll(trp_client*);
   void wakeup(trp_client*);
 
@@ -172,6 +173,25 @@ public:
   void add_to_poll_queue(trp_client* clnt);
   void remove_from_poll_queue(trp_client* clnt);
 
+  /*
+    Configuration handling of the receiver threads handling of polling
+    These methods implement methods on the ndb_cluster_connection
+    interface.
+  */
+#define NO_RECV_THREAD_CPU_ID 0xFFFF
+  int set_num_recv_threads(Uint32 num_recv_threads);
+  int get_num_recv_threads() const;
+  int set_recv_thread_cpu(Uint16 *cpuid_array,
+                          Uint32 array_len,
+                          Uint32 recv_thread_id);
+  int set_recv_thread_activation_threshold(Uint32 threshold);
+  int get_recv_thread_activation_threshold() const;
+  /* Variables to support configuration of receiver thread handling */
+  Uint32 min_active_clients_recv_thread;
+  Uint16 recv_thread_cpu_id;
+  /* Support method to lock the receiver thread to its CPU */
+  void lock_recv_thread_cpu();
+
   trp_client * m_poll_owner;
   trp_client * m_poll_queue_head; // First in queue
   trp_client * m_poll_queue_tail; // Last in queue
@@ -222,7 +242,7 @@ private:
 
   void checkClusterMgr(NDB_TICKS currTime, NDB_TICKS & lastTime);
   bool get_poll_owner(trp_client* clnt, Uint32 wait_time);
-  bool become_poll_owner(trp_client* clnt);
+  bool become_poll_owner(trp_client* clnt, NDB_TICKS currtime);
   void finish_poll(trp_client* clnt,
                    Uint32 cnt,
                    Uint32& cnt_woken,
@@ -231,6 +251,9 @@ private:
                             bool &new_owner_locked,
                             trp_client** new_owner_ptr);
 
+  Uint32 m_num_active_clients;
+  NDB_TICKS m_receive_activation_time;
+
   bool isConnected(NodeId aNodeId);
   void doStop();
 

=== modified file 'storage/ndb/src/ndbapi/ndb_cluster_connection.cpp'
--- a/storage/ndb/src/ndbapi/ndb_cluster_connection.cpp	revid:mikael.ronstrom@stripped
+++ b/storage/ndb/src/ndbapi/ndb_cluster_connection.cpp	revid:mikael.ronstrom@stripped
@@ -91,6 +91,61 @@ const char *Ndb_cluster_connection::get_
   return 0;
 }
 
+int
+Ndb_cluster_connection::set_num_recv_threads(Uint32 num_recv_threads)
+{
+  if (m_impl.m_transporter_facade)
+  {
+    return m_impl.m_transporter_facade->set_num_recv_threads(num_recv_threads);
+  }
+  return -1;
+}
+
+int
+Ndb_cluster_connection::get_num_recv_threads() const
+{
+  if (m_impl.m_transporter_facade)
+  {
+    return m_impl.m_transporter_facade->get_num_recv_threads();
+  }
+  return -1;
+}
+
+int
+Ndb_cluster_connection::set_recv_thread_cpu(Uint16 *cpuid_array,
+                                            Uint32 array_len,
+                                            Uint32 recv_thread_id)
+{
+  if (m_impl.m_transporter_facade)
+  {
+    return m_impl.m_transporter_facade->set_recv_thread_cpu(cpuid_array,
+                                                            array_len,
+                                                            recv_thread_id);
+  }
+  return -1;
+}
+
+int
+Ndb_cluster_connection::set_recv_thread_activation_threshold(Uint32 threshold)
+{
+  TransporterFacade *fac = m_impl.m_transporter_facade;
+  if (fac)
+  {
+    return fac->set_recv_thread_activation_threshold(threshold);
+  }
+  return -1;
+}
+
+int
+Ndb_cluster_connection::get_recv_thread_activation_threshold() const
+{
+  if (m_impl.m_transporter_facade)
+  {
+    return m_impl.m_transporter_facade->get_recv_thread_activation_threshold();
+  }
+  return -1;
+}
+
 const char *Ndb_cluster_connection::get_connectstring(char *buf,
 						      int buf_sz) const
 {

=== modified file 'storage/ndb/src/ndbapi/trp_client.cpp'
--- a/storage/ndb/src/ndbapi/trp_client.cpp	revid:mikael.ronstrom@stripped
+++ b/storage/ndb/src/ndbapi/trp_client.cpp	revid:mikael.ronstrom@stripped
@@ -17,6 +17,7 @@
 
 #include "trp_client.hpp"
 #include "TransporterFacade.hpp"
+#include <NdbMem.h>
 
 trp_client::trp_client()
   : m_blockNo(~Uint32(0)), m_facade(0)
@@ -44,13 +45,28 @@ trp_client::~trp_client()
 }
 
 Uint32
-trp_client::open(TransporterFacade* tf, int blockNo)
+trp_client::open(TransporterFacade* tf, int blockNo,
+                 bool receive_thread)
 {
   Uint32 res = 0;
   assert(m_facade == 0);
   if (m_facade == 0)
   {
     m_facade = tf;
+    if (receive_thread)
+    {
+      m_poll.m_lock_array_size = 256;
+    }
+    else
+    {
+      m_poll.m_lock_array_size = 16;
+    }
+    m_poll.m_locked_clients = (trp_client**)
+      NdbMem_Allocate(sizeof(trp_client**) * m_poll.m_lock_array_size);
+    if (m_poll.m_locked_clients == NULL)
+    {
+      return 0;
+    }
     res = tf->open_clnt(this, blockNo);
     if (res != 0)
     {
@@ -58,6 +74,8 @@ trp_client::open(TransporterFacade* tf, 
     }
     else
     {
+      NdbMem_Free((void*)m_poll.m_locked_clients);
+      m_poll.m_locked_clients = NULL;
       m_facade = 0;
     }
   }
@@ -79,6 +97,11 @@ trp_client::close()
 
     m_facade = 0;
     m_blockNo = ~Uint32(0);
+    if (m_poll.m_locked_clients)
+    {
+      NdbMem_Free((void*)m_poll.m_locked_clients);
+      m_poll.m_locked_clients = NULL;
+    }
   }
 }
 

=== modified file 'storage/ndb/src/ndbapi/trp_client.hpp'
--- a/storage/ndb/src/ndbapi/trp_client.hpp	revid:mikael.ronstrom@stripped
+++ b/storage/ndb/src/ndbapi/trp_client.hpp	revid:mikael.ronstrom@stripped
@@ -42,7 +42,8 @@ public:
   virtual void trp_wakeup()
     {};
 
-  Uint32 open(class TransporterFacade*, int blockNo = -1);
+  Uint32 open(class TransporterFacade*, int blockNo = -1,
+              bool receive_thread = false);
   void close();
 
   void start_poll();
@@ -131,7 +132,8 @@ private:
     void lock_client(trp_client*);
     bool check_if_locked(const trp_client*) const;
     Uint32 m_locked_cnt;
-    trp_client * m_locked_clients[64];
+    Uint32 m_lock_array_size;
+    trp_client ** m_locked_clients;
   } m_poll;
 
   /**
@@ -243,6 +245,8 @@ trp_client::PollQueue::PollQueue()
   m_prev = 0;
   m_condition = NdbCondition_Create();
   m_locked_cnt = 0;
+  m_lock_array_size = 0;
+  m_locked_clients = NULL;
 }
 
 inline

No bundle (reason: useless for push emails).
Thread
bzr push into mysql-5.5-cluster-7.2 branch (mikael.ronstrom:3956 to 3960) Mikael Ronstrom13 Jul