From: Mikael Ronstrom Date: January 23 2012 9:41am Subject: bzr push into mysql-5.5-cluster-7.2 branch (mikael.ronstrom:3717 to 3719) List-Archive: http://lists.mysql.com/commits/142511 Message-Id: <201201230942.q0N9gw2G011131@dator6> MIME-Version: 1.0 Content-Type: text/plain; charset="us-ascii" Content-Transfer-Encoding: 7bit 3719 Mikael Ronstrom 2012-01-23 Merge step 8 modified: storage/ndb/src/common/transporter/TransporterRegistry.cpp storage/ndb/src/kernel/blocks/trpman.cpp storage/ndb/src/kernel/vm/TransporterCallback.cpp storage/ndb/src/kernel/vm/TransporterCallbackKernel.hpp storage/ndb/src/kernel/vm/mt.cpp storage/ndb/src/kernel/vm/mt.hpp 3718 Mikael Ronstrom 2012-01-20 Merge step 7 modified: storage/ndb/src/kernel/blocks/trpman.cpp storage/ndb/src/kernel/vm/mt.cpp 3717 Mikael Ronstrom 2012-01-20 Sixth step of merge modified: storage/ndb/src/kernel/vm/TransporterCallback.cpp storage/ndb/src/kernel/vm/TransporterCallbackKernel.hpp storage/ndb/src/kernel/vm/mt.cpp === modified file 'storage/ndb/src/common/transporter/TransporterRegistry.cpp' --- a/storage/ndb/src/common/transporter/TransporterRegistry.cpp revid:mikael.ronstrom@stripped +++ b/storage/ndb/src/common/transporter/TransporterRegistry.cpp revid:mikael.ronstrom@stripped @@ -1241,6 +1241,7 @@ TransporterRegistry::poll_TCP(Uint32 tim if (m_has_extra_wakeup_socket && recvdata.m_transporters.get(0)) { const NDB_SOCKET_TYPE socket = m_extra_wakeup_sockets[0]; + assert(receiveHandle); // Poll the wakup-socket for read recvdata.m_socket_poller.add(socket, true, false, false); @@ -1313,6 +1314,7 @@ TransporterRegistry::performReceive(Tran if (recvdata.m_has_data_transporters.get(0)) { assert(recvdata.m_transporters.get(0)); + assert(receiveHandle); recvdata.m_has_data_transporters.clear(Uint32(0)); consume_extra_sockets(); } === modified file 'storage/ndb/src/kernel/blocks/trpman.cpp' --- a/storage/ndb/src/kernel/blocks/trpman.cpp revid:mikael.ronstrom@stripped +++ b/storage/ndb/src/kernel/blocks/trpman.cpp revid:mikael.ronstrom@stripped @@ -54,17 +54,17 @@ extern Uint32 MAX_RECEIVED_SIGNALS; class TransporterReceiveHandle * mt_get_trp_receive_handle(unsigned instance); #endif +Uint32 mt_get_recv_thread_idx(NodeId nodeId); bool Trpman::handles_this_node(Uint32 nodeId) { -#if MAX_NDBMT_RECEIVE_THREADS == 1 +#ifndef NDBD_MULTITHREADED return true; #else if (globalData.ndbMtReceiveThreads <= (Uint32)1) return true; - return (instance()== - (globalTransporterRegistry.getReceiveThreadId(nodeId) + 1)); + return (instance()== (mt_get_recv_thread_idx(nodeId) + 1)); #endif } @@ -780,10 +780,10 @@ TrpmanProxy::execROUTE_ORD(Signal* signa jamEntry(); ndbassert(nodeId != 0); -#if MAX_NDBMT_RECEIVE_THREADS == 1 +#ifndef NDBD_MULTITHREADED Uint32 workerId = 0; #else - Uint32 workerId = globalTransporterRegistry.getReceiveThreadId(nodeId); + Uint32 workerId = mt_get_recv_thread_idx(nodeId); #endif SectionHandle handle(this, signal); sendSignal(workerRef(workerId), GSN_ROUTE_ORD, signal, === modified file 'storage/ndb/src/kernel/vm/TransporterCallback.cpp' --- a/storage/ndb/src/kernel/vm/TransporterCallback.cpp revid:mikael.ronstrom@stripped +++ b/storage/ndb/src/kernel/vm/TransporterCallback.cpp revid:mikael.ronstrom@stripped @@ -489,8 +489,9 @@ TransporterReceiveHandleKernel::checkJob } void -TransporterReceiveHandleKernel::assign_nodes(Uint32 *recv_thread_idx_array) +TransporterReceiveHandleKernel::assign_nodes(NodeId *recv_thread_idx_array) { +#ifdef NDBD_MULTITHREADED m_transporters.clear(); /* Clear all first */ for (Uint32 nodeId = 1; nodeId < MAX_NODES; nodeId++) { @@ -498,6 +499,7 @@ TransporterReceiveHandleKernel::assign_n m_transporters.set(nodeId); /* Belongs to our receive thread */ } return; +#endif } void === modified file 'storage/ndb/src/kernel/vm/TransporterCallbackKernel.hpp' --- a/storage/ndb/src/kernel/vm/TransporterCallbackKernel.hpp revid:mikael.ronstrom@stripped +++ b/storage/ndb/src/kernel/vm/TransporterCallbackKernel.hpp revid:mikael.ronstrom@stripped @@ -49,7 +49,7 @@ public: void reportError(NodeId nodeId, TransporterError errorCode, const char *info = 0); void transporter_recv_from(NodeId node); - void assign_nodes(Uint32 *recv_thread_idx_array); + void assign_nodes(NodeId *recv_thread_idx_array); int checkJobBuffer(); virtual ~TransporterReceiveHandleKernel() { } }; === modified file 'storage/ndb/src/kernel/vm/mt.cpp' --- a/storage/ndb/src/kernel/vm/mt.cpp revid:mikael.ronstrom@stripped +++ b/storage/ndb/src/kernel/vm/mt.cpp revid:mikael.ronstrom@stripped @@ -37,6 +37,16 @@ #include "mt-asm.h" +/** + * Array for mapping nodes to receiver threads and function to access it. + */ +static NodeId g_node_to_recv_thr_map[MAX_NODES]; + +Uint32 mt_get_recv_thread_idx(NodeId nodeId) +{ + return g_node_to_recv_thr_map[nodeId]; +} + inline SimulatedBlock* GlobalData::mt_getBlock(BlockNumber blockNo, Uint32 instanceNo) @@ -2199,7 +2209,7 @@ trp_callback::reportSendLen(NodeId nodeI void trp_callback::lock_transporter(NodeId node) { - Uint32 recv_thread_id = globalTransporterRegistry.getReceiveThreadId(node); + Uint32 recv_thread_idx = mt_get_recv_thread_idx(node); struct thr_repository* rep = &g_thr_repository; /** * Note: take the send lock _first_, so that we will not hold the receive @@ -2211,15 +2221,15 @@ trp_callback::lock_transporter(NodeId no * non-waiting (so we will not block sending on other transporters). */ lock(&rep->m_send_buffers[node].m_send_lock); - lock(&rep->m_receive_lock[recv_thread_id]); + lock(&rep->m_receive_lock[recv_thread_idx]); } void trp_callback::unlock_transporter(NodeId node) { - Uint32 recv_thread_id = globalTransporterRegistry.getReceiveThreadId(node); + Uint32 recv_thread_idx = mt_get_recv_thread_idx(node); struct thr_repository* rep = &g_thr_repository; - unlock(&rep->m_receive_lock[recv_thread_id]); + unlock(&rep->m_receive_lock[recv_thread_idx]); unlock(&rep->m_send_buffers[node].m_send_lock); } @@ -3412,7 +3422,7 @@ mt_receiver_thread_main(void *thr_arg) * Object that keeps track of our pollReceive-state */ TransporterReceiveHandleKernel recvdata(thr_no, recv_thread_idx); - recvdata.assign_nodes(g_nodes_to_thr_map); + recvdata.assign_nodes(g_node_to_recv_thr_map); globalTransporterRegistry.init(recvdata); /** @@ -4156,18 +4166,13 @@ setcpuaffinity(struct thr_repository* re } } -/** - * Array for mapping nodes to receiver threads - */ -static NodeId g_node_to_recv_thr_map[MAX_NODES]; - static void mt_assign_receiver_threads(void) { Uint32 num_recv_threads = globalData.ndbMtReceiveThreads; Uint32 recv_thread_idx = 0; - for (nodeId = 0; nodeId < MAX_NODES; nodeId++) + for (Uint32 nodeId = 1; nodeId < MAX_NODES; nodeId++) { Transporter *node_trp = globalTransporterRegistry.get_transporter(nodeId); === modified file 'storage/ndb/src/kernel/vm/mt.hpp' --- a/storage/ndb/src/kernel/vm/mt.hpp revid:mikael.ronstrom@stripped +++ b/storage/ndb/src/kernel/vm/mt.hpp revid:mikael.ronstrom@stripped @@ -48,7 +48,7 @@ SendStatus mt_send_remote(Uint32 self, c void mt_section_lock(); void mt_section_unlock(); -int mt_checkDoJob(); +int mt_checkDoJob(Uint32 receiver_thread_idx); /** * Are we (not) multi threaded No bundle (reason: useless for push emails).