4769 jonas oreland 2012-01-04
ndb - add handles_this_node (dummy) to trpman
modified:
storage/ndb/src/kernel/blocks/trpman.cpp
storage/ndb/src/kernel/blocks/trpman.hpp
4768 jonas oreland 2012-01-04
ndb - add embryo to multiple send/recv threads
modified:
storage/ndb/include/kernel/ndb_limits.h
storage/ndb/src/kernel/blocks/cmvmi/Cmvmi.cpp
storage/ndb/src/kernel/vm/SimulatedBlock.hpp
storage/ndb/src/kernel/vm/TransporterCallback.cpp
storage/ndb/src/kernel/vm/mt.cpp
storage/ndb/src/kernel/vm/mt_thr_config.cpp
storage/ndb/src/kernel/vm/mt_thr_config.hpp
4767 jonas oreland 2012-01-04
ndb - add padding in ArrayPool to avoid false sharing (readers/writers)
modified:
storage/ndb/src/kernel/vm/ArrayPool.hpp
4766 jonas oreland 2012-01-04
ndb - only update overloaded bitmask if it actually changed...NOTE: this code is really not thread-safebzr log -l 5 and should be fixed
modified:
storage/ndb/include/transporter/TransporterRegistry.hpp
4765 jonas oreland 2012-01-04
ndb - rename MAX_THREADS to MAX_BLOCK_THREADS
modified:
storage/ndb/src/kernel/vm/mt.cpp
4764 jonas oreland 2012-01-04
ndb - add jamBuffer argument to extractNodeInfo to increase mttc scalability
modified:
storage/ndb/src/kernel/blocks/dbdih/Dbdih.hpp
storage/ndb/src/kernel/blocks/dbdih/DbdihMain.cpp
4763 jonas oreland 2012-01-02 [merge]
ndb - merge 63 to 70
modified:
mysql-test/t/ctype_cp932_binlog_stm.test
=== modified file 'storage/ndb/include/kernel/ndb_limits.h'
--- a/storage/ndb/include/kernel/ndb_limits.h 2011-12-13 13:20:08 +0000
+++ b/storage/ndb/include/kernel/ndb_limits.h 2012-01-04 11:13:53 +0000
@@ -215,6 +215,9 @@
#define MAX_NDBMT_TC_THREADS 4
#endif
+#define MAX_NDBMT_SEND_THREADS 0
+#define MAX_NDBMT_RECEIVE_THREADS 1
+
#define NDB_FILE_BUFFER_SIZE (256*1024)
/**
=== modified file 'storage/ndb/include/transporter/TransporterRegistry.hpp'
--- a/storage/ndb/include/transporter/TransporterRegistry.hpp 2011-06-30 15:59:25 +0000
+++ b/storage/ndb/include/transporter/TransporterRegistry.hpp 2012-01-04 08:53:15 +0000
@@ -534,7 +534,8 @@ inline void
TransporterRegistry::set_status_overloaded(Uint32 nodeId, bool val)
{
assert(nodeId < MAX_NODES);
- m_status_overloaded.set(nodeId, val);
+ if (val != m_status_overloaded.get(nodeId))
+ m_status_overloaded.set(nodeId, val);
}
inline const NodeBitmask&
=== modified file 'storage/ndb/src/kernel/blocks/cmvmi/Cmvmi.cpp'
--- a/storage/ndb/src/kernel/blocks/cmvmi/Cmvmi.cpp 2011-11-24 14:14:24 +0000
+++ b/storage/ndb/src/kernel/blocks/cmvmi/Cmvmi.cpp 2012-01-04 11:13:53 +0000
@@ -57,6 +57,7 @@ extern int simulate_error_during_shutdow
// Index pages used by ACC instances
Uint32 g_acc_pages_used[1 + MAX_NDBMT_LQH_WORKERS];
+extern void mt_init_receiver_cache();
extern void mt_set_section_chunk_size();
Cmvmi::Cmvmi(Block_context& ctx) :
@@ -83,6 +84,7 @@ Cmvmi::Cmvmi(Block_context& ctx) :
g_sectionSegmentPool.setSize(long_sig_buffer_size,
true,true,true,CFG_DB_LONG_SIGNAL_BUFFER);
+ mt_init_receiver_cache();
mt_set_section_chunk_size();
// Add received signals
=== modified file 'storage/ndb/src/kernel/blocks/dbdih/Dbdih.hpp'
--- a/storage/ndb/src/kernel/blocks/dbdih/Dbdih.hpp 2011-12-15 10:01:04 +0000
+++ b/storage/ndb/src/kernel/blocks/dbdih/Dbdih.hpp 2012-01-04 08:17:53 +0000
@@ -1127,7 +1127,9 @@ private:
void allocStoredReplica(FragmentstorePtr regFragptr,
ReplicaRecordPtr& newReplicaPtr,
Uint32 nodeId);
- Uint32 extractNodeInfo(const Fragmentstore * fragPtr, Uint32 nodes[]);
+ Uint32 extractNodeInfo(EmulatedJamBuffer *jambuf,
+ const Fragmentstore * fragPtr,
+ Uint32 nodes[]);
bool findBestLogNode(CreateReplicaRecord* createReplica,
FragmentstorePtr regFragptr,
Uint32 startGci,
=== modified file 'storage/ndb/src/kernel/blocks/dbdih/DbdihMain.cpp'
--- a/storage/ndb/src/kernel/blocks/dbdih/DbdihMain.cpp 2011-12-15 10:01:04 +0000
+++ b/storage/ndb/src/kernel/blocks/dbdih/DbdihMain.cpp 2012-01-04 08:17:53 +0000
@@ -3846,7 +3846,7 @@ done:
FragmentstorePtr fragPtr;
getFragstore(tabPtr.p, takeOverPtr.p->toCurrentFragid, fragPtr);
Uint32 nodes[MAX_REPLICAS];
- extractNodeInfo(fragPtr.p, nodes);
+ extractNodeInfo(jamBuffer(), fragPtr.p, nodes);
req->lqhLogNode[0] = nodes[0]; // Source
req->requestInfo = StartFragReq::SFR_COPY_FRAG;
@@ -4179,7 +4179,7 @@ void Dbdih::toCopyFragLab(Signal* signal
FragmentstorePtr fragPtr;
getFragstore(tabPtr.p, takeOverPtr.p->toCurrentFragid, fragPtr);
Uint32 nodes[MAX_REPLICAS];
- extractNodeInfo(fragPtr.p, nodes);
+ extractNodeInfo(jamBuffer(), fragPtr.p, nodes);
takeOverPtr.p->toCopyNode = nodes[0];
PrepareCopyFragReq* req= (PrepareCopyFragReq*)signal->getDataPtrSend();
@@ -4376,7 +4376,7 @@ Dbdih::toStartCopyFrag(Signal* signal, T
copyFragReq->distributionKey = fragPtr.p->distributionKey;
copyFragReq->gci = gci;
Uint32 len = copyFragReq->nodeCount =
- extractNodeInfo(fragPtr.p,
+ extractNodeInfo(jamBuffer(), fragPtr.p,
copyFragReq->nodeList);
copyFragReq->nodeList[len] = takeOverPtr.p->maxPage;
copyFragReq->nodeList[len+1] = CopyFragReq::CFR_TRANSACTIONAL;
@@ -9227,7 +9227,7 @@ loop:
return;
}
getFragstore(tabPtr.p, fragId, fragPtr);
- Uint32 nodeCount = extractNodeInfo(fragPtr.p, conf->nodes);
+ Uint32 nodeCount = extractNodeInfo(jambuf, fragPtr.p, conf->nodes);
Uint32 sig2 = (nodeCount - 1) +
(fragPtr.p->distributionKey << 16) +
(dihGetInstanceKey(fragPtr) << 24);
@@ -9240,7 +9240,9 @@ loop:
thrjam(jambuf);
conf->reqinfo |= DiGetNodesConf::REORG_MOVING;
getFragstore(tabPtr.p, newFragId, fragPtr);
- nodeCount = extractNodeInfo(fragPtr.p, conf->nodes + 2 + MAX_REPLICAS);
+ nodeCount = extractNodeInfo(jambuf,
+ fragPtr.p,
+ conf->nodes + 2 + MAX_REPLICAS);
conf->nodes[MAX_REPLICAS] = newFragId;
conf->nodes[MAX_REPLICAS + 1] = (nodeCount - 1) +
(fragPtr.p->distributionKey << 16) +
@@ -9251,18 +9253,20 @@ loop:
goto loop;
}//Dbdih::execDIGETNODESREQ()
-Uint32 Dbdih::extractNodeInfo(const Fragmentstore * fragPtr, Uint32 nodes[])
+Uint32 Dbdih::extractNodeInfo(EmulatedJamBuffer *jambuf,
+ const Fragmentstore * fragPtr,
+ Uint32 nodes[])
{
Uint32 nodeCount = 0;
nodes[0] = nodes[1] = nodes[2] = nodes[3] = 0;
for (Uint32 i = 0; i < fragPtr->fragReplicas; i++) {
- jam();
+ thrjam(jambuf);
NodeRecordPtr nodePtr;
ndbrequire(i < MAX_REPLICAS);
nodePtr.i = fragPtr->activeNodes[i];
ptrCheckGuard(nodePtr, MAX_NDB_NODES, nodeRecord);
if (nodePtr.p->useInTransactions) {
- jam();
+ thrjam(jambuf);
nodes[nodeCount] = nodePtr.i;
nodeCount++;
}//if
@@ -9543,7 +9547,7 @@ void Dbdih::execDIH_SCAN_GET_NODES_REQ(S
Uint32 nodes[MAX_REPLICAS];
getFragstore(tabPtr.p, fragId, fragPtr);
- Uint32 count = extractNodeInfo(fragPtr.p, nodes);
+ Uint32 count = extractNodeInfo(jamBuffer(), fragPtr.p, nodes);
DihScanGetNodesConf* conf = (DihScanGetNodesConf*)signal->getDataPtrSend();
conf->senderData = senderData;
@@ -17519,7 +17523,9 @@ Dbdih::execDUMP_STATE_ORD(Signal* signal
getFragstore(tabPtr.p, j, fragPtr);
Uint32 nodeOrder[MAX_REPLICAS];
- const Uint32 noOfReplicas = extractNodeInfo(fragPtr.p, nodeOrder);
+ const Uint32 noOfReplicas = extractNodeInfo(jamBuffer(),
+ fragPtr.p,
+ nodeOrder);
char buf[100];
BaseString::snprintf(buf, sizeof(buf), " Table %d Fragment %d(%u) LP: %u - ", tabPtr.i, j, dihGetInstanceKey(fragPtr), fragPtr.p->m_log_part_id);
for(Uint32 k = 0; k < noOfReplicas; k++){
@@ -18296,7 +18302,9 @@ void Dbdih::execDIH_SWITCH_REPLICA_REQ(S
* Do funky stuff
*/
Uint32 oldOrder[MAX_REPLICAS];
- const Uint32 noOfReplicas = extractNodeInfo(fragPtr.p, oldOrder);
+ const Uint32 noOfReplicas = extractNodeInfo(jamBuffer(),
+ fragPtr.p,
+ oldOrder);
if (noOfReplicas < req->noOfReplicas) {
jam();
@@ -18434,7 +18442,9 @@ Dbdih::switchReplica(Signal* signal,
getFragstore(tabPtr.p, fragNo, fragPtr);
Uint32 oldOrder[MAX_REPLICAS];
- const Uint32 noOfReplicas = extractNodeInfo(fragPtr.p, oldOrder);
+ const Uint32 noOfReplicas = extractNodeInfo(jamBuffer(),
+ fragPtr.p,
+ oldOrder);
if(oldOrder[0] != nodeId) {
jam();
=== modified file 'storage/ndb/src/kernel/blocks/trpman.cpp'
--- a/storage/ndb/src/kernel/blocks/trpman.cpp 2011-11-22 13:03:47 +0000
+++ b/storage/ndb/src/kernel/blocks/trpman.cpp 2012-01-04 12:12:59 +0000
@@ -51,6 +51,13 @@ static NodeBitmask c_error_9000_nodes_ma
extern Uint32 MAX_RECEIVED_SIGNALS;
#endif
+static
+bool
+handles_this_node(Uint32 nodeId)
+{
+ return true;
+}
+
void
Trpman::execOPEN_COMREQ(Signal* signal)
{
@@ -70,6 +77,12 @@ Trpman::execOPEN_COMREQ(Signal* signal)
&& c_error_9000_nodes_mask.get(tStartingNode)))
#endif
{
+ if (!handles_this_node(tStartingNode))
+ {
+ jam();
+ goto done;
+ }
+
globalTransporterRegistry.do_connect(tStartingNode);
globalTransporterRegistry.setIOState(tStartingNode, HaltIO);
@@ -87,7 +100,8 @@ Trpman::execOPEN_COMREQ(Signal* signal)
for(unsigned int i = 1; i < MAX_NODES; i++ )
{
jam();
- if (i != getOwnNodeId() && getNodeInfo(i).m_type == tData2)
+ if (i != getOwnNodeId() && getNodeInfo(i).m_type == tData2 &&
+ handles_this_node(i))
{
jam();
@@ -106,12 +120,13 @@ Trpman::execOPEN_COMREQ(Signal* signal)
}
}
+done:
if (userRef != 0)
{
jam();
signal->theData[0] = tStartingNode;
signal->theData[1] = tData2;
- sendSignal(userRef, GSN_OPEN_COMCONF, signal, len - 1,JBA);
+ sendSignal(userRef, GSN_OPEN_COMCONF, signal, 2, JBA);
}
}
@@ -167,9 +182,10 @@ Trpman::execCLOSE_COMREQ(Signal* signal)
// Uint32 noOfNodes = closeCom->noOfNodes;
jamEntry();
- for (unsigned i = 0; i < MAX_NODES; i++)
+ for (unsigned i = 1; i < MAX_NODES; i++)
{
- if (NodeBitmask::get(closeCom->theNodes, i))
+ if (NodeBitmask::get(closeCom->theNodes, i) &&
+ handles_this_node(i))
{
jam();
@@ -204,6 +220,18 @@ Trpman::execCLOSE_COMREQ(Signal* signal)
}
}
+/*
+ We need to implement CLOSE_COMCONF signal for the non-multithreaded
+ case where message should go to QMGR, for multithreaded case it
+ needs to pass through TRPMAN proxy on its way back.
+*/
+void
+Trpman::execCLOSE_COMCONF(Signal *signal)
+{
+ jamEntry();
+ sendSignal(QMGR_REF, GSN_CLOSE_COMCONF, signal, 19, JBA);
+}
+
void
Trpman::execENABLE_COMREQ(Signal* signal)
{
@@ -225,6 +253,8 @@ Trpman::execENABLE_COMREQ(Signal* signal
break;
search_from = tStartingNode + 1;
+ if (!handles_this_node(tStartingNode))
+ continue;
globalTransporterRegistry.setIOState(tStartingNode, NoHalt);
setNodeInfo(tStartingNode).m_connected = true;
@@ -353,6 +383,12 @@ Trpman::execDBINFO_SCANREQ(Signal *signa
while (rnode < MAX_NODES)
{
+ if (!handles_this_node(rnode))
+ {
+ rnode++;
+ continue;
+ }
+
switch(getNodeInfo(rnode).m_type)
{
default:
@@ -429,14 +465,15 @@ Trpman::execDUMP_STATE_ORD(Signal* signa
CLEAR_ERROR_INSERT_VALUE;
if (signal->getLength() == 1 || signal->theData[1])
{
- for (Uint32 i = 0; i<MAX_NODES; i++)
+ for (Uint32 i = 1; i<MAX_NODES; i++)
{
- if (c_error_9000_nodes_mask.get(i))
- {
- signal->theData[0] = 0;
- signal->theData[1] = i;
+ if (c_error_9000_nodes_mask.get(i) &&
+ handles_this_node(i))
+ {
+ signal->theData[0] = 0;
+ signal->theData[1] = i;
execOPEN_COMREQ(signal);
- }
+ }
}
}
c_error_9000_nodes_mask.clear();
@@ -452,10 +489,13 @@ Trpman::execDUMP_STATE_ORD(Signal* signa
if (arg == 9005 && signal->getLength() == 2 && ERROR_INSERTED(9004))
{
Uint32 db = signal->theData[1];
- Uint32 i = c_error_9000_nodes_mask.find(0);
- signal->theData[0] = i;
- sendSignal(calcQmgrBlockRef(db),GSN_API_FAILREQ, signal, 1, JBA);
- ndbout_c("stopping %u using %u", i, db);
+ Uint32 i = c_error_9000_nodes_mask.find(1);
+ if (handles_this_node(i))
+ {
+ signal->theData[0] = i;
+ sendSignal(calcQmgrBlockRef(db),GSN_API_FAILREQ, signal, 1, JBA);
+ ndbout_c("stopping %u using %u", i, db);
+ }
CLEAR_ERROR_INSERT_VALUE;
}
#endif
@@ -482,26 +522,28 @@ Trpman::execDUMP_STATE_ORD(Signal* signa
for (Uint32 n = 1; n < signal->getLength(); n++)
{
Uint32 nodeId = signal->theData[n];
+ if (!handles_this_node(nodeId))
+ continue;
if ((nodeId > 0) &&
(nodeId < MAX_NODES))
{
if (block)
{
- ndbout_c("CMVMI : Blocking receive from node %u", nodeId);
+ ndbout_c("TRPMAN : Blocking receive from node %u", nodeId);
globalTransporterRegistry.blockReceive(nodeId);
}
else
{
- ndbout_c("CMVMI : Unblocking receive from node %u", nodeId);
+ ndbout_c("TRPMAN : Unblocking receive from node %u", nodeId);
globalTransporterRegistry.unblockReceive(nodeId);
}
}
else
{
- ndbout_c("CMVMI : Ignoring dump %u for node %u",
+ ndbout_c("TRPMAN : Ignoring dump %u for node %u",
arg, nodeId);
}
}
@@ -512,12 +554,14 @@ Trpman::execDUMP_STATE_ORD(Signal* signa
if (signal->getLength() > 1)
{
pattern = signal->theData[1];
- ndbout_c("CMVMI : Blocking receive from all ndbds matching pattern -%s-",
+ ndbout_c("TRPMAN : Blocking receive from all ndbds matching pattern -%s-",
((pattern == 1)? "Other side":"Unknown"));
}
for (Uint32 node = 1; node < MAX_NDB_NODES; node++)
{
+ if (!handles_this_node(node))
+ continue;
if (globalTransporterRegistry.is_connected(node))
{
if (getNodeInfo(node).m_type == NodeInfo::DB)
@@ -542,7 +586,7 @@ Trpman::execDUMP_STATE_ORD(Signal* signa
default:
break;
}
- ndbout_c("CMVMI : Blocking receive from node %u", node);
+ ndbout_c("TRPMAN : Blocking receive from node %u", node);
globalTransporterRegistry.blockReceive(node);
}
}
@@ -551,8 +595,10 @@ Trpman::execDUMP_STATE_ORD(Signal* signa
}
if (arg == 9991) /* Unblock recv from all blocked */
{
- for (Uint32 node = 0; node < MAX_NODES; node++)
+ for (Uint32 node = 1; node < MAX_NODES; node++)
{
+ if (!handles_this_node(node))
+ continue;
if (globalTransporterRegistry.isBlocked(node))
{
ndbout_c("CMVMI : Unblocking receive from node %u", node);
=== modified file 'storage/ndb/src/kernel/blocks/trpman.hpp'
--- a/storage/ndb/src/kernel/blocks/trpman.hpp 2011-11-22 13:03:47 +0000
+++ b/storage/ndb/src/kernel/blocks/trpman.hpp 2012-01-04 12:12:59 +0000
@@ -30,6 +30,7 @@ public:
BLOCK_DEFINES(Trpman);
void execCLOSE_COMREQ(Signal *signal);
+ void execCLOSE_COMCONF(Signal * signal);
void execOPEN_COMREQ(Signal *signal);
void execENABLE_COMREQ(Signal *signal);
void execDISCONNECT_REP(Signal *signal);
=== modified file 'storage/ndb/src/kernel/vm/ArrayPool.hpp'
--- a/storage/ndb/src/kernel/vm/ArrayPool.hpp 2011-09-27 06:44:06 +0000
+++ b/storage/ndb/src/kernel/vm/ArrayPool.hpp 2012-01-04 09:08:22 +0000
@@ -166,6 +166,11 @@ public:
struct Cache
{
Cache(Uint32 a0 = 512, Uint32 a1 = 256) { m_first_free = RNIL; m_free_cnt = 0; m_alloc_cnt = a0; m_max_free_cnt = a1; }
+ void init_cache(Uint32 a0, Uint32 a1)
+ {
+ m_alloc_cnt = a0;
+ m_max_free_cnt = a1;
+ }
Uint32 m_first_free;
Uint32 m_free_cnt;
Uint32 m_alloc_cnt;
@@ -248,17 +253,24 @@ public:
#endif
protected:
- Uint32 firstFree;
+ T * theArray;
Uint32 size;
+ /*
+ * Protect size and theArray which are very seldomly updated from
+ * updates of often updated variables such as firstFree, noOfFree.
+ * Protect here means to have them on separate CPU cache lines to
+ * avoid false CPU cache line sharing.
+ */
+ char protect_read_var[64 - (sizeof(Uint32) + sizeof(void*))];
+ Uint32 firstFree;
Uint32 noOfFree;
Uint32 noOfFreeMin;
- T * theArray;
- void * alloc_ptr;
#ifdef ARRAY_GUARD
+ bool chunk;
Uint32 bitmaskSz;
Uint32 *theAllocatedBitmask;
- bool chunk;
#endif
+ void * alloc_ptr;
};
template <class T>
=== modified file 'storage/ndb/src/kernel/vm/SimulatedBlock.hpp'
--- a/storage/ndb/src/kernel/vm/SimulatedBlock.hpp 2011-11-11 07:46:17 +0000
+++ b/storage/ndb/src/kernel/vm/SimulatedBlock.hpp 2012-01-04 11:13:53 +0000
@@ -635,7 +635,10 @@ private:
* are real LQHs run by multiple threads.
*/
protected:
- enum { MaxInstances = 3 + MAX_NDBMT_TC_THREADS + MAX_NDBMT_LQH_WORKERS + 1 };
+ enum { MaxInstances = 3 +
+ MAX_NDBMT_TC_THREADS +
+ MAX_NDBMT_LQH_WORKERS +
+ MAX_NDBMT_RECEIVE_THREADS };
private:
SimulatedBlock** theInstanceList; // set in main, indexed by instance
SimulatedBlock* theMainInstance; // set in all
=== modified file 'storage/ndb/src/kernel/vm/TransporterCallback.cpp'
--- a/storage/ndb/src/kernel/vm/TransporterCallback.cpp 2011-11-22 13:03:47 +0000
+++ b/storage/ndb/src/kernel/vm/TransporterCallback.cpp 2012-01-04 11:13:53 +0000
@@ -110,7 +110,20 @@ TransporterRegistry globalTransporterReg
#endif
#ifdef NDBD_MULTITHREADED
-static SectionSegmentPool::Cache cache(1024,1024);
+static struct ReceiverThreadCache
+{
+ SectionSegmentPool::Cache cache_instance;
+ char pad[64 - sizeof(SectionSegmentPool::Cache)];
+} g_receiver_thread_cache[MAX_NDBMT_RECEIVE_THREADS];
+
+void
+mt_init_receiver_cache()
+{
+ for (unsigned i = 0; i < NDB_ARRAY_SIZE(g_receiver_thread_cache); i++)
+ {
+ g_receiver_thread_cache[i].cache_instance.init_cache(1024,1024);
+ }
+}
void
mt_set_section_chunk_size()
@@ -119,6 +132,7 @@ mt_set_section_chunk_size()
}
#else
+void mt_init_receiver_cache(){}
void mt_set_section_chunk_size(){}
#endif
@@ -128,6 +142,9 @@ TransporterCallbackKernel::deliver_signa
Uint32 * const theData,
LinearSectionPtr ptr[3])
{
+#ifdef NDBD_MULTITHREADED
+ SectionSegmentPool::Cache & cache = g_receiver_thread_cache[0].cache_instance;
+#endif
const Uint32 secCount = header->m_noOfSections;
const Uint32 length = header->theLength;
=== modified file 'storage/ndb/src/kernel/vm/mt.cpp'
--- a/storage/ndb/src/kernel/vm/mt.cpp 2011-11-22 13:03:47 +0000
+++ b/storage/ndb/src/kernel/vm/mt.cpp 2012-01-04 11:13:53 +0000
@@ -73,10 +73,18 @@ static const Uint32 MAX_SIGNALS_BEFORE_W
//#define NDB_MT_LOCK_TO_CPU
#define NUM_MAIN_THREADS 2 // except receiver
-#define MAX_THREADS (NUM_MAIN_THREADS + \
- MAX_NDBMT_LQH_THREADS + \
- MAX_NDBMT_TC_THREADS + 1)
-#define MAX_BLOCK_INSTANCES (MAX_THREADS+1)
+/*
+ MAX_BLOCK_THREADS need not include the send threads since it's
+ used to set size of arrays used by all threads that contains a
+ job buffer and executes signals. The send threads only sends
+ messages directed to other nodes and contains no blocks and
+ executes thus no signals.
+*/
+#define MAX_BLOCK_THREADS (NUM_MAIN_THREADS + \
+ MAX_NDBMT_LQH_THREADS + \
+ MAX_NDBMT_TC_THREADS + \
+ MAX_NDBMT_RECEIVE_THREADS)
+#define MAX_BLOCK_INSTANCES (MAX_BLOCK_THREADS+1)
/* If this is too small it crashes before first signal. */
#define MAX_INSTANCES_PER_THREAD (16 + 8 * MAX_NDBMT_LQH_THREADS)
@@ -87,7 +95,7 @@ static Uint32 num_tc_threads = 0;
static Uint32 num_threads = 0;
static Uint32 receiver_thread_no = 0;
-#define NO_SEND_THREAD (MAX_THREADS + 1)
+#define NO_SEND_THREAD (MAX_BLOCK_THREADS + MAX_NDBMT_SEND_THREADS + 1)
/* max signal is 32 words, 7 for signal header and 25 datawords */
#define MIN_SIGNALS_PER_PAGE (thr_job_buffer::SIZE / 32)
@@ -864,12 +872,12 @@ struct thr_data
* These are the thread input queues, where other threads deliver signals
* into.
*/
- struct thr_job_queue_head m_in_queue_head[MAX_THREADS];
- struct thr_job_queue m_in_queue[MAX_THREADS];
+ struct thr_job_queue_head m_in_queue_head[MAX_BLOCK_THREADS];
+ struct thr_job_queue m_in_queue[MAX_BLOCK_THREADS];
/* These are the write states of m_in_queue[self] in each thread. */
- struct thr_jb_write_state m_write_states[MAX_THREADS];
+ struct thr_jb_write_state m_write_states[MAX_BLOCK_THREADS];
/* These are the read states of all of our own m_in_queue[]. */
- struct thr_jb_read_state m_read_states[MAX_THREADS];
+ struct thr_jb_read_state m_read_states[MAX_BLOCK_THREADS];
/* Jam buffers for making trace files at crashes. */
EmulatedJamBuffer m_jam;
@@ -951,21 +959,20 @@ extern struct thr_repository g_thr_repos
struct thr_repository
{
thr_repository()
- : m_receive_lock("recvlock"),
- m_section_lock("sectionlock"),
+ : m_section_lock("sectionlock"),
m_mem_manager_lock("memmanagerlock"),
m_jb_pool("jobbufferpool"),
m_sb_pool("sendbufferpool")
{}
- struct thr_spin_lock<64> m_receive_lock;
+ struct thr_spin_lock<64> m_receive_lock[MAX_NDBMT_RECEIVE_THREADS];
struct thr_spin_lock<64> m_section_lock;
struct thr_spin_lock<64> m_mem_manager_lock;
struct thr_safe_pool<thr_job_buffer> m_jb_pool;
struct thr_safe_pool<thr_send_page> m_sb_pool;
Ndbd_mem_manager * m_mm;
unsigned m_thread_count;
- struct thr_data m_thread[MAX_THREADS];
+ struct thr_data m_thread[MAX_BLOCK_THREADS];
/**
* send buffer handling
@@ -1006,11 +1013,11 @@ struct thr_repository
Uint32 m_bytes;
/* read index(es) in thr_send_queue */
- Uint32 m_read_index[MAX_THREADS];
+ Uint32 m_read_index[MAX_BLOCK_THREADS];
} m_send_buffers[MAX_NTRANSPORTERS];
/* The buffers published by threads */
- thr_send_queue m_thread_send_buffers[MAX_NTRANSPORTERS][MAX_THREADS];
+ thr_send_queue m_thread_send_buffers[MAX_NTRANSPORTERS][MAX_BLOCK_THREADS];
/*
* These are used to synchronize during crash / trace dumps.
@@ -1667,6 +1674,7 @@ trp_callback::reportSendLen(NodeId nodeI
void
trp_callback::lock_transporter(NodeId node)
{
+ Uint32 recv_thread_no = 0;
struct thr_repository* rep = &g_thr_repository;
/**
* Note: take the send lock _first_, so that we will not hold the receive
@@ -1678,14 +1686,15 @@ trp_callback::lock_transporter(NodeId no
* non-waiting (so we will not block sending on other transporters).
*/
lock(&rep->m_send_buffers[node].m_send_lock);
- lock(&rep->m_receive_lock);
+ lock(&rep->m_receive_lock[recv_thread_no]);
}
void
trp_callback::unlock_transporter(NodeId node)
{
+ Uint32 recv_thread_no = 0;
struct thr_repository* rep = &g_thr_repository;
- unlock(&rep->m_receive_lock);
+ unlock(&rep->m_receive_lock[recv_thread_no]);
unlock(&rep->m_send_buffers[node].m_send_lock);
}
@@ -1742,8 +1751,8 @@ static
Uint32
link_thread_send_buffers(thr_repository::send_buffer * sb, Uint32 node)
{
- Uint32 ri[MAX_THREADS];
- Uint32 wi[MAX_THREADS];
+ Uint32 ri[MAX_BLOCK_THREADS];
+ Uint32 wi[MAX_BLOCK_THREADS];
thr_send_queue * src = g_thr_repository.m_thread_send_buffers[node];
for (unsigned thr = 0; thr < num_threads; thr++)
{
@@ -2803,7 +2812,7 @@ init_thread(thr_data *selfptr)
* Align signal buffer for better cache performance.
* Also skew it a litte for each thread to avoid cache pollution.
*/
-#define SIGBUF_SIZE (sizeof(Signal) + 63 + 256 * MAX_THREADS)
+#define SIGBUF_SIZE (sizeof(Signal) + 63 + 256 * MAX_BLOCK_THREADS)
static Signal *
aligned_signal(unsigned char signal_buf[SIGBUF_SIZE], unsigned thr_no)
{
@@ -2837,6 +2846,7 @@ mt_receiver_thread_main(void *thr_arg)
Uint32& watchDogCounter = selfptr->m_watchdog_counter;
Uint32 thrSignalId = 0;
bool has_received = false;
+ const unsigned recv_thread_no = 0;
init_thread(selfptr);
receiverThreadId = thr_no;
@@ -2876,9 +2886,9 @@ mt_receiver_thread_main(void *thr_arg)
if (check_job_buffers(rep) == 0)
{
watchDogCounter = 8;
- lock(&rep->m_receive_lock);
+ lock(&rep->m_receive_lock[recv_thread_no]);
globalTransporterRegistry.performReceive();
- unlock(&rep->m_receive_lock);
+ unlock(&rep->m_receive_lock[recv_thread_no]);
has_received = true;
}
}
@@ -3393,6 +3403,16 @@ thr_init2(struct thr_repository* rep, st
static
void
+receive_lock_init(Uint32 recv_thread_id, thr_repository *rep)
+{
+ char buf[100];
+ BaseString::snprintf(buf, sizeof(buf), "receive lock thread id %d",
+ recv_thread_id);
+ register_lock(&rep->m_receive_lock[recv_thread_id], buf);
+}
+
+static
+void
send_buffer_init(Uint32 node, thr_repository::send_buffer * sb)
{
char buf[100];
@@ -3425,6 +3445,10 @@ rep_init(struct thr_repository* rep, uns
NdbMutex_Init(&rep->stop_for_crash_mutex);
NdbCondition_Init(&rep->stop_for_crash_cond);
+ for (Uint32 i = 0; i < NDB_ARRAY_SIZE(rep->m_receive_lock); i++)
+ {
+ receive_lock_init(i, rep);
+ }
for (int i = 0 ; i < MAX_NTRANSPORTERS; i++)
{
send_buffer_init(i, rep->m_send_buffers+i);
@@ -3496,7 +3520,7 @@ ThreadConfig::init()
num_lqh_threads = globalData.ndbMtLqhThreads;
num_tc_threads = globalData.ndbMtTcThreads;
num_threads = NUM_MAIN_THREADS + num_tc_threads + num_lqh_threads + 1;
- require(num_threads <= MAX_THREADS);
+ require(num_threads <= MAX_BLOCK_THREADS);
receiver_thread_no = num_threads - 1;
ndbout << "NDBMT: num_threads=" << num_threads << endl;
@@ -3785,14 +3809,14 @@ FastScheduler::dumpSignalMemory(Uint32 t
* the same order they were executed (order obtained from signal id).
*
* We may need to keep track of THR_FREE_BUF_MAX buffers for fully executed
- * (and freed) buffers, plus MAX_THREADS buffers for currently active
+ * (and freed) buffers, plus MAX_BLOCK_THREADS buffers for currently active
* prio B buffers, plus one active prio A buffer.
*/
struct {
const thr_job_buffer *m_jb;
Uint32 m_pos;
Uint32 m_max;
- } jbs[THR_FREE_BUF_MAX + MAX_THREADS + 1];
+ } jbs[THR_FREE_BUF_MAX + MAX_BLOCK_THREADS + 1];
Uint32 num_jbs = 0;
@@ -4032,7 +4056,7 @@ mt_get_thread_references_for_blocks(cons
Uint32 dst[], Uint32 len)
{
Uint32 cnt = 0;
- Bitmask<(MAX_THREADS+31)/32> mask;
+ Bitmask<(MAX_BLOCK_THREADS+31)/32> mask;
mask.set(threadId);
for (Uint32 i = 0; blocks[i] != 0; i++)
{
=== modified file 'storage/ndb/src/kernel/vm/mt_thr_config.cpp'
--- a/storage/ndb/src/kernel/vm/mt_thr_config.cpp 2011-12-13 13:14:03 +0000
+++ b/storage/ndb/src/kernel/vm/mt_thr_config.cpp 2012-01-04 11:13:53 +0000
@@ -30,10 +30,11 @@ static const struct THRConfig::Entries m
// name type min max
{ "main", THRConfig::T_MAIN, 1, 1 },
{ "ldm", THRConfig::T_LDM, 1, MAX_NDBMT_LQH_THREADS },
- { "recv", THRConfig::T_RECV, 1, 1 },
+ { "recv", THRConfig::T_RECV, 1, MAX_NDBMT_RECEIVE_THREADS },
{ "rep", THRConfig::T_REP, 1, 1 },
{ "io", THRConfig::T_IO, 1, 1 },
- { "tc", THRConfig::T_TC, 0, MAX_NDBMT_TC_THREADS }
+ { "tc", THRConfig::T_TC, 0, MAX_NDBMT_TC_THREADS },
+ { "send", THRConfig::T_SEND, 0, MAX_NDBMT_SEND_THREADS }
};
static const struct THRConfig::Param m_params[] =
@@ -144,6 +145,8 @@ THRConfig::do_parse(unsigned MaxNoOfExec
Uint32 tcthreads = 0;
Uint32 lqhthreads = 0;
+ Uint32 sendthreads = 0;
+ Uint32 recvthreads = 1;
switch(MaxNoOfExecutionThreads){
case 0:
case 1:
@@ -165,24 +168,32 @@ THRConfig::do_parse(unsigned MaxNoOfExec
lqhthreads = __ndbmt_lqh_threads;
}
- add(T_MAIN);
- add(T_REP);
- add(T_RECV);
+ add(T_MAIN); /* Global */
+ add(T_REP); /* Local, main consumer is SUMA */
+ for(Uint32 i = 0; i < recvthreads; i++)
+ {
+ add(T_RECV);
+ }
add(T_IO);
for(Uint32 i = 0; i < lqhthreads; i++)
{
add(T_LDM);
}
-
for(Uint32 i = 0; i < tcthreads; i++)
{
add(T_TC);
}
+ for(Uint32 i = 0; i < sendthreads; i++)
+ {
+ add(T_SEND);
+ }
// If we have set TC-threads...we say that this is "new" code
// and give error for having too few CPU's in mask compared to #threads
// started
- const bool allow_too_few_cpus = (tcthreads == 0);
+ const bool allow_too_few_cpus = (tcthreads == 0 &&
+ sendthreads == 0 &&
+ recvthreads == 1);
return do_bindings(allow_too_few_cpus) || do_validate();
}
@@ -451,11 +462,15 @@ THRConfig::do_validate()
}
/**
- * LDM can be 1 2 4
+ * LDM can be 1 2 4 8 16
*/
- if (m_threads[T_LDM].size() == 3)
+ if (m_threads[T_LDM].size() != 1 &&
+ m_threads[T_LDM].size() != 2 &&
+ m_threads[T_LDM].size() != 4 &&
+ m_threads[T_LDM].size() != 8 &&
+ m_threads[T_LDM].size() != 16)
{
- m_err_msg.assfmt("No of LDM-instances can be 1,2,4. Specified: %u",
+ m_err_msg.assfmt("No of LDM-instances can be 1,2,4,8,16. Specified: %u",
m_threads[T_LDM].size());
return -1;
}
@@ -886,7 +901,11 @@ THRConfig::do_parse(const char * ThreadC
add((T_Type)i);
}
- const bool allow_too_few_cpus = m_threads[T_TC].size() == 0;
+ const bool allow_too_few_cpus =
+ m_threads[T_TC].size() == 0 &&
+ m_threads[T_SEND].size() == 0 &&
+ m_threads[T_RECV].size() == 1;
+
int res = do_bindings(allow_too_few_cpus);
if (res != 0)
{
@@ -958,6 +977,21 @@ THRConfigApplier::appendInfo(BaseString&
const unsigned short list[], unsigned cnt) const
{
const T_Thread* thr = find_thread(list, cnt);
+ appendInfo(str, thr);
+}
+
+void
+THRConfigApplier::appendInfoSendThread(BaseString& str,
+ unsigned instance_no) const
+{
+ const T_Thread* thr = &m_threads[T_SEND][instance_no];
+ appendInfo(str, thr);
+}
+
+void
+THRConfigApplier::appendInfo(BaseString& str,
+ const T_Thread* thr) const
+{
assert(thr != 0);
str.appfmt("(%s) ", getEntryName(thr->m_type));
if (thr->m_bind_type == T_Thread::B_CPU_BOUND)
@@ -989,27 +1023,27 @@ THRConfigApplier::do_bind(NdbThread* thr
const unsigned short list[], unsigned cnt)
{
const T_Thread* thr = find_thread(list, cnt);
- if (thr->m_bind_type == T_Thread::B_CPU_BOUND)
- {
- int res = NdbThread_LockCPU(thread, thr->m_bind_no);
- if (res == 0)
- return 1;
- else
- return -res;
- }
-#if TODO
- else if (thr->m_bind_type == T_Thread::B_CPUSET_BOUND)
- {
- }
-#endif
-
- return 0;
+ return do_bind(thread, thr);
}
int
THRConfigApplier::do_bind_io(NdbThread* thread)
{
const T_Thread* thr = &m_threads[T_IO][0];
+ return do_bind(thread, thr);
+}
+
+int
+THRConfigApplier::do_bind_send(NdbThread* thread, unsigned instance)
+{
+ const T_Thread* thr = &m_threads[T_SEND][instance];
+ return do_bind(thread, thr);
+}
+
+int
+THRConfigApplier::do_bind(NdbThread* thread,
+ const T_Thread* thr)
+{
if (thr->m_bind_type == T_Thread::B_CPU_BOUND)
{
int res = NdbThread_LockCPU(thread, thr->m_bind_no);
=== modified file 'storage/ndb/src/kernel/vm/mt_thr_config.hpp'
--- a/storage/ndb/src/kernel/vm/mt_thr_config.hpp 2011-12-13 13:14:03 +0000
+++ b/storage/ndb/src/kernel/vm/mt_thr_config.hpp 2012-01-04 11:13:53 +0000
@@ -44,8 +44,9 @@ public:
T_REP = 3, /* SUMA */
T_IO = 4, /* FS, SocketServer etc */
T_TC = 5, /* TC+SPJ */
+ T_SEND = 6, /* No blocks */
- T_END = 6
+ T_END = 7
};
THRConfig();
@@ -126,11 +127,15 @@ public:
const char * getName(const unsigned short list[], unsigned cnt) const;
void appendInfo(BaseString&, const unsigned short list[], unsigned cnt) const;
+ void appendInfoSendThread(BaseString&, unsigned instance_no) const;
int do_bind(NdbThread*, const unsigned short list[], unsigned cnt);
int do_bind_io(NdbThread*);
+ int do_bind_send(NdbThread*, unsigned);
protected:
const T_Thread* find_thread(const unsigned short list[], unsigned cnt) const;
+ void appendInfo(BaseString&, const T_Thread*) const;
+ int do_bind(NdbThread*, const T_Thread*);
};
#endif // IPCConfig_H
No bundle (reason: useless for push emails).
| Thread |
|---|
| • bzr push into mysql-5.1-telco-7.0 branch (jonas.oreland:4763 to 4769) | jonas oreland | 9 Jan |