From: Jonas Oreland Date: January 25 2012 10:41am Subject: bzr push into mysql-5.5-cluster-7.2 branch (jonas.oreland:3783 to 3785) List-Archive: http://lists.mysql.com/commits/142559 Message-Id: <20120125104133.4BB0E55C2B5@perch.localdomain> MIME-Version: 1.0 Content-Type: text/plain; charset="us-ascii" Content-Transfer-Encoding: 7bit 3785 Jonas Oreland 2012-01-25 [merge] ndb - merge 71 to 72 modified: storage/ndb/src/kernel/vm/mt.cpp storage/ndb/src/mgmsrv/ConfigInfo.cpp 3784 mikael.ronstrom@stripped 2012-01-25 add --no-static-linking to BUILD/build_mccge.sh. Patch from mikael modified: BUILD/build_mccge.sh 3783 Martin Skold 2012-01-24 [merge] Merge modified: mysql-test/suite/ndb/r/ndb_condition_pushdown.result mysql-test/suite/ndb/t/ndb_condition_pushdown.test sql/ha_ndbcluster_cond.cc sql/item_cmpfunc.h === modified file 'BUILD/build_mccge.sh' --- a/BUILD/build_mccge.sh 2011-06-30 15:46:53 +0000 +++ b/BUILD/build_mccge.sh 2012-01-25 09:10:25 +0000 @@ -856,6 +856,9 @@ parse_options() --static-linking) static_linking_flag="yes" ;; + --no-static-linking) + static_linking_flag="no" + ;; --strip) strip_flag="yes" ;; @@ -1763,8 +1766,10 @@ set_static_link_configs() loc_static_link="--with-mysqld-ldflags=\"-all-static\"" loc_static_link="$loc_static_link --with-client-ldflags=\"-all-static\"" else - loc_static_link="--with-mysqld-ldflags=\"-static\"" - loc_static_link="$loc_static_link --with-client-ldflags=\"-static\"" + if test "x$static_linking_flag" != "xno" ; then + loc_static_link="--with-mysqld-ldflags=\"-static\"" + loc_static_link="$loc_static_link --with-client-ldflags=\"-static\"" + fi fi base_configs="$base_configs $loc_static_link" } === modified file 'storage/ndb/src/kernel/vm/mt.cpp' --- a/storage/ndb/src/kernel/vm/mt.cpp 2012-01-24 06:20:13 +0000 +++ b/storage/ndb/src/kernel/vm/mt.cpp 2012-01-25 10:39:40 +0000 @@ -576,6 +576,15 @@ public: validate(); } + /** + * release everything if more than m_max_free + * else do nothing + */ + void release_chunk(Ndbd_mem_manager *mm, Uint32 rg) { + if (m_free > m_max_free) + release_all(mm, rg); + } + void set_pool(thr_safe_pool * pool) { m_global_pool = pool; } private: @@ -1023,6 +1032,477 @@ struct thr_repository Uint32 stopped_threads; }; +/** + * Class to handle send threads + * ---------------------------- + * We can have up to 8 send threads. + * + * This class will handle when a block thread needs to send, it will + * handle the running of the send thread and will also start the + * send thread. + */ +#define is_send_thread(thr_no) (thr_no >= num_threads) + +struct thr_send_thread_instance +{ + thr_send_thread_instance() : + m_instance_no(0), + m_watchdog_counter(0), + m_awake(FALSE), + m_thread(NULL), + m_waiter_struct(), + m_send_buffer_pool(0, THR_FREE_BUF_MAX) + {} + Uint32 m_instance_no; + Uint32 m_watchdog_counter; + Uint32 m_awake; + NdbThread *m_thread; + thr_wait m_waiter_struct; + class thread_local_pool m_send_buffer_pool; +}; + +struct thr_send_nodes +{ + /* 0 means NULL */ + Uint16 m_next; + Uint16 m_data_available; +}; + +class thr_send_threads +{ +public: + /* Create send thread environment */ + thr_send_threads(); + + /* Destroy send thread environment and ensure threads are stopped */ + ~thr_send_threads(); + + /* A block thread has flushed data for a node and wants it sent */ + void alert_send_thread(NodeId node); + + /* Method used to run the send thread */ + void run_send_thread(Uint32 instance_no); + + /* Method to start the send threads */ + void start_send_threads(); + + /* Check if at least one node to send to */ + bool data_available() + { + rmb(); + return m_first_node; + } + + bool data_available(NodeId node) + { + struct thr_send_nodes *node_state; + + node_state = &m_node_state[node]; + return node_state->m_data_available; + } + + /* Get send buffer pool for send thread */ + thread_local_pool* get_send_buffer_pool(Uint32 thr_no) + { + return &m_send_threads[thr_no - num_threads].m_send_buffer_pool; + } + +private: + /* Insert a node in list of nodes that has data available to send */ + void insert_node(NodeId node); + + /* Get a node from the list in order to send to it */ + NodeId get_node(); + + /* Get a send thread which isn't awake currently */ + struct thr_send_thread_instance* get_not_awake_send_thread(); + /* + * Try to lock send for this node, if not successful, try another + * node ready for sending, unlock send thread mutex as part of + * the call. + */ + NodeId check_and_lock_send_node(NodeId node); + + /* Perform the actual send to the node */ + int perform_send(NodeId node, Uint32 instance_no); + + /* Have threads been started */ + Uint32 m_started_threads; + + /* First node that has data to be sent */ + Uint32 m_first_node; + + /* Last node in list of nodes with data available for sending */ + Uint32 m_last_node; + + /* Is data available and next reference for each node in cluster */ + struct thr_send_nodes m_node_state[MAX_NODES]; + + /** + * Very few compiler (gcc) allow zero length arrays + */ +#if MAX_NDBMT_SEND_THREADS == 0 +#define _MAX_SEND_THREADS 1 +#else +#define _MAX_SEND_THREADS MAX_NDBMT_SEND_THREADS +#endif + + /* Data and state for the send threads */ + struct thr_send_thread_instance m_send_threads[_MAX_SEND_THREADS]; + + /** + * Mutex protecting the linked list of nodes awaiting sending + * and also the not_awake variable of the send thread. + */ + NdbMutex *send_thread_mutex; +}; + + +/* + * The single instance of the thr_send_threads class, if this variable + * is non-NULL, then we're using send threads, otherwise if NULL, there + * are no send threads. + */ +static thr_send_threads *g_send_threads = NULL; + +extern "C" +void * +mt_send_thread_main(void *thr_arg) +{ + struct thr_send_thread_instance *this_send_thread = + (thr_send_thread_instance*)thr_arg; + + Uint32 instance_no = this_send_thread->m_instance_no; + ndbout_c("Send thread : %u is started", instance_no); + g_send_threads->run_send_thread(instance_no); + return NULL; +} + +thr_send_threads::thr_send_threads() +{ + struct thr_repository *rep = &g_thr_repository; + + m_started_threads = FALSE; + m_first_node = 0; + m_last_node = 0; + for (Uint32 i = 0; i < NDB_ARRAY_SIZE(m_node_state); i++) + { + m_node_state[i].m_next = 0; + m_node_state[i].m_data_available = FALSE; + } + for (Uint32 i = 0; i < NDB_ARRAY_SIZE(m_send_threads); i++) + { + m_send_threads[i].m_waiter_struct.init(); + m_send_threads[i].m_instance_no = i; + m_send_threads[i].m_send_buffer_pool.set_pool(&rep->m_sb_pool); + } + send_thread_mutex = NdbMutex_Create(); +} + +thr_send_threads::~thr_send_threads() +{ + if (!m_started_threads) + return; + + for (Uint32 i = 0; i < globalData.ndbMtSendThreads; i++) + { + void *dummy_return_status; + + /* Ensure thread is woken up to die */ + wakeup(&(m_send_threads[i].m_waiter_struct)); + NdbThread_WaitFor(m_send_threads[i].m_thread, &dummy_return_status); + NdbThread_Destroy(&(m_send_threads[i].m_thread)); + } +} + +void +thr_send_threads::start_send_threads() +{ + for (Uint32 i = 0; i < globalData.ndbMtSendThreads; i++) + { + ndbout_c("Start send thread: %u", i); + m_send_threads[i].m_thread = + NdbThread_Create(mt_send_thread_main, + (void **)&m_send_threads[i], + 1024*1024, + "send thread", //ToDo add number + NDB_THREAD_PRIO_MEAN); + } + m_started_threads = TRUE; +} + +/* Called under mutex protection of send_thread_mutex */ +void +thr_send_threads::insert_node(NodeId node) +{ + Uint8 last_node = m_last_node; + Uint8 first_node = m_first_node; + struct thr_send_nodes *last_node_state = &m_node_state[last_node]; + struct thr_send_nodes *node_state = &m_node_state[node]; + + node_state->m_next = 0; + node_state->m_data_available = TRUE; + m_last_node = node; + if (first_node == 0) + { + m_first_node = node; + wmb(); /* Ensure send threads see the change to m_first_node != 0 */ + } + else + last_node_state->m_next = node; +} + +/* Called under mutex protection of send_thread_mutex */ +NodeId +thr_send_threads::get_node() +{ + Uint32 first_node = m_first_node; + struct thr_send_nodes *node_state; + Uint32 next; + + if (first_node) + { + node_state = &m_node_state[first_node]; + next = node_state->m_next; + + node_state->m_next = 0; + node_state->m_data_available = FALSE; + + m_first_node = next; + if (next == 0) + { + m_last_node = 0; + wmb(); /* Ensure send threads see the change to m_first_node == 0 */ + } + return (NodeId)first_node; + } + return 0; +} + +struct thr_send_thread_instance* +thr_send_threads::get_not_awake_send_thread() +{ + struct thr_send_thread_instance *used_send_thread; + + for (Uint32 i = 0; i < globalData.ndbMtSendThreads; i++) + { + if (!m_send_threads[i].m_awake) + { + used_send_thread= &m_send_threads[i]; + return used_send_thread; + } + } + return NULL; +} + +void +thr_send_threads::alert_send_thread(NodeId node) +{ + struct thr_send_thread_instance *used_send_thread = NULL; + + NdbMutex_Lock(send_thread_mutex); + if (m_node_state[node].m_data_available) + { + /* + * The node is already flagged that it has data needing to be sent. + * This means that someone else also ensured that a send thread + * was woken up if necessary, so we can rely on this that the send + * threads will now take care of it. + * + * We are safe that the buffers we have flushed will be read by a send + * thread since this variable is always reset before reading the send + * buffers of the node. + * + * The thread that set this flag should also have ensured that a send + * thread is awoken if needed, there is no need to wake even more + * threads up in this case since we piggyback on someone else's request. + */ + assert(m_first_node); + NdbMutex_Unlock(send_thread_mutex); + return; + } + else + insert_node(node); + /* + * Search for a send thread which is asleep, if there is one, wake it + * + * If everyone is already awake we don't need to wake anyone up since + * the threads will check if there is nodes available to send to before + * they go to sleep. + * + * The reason to look for anyone asleep is to ensure proper use of CPU + * resources and ensure that we use all the send thread CPUs available + * to our disposal when necessary. + */ + used_send_thread = get_not_awake_send_thread(); + + NdbMutex_Unlock(send_thread_mutex); + + if (used_send_thread) + { + /* + * Wake the assigned sleeping send thread, potentially a spurious wakeup, + * but this is not a problem, important is to ensure that at least one + * send thread is awoken to handle our request. If someone is already + * awake and takes of our request before we get to wake someone up it's + * not a problem. + */ + wakeup(&(used_send_thread->m_waiter_struct)); + } +} + +extern "C" +bool +check_available_send_data(struct thr_data *not_used) +{ + (void)not_used; + return !g_send_threads->data_available(); +} + +NodeId +thr_send_threads::check_and_lock_send_node(NodeId node) +{ + thr_repository::send_buffer * sb = g_thr_repository.m_send_buffers+node; + NodeId in_node = node; + + /* We own send_thread_mutex when entering */ + while (trylock(&sb->m_send_lock) != 0) + { + /* + * We failed to acquire lock, another send thread is busy writing to + * this node most likely, we'll post it last if there is other nodes + * to send data to. + */ + insert_node(node); + node = get_node(); + sb = g_thr_repository.m_send_buffers+node; + if (node == in_node) + { + /* + * No other node available to send to, we'll wait for this one to + * become free by using normal lock. + */ + NdbMutex_Unlock(send_thread_mutex); + lock(&sb->m_send_lock); + return node; + } + } + /* Only gets here when own the send lock of node */ + NdbMutex_Unlock(send_thread_mutex); + /* We don't want to have send thread mutex while sending */ + return node; +} + +int +thr_send_threads::perform_send(NodeId node, Uint32 instance_no) +{ + int res; + thr_repository::send_buffer * sb = g_thr_repository.m_send_buffers+node; + sb->m_send_thread = num_threads + instance_no; + res = globalTransporterRegistry.performSend(node); + sb->m_send_thread = NO_SEND_THREAD; + unlock(&sb->m_send_lock); + return res; +} + +void +thr_send_threads::run_send_thread(Uint32 instance_no) +{ + struct thr_send_thread_instance *this_send_thread = + &m_send_threads[instance_no]; + const Uint32 thr_no = num_threads + instance_no; + + { + /** + * Wait for thread object to be visible + */ + while(this_send_thread->m_thread == 0) + NdbSleep_MilliSleep(30); + } + + { + /** + * Print out information about starting thread + * (number, tid, name, the CPU it's locked into (if locked at all)) + * Also perform the locking to CPU. + */ + BaseString tmp; + THRConfigApplier & conf = globalEmulatorData.theConfiguration->m_thr_config; + tmp.appfmt("thr: %u ", thr_no); + int tid = NdbThread_GetTid(this_send_thread->m_thread); + if (tid != -1) + { + tmp.appfmt("tid: %u ", tid); + } + conf.appendInfoSendThread(tmp, instance_no); + int res = conf.do_bind_send(this_send_thread->m_thread, instance_no); + if (res < 0) + { + tmp.appfmt("err: %d ", -res); + } + else if (res > 0) + { + tmp.appfmt("OK "); + } + printf("%s\n", tmp.c_str()); + fflush(stdout); + } + + /** + * register watchdog + */ + globalEmulatorData.theWatchDog-> + registerWatchedThread(&this_send_thread->m_watchdog_counter, thr_no); + + NdbMutex_Lock(send_thread_mutex); + this_send_thread->m_awake = FALSE; + NdbMutex_Unlock(send_thread_mutex); + + while (globalData.theRestartFlag != perform_stop) + { + this_send_thread->m_watchdog_counter = 1; + + /* Yield for a maximum of 1ms */ + const Uint32 wait = 1000000; + yield(&this_send_thread->m_waiter_struct, wait, + check_available_send_data, NULL); + + NdbMutex_Lock(send_thread_mutex); + this_send_thread->m_awake = TRUE; + + NodeId node; + while ((node = get_node()) != 0 && + globalData.theRestartFlag != perform_stop) + { + this_send_thread->m_watchdog_counter = 2; + + /* We enter this method with send thread mutex and come + * back with send thread mutex released and instead owning + * the spin lock to send to the node returned + */ + node = check_and_lock_send_node(node); + + int res = perform_send(node, instance_no); + /* We return with no spin locks or mutexes held */ + + /* Release chunk-wise to decrease pressure on spin lock */ + this_send_thread->m_watchdog_counter = 3; + this_send_thread->m_send_buffer_pool. + release_chunk(g_thr_repository.m_mm, RG_TRANSPORTER_BUFFERS); + + NdbMutex_Lock(send_thread_mutex); + if (res && !data_available(node)) + insert_node(node); + } + + /* No data to send, prepare to sleep */ + this_send_thread->m_awake = FALSE; + NdbMutex_Unlock(send_thread_mutex); + } + + globalEmulatorData.theWatchDog->unregisterWatchedThread(thr_no); +} + #if 0 static Uint32 @@ -1642,6 +2122,14 @@ trp_callback::reportSendLen(NodeId nodeI Signal &signal = * new (&signalT) Signal(0); memset(&signal.header, 0, sizeof(signal.header)); + if (g_send_threads) + { + /** + * TODO: Implement this also when using send threads!! + */ + return; + } + signal.header.theLength = 3; signal.header.theSendersSignalId = 0; signal.header.theSendersBlockRef = numberToRef(0, globalData.ownId); @@ -1928,8 +2416,16 @@ trp_callback::bytes_sent(NodeId node, Ui thr_repository::send_buffer * sb = g_thr_repository.m_send_buffers+node; Uint32 thr_no = sb->m_send_thread; assert(thr_no != NO_SEND_THREAD); - return ::bytes_sent(&g_thr_repository.m_thread[thr_no].m_send_buffer_pool, - sb, bytes); + if (!is_send_thread(thr_no)) + { + return ::bytes_sent(&g_thr_repository.m_thread[thr_no].m_send_buffer_pool, + sb, bytes); + } + else + { + return ::bytes_sent(g_send_threads->get_send_buffer_pool(thr_no), + sb, bytes); + } } bool @@ -1940,6 +2436,7 @@ trp_callback::has_data_to_send(NodeId no thr_repository::send_buffer * sb = g_thr_repository.m_send_buffers + node; Uint32 thr_no = sb->m_send_thread; assert(thr_no != NO_SEND_THREAD); + assert(!is_send_thread(thr_no)); assert((sb->m_bytes > 0) == (sb->m_buffer.m_first_page != 0)); if (sb->m_bytes > 0 || sb->m_force_send) return true; @@ -2146,6 +2643,11 @@ do_send(struct thr_data* selfptr, bool m flush_send_buffer(selfptr, node); + if (g_send_threads) + { + g_send_threads->alert_send_thread(node); + continue; + } thr_repository::send_buffer * sb = rep->m_send_buffers + node; /** @@ -2230,7 +2732,8 @@ mt_send_handle::getWritePtr(NodeId node, { // TODO: maybe dont always flush on page-boundary ??? flush_send_buffer(m_selfptr, node); - try_send(m_selfptr, node); + if (!g_send_threads) + try_send(m_selfptr, node); } if ((p = m_selfptr->m_send_buffer_pool.seize(g_thr_repository.m_mm, @@ -3619,16 +4122,26 @@ ThreadConfig::ipControlLoop(NdbThread* p */ setcpuaffinity(rep); + if (globalData.ndbMtSendThreads) + { + g_send_threads = new thr_send_threads(); + } + /** * assign nodes to receiver threads */ assign_receiver_threads(); + /* Start the send thread(s) */ + if (g_send_threads) + { + g_send_threads->start_send_threads(); + } + /* * Start threads for all execution threads, except for the receiver * thread, which runs in the main thread. */ - for (thr_no = 0; thr_no < num_threads; thr_no++) { rep->m_thread[thr_no].m_time = NdbTick_CurrentMillisecond(); @@ -3677,6 +4190,12 @@ ThreadConfig::ipControlLoop(NdbThread* p NdbThread_WaitFor(rep->m_thread[thr_no].m_thread, &dummy_return_status); NdbThread_Destroy(&(rep->m_thread[thr_no].m_thread)); } + + /* Delete send threads, includes waiting for threads to shutdown */ + if (g_send_threads) + { + delete g_send_threads; + } } int === modified file 'storage/ndb/src/mgmsrv/ConfigInfo.cpp' --- a/storage/ndb/src/mgmsrv/ConfigInfo.cpp 2012-01-24 06:20:13 +0000 +++ b/storage/ndb/src/mgmsrv/ConfigInfo.cpp 2012-01-25 10:39:40 +0000 @@ -1120,7 +1120,7 @@ const ConfigInfo::ParamInfo ConfigInfo:: ConfigInfo::CI_INT, "4", "4", - "4" + STR_VALUE(NDB_MAX_LOG_PARTS) }, { No bundle (reason: useless for push emails).