From: Mikael Ronstrom Date: July 12 2012 9:19pm Subject: bzr push into mysql-5.5-cluster-7.2 branch (mikael.ronstrom:3956 to 3960) List-Archive: http://lists.mysql.com/commits/144435 Message-Id: <201207122120.q6CLKVSh026659@dator6> MIME-Version: 1.0 Content-Type: text/plain; charset="us-ascii" Content-Transfer-Encoding: 7bit 3960 Mikael Ronstrom 2012-07-12 Added options for setting receiver thread options in cluster connection interface and in MySQL Server modified: sql/ha_ndbcluster.cc sql/ha_ndbcluster_connection.cc sql/ha_ndbcluster_connection.h storage/ndb/include/ndbapi/ndb_cluster_connection.hpp storage/ndb/src/ndbapi/TransporterFacade.cpp storage/ndb/src/ndbapi/TransporterFacade.hpp storage/ndb/src/ndbapi/ndb_cluster_connection.cpp 3959 Mikael Ronstrom 2012-07-12 Made adaptive solution that uses receive thread in high-load situations modified: storage/ndb/src/ndbapi/TransporterFacade.cpp storage/ndb/src/ndbapi/TransporterFacade.hpp storage/ndb/src/ndbapi/trp_client.cpp storage/ndb/src/ndbapi/trp_client.hpp 3958 Mikael Ronstrom 2012-07-12 Set m_poll_owner on client object when becoming poll owner modified: storage/ndb/src/ndbapi/TransporterFacade.cpp 3957 Mikael Ronstrom 2012-07-06 Streamlined receiver thread code, added use of start_poll and complete_poll on receiver thread trp client modified: storage/ndb/src/ndbapi/TransporterFacade.cpp 3956 Mikael Ronstrom 2012-07-05 fix modified: storage/ndb/src/ndbapi/TransporterFacade.cpp === modified file 'sql/ha_ndbcluster.cc' --- a/sql/ha_ndbcluster.cc revid:mikael.ronstrom@stripped +++ b/sql/ha_ndbcluster.cc revid:mikael.ronstrom@stripped @@ -73,6 +73,9 @@ static ulong opt_ndb_wait_connected; ulong opt_ndb_wait_setup; static ulong opt_ndb_cache_check_time; static uint opt_ndb_cluster_connection_pool; +static uint opt_ndb_num_recv_threads; +static uint opt_ndb_recv_thread_activation_threshold; +static char* opt_ndb_recv_thread_cpu_mask; static char* opt_ndb_index_stat_option; static char* opt_ndb_connectstring; static uint opt_ndb_nodeid; @@ -11924,7 +11927,9 @@ static int ndbcluster_init(void *p) opt_ndb_cluster_connection_pool, (global_opti_node_select & 1), opt_ndb_connectstring, - opt_ndb_nodeid)) + opt_ndb_nodeid, + opt_ndb_num_recv_threads, + opt_ndb_recv_thread_activation_threshold)) { DBUG_PRINT("error", ("Could not initiate connection to cluster")); goto ndbcluster_init_error; @@ -17082,7 +17087,7 @@ static MYSQL_SYSVAR_ULONG( 0 /* block */ ); - +#define MAX_CLUSTER_CONNECTIONS 63 static MYSQL_SYSVAR_UINT( cluster_connection_pool, /* name */ opt_ndb_cluster_connection_pool, /* var */ @@ -17092,10 +17097,170 @@ static MYSQL_SYSVAR_UINT( NULL, /* update func. */ 1, /* default */ 1, /* min */ - 63, /* max */ + MAX_CLUSTER_CONNECTIONS, /* max */ + 0 /* block */ +); + +#define MAX_NUM_RECV_THREADS 4 +static MYSQL_SYSVAR_UINT( + num_recv_threads, /* name */ + opt_ndb_num_recv_threads,/* var */ + PLUGIN_VAR_RQCMDARG | PLUGIN_VAR_READONLY, + "Number of receive threads per cluster connection", + NULL, /* check func. */ + NULL, /* update func. */ + 1, /* default */ + 1, /* min */ + MAX_NUM_RECV_THREADS, /* max */ 0 /* block */ ); +static +void +ndb_recv_thread_activation_threshold_update(MYSQL_THD, + struct st_mysql_sys_var *var, + void *var_ptr, + const void *save) +{ + ndb_set_recv_thread_activation_threshold( + opt_ndb_recv_thread_activation_threshold); +} + +static MYSQL_SYSVAR_UINT( + recv_thread_activation_threshold, /* name */ + opt_ndb_recv_thread_activation_threshold, /* var */ + PLUGIN_VAR_RQCMDARG, + "Activation threshold when receive thread takes over the polling " + "of the cluster connection (measured in concurrently active " + "threads)", + NULL, /* check func. */ + ndb_recv_thread_activation_threshold_update, /* update func. */ + 8, /* default */ + 0, /* min */ + 16, /* max */ + 0 /* block */ +); + +static const int ndb_recv_thread_cpu_mask_option_buf_size = 512; +char ndb_recv_thread_cpu_mask_option_buf[ndb_recv_thread_cpu_mask_option_buf_size]; +Uint16 recv_thread_cpuid_array[MAX_NUM_RECV_THREADS * MAX_CLUSTER_CONNECTIONS]; +Uint32 recv_thread_num_cpus; + +static int +count_bits_in_hex_number(Uint32 hex_number) +{ + int num_cpus = 0; + for (int i = 0; i < 4; i++) + { + if (hex_number & (1 << i)) + { + num_cpus++; + } + } + return num_cpus; +} + +static int +assign_cpuid_in_array_from_hexnumber(Uint32 hex_number, Uint32 base_cpuid) +{ + if ((recv_thread_num_cpus + count_bits_in_hex_number(hex_number)) >= + (MAX_NUM_RECV_THREADS * MAX_CLUSTER_CONNECTIONS)) + { + return -1; + } + for (int i = 0; i < 4; i++) + { + if (hex_number & (1 << i)) + { + recv_thread_cpuid_array[recv_thread_num_cpus++] = (base_cpuid + i); + } + } + return 0; +} + +static int +assign_cpus_from_character(char c, Uint32 base_cpuid) +{ + Uint32 hex_number; + + if (c >= '0' && c <= '9') + hex_number = (c - '0'); + else if (c >= 'A' && c <= 'F') + hex_number = (c - 'A') + 10; + else if (c >= 'a' && c <= 'f') + hex_number = (c - 'a') + 10; + else + return -1; + return assign_cpuid_in_array_from_hexnumber(hex_number, base_cpuid); +} + +static int +cpu_mask_parse(const char *str, int mask_str_len) +{ + Uint32 base_cpu_id = 0; + recv_thread_num_cpus = 0; + if (str[0] != '0' || str[1] != 'x') + goto error; + for (int i = mask_str_len - 1; i >= 2; i++) + { + /* + Parse string backwards, last character is representing + CPU 0-3, next to last represents 4-7 and so forth. + */ + if (assign_cpus_from_character(str[i], base_cpu_id) != 0) + goto error; + base_cpu_id += 4; + } + return 0; +error: + return -1; +} + +static +int +ndb_recv_thread_cpu_mask_check(MYSQL_THD thd, + struct st_mysql_sys_var *var, + void *save, + struct st_mysql_value *value) +{ + int mask_str_len; + char buf[ndb_recv_thread_cpu_mask_option_buf_size]; + int len = sizeof(buf); + const char *str = value->val_str(value, buf, &len); + if (str != 0) + { + mask_str_len = strlen(str); + if ((mask_str_len >= len) || + (cpu_mask_parse(str, mask_str_len) != 0)) + goto error; + } +error: + return 1; +} + +static +void +ndb_recv_thread_cpu_mask_update(MYSQL_THD, + struct st_mysql_sys_var *var, + void *var_ptr, + const void *save) +{ + ndb_set_recv_thread_cpu(recv_thread_cpuid_array, + opt_ndb_num_recv_threads, + recv_thread_num_cpus); +} + +static MYSQL_SYSVAR_STR( + recv_thread_cpu_mask, /* name */ + opt_ndb_recv_thread_cpu_mask, /* var */ + PLUGIN_VAR_RQCMDARG, + "CPU mask for locking receiver threads to specific CPU, specified " + " as hexadecimal as e.g. 0x33, one CPU is used per receiver thread.", + ndb_recv_thread_cpu_mask_check, /* check func. */ + ndb_recv_thread_cpu_mask_update, /* update func. */ + ndb_recv_thread_cpu_mask_option_buf +); + /* should be in index_stat.h */ extern int @@ -17373,6 +17538,9 @@ static struct st_mysql_sys_var* system_v MYSQL_SYSVAR(wait_connected), MYSQL_SYSVAR(wait_setup), MYSQL_SYSVAR(cluster_connection_pool), + MYSQL_SYSVAR(num_recv_threads), + MYSQL_SYSVAR(recv_thread_activation_threshold), + MYSQL_SYSVAR(recv_thread_cpu_mask), MYSQL_SYSVAR(report_thresh_binlog_mem_usage), MYSQL_SYSVAR(report_thresh_binlog_epoch_slip), MYSQL_SYSVAR(log_update_as_write), === modified file 'sql/ha_ndbcluster_connection.cc' --- a/sql/ha_ndbcluster_connection.cc revid:mikael.ronstrom@stripped +++ b/sql/ha_ndbcluster_connection.cc revid:mikael.ronstrom@stripped @@ -45,7 +45,9 @@ ndbcluster_connect(int (*connect_callbac uint connection_pool_size, bool optimized_node_select, const char* connect_string, - uint force_nodeid) + uint force_nodeid, + uint num_recv_threads, + uint recv_thread_activation_threshold) { NDB_TICKS end_time; @@ -77,6 +79,9 @@ ndbcluster_connect(int (*connect_callbac g_ndb_cluster_connection->set_name(buf); } g_ndb_cluster_connection->set_optimized_node_selection(optimized_node_select); + g_ndb_cluster_connection->set_num_recv_threads(num_recv_threads); + g_ndb_cluster_connection->set_recv_thread_activation_threshold( + recv_thread_activation_threshold); // Create a Ndb object to open the connection to NDB if ( (g_ndb= new Ndb(g_ndb_cluster_connection, "sys")) == 0 ) @@ -135,6 +140,8 @@ ndbcluster_connect(int (*connect_callbac g_pool[i]->set_name(buf); } g_pool[i]->set_optimized_node_selection(optimized_node_select); + g_pool[i]->set_num_recv_threads(num_recv_threads); + g_pool[i]->set_recv_thread_activation_threshold(recv_thread_activation_threshold); } } @@ -289,6 +296,40 @@ int ndb_has_node_id(uint id) return 0; } +int ndb_set_recv_thread_activation_threshold(Uint32 threshold) +{ + for (uint i= 0; i < g_pool_alloc; i++) + { + g_pool[i]->set_recv_thread_activation_threshold(threshold); + } + return 0; +} + +int +ndb_set_recv_thread_cpu(Uint16 *cpuid_array, + Uint32 num_recv_threads, + Uint32 cpuid_array_size) +{ + Uint32 num_cpu_needed = num_recv_threads * g_pool_alloc; + Uint32 cpu_index = 0; + if (cpuid_array_size < num_cpu_needed) + { + /* Ignore cpu masks that is too short */ + return 0; + } + for (Uint32 i = 0; i < g_pool_alloc; i++) + { + for (Uint32 j = 0; j < num_recv_threads; j++) + { + g_pool[i]->set_recv_thread_cpu(&cpuid_array[cpu_index], + (Uint32)1, + j); + cpu_index++; + } + } + return 0; +} + void ndb_get_connection_stats(Uint64* statsArr) { Uint64 connectionStats[ Ndb::NumClientStatistics ]; === modified file 'sql/ha_ndbcluster_connection.h' --- a/sql/ha_ndbcluster_connection.h revid:mikael.ronstrom@stripped +++ b/sql/ha_ndbcluster_connection.h revid:mikael.ronstrom@stripped @@ -19,13 +19,21 @@ int ndbcluster_connect(int (*connect_cal ulong wait_connected, uint connection_pool_size, bool optimized_node_select, - const char* connect_string, uint force_nodeid); + const char* connect_string, + uint force_nodeid, + uint num_recv_threads, + uint recv_thread_activation_threshold); + void ndbcluster_disconnect(void); Ndb_cluster_connection *ndb_get_cluster_connection(); ulonglong ndb_get_latest_trans_gci(); void ndb_set_latest_trans_gci(ulonglong val); int ndb_has_node_id(uint id); +int ndb_set_recv_thread_activation_threshold(Uint32 threshold); +int ndb_set_recv_thread_cpu(Uint16 *cpuid_array, + Uint32 num_recv_threads, + Uint32 cpuid_array_size); void ndb_get_connection_stats(Uint64* statsArr); /* perform random sleep in the range milli_sleep to 2*milli_sleep */ === modified file 'storage/ndb/include/ndbapi/ndb_cluster_connection.hpp' --- a/storage/ndb/include/ndbapi/ndb_cluster_connection.hpp revid:mikael.ronstrom@stripped +++ b/storage/ndb/include/ndbapi/ndb_cluster_connection.hpp revid:mikael.ronstrom@stripped @@ -189,6 +189,30 @@ public: void set_max_adaptive_send_time(Uint32 milliseconds); Uint32 get_max_adaptive_send_time(); + /** + * Configuration handling of the receiver thread(s). + * We can set the number of receiver threads, we can set the cpu to bind + * the receiver thread to. We can also set the level of when we activate + * the receiver thread as the receiver, before this level the normal + * user threads are used to receive signals. If we set the level to + * 16 or higher we will never use receive threads as receivers. + * + * By default we have one receiver thread, this thread is not locked to + * any specific CPU and the level is 8. + * + * The number of receive threads can only be set at a time before the + * connect call is made to connect to the other nodes in the cluster. + * The other methods can be called at any time. + * + * All methods return -1 as an error indication + */ + int set_num_recv_threads(Uint32 num_recv_threads); + int get_num_recv_threads() const; + int set_recv_thread_cpu(Uint16 *cpuid_array, + Uint32 array_len, + Uint32 recv_thread_id = 0); + int set_recv_thread_activation_threshold(Uint32 threshold); + int get_recv_thread_activation_threshold() const; #ifndef DOXYGEN_SHOULD_SKIP_INTERNAL int get_no_ready(); === modified file 'storage/ndb/src/ndbapi/TransporterFacade.cpp' --- a/storage/ndb/src/ndbapi/TransporterFacade.cpp revid:mikael.ronstrom@stripped +++ b/storage/ndb/src/ndbapi/TransporterFacade.cpp revid:mikael.ronstrom@stripped @@ -1,5 +1,5 @@ /* - Copyright (c) 2003, 2011, Oracle and/or its affiliates. All rights reserved. + Copyright (c) 2003, 2012, Oracle and/or its affiliates. All rights reserved. This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by @@ -330,7 +330,7 @@ TransporterFacade::deliver_signal(Signal * one packed signal to the NDB API. */ const Uint32 MAX_MESSAGES_IN_LOCKED_CLIENTS = - NDB_ARRAY_SIZE(m_poll_owner->m_poll.m_locked_clients) - 6; + m_poll_owner->m_poll.m_lock_array_size - 6; return m_poll_owner->m_poll.m_locked_cnt >= MAX_MESSAGES_IN_LOCKED_CLIENTS; } @@ -581,7 +581,7 @@ runReceiveResponse_C(void * me) ReceiveThreadClient::ReceiveThreadClient(TransporterFacade * facade) { DBUG_ENTER("ReceiveThreadClient::ReceiveThreadClient"); - Uint32 ret = this->open(facade); + Uint32 ret = this->open(facade, -1, true); if (unlikely(ret == 0)) { ndbout_c("Failed to register receive thread, ret = %d", ret); @@ -636,13 +636,16 @@ TransporterFacade::checkClusterMgr(NDB_T } bool -TransporterFacade::become_poll_owner(trp_client* clnt) +TransporterFacade::become_poll_owner(trp_client* clnt, + NDB_TICKS currTime) { bool poll_owner = false; lock_poll_mutex(); if (m_poll_owner == NULL) { poll_owner = true; + m_num_active_clients = 0; + m_receive_activation_time = currTime; m_poll_owner = clnt; } unlock_poll_mutex(); @@ -654,6 +657,71 @@ TransporterFacade::become_poll_owner(trp return false; } +int +TransporterFacade::set_num_recv_threads(Uint32 num_recv_threads) +{ + return -1; +} + +int +TransporterFacade::get_num_recv_threads() const +{ + return 1; +} + +int +TransporterFacade::set_recv_thread_cpu(Uint16 *cpuid_array, + Uint32 array_len, + Uint32 recv_thread_id) +{ + if (array_len > 1 || array_len == 0) + { + return -1; + } + if (recv_thread_id != 0) + { + return -1; + } + recv_thread_cpu_id = cpuid_array[0]; + if (theTransporterRegistry) + { + /* Receiver thread already started, lock cpu now */ + lock_recv_thread_cpu(); + } + return 0; +} + +int +TransporterFacade::set_recv_thread_activation_threshold(Uint32 threshold) +{ + if (threshold >= 16) + { + threshold = 256; + } + min_active_clients_recv_thread = threshold; + return 0; +} + +void +TransporterFacade::lock_recv_thread_cpu() +{ + while (theReceiveThread == NULL) + { + NdbSleep_MilliSleep(1); + } + if (recv_thread_cpu_id != NO_RECV_THREAD_CPU_ID) + { + NdbThread_LockCPU(theReceiveThread, (Uint32)recv_thread_cpu_id); + } +} + +int +TransporterFacade::get_recv_thread_activation_threshold() const +{ + return min_active_clients_recv_thread; +} + +#define MIN_ACTIVE_CLIENTS_RECV_THREAD 8 /* The receiver thread is changed to only wake up once every 10 milliseconds to poll. It will first check that nobody owns the poll "right" before @@ -672,20 +740,47 @@ void TransporterFacade::threadMainReceiv NdbThread_set_shm_sigmask(TRUE); #endif ReceiveThreadClient *recv_client = new ReceiveThreadClient(this); - while (theReceiveThread == NULL) - { - NdbSleep_MilliSleep(1); - } - // Temporarily hardcoded - NdbThread_LockCPU(theReceiveThread, (Uint32)7); + lock_recv_thread_cpu(); while(!theStopReceive) { currTime = NdbTick_CurrentMillisecond(); checkClusterMgr(currTime, lastTime); if (!poll_owner) - poll_owner = become_poll_owner(recv_client); + { + /* + We only take the step to become poll owner in receive thread if + we are sufficiently active, at least 16 threads active. + */ + if (m_num_active_clients > min_active_clients_recv_thread) + { + poll_owner = become_poll_owner(recv_client, currTime); + } + else + { + NdbSleep_MilliSleep(100); + } + } if (poll_owner) - do_poll(recv_client, 100, true); + { + bool stay_poll_owner = true; + if ((currTime - m_receive_activation_time) > (NDB_TICKS)1000) + { + /* Reset timer for next activation check time */ + m_receive_activation_time = currTime; + lock_poll_mutex(); + if (m_num_active_clients < (min_active_clients_recv_thread / 2)) + { + /* Go back to not have an active receive thread */ + stay_poll_owner = false; + poll_owner = false; + } + m_num_active_clients = 0; /* Reset active clients for next timeslot */ + unlock_poll_mutex(); + } + recv_client->start_poll(); + do_poll(recv_client, 10, true, stay_poll_owner); + recv_client->complete_poll(); + } }//while delete recv_client; theTransporterRegistry->stopReceiving(); @@ -724,9 +819,12 @@ TransporterFacade::external_poll(trp_cli } TransporterFacade::TransporterFacade(GlobalDictCache *cache) : + min_active_clients_recv_thread(MIN_ACTIVE_CLIENTS_RECV_THREAD), + recv_thread_cpu_id(NO_RECV_THREAD_CPU_ID), m_poll_owner(NULL), m_poll_queue_head(NULL), m_poll_queue_tail(NULL), + m_num_active_clients(0), theTransporterRegistry(0), theOwnId(0), theStartNodeId(1), @@ -1953,7 +2051,7 @@ TransporterFacade::finish_poll(trp_clien { Uint32 lock_cnt = clnt->m_poll.m_locked_cnt; assert(lock_cnt >= 1); - assert(lock_cnt <= NDB_ARRAY_SIZE(clnt->m_poll.m_locked_clients)); + assert(lock_cnt <= clnt->m_poll.m_lock_array_size); assert(clnt->m_poll.m_locked_clients[0] == clnt); // no duplicates if (DBG_POLL) printf("after external_poll: cnt: %u ", lock_cnt); @@ -2079,15 +2177,16 @@ TransporterFacade::try_lock_last_client( void TransporterFacade::do_poll(trp_client* clnt, Uint32 wait_time, - bool poll_owner) + bool is_poll_owner, + bool stay_poll_owner) { dbg("do_poll(%p)", clnt); clnt->m_poll.m_waiting = trp_client::PollQueue::PQ_WAITING; - if (!poll_owner) + assert(clnt->m_poll.m_locked == true); + assert(clnt->m_poll.m_poll_owner == false); + assert(clnt->m_poll.m_poll_queue == false); + if (!is_poll_owner) { - assert(clnt->m_poll.m_locked == true); - assert(clnt->m_poll.m_poll_owner == false); - assert(clnt->m_poll.m_poll_queue == false); if (!get_poll_owner(clnt, wait_time)) return; } @@ -2105,12 +2204,15 @@ TransporterFacade::do_poll(trp_client* c Uint32 cnt_woken = 0; Uint32 cnt = clnt->m_poll.m_locked_cnt - 1; // skip self trp_client ** arr = clnt->m_poll.m_locked_clients + 1; // skip self - if (!poll_owner) - clnt->m_poll.m_poll_owner = false; + clnt->m_poll.m_poll_owner = false; finish_poll(clnt, cnt, cnt_woken, arr); lock_poll_mutex(); + if ((cnt + 1) > m_num_active_clients) + { + m_num_active_clients = cnt + 1; + } /** * now remove all woken from poll queue * note: poll mutex held @@ -2119,7 +2221,7 @@ TransporterFacade::do_poll(trp_client* c bool new_owner_locked = true; trp_client * new_owner = NULL; - if (poll_owner) + if (stay_poll_owner) { unlock_poll_mutex(); } @@ -2142,7 +2244,7 @@ TransporterFacade::do_poll(trp_client* c NdbMutex_Unlock(arr[i]->m_mutex); } - if (poll_owner) + if (stay_poll_owner) { clnt->m_poll.m_locked_cnt = 0; dbg("%p->do_poll return", clnt); @@ -2274,13 +2376,13 @@ inline void trp_client::PollQueue::lock_client (trp_client* clnt) { - assert(m_locked_cnt <= NDB_ARRAY_SIZE(m_locked_clients)); + assert(m_locked_cnt <= m_lock_array_size); if (check_if_locked(clnt)) return; dbg("lock_client(%p)", clnt); - assert(m_locked_cnt < NDB_ARRAY_SIZE(m_locked_clients)); + assert(m_locked_cnt < m_lock_array_size); m_locked_clients[m_locked_cnt++] = clnt; NdbMutex_Lock(clnt->m_mutex); return; === modified file 'storage/ndb/src/ndbapi/TransporterFacade.hpp' --- a/storage/ndb/src/ndbapi/TransporterFacade.hpp revid:mikael.ronstrom@stripped +++ b/storage/ndb/src/ndbapi/TransporterFacade.hpp revid:mikael.ronstrom@stripped @@ -158,7 +158,8 @@ public: void start_poll(trp_client*); void do_poll(trp_client* clnt, Uint32 wait_time, - bool poll_owner = false); + bool is_poll_owner = false, + bool stay_poll_owner = false); void complete_poll(trp_client*); void wakeup(trp_client*); @@ -172,6 +173,25 @@ public: void add_to_poll_queue(trp_client* clnt); void remove_from_poll_queue(trp_client* clnt); + /* + Configuration handling of the receiver threads handling of polling + These methods implement methods on the ndb_cluster_connection + interface. + */ +#define NO_RECV_THREAD_CPU_ID 0xFFFF + int set_num_recv_threads(Uint32 num_recv_threads); + int get_num_recv_threads() const; + int set_recv_thread_cpu(Uint16 *cpuid_array, + Uint32 array_len, + Uint32 recv_thread_id); + int set_recv_thread_activation_threshold(Uint32 threshold); + int get_recv_thread_activation_threshold() const; + /* Variables to support configuration of receiver thread handling */ + Uint32 min_active_clients_recv_thread; + Uint16 recv_thread_cpu_id; + /* Support method to lock the receiver thread to its CPU */ + void lock_recv_thread_cpu(); + trp_client * m_poll_owner; trp_client * m_poll_queue_head; // First in queue trp_client * m_poll_queue_tail; // Last in queue @@ -222,7 +242,7 @@ private: void checkClusterMgr(NDB_TICKS currTime, NDB_TICKS & lastTime); bool get_poll_owner(trp_client* clnt, Uint32 wait_time); - bool become_poll_owner(trp_client* clnt); + bool become_poll_owner(trp_client* clnt, NDB_TICKS currtime); void finish_poll(trp_client* clnt, Uint32 cnt, Uint32& cnt_woken, @@ -231,6 +251,9 @@ private: bool &new_owner_locked, trp_client** new_owner_ptr); + Uint32 m_num_active_clients; + NDB_TICKS m_receive_activation_time; + bool isConnected(NodeId aNodeId); void doStop(); === modified file 'storage/ndb/src/ndbapi/ndb_cluster_connection.cpp' --- a/storage/ndb/src/ndbapi/ndb_cluster_connection.cpp revid:mikael.ronstrom@stripped +++ b/storage/ndb/src/ndbapi/ndb_cluster_connection.cpp revid:mikael.ronstrom@stripped @@ -91,6 +91,61 @@ const char *Ndb_cluster_connection::get_ return 0; } +int +Ndb_cluster_connection::set_num_recv_threads(Uint32 num_recv_threads) +{ + if (m_impl.m_transporter_facade) + { + return m_impl.m_transporter_facade->set_num_recv_threads(num_recv_threads); + } + return -1; +} + +int +Ndb_cluster_connection::get_num_recv_threads() const +{ + if (m_impl.m_transporter_facade) + { + return m_impl.m_transporter_facade->get_num_recv_threads(); + } + return -1; +} + +int +Ndb_cluster_connection::set_recv_thread_cpu(Uint16 *cpuid_array, + Uint32 array_len, + Uint32 recv_thread_id) +{ + if (m_impl.m_transporter_facade) + { + return m_impl.m_transporter_facade->set_recv_thread_cpu(cpuid_array, + array_len, + recv_thread_id); + } + return -1; +} + +int +Ndb_cluster_connection::set_recv_thread_activation_threshold(Uint32 threshold) +{ + TransporterFacade *fac = m_impl.m_transporter_facade; + if (fac) + { + return fac->set_recv_thread_activation_threshold(threshold); + } + return -1; +} + +int +Ndb_cluster_connection::get_recv_thread_activation_threshold() const +{ + if (m_impl.m_transporter_facade) + { + return m_impl.m_transporter_facade->get_recv_thread_activation_threshold(); + } + return -1; +} + const char *Ndb_cluster_connection::get_connectstring(char *buf, int buf_sz) const { === modified file 'storage/ndb/src/ndbapi/trp_client.cpp' --- a/storage/ndb/src/ndbapi/trp_client.cpp revid:mikael.ronstrom@stripped +++ b/storage/ndb/src/ndbapi/trp_client.cpp revid:mikael.ronstrom@stripped @@ -17,6 +17,7 @@ #include "trp_client.hpp" #include "TransporterFacade.hpp" +#include trp_client::trp_client() : m_blockNo(~Uint32(0)), m_facade(0) @@ -44,13 +45,28 @@ trp_client::~trp_client() } Uint32 -trp_client::open(TransporterFacade* tf, int blockNo) +trp_client::open(TransporterFacade* tf, int blockNo, + bool receive_thread) { Uint32 res = 0; assert(m_facade == 0); if (m_facade == 0) { m_facade = tf; + if (receive_thread) + { + m_poll.m_lock_array_size = 256; + } + else + { + m_poll.m_lock_array_size = 16; + } + m_poll.m_locked_clients = (trp_client**) + NdbMem_Allocate(sizeof(trp_client**) * m_poll.m_lock_array_size); + if (m_poll.m_locked_clients == NULL) + { + return 0; + } res = tf->open_clnt(this, blockNo); if (res != 0) { @@ -58,6 +74,8 @@ trp_client::open(TransporterFacade* tf, } else { + NdbMem_Free((void*)m_poll.m_locked_clients); + m_poll.m_locked_clients = NULL; m_facade = 0; } } @@ -79,6 +97,11 @@ trp_client::close() m_facade = 0; m_blockNo = ~Uint32(0); + if (m_poll.m_locked_clients) + { + NdbMem_Free((void*)m_poll.m_locked_clients); + m_poll.m_locked_clients = NULL; + } } } === modified file 'storage/ndb/src/ndbapi/trp_client.hpp' --- a/storage/ndb/src/ndbapi/trp_client.hpp revid:mikael.ronstrom@stripped +++ b/storage/ndb/src/ndbapi/trp_client.hpp revid:mikael.ronstrom@stripped @@ -42,7 +42,8 @@ public: virtual void trp_wakeup() {}; - Uint32 open(class TransporterFacade*, int blockNo = -1); + Uint32 open(class TransporterFacade*, int blockNo = -1, + bool receive_thread = false); void close(); void start_poll(); @@ -131,7 +132,8 @@ private: void lock_client(trp_client*); bool check_if_locked(const trp_client*) const; Uint32 m_locked_cnt; - trp_client * m_locked_clients[64]; + Uint32 m_lock_array_size; + trp_client ** m_locked_clients; } m_poll; /** @@ -243,6 +245,8 @@ trp_client::PollQueue::PollQueue() m_prev = 0; m_condition = NdbCondition_Create(); m_locked_cnt = 0; + m_lock_array_size = 0; + m_locked_clients = NULL; } inline No bundle (reason: useless for push emails).