3960 Mikael Ronstrom 2012-07-12
Added options for setting receiver thread options in cluster connection interface and in MySQL Server
modified:
sql/ha_ndbcluster.cc
sql/ha_ndbcluster_connection.cc
sql/ha_ndbcluster_connection.h
storage/ndb/include/ndbapi/ndb_cluster_connection.hpp
storage/ndb/src/ndbapi/TransporterFacade.cpp
storage/ndb/src/ndbapi/TransporterFacade.hpp
storage/ndb/src/ndbapi/ndb_cluster_connection.cpp
3959 Mikael Ronstrom 2012-07-12
Made adaptive solution that uses receive thread in high-load situations
modified:
storage/ndb/src/ndbapi/TransporterFacade.cpp
storage/ndb/src/ndbapi/TransporterFacade.hpp
storage/ndb/src/ndbapi/trp_client.cpp
storage/ndb/src/ndbapi/trp_client.hpp
3958 Mikael Ronstrom 2012-07-12
Set m_poll_owner on client object when becoming poll owner
modified:
storage/ndb/src/ndbapi/TransporterFacade.cpp
3957 Mikael Ronstrom 2012-07-06
Streamlined receiver thread code, added use of start_poll and complete_poll on receiver thread trp client
modified:
storage/ndb/src/ndbapi/TransporterFacade.cpp
3956 Mikael Ronstrom 2012-07-05
fix
modified:
storage/ndb/src/ndbapi/TransporterFacade.cpp
=== modified file 'sql/ha_ndbcluster.cc'
--- a/sql/ha_ndbcluster.cc revid:mikael.ronstrom@stripped
+++ b/sql/ha_ndbcluster.cc revid:mikael.ronstrom@stripped
@@ -73,6 +73,9 @@ static ulong opt_ndb_wait_connected;
ulong opt_ndb_wait_setup;
static ulong opt_ndb_cache_check_time;
static uint opt_ndb_cluster_connection_pool;
+static uint opt_ndb_num_recv_threads;
+static uint opt_ndb_recv_thread_activation_threshold;
+static char* opt_ndb_recv_thread_cpu_mask;
static char* opt_ndb_index_stat_option;
static char* opt_ndb_connectstring;
static uint opt_ndb_nodeid;
@@ -11924,7 +11927,9 @@ static int ndbcluster_init(void *p)
opt_ndb_cluster_connection_pool,
(global_opti_node_select & 1),
opt_ndb_connectstring,
- opt_ndb_nodeid))
+ opt_ndb_nodeid,
+ opt_ndb_num_recv_threads,
+ opt_ndb_recv_thread_activation_threshold))
{
DBUG_PRINT("error", ("Could not initiate connection to cluster"));
goto ndbcluster_init_error;
@@ -17082,7 +17087,7 @@ static MYSQL_SYSVAR_ULONG(
0 /* block */
);
-
+#define MAX_CLUSTER_CONNECTIONS 63
static MYSQL_SYSVAR_UINT(
cluster_connection_pool, /* name */
opt_ndb_cluster_connection_pool, /* var */
@@ -17092,10 +17097,170 @@ static MYSQL_SYSVAR_UINT(
NULL, /* update func. */
1, /* default */
1, /* min */
- 63, /* max */
+ MAX_CLUSTER_CONNECTIONS, /* max */
+ 0 /* block */
+);
+
+#define MAX_NUM_RECV_THREADS 4
+static MYSQL_SYSVAR_UINT(
+ num_recv_threads, /* name */
+ opt_ndb_num_recv_threads,/* var */
+ PLUGIN_VAR_RQCMDARG | PLUGIN_VAR_READONLY,
+ "Number of receive threads per cluster connection",
+ NULL, /* check func. */
+ NULL, /* update func. */
+ 1, /* default */
+ 1, /* min */
+ MAX_NUM_RECV_THREADS, /* max */
0 /* block */
);
+static
+void
+ndb_recv_thread_activation_threshold_update(MYSQL_THD,
+ struct st_mysql_sys_var *var,
+ void *var_ptr,
+ const void *save)
+{
+ ndb_set_recv_thread_activation_threshold(
+ opt_ndb_recv_thread_activation_threshold);
+}
+
+static MYSQL_SYSVAR_UINT(
+ recv_thread_activation_threshold, /* name */
+ opt_ndb_recv_thread_activation_threshold, /* var */
+ PLUGIN_VAR_RQCMDARG,
+ "Activation threshold when receive thread takes over the polling "
+ "of the cluster connection (measured in concurrently active "
+ "threads)",
+ NULL, /* check func. */
+ ndb_recv_thread_activation_threshold_update, /* update func. */
+ 8, /* default */
+ 0, /* min */
+ 16, /* max */
+ 0 /* block */
+);
+
+static const int ndb_recv_thread_cpu_mask_option_buf_size = 512;
+char ndb_recv_thread_cpu_mask_option_buf[ndb_recv_thread_cpu_mask_option_buf_size];
+Uint16 recv_thread_cpuid_array[MAX_NUM_RECV_THREADS * MAX_CLUSTER_CONNECTIONS];
+Uint32 recv_thread_num_cpus;
+
+static int
+count_bits_in_hex_number(Uint32 hex_number)
+{
+ int num_cpus = 0;
+ for (int i = 0; i < 4; i++)
+ {
+ if (hex_number & (1 << i))
+ {
+ num_cpus++;
+ }
+ }
+ return num_cpus;
+}
+
+static int
+assign_cpuid_in_array_from_hexnumber(Uint32 hex_number, Uint32 base_cpuid)
+{
+ if ((recv_thread_num_cpus + count_bits_in_hex_number(hex_number)) >=
+ (MAX_NUM_RECV_THREADS * MAX_CLUSTER_CONNECTIONS))
+ {
+ return -1;
+ }
+ for (int i = 0; i < 4; i++)
+ {
+ if (hex_number & (1 << i))
+ {
+ recv_thread_cpuid_array[recv_thread_num_cpus++] = (base_cpuid + i);
+ }
+ }
+ return 0;
+}
+
+static int
+assign_cpus_from_character(char c, Uint32 base_cpuid)
+{
+ Uint32 hex_number;
+
+ if (c >= '0' && c <= '9')
+ hex_number = (c - '0');
+ else if (c >= 'A' && c <= 'F')
+ hex_number = (c - 'A') + 10;
+ else if (c >= 'a' && c <= 'f')
+ hex_number = (c - 'a') + 10;
+ else
+ return -1;
+ return assign_cpuid_in_array_from_hexnumber(hex_number, base_cpuid);
+}
+
+static int
+cpu_mask_parse(const char *str, int mask_str_len)
+{
+ Uint32 base_cpu_id = 0;
+ recv_thread_num_cpus = 0;
+ if (str[0] != '0' || str[1] != 'x')
+ goto error;
+ for (int i = mask_str_len - 1; i >= 2; i++)
+ {
+ /*
+ Parse string backwards, last character is representing
+ CPU 0-3, next to last represents 4-7 and so forth.
+ */
+ if (assign_cpus_from_character(str[i], base_cpu_id) != 0)
+ goto error;
+ base_cpu_id += 4;
+ }
+ return 0;
+error:
+ return -1;
+}
+
+static
+int
+ndb_recv_thread_cpu_mask_check(MYSQL_THD thd,
+ struct st_mysql_sys_var *var,
+ void *save,
+ struct st_mysql_value *value)
+{
+ int mask_str_len;
+ char buf[ndb_recv_thread_cpu_mask_option_buf_size];
+ int len = sizeof(buf);
+ const char *str = value->val_str(value, buf, &len);
+ if (str != 0)
+ {
+ mask_str_len = strlen(str);
+ if ((mask_str_len >= len) ||
+ (cpu_mask_parse(str, mask_str_len) != 0))
+ goto error;
+ }
+error:
+ return 1;
+}
+
+static
+void
+ndb_recv_thread_cpu_mask_update(MYSQL_THD,
+ struct st_mysql_sys_var *var,
+ void *var_ptr,
+ const void *save)
+{
+ ndb_set_recv_thread_cpu(recv_thread_cpuid_array,
+ opt_ndb_num_recv_threads,
+ recv_thread_num_cpus);
+}
+
+static MYSQL_SYSVAR_STR(
+ recv_thread_cpu_mask, /* name */
+ opt_ndb_recv_thread_cpu_mask, /* var */
+ PLUGIN_VAR_RQCMDARG,
+ "CPU mask for locking receiver threads to specific CPU, specified "
+ " as hexadecimal as e.g. 0x33, one CPU is used per receiver thread.",
+ ndb_recv_thread_cpu_mask_check, /* check func. */
+ ndb_recv_thread_cpu_mask_update, /* update func. */
+ ndb_recv_thread_cpu_mask_option_buf
+);
+
/* should be in index_stat.h */
extern int
@@ -17373,6 +17538,9 @@ static struct st_mysql_sys_var* system_v
MYSQL_SYSVAR(wait_connected),
MYSQL_SYSVAR(wait_setup),
MYSQL_SYSVAR(cluster_connection_pool),
+ MYSQL_SYSVAR(num_recv_threads),
+ MYSQL_SYSVAR(recv_thread_activation_threshold),
+ MYSQL_SYSVAR(recv_thread_cpu_mask),
MYSQL_SYSVAR(report_thresh_binlog_mem_usage),
MYSQL_SYSVAR(report_thresh_binlog_epoch_slip),
MYSQL_SYSVAR(log_update_as_write),
=== modified file 'sql/ha_ndbcluster_connection.cc'
--- a/sql/ha_ndbcluster_connection.cc revid:mikael.ronstrom@stripped
+++ b/sql/ha_ndbcluster_connection.cc revid:mikael.ronstrom@stripped
@@ -45,7 +45,9 @@ ndbcluster_connect(int (*connect_callbac
uint connection_pool_size,
bool optimized_node_select,
const char* connect_string,
- uint force_nodeid)
+ uint force_nodeid,
+ uint num_recv_threads,
+ uint recv_thread_activation_threshold)
{
NDB_TICKS end_time;
@@ -77,6 +79,9 @@ ndbcluster_connect(int (*connect_callbac
g_ndb_cluster_connection->set_name(buf);
}
g_ndb_cluster_connection->set_optimized_node_selection(optimized_node_select);
+ g_ndb_cluster_connection->set_num_recv_threads(num_recv_threads);
+ g_ndb_cluster_connection->set_recv_thread_activation_threshold(
+ recv_thread_activation_threshold);
// Create a Ndb object to open the connection to NDB
if ( (g_ndb= new Ndb(g_ndb_cluster_connection, "sys")) == 0 )
@@ -135,6 +140,8 @@ ndbcluster_connect(int (*connect_callbac
g_pool[i]->set_name(buf);
}
g_pool[i]->set_optimized_node_selection(optimized_node_select);
+ g_pool[i]->set_num_recv_threads(num_recv_threads);
+ g_pool[i]->set_recv_thread_activation_threshold(recv_thread_activation_threshold);
}
}
@@ -289,6 +296,40 @@ int ndb_has_node_id(uint id)
return 0;
}
+int ndb_set_recv_thread_activation_threshold(Uint32 threshold)
+{
+ for (uint i= 0; i < g_pool_alloc; i++)
+ {
+ g_pool[i]->set_recv_thread_activation_threshold(threshold);
+ }
+ return 0;
+}
+
+int
+ndb_set_recv_thread_cpu(Uint16 *cpuid_array,
+ Uint32 num_recv_threads,
+ Uint32 cpuid_array_size)
+{
+ Uint32 num_cpu_needed = num_recv_threads * g_pool_alloc;
+ Uint32 cpu_index = 0;
+ if (cpuid_array_size < num_cpu_needed)
+ {
+ /* Ignore cpu masks that is too short */
+ return 0;
+ }
+ for (Uint32 i = 0; i < g_pool_alloc; i++)
+ {
+ for (Uint32 j = 0; j < num_recv_threads; j++)
+ {
+ g_pool[i]->set_recv_thread_cpu(&cpuid_array[cpu_index],
+ (Uint32)1,
+ j);
+ cpu_index++;
+ }
+ }
+ return 0;
+}
+
void ndb_get_connection_stats(Uint64* statsArr)
{
Uint64 connectionStats[ Ndb::NumClientStatistics ];
=== modified file 'sql/ha_ndbcluster_connection.h'
--- a/sql/ha_ndbcluster_connection.h revid:mikael.ronstrom@stripped
+++ b/sql/ha_ndbcluster_connection.h revid:mikael.ronstrom@stripped
@@ -19,13 +19,21 @@ int ndbcluster_connect(int (*connect_cal
ulong wait_connected,
uint connection_pool_size,
bool optimized_node_select,
- const char* connect_string, uint force_nodeid);
+ const char* connect_string,
+ uint force_nodeid,
+ uint num_recv_threads,
+ uint recv_thread_activation_threshold);
+
void ndbcluster_disconnect(void);
Ndb_cluster_connection *ndb_get_cluster_connection();
ulonglong ndb_get_latest_trans_gci();
void ndb_set_latest_trans_gci(ulonglong val);
int ndb_has_node_id(uint id);
+int ndb_set_recv_thread_activation_threshold(Uint32 threshold);
+int ndb_set_recv_thread_cpu(Uint16 *cpuid_array,
+ Uint32 num_recv_threads,
+ Uint32 cpuid_array_size);
void ndb_get_connection_stats(Uint64* statsArr);
/* perform random sleep in the range milli_sleep to 2*milli_sleep */
=== modified file 'storage/ndb/include/ndbapi/ndb_cluster_connection.hpp'
--- a/storage/ndb/include/ndbapi/ndb_cluster_connection.hpp revid:mikael.ronstrom@stripped
+++ b/storage/ndb/include/ndbapi/ndb_cluster_connection.hpp revid:mikael.ronstrom@stripped
@@ -189,6 +189,30 @@ public:
void set_max_adaptive_send_time(Uint32 milliseconds);
Uint32 get_max_adaptive_send_time();
+ /**
+ * Configuration handling of the receiver thread(s).
+ * We can set the number of receiver threads, we can set the cpu to bind
+ * the receiver thread to. We can also set the level of when we activate
+ * the receiver thread as the receiver, before this level the normal
+ * user threads are used to receive signals. If we set the level to
+ * 16 or higher we will never use receive threads as receivers.
+ *
+ * By default we have one receiver thread, this thread is not locked to
+ * any specific CPU and the level is 8.
+ *
+ * The number of receive threads can only be set at a time before the
+ * connect call is made to connect to the other nodes in the cluster.
+ * The other methods can be called at any time.
+ *
+ * All methods return -1 as an error indication
+ */
+ int set_num_recv_threads(Uint32 num_recv_threads);
+ int get_num_recv_threads() const;
+ int set_recv_thread_cpu(Uint16 *cpuid_array,
+ Uint32 array_len,
+ Uint32 recv_thread_id = 0);
+ int set_recv_thread_activation_threshold(Uint32 threshold);
+ int get_recv_thread_activation_threshold() const;
#ifndef DOXYGEN_SHOULD_SKIP_INTERNAL
int get_no_ready();
=== modified file 'storage/ndb/src/ndbapi/TransporterFacade.cpp'
--- a/storage/ndb/src/ndbapi/TransporterFacade.cpp revid:mikael.ronstrom@stripped
+++ b/storage/ndb/src/ndbapi/TransporterFacade.cpp revid:mikael.ronstrom@stripped
@@ -1,5 +1,5 @@
/*
- Copyright (c) 2003, 2011, Oracle and/or its affiliates. All rights reserved.
+ Copyright (c) 2003, 2012, Oracle and/or its affiliates. All rights reserved.
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
@@ -330,7 +330,7 @@ TransporterFacade::deliver_signal(Signal
* one packed signal to the NDB API.
*/
const Uint32 MAX_MESSAGES_IN_LOCKED_CLIENTS =
- NDB_ARRAY_SIZE(m_poll_owner->m_poll.m_locked_clients) - 6;
+ m_poll_owner->m_poll.m_lock_array_size - 6;
return m_poll_owner->m_poll.m_locked_cnt >= MAX_MESSAGES_IN_LOCKED_CLIENTS;
}
@@ -581,7 +581,7 @@ runReceiveResponse_C(void * me)
ReceiveThreadClient::ReceiveThreadClient(TransporterFacade * facade)
{
DBUG_ENTER("ReceiveThreadClient::ReceiveThreadClient");
- Uint32 ret = this->open(facade);
+ Uint32 ret = this->open(facade, -1, true);
if (unlikely(ret == 0))
{
ndbout_c("Failed to register receive thread, ret = %d", ret);
@@ -636,13 +636,16 @@ TransporterFacade::checkClusterMgr(NDB_T
}
bool
-TransporterFacade::become_poll_owner(trp_client* clnt)
+TransporterFacade::become_poll_owner(trp_client* clnt,
+ NDB_TICKS currTime)
{
bool poll_owner = false;
lock_poll_mutex();
if (m_poll_owner == NULL)
{
poll_owner = true;
+ m_num_active_clients = 0;
+ m_receive_activation_time = currTime;
m_poll_owner = clnt;
}
unlock_poll_mutex();
@@ -654,6 +657,71 @@ TransporterFacade::become_poll_owner(trp
return false;
}
+int
+TransporterFacade::set_num_recv_threads(Uint32 num_recv_threads)
+{
+ return -1;
+}
+
+int
+TransporterFacade::get_num_recv_threads() const
+{
+ return 1;
+}
+
+int
+TransporterFacade::set_recv_thread_cpu(Uint16 *cpuid_array,
+ Uint32 array_len,
+ Uint32 recv_thread_id)
+{
+ if (array_len > 1 || array_len == 0)
+ {
+ return -1;
+ }
+ if (recv_thread_id != 0)
+ {
+ return -1;
+ }
+ recv_thread_cpu_id = cpuid_array[0];
+ if (theTransporterRegistry)
+ {
+ /* Receiver thread already started, lock cpu now */
+ lock_recv_thread_cpu();
+ }
+ return 0;
+}
+
+int
+TransporterFacade::set_recv_thread_activation_threshold(Uint32 threshold)
+{
+ if (threshold >= 16)
+ {
+ threshold = 256;
+ }
+ min_active_clients_recv_thread = threshold;
+ return 0;
+}
+
+void
+TransporterFacade::lock_recv_thread_cpu()
+{
+ while (theReceiveThread == NULL)
+ {
+ NdbSleep_MilliSleep(1);
+ }
+ if (recv_thread_cpu_id != NO_RECV_THREAD_CPU_ID)
+ {
+ NdbThread_LockCPU(theReceiveThread, (Uint32)recv_thread_cpu_id);
+ }
+}
+
+int
+TransporterFacade::get_recv_thread_activation_threshold() const
+{
+ return min_active_clients_recv_thread;
+}
+
+#define MIN_ACTIVE_CLIENTS_RECV_THREAD 8
/*
The receiver thread is changed to only wake up once every 10 milliseconds
to poll. It will first check that nobody owns the poll "right" before
@@ -672,20 +740,47 @@ void TransporterFacade::threadMainReceiv
NdbThread_set_shm_sigmask(TRUE);
#endif
ReceiveThreadClient *recv_client = new ReceiveThreadClient(this);
- while (theReceiveThread == NULL)
- {
- NdbSleep_MilliSleep(1);
- }
- // Temporarily hardcoded
- NdbThread_LockCPU(theReceiveThread, (Uint32)7);
+ lock_recv_thread_cpu();
while(!theStopReceive)
{
currTime = NdbTick_CurrentMillisecond();
checkClusterMgr(currTime, lastTime);
if (!poll_owner)
- poll_owner = become_poll_owner(recv_client);
+ {
+ /*
+ We only take the step to become poll owner in receive thread if
+ we are sufficiently active, at least 16 threads active.
+ */
+ if (m_num_active_clients > min_active_clients_recv_thread)
+ {
+ poll_owner = become_poll_owner(recv_client, currTime);
+ }
+ else
+ {
+ NdbSleep_MilliSleep(100);
+ }
+ }
if (poll_owner)
- do_poll(recv_client, 100, true);
+ {
+ bool stay_poll_owner = true;
+ if ((currTime - m_receive_activation_time) > (NDB_TICKS)1000)
+ {
+ /* Reset timer for next activation check time */
+ m_receive_activation_time = currTime;
+ lock_poll_mutex();
+ if (m_num_active_clients < (min_active_clients_recv_thread / 2))
+ {
+ /* Go back to not have an active receive thread */
+ stay_poll_owner = false;
+ poll_owner = false;
+ }
+ m_num_active_clients = 0; /* Reset active clients for next timeslot */
+ unlock_poll_mutex();
+ }
+ recv_client->start_poll();
+ do_poll(recv_client, 10, true, stay_poll_owner);
+ recv_client->complete_poll();
+ }
}//while
delete recv_client;
theTransporterRegistry->stopReceiving();
@@ -724,9 +819,12 @@ TransporterFacade::external_poll(trp_cli
}
TransporterFacade::TransporterFacade(GlobalDictCache *cache) :
+ min_active_clients_recv_thread(MIN_ACTIVE_CLIENTS_RECV_THREAD),
+ recv_thread_cpu_id(NO_RECV_THREAD_CPU_ID),
m_poll_owner(NULL),
m_poll_queue_head(NULL),
m_poll_queue_tail(NULL),
+ m_num_active_clients(0),
theTransporterRegistry(0),
theOwnId(0),
theStartNodeId(1),
@@ -1953,7 +2051,7 @@ TransporterFacade::finish_poll(trp_clien
{
Uint32 lock_cnt = clnt->m_poll.m_locked_cnt;
assert(lock_cnt >= 1);
- assert(lock_cnt <= NDB_ARRAY_SIZE(clnt->m_poll.m_locked_clients));
+ assert(lock_cnt <= clnt->m_poll.m_lock_array_size);
assert(clnt->m_poll.m_locked_clients[0] == clnt);
// no duplicates
if (DBG_POLL) printf("after external_poll: cnt: %u ", lock_cnt);
@@ -2079,15 +2177,16 @@ TransporterFacade::try_lock_last_client(
void
TransporterFacade::do_poll(trp_client* clnt,
Uint32 wait_time,
- bool poll_owner)
+ bool is_poll_owner,
+ bool stay_poll_owner)
{
dbg("do_poll(%p)", clnt);
clnt->m_poll.m_waiting = trp_client::PollQueue::PQ_WAITING;
- if (!poll_owner)
+ assert(clnt->m_poll.m_locked == true);
+ assert(clnt->m_poll.m_poll_owner == false);
+ assert(clnt->m_poll.m_poll_queue == false);
+ if (!is_poll_owner)
{
- assert(clnt->m_poll.m_locked == true);
- assert(clnt->m_poll.m_poll_owner == false);
- assert(clnt->m_poll.m_poll_queue == false);
if (!get_poll_owner(clnt, wait_time))
return;
}
@@ -2105,12 +2204,15 @@ TransporterFacade::do_poll(trp_client* c
Uint32 cnt_woken = 0;
Uint32 cnt = clnt->m_poll.m_locked_cnt - 1; // skip self
trp_client ** arr = clnt->m_poll.m_locked_clients + 1; // skip self
- if (!poll_owner)
- clnt->m_poll.m_poll_owner = false;
+ clnt->m_poll.m_poll_owner = false;
finish_poll(clnt, cnt, cnt_woken, arr);
lock_poll_mutex();
+ if ((cnt + 1) > m_num_active_clients)
+ {
+ m_num_active_clients = cnt + 1;
+ }
/**
* now remove all woken from poll queue
* note: poll mutex held
@@ -2119,7 +2221,7 @@ TransporterFacade::do_poll(trp_client* c
bool new_owner_locked = true;
trp_client * new_owner = NULL;
- if (poll_owner)
+ if (stay_poll_owner)
{
unlock_poll_mutex();
}
@@ -2142,7 +2244,7 @@ TransporterFacade::do_poll(trp_client* c
NdbMutex_Unlock(arr[i]->m_mutex);
}
- if (poll_owner)
+ if (stay_poll_owner)
{
clnt->m_poll.m_locked_cnt = 0;
dbg("%p->do_poll return", clnt);
@@ -2274,13 +2376,13 @@ inline
void
trp_client::PollQueue::lock_client (trp_client* clnt)
{
- assert(m_locked_cnt <= NDB_ARRAY_SIZE(m_locked_clients));
+ assert(m_locked_cnt <= m_lock_array_size);
if (check_if_locked(clnt))
return;
dbg("lock_client(%p)", clnt);
- assert(m_locked_cnt < NDB_ARRAY_SIZE(m_locked_clients));
+ assert(m_locked_cnt < m_lock_array_size);
m_locked_clients[m_locked_cnt++] = clnt;
NdbMutex_Lock(clnt->m_mutex);
return;
=== modified file 'storage/ndb/src/ndbapi/TransporterFacade.hpp'
--- a/storage/ndb/src/ndbapi/TransporterFacade.hpp revid:mikael.ronstrom@stripped
+++ b/storage/ndb/src/ndbapi/TransporterFacade.hpp revid:mikael.ronstrom@stripped
@@ -158,7 +158,8 @@ public:
void start_poll(trp_client*);
void do_poll(trp_client* clnt,
Uint32 wait_time,
- bool poll_owner = false);
+ bool is_poll_owner = false,
+ bool stay_poll_owner = false);
void complete_poll(trp_client*);
void wakeup(trp_client*);
@@ -172,6 +173,25 @@ public:
void add_to_poll_queue(trp_client* clnt);
void remove_from_poll_queue(trp_client* clnt);
+ /*
+ Configuration handling of the receiver threads handling of polling
+ These methods implement methods on the ndb_cluster_connection
+ interface.
+ */
+#define NO_RECV_THREAD_CPU_ID 0xFFFF
+ int set_num_recv_threads(Uint32 num_recv_threads);
+ int get_num_recv_threads() const;
+ int set_recv_thread_cpu(Uint16 *cpuid_array,
+ Uint32 array_len,
+ Uint32 recv_thread_id);
+ int set_recv_thread_activation_threshold(Uint32 threshold);
+ int get_recv_thread_activation_threshold() const;
+ /* Variables to support configuration of receiver thread handling */
+ Uint32 min_active_clients_recv_thread;
+ Uint16 recv_thread_cpu_id;
+ /* Support method to lock the receiver thread to its CPU */
+ void lock_recv_thread_cpu();
+
trp_client * m_poll_owner;
trp_client * m_poll_queue_head; // First in queue
trp_client * m_poll_queue_tail; // Last in queue
@@ -222,7 +242,7 @@ private:
void checkClusterMgr(NDB_TICKS currTime, NDB_TICKS & lastTime);
bool get_poll_owner(trp_client* clnt, Uint32 wait_time);
- bool become_poll_owner(trp_client* clnt);
+ bool become_poll_owner(trp_client* clnt, NDB_TICKS currtime);
void finish_poll(trp_client* clnt,
Uint32 cnt,
Uint32& cnt_woken,
@@ -231,6 +251,9 @@ private:
bool &new_owner_locked,
trp_client** new_owner_ptr);
+ Uint32 m_num_active_clients;
+ NDB_TICKS m_receive_activation_time;
+
bool isConnected(NodeId aNodeId);
void doStop();
=== modified file 'storage/ndb/src/ndbapi/ndb_cluster_connection.cpp'
--- a/storage/ndb/src/ndbapi/ndb_cluster_connection.cpp revid:mikael.ronstrom@stripped
+++ b/storage/ndb/src/ndbapi/ndb_cluster_connection.cpp revid:mikael.ronstrom@stripped
@@ -91,6 +91,61 @@ const char *Ndb_cluster_connection::get_
return 0;
}
+int
+Ndb_cluster_connection::set_num_recv_threads(Uint32 num_recv_threads)
+{
+ if (m_impl.m_transporter_facade)
+ {
+ return m_impl.m_transporter_facade->set_num_recv_threads(num_recv_threads);
+ }
+ return -1;
+}
+
+int
+Ndb_cluster_connection::get_num_recv_threads() const
+{
+ if (m_impl.m_transporter_facade)
+ {
+ return m_impl.m_transporter_facade->get_num_recv_threads();
+ }
+ return -1;
+}
+
+int
+Ndb_cluster_connection::set_recv_thread_cpu(Uint16 *cpuid_array,
+ Uint32 array_len,
+ Uint32 recv_thread_id)
+{
+ if (m_impl.m_transporter_facade)
+ {
+ return m_impl.m_transporter_facade->set_recv_thread_cpu(cpuid_array,
+ array_len,
+ recv_thread_id);
+ }
+ return -1;
+}
+
+int
+Ndb_cluster_connection::set_recv_thread_activation_threshold(Uint32 threshold)
+{
+ TransporterFacade *fac = m_impl.m_transporter_facade;
+ if (fac)
+ {
+ return fac->set_recv_thread_activation_threshold(threshold);
+ }
+ return -1;
+}
+
+int
+Ndb_cluster_connection::get_recv_thread_activation_threshold() const
+{
+ if (m_impl.m_transporter_facade)
+ {
+ return m_impl.m_transporter_facade->get_recv_thread_activation_threshold();
+ }
+ return -1;
+}
+
const char *Ndb_cluster_connection::get_connectstring(char *buf,
int buf_sz) const
{
=== modified file 'storage/ndb/src/ndbapi/trp_client.cpp'
--- a/storage/ndb/src/ndbapi/trp_client.cpp revid:mikael.ronstrom@stripped
+++ b/storage/ndb/src/ndbapi/trp_client.cpp revid:mikael.ronstrom@stripped
@@ -17,6 +17,7 @@
#include "trp_client.hpp"
#include "TransporterFacade.hpp"
+#include <NdbMem.h>
trp_client::trp_client()
: m_blockNo(~Uint32(0)), m_facade(0)
@@ -44,13 +45,28 @@ trp_client::~trp_client()
}
Uint32
-trp_client::open(TransporterFacade* tf, int blockNo)
+trp_client::open(TransporterFacade* tf, int blockNo,
+ bool receive_thread)
{
Uint32 res = 0;
assert(m_facade == 0);
if (m_facade == 0)
{
m_facade = tf;
+ if (receive_thread)
+ {
+ m_poll.m_lock_array_size = 256;
+ }
+ else
+ {
+ m_poll.m_lock_array_size = 16;
+ }
+ m_poll.m_locked_clients = (trp_client**)
+ NdbMem_Allocate(sizeof(trp_client**) * m_poll.m_lock_array_size);
+ if (m_poll.m_locked_clients == NULL)
+ {
+ return 0;
+ }
res = tf->open_clnt(this, blockNo);
if (res != 0)
{
@@ -58,6 +74,8 @@ trp_client::open(TransporterFacade* tf,
}
else
{
+ NdbMem_Free((void*)m_poll.m_locked_clients);
+ m_poll.m_locked_clients = NULL;
m_facade = 0;
}
}
@@ -79,6 +97,11 @@ trp_client::close()
m_facade = 0;
m_blockNo = ~Uint32(0);
+ if (m_poll.m_locked_clients)
+ {
+ NdbMem_Free((void*)m_poll.m_locked_clients);
+ m_poll.m_locked_clients = NULL;
+ }
}
}
=== modified file 'storage/ndb/src/ndbapi/trp_client.hpp'
--- a/storage/ndb/src/ndbapi/trp_client.hpp revid:mikael.ronstrom@stripped
+++ b/storage/ndb/src/ndbapi/trp_client.hpp revid:mikael.ronstrom@stripped
@@ -42,7 +42,8 @@ public:
virtual void trp_wakeup()
{};
- Uint32 open(class TransporterFacade*, int blockNo = -1);
+ Uint32 open(class TransporterFacade*, int blockNo = -1,
+ bool receive_thread = false);
void close();
void start_poll();
@@ -131,7 +132,8 @@ private:
void lock_client(trp_client*);
bool check_if_locked(const trp_client*) const;
Uint32 m_locked_cnt;
- trp_client * m_locked_clients[64];
+ Uint32 m_lock_array_size;
+ trp_client ** m_locked_clients;
} m_poll;
/**
@@ -243,6 +245,8 @@ trp_client::PollQueue::PollQueue()
m_prev = 0;
m_condition = NdbCondition_Create();
m_locked_cnt = 0;
+ m_lock_array_size = 0;
+ m_locked_clients = NULL;
}
inline
No bundle (reason: useless for push emails).
| Thread |
|---|
| • bzr push into mysql-5.5-cluster-7.2 branch (mikael.ronstrom:3956 to 3960) | Mikael Ronstrom | 13 Jul |