3719 Mikael Ronstrom 2012-01-23
Merge step 8
modified:
storage/ndb/src/common/transporter/TransporterRegistry.cpp
storage/ndb/src/kernel/blocks/trpman.cpp
storage/ndb/src/kernel/vm/TransporterCallback.cpp
storage/ndb/src/kernel/vm/TransporterCallbackKernel.hpp
storage/ndb/src/kernel/vm/mt.cpp
storage/ndb/src/kernel/vm/mt.hpp
3718 Mikael Ronstrom 2012-01-20
Merge step 7
modified:
storage/ndb/src/kernel/blocks/trpman.cpp
storage/ndb/src/kernel/vm/mt.cpp
3717 Mikael Ronstrom 2012-01-20
Sixth step of merge
modified:
storage/ndb/src/kernel/vm/TransporterCallback.cpp
storage/ndb/src/kernel/vm/TransporterCallbackKernel.hpp
storage/ndb/src/kernel/vm/mt.cpp
=== modified file 'storage/ndb/src/common/transporter/TransporterRegistry.cpp'
--- a/storage/ndb/src/common/transporter/TransporterRegistry.cpp revid:mikael.ronstrom@stripped
+++ b/storage/ndb/src/common/transporter/TransporterRegistry.cpp revid:mikael.ronstrom@stripped
@@ -1241,6 +1241,7 @@ TransporterRegistry::poll_TCP(Uint32 tim
if (m_has_extra_wakeup_socket && recvdata.m_transporters.get(0))
{
const NDB_SOCKET_TYPE socket = m_extra_wakeup_sockets[0];
+ assert(receiveHandle);
// Poll the wakup-socket for read
recvdata.m_socket_poller.add(socket, true, false, false);
@@ -1313,6 +1314,7 @@ TransporterRegistry::performReceive(Tran
if (recvdata.m_has_data_transporters.get(0))
{
assert(recvdata.m_transporters.get(0));
+ assert(receiveHandle);
recvdata.m_has_data_transporters.clear(Uint32(0));
consume_extra_sockets();
}
=== modified file 'storage/ndb/src/kernel/blocks/trpman.cpp'
--- a/storage/ndb/src/kernel/blocks/trpman.cpp revid:mikael.ronstrom@stripped
+++ b/storage/ndb/src/kernel/blocks/trpman.cpp revid:mikael.ronstrom@stripped
@@ -54,17 +54,17 @@ extern Uint32 MAX_RECEIVED_SIGNALS;
class TransporterReceiveHandle *
mt_get_trp_receive_handle(unsigned instance);
#endif
+Uint32 mt_get_recv_thread_idx(NodeId nodeId);
bool
Trpman::handles_this_node(Uint32 nodeId)
{
-#if MAX_NDBMT_RECEIVE_THREADS == 1
+#ifndef NDBD_MULTITHREADED
return true;
#else
if (globalData.ndbMtReceiveThreads <= (Uint32)1)
return true;
- return (instance()==
- (globalTransporterRegistry.getReceiveThreadId(nodeId) + 1));
+ return (instance()== (mt_get_recv_thread_idx(nodeId) + 1));
#endif
}
@@ -780,10 +780,10 @@ TrpmanProxy::execROUTE_ORD(Signal* signa
jamEntry();
ndbassert(nodeId != 0);
-#if MAX_NDBMT_RECEIVE_THREADS == 1
+#ifndef NDBD_MULTITHREADED
Uint32 workerId = 0;
#else
- Uint32 workerId = globalTransporterRegistry.getReceiveThreadId(nodeId);
+ Uint32 workerId = mt_get_recv_thread_idx(nodeId);
#endif
SectionHandle handle(this, signal);
sendSignal(workerRef(workerId), GSN_ROUTE_ORD, signal,
=== modified file 'storage/ndb/src/kernel/vm/TransporterCallback.cpp'
--- a/storage/ndb/src/kernel/vm/TransporterCallback.cpp revid:mikael.ronstrom@stripped
+++ b/storage/ndb/src/kernel/vm/TransporterCallback.cpp revid:mikael.ronstrom@stripped
@@ -489,8 +489,9 @@ TransporterReceiveHandleKernel::checkJob
}
void
-TransporterReceiveHandleKernel::assign_nodes(Uint32 *recv_thread_idx_array)
+TransporterReceiveHandleKernel::assign_nodes(NodeId *recv_thread_idx_array)
{
+#ifdef NDBD_MULTITHREADED
m_transporters.clear(); /* Clear all first */
for (Uint32 nodeId = 1; nodeId < MAX_NODES; nodeId++)
{
@@ -498,6 +499,7 @@ TransporterReceiveHandleKernel::assign_n
m_transporters.set(nodeId); /* Belongs to our receive thread */
}
return;
+#endif
}
void
=== modified file 'storage/ndb/src/kernel/vm/TransporterCallbackKernel.hpp'
--- a/storage/ndb/src/kernel/vm/TransporterCallbackKernel.hpp revid:mikael.ronstrom@stripped
+++ b/storage/ndb/src/kernel/vm/TransporterCallbackKernel.hpp revid:mikael.ronstrom@stripped
@@ -49,7 +49,7 @@ public:
void reportError(NodeId nodeId, TransporterError errorCode,
const char *info = 0);
void transporter_recv_from(NodeId node);
- void assign_nodes(Uint32 *recv_thread_idx_array);
+ void assign_nodes(NodeId *recv_thread_idx_array);
int checkJobBuffer();
virtual ~TransporterReceiveHandleKernel() { }
};
=== modified file 'storage/ndb/src/kernel/vm/mt.cpp'
--- a/storage/ndb/src/kernel/vm/mt.cpp revid:mikael.ronstrom@stripped
+++ b/storage/ndb/src/kernel/vm/mt.cpp revid:mikael.ronstrom@stripped
@@ -37,6 +37,16 @@
#include "mt-asm.h"
+/**
+ * Array for mapping nodes to receiver threads and function to access it.
+ */
+static NodeId g_node_to_recv_thr_map[MAX_NODES];
+
+Uint32 mt_get_recv_thread_idx(NodeId nodeId)
+{
+ return g_node_to_recv_thr_map[nodeId];
+}
+
inline
SimulatedBlock*
GlobalData::mt_getBlock(BlockNumber blockNo, Uint32 instanceNo)
@@ -2199,7 +2209,7 @@ trp_callback::reportSendLen(NodeId nodeI
void
trp_callback::lock_transporter(NodeId node)
{
- Uint32 recv_thread_id = globalTransporterRegistry.getReceiveThreadId(node);
+ Uint32 recv_thread_idx = mt_get_recv_thread_idx(node);
struct thr_repository* rep = &g_thr_repository;
/**
* Note: take the send lock _first_, so that we will not hold the receive
@@ -2211,15 +2221,15 @@ trp_callback::lock_transporter(NodeId no
* non-waiting (so we will not block sending on other transporters).
*/
lock(&rep->m_send_buffers[node].m_send_lock);
- lock(&rep->m_receive_lock[recv_thread_id]);
+ lock(&rep->m_receive_lock[recv_thread_idx]);
}
void
trp_callback::unlock_transporter(NodeId node)
{
- Uint32 recv_thread_id = globalTransporterRegistry.getReceiveThreadId(node);
+ Uint32 recv_thread_idx = mt_get_recv_thread_idx(node);
struct thr_repository* rep = &g_thr_repository;
- unlock(&rep->m_receive_lock[recv_thread_id]);
+ unlock(&rep->m_receive_lock[recv_thread_idx]);
unlock(&rep->m_send_buffers[node].m_send_lock);
}
@@ -3412,7 +3422,7 @@ mt_receiver_thread_main(void *thr_arg)
* Object that keeps track of our pollReceive-state
*/
TransporterReceiveHandleKernel recvdata(thr_no, recv_thread_idx);
- recvdata.assign_nodes(g_nodes_to_thr_map);
+ recvdata.assign_nodes(g_node_to_recv_thr_map);
globalTransporterRegistry.init(recvdata);
/**
@@ -4156,18 +4166,13 @@ setcpuaffinity(struct thr_repository* re
}
}
-/**
- * Array for mapping nodes to receiver threads
- */
-static NodeId g_node_to_recv_thr_map[MAX_NODES];
-
static
void
mt_assign_receiver_threads(void)
{
Uint32 num_recv_threads = globalData.ndbMtReceiveThreads;
Uint32 recv_thread_idx = 0;
- for (nodeId = 0; nodeId < MAX_NODES; nodeId++)
+ for (Uint32 nodeId = 1; nodeId < MAX_NODES; nodeId++)
{
Transporter *node_trp =
globalTransporterRegistry.get_transporter(nodeId);
=== modified file 'storage/ndb/src/kernel/vm/mt.hpp'
--- a/storage/ndb/src/kernel/vm/mt.hpp revid:mikael.ronstrom@stripped
+++ b/storage/ndb/src/kernel/vm/mt.hpp revid:mikael.ronstrom@stripped
@@ -48,7 +48,7 @@ SendStatus mt_send_remote(Uint32 self, c
void mt_section_lock();
void mt_section_unlock();
-int mt_checkDoJob();
+int mt_checkDoJob(Uint32 receiver_thread_idx);
/**
* Are we (not) multi threaded
No bundle (reason: useless for push emails).
| Thread |
|---|
| • bzr push into mysql-5.5-cluster-7.2 branch (mikael.ronstrom:3717 to 3719) | Mikael Ronstrom | 23 Jan |