#At file:///export/space/pekka/ndb/version/my51-wl4391/
3080 Pekka Nousiainen 2008-11-15
wl#4391 25a_pgman.diff
DD - mt pgman: extra worker in proxy thread
modified:
storage/ndb/src/kernel/blocks/LocalProxy.cpp
storage/ndb/src/kernel/blocks/LocalProxy.hpp
storage/ndb/src/kernel/vm/SimulatedBlock.cpp
storage/ndb/src/kernel/vm/SimulatedBlock.hpp
storage/ndb/src/kernel/vm/dummy_nonmt.cpp
storage/ndb/src/kernel/vm/mt.cpp
storage/ndb/src/kernel/vm/mt.hpp
=== modified file 'storage/ndb/src/kernel/blocks/LocalProxy.cpp'
--- a/storage/ndb/src/kernel/blocks/LocalProxy.cpp 2008-08-27 08:00:52 +0000
+++ b/storage/ndb/src/kernel/blocks/LocalProxy.cpp 2008-11-15 15:43:59 +0000
@@ -23,6 +23,8 @@ LocalProxy::LocalProxy(BlockNumber block
BLOCK_CONSTRUCTOR(LocalProxy);
ndbrequire(instance() == 0); // this is main block
+ c_lqhWorkers = 0;
+ c_extraWorkers = 0; // sub-class constructor can set
c_workers = 0;
Uint32 i;
for (i = 0; i < MaxWorkers; i++)
@@ -162,7 +164,8 @@ LocalProxy::recvCONF(Signal* signal, SsP
BlockReference ref = signal->getSendersBlockRef();
ndbrequire(refToMain(ref) == number());
- ss.m_worker = refToInstance(ref) - 1;
+ Uint32 ino = refToInstance(ref);
+ ss.m_worker = workerIndex(ino);
ndbrequire(ref == workerRef(ss.m_worker));
ndbrequire(ss.m_worker < c_workers);
ndbrequire(ss.m_workerMask.get(ss.m_worker));
@@ -219,17 +222,24 @@ LocalProxy::lastReply(const SsParallel&
void
LocalProxy::loadWorkers()
{
- c_workers = getLqhWorkers();
+ c_lqhWorkers = getLqhWorkers();
+ c_workers = c_lqhWorkers + c_extraWorkers;
Uint32 i;
for (i = 0; i < c_workers; i++) {
- const Uint32 instanceNo = 1 + i;
+ jam();
+ Uint32 instanceNo = workerInstance(i);
+
SimulatedBlock* worker = newWorker(instanceNo);
ndbrequire(worker->instance() == instanceNo);
ndbrequire(this->getInstance(instanceNo) == worker);
c_worker[i] = worker;
- add_lqh_worker_thr_map(number(), instanceNo);
+ if (i < c_lqhWorkers) {
+ add_lqh_worker_thr_map(number(), instanceNo);
+ } else {
+ add_extra_worker_thr_map(number(), instanceNo);
+ }
}
}
=== modified file 'storage/ndb/src/kernel/blocks/LocalProxy.hpp'
--- a/storage/ndb/src/kernel/blocks/LocalProxy.hpp 2008-08-27 08:00:52 +0000
+++ b/storage/ndb/src/kernel/blocks/LocalProxy.hpp 2008-11-15 15:43:59 +0000
@@ -33,10 +33,17 @@
*
* The LQH proxy is the LQH block seen by other nodes and blocks,
* unless by-passed for efficiency. Real LQH instances (workers)
- * run behind it.
+ * run behind it. The instance number is 1 + worker index.
*
- * There are also ACC,TUP,TUX,BACKUP,RESTORE proxies and workers.
- * All proxy classes are subclasses of LocalProxy.
+ * There are also proxies and workers for ACC, TUP, TUX, BACKUP,
+ * RESTORE, and PGMAN. Proxy classes are subclasses of LocalProxy.
+ * Workers with same instance number (one from each class) run in
+ * same thread.
+ *
+ * After LQH workers there is an optional extra worker. It runs
+ * in the thread of the main block (i.e. the proxy). Its instance
+ * number is fixed as 1 + MaxLqhWorkers (currently 5) i.e. it skips
+ * over any unused LQH instance numbers.
*/
class LocalProxy : public SimulatedBlock {
@@ -46,24 +53,64 @@ public:
BLOCK_DEFINES(LocalProxy);
protected:
- enum { MaxWorkers = MAX_NDBMT_LQH_WORKERS };
- typedef Bitmask<MaxWorkers> WorkerMask;
+ enum { MaxLqhWorkers = MAX_NDBMT_LQH_WORKERS };
+ enum { MaxExtraWorkers = 1 };
+ enum { MaxWorkers = MaxLqhWorkers + MaxExtraWorkers };
+ typedef Bitmask<(MaxWorkers+31)/32> WorkerMask;
+ Uint32 c_lqhWorkers;
+ Uint32 c_extraWorkers;
Uint32 c_workers;
+ // no gaps - extra worker has index c_lqhWorkers (not MaxLqhWorkers)
SimulatedBlock* c_worker[MaxWorkers];
virtual SimulatedBlock* newWorker(Uint32 instanceNo) = 0;
virtual void loadWorkers();
- // worker index to worker instance
- Uint32 workerInstance(Uint32 i) {
+ // get worker block by index (not by instance)
+
+ SimulatedBlock* workerBlock(Uint32 i) {
ndbrequire(i < c_workers);
- return 1 + i;
+ ndbrequire(c_worker[i] != 0);
+ return c_worker[i];
+ }
+
+ SimulatedBlock* extraWorkerBlock() {
+ return workerBlock(c_lqhWorkers);
}
- // worker index to worker ref
+ // get worker block reference by index (not by instance)
+
BlockReference workerRef(Uint32 i) {
+ return numberToRef(number(), workerInstance(i), getOwnNodeId());
+ }
+
+ BlockReference extraWorkerRef() {
+ ndbrequire(c_workers == c_lqhWorkers + 1);
+ Uint32 i = c_lqhWorkers;
+ return workerRef(i);
+ }
+
+ // convert between worker index and worker instance
+
+ Uint32 workerInstance(Uint32 i) {
+ ndbrequire(i < c_workers);
+ Uint32 ino;
+ if (i < c_lqhWorkers)
+ ino = 1 + i;
+ else
+ ino = 1 + MaxLqhWorkers;
+ return ino;
+ }
+
+ Uint32 workerIndex(Uint32 ino) {
+ ndbrequire(ino != 0);
+ Uint32 i;
+ if (ino != 1 + MaxLqhWorkers)
+ i = ino - 1;
+ else
+ i = c_lqhWorkers;
ndbrequire(i < c_workers);
- return numberToRef(number(), 1 + i, getOwnNodeId());
+ return i;
}
// support routines and classes ("Ss" = signal state)
=== modified file 'storage/ndb/src/kernel/vm/SimulatedBlock.cpp'
--- a/storage/ndb/src/kernel/vm/SimulatedBlock.cpp 2008-10-20 13:23:01 +0000
+++ b/storage/ndb/src/kernel/vm/SimulatedBlock.cpp 2008-11-15 15:43:59 +0000
@@ -66,7 +66,6 @@ SimulatedBlock::SimulatedBlock(BlockNumb
theNumber(blockNumber),
theInstance(instanceNumber),
theReference(numberToRef(blockNumber, instanceNumber, globalData.ownId)),
- theInstanceCount(0),
theInstanceList(0),
theMainInstance(0),
m_ctx(ctx),
@@ -89,31 +88,26 @@ SimulatedBlock::SimulatedBlock(BlockNumb
m_jamBuffer = (EmulatedJamBuffer *)NdbThread_GetTlsKey(NDB_THREAD_TLS_JAM);
NewVarRef = 0;
- SimulatedBlock* main = globalData.getBlock(blockNumber);
+ SimulatedBlock* mainBlock = globalData.getBlock(blockNumber);
if (theInstance == 0) {
- ndbrequire(main == 0);
- main = this;
- globalData.setBlock(blockNumber, main);
- main->theInstanceCount = 1;
+ ndbrequire(mainBlock == 0);
+ mainBlock = this;
+ globalData.setBlock(blockNumber, mainBlock);
} else {
- ndbrequire(main != 0);
- ndbrequire(theInstance == main->theInstanceCount);
- ndbrequire(theInstance < MaxInstances);
- if (theInstance == 1) {
- ndbrequire(main->theInstanceList == 0);
- main->theInstanceList = new SimulatedBlock* [MaxInstances];
+ ndbrequire(mainBlock != 0);
+ if (mainBlock->theInstanceList == 0) {
+ mainBlock->theInstanceList = new SimulatedBlock* [MaxInstances];
+ ndbrequire(mainBlock->theInstanceList != 0);
Uint32 i;
for (i = 0; i < MaxInstances; i++)
- main->theInstanceList[i] = 0;
- } else {
- ndbrequire(main->theInstanceList != 0);
- }
- ndbrequire(main->theInstanceList[theInstance] == 0);
- main->theInstanceList[theInstance] = this;
- main->theInstanceCount = theInstance + 1;
+ mainBlock->theInstanceList[i] = 0;
+ }
+ ndbrequire(theInstance < MaxInstances);
+ ndbrequire(mainBlock->theInstanceList[theInstance] == 0);
+ mainBlock->theInstanceList[theInstance] = this;
}
- theMainInstance = main;
+ theMainInstance = mainBlock;
c_fragmentIdCounter = 1;
c_fragSenderRunning = false;
@@ -172,7 +166,7 @@ SimulatedBlock::~SimulatedBlock()
if (theInstanceList != 0) {
Uint32 i;
- for (i = 0; i < theInstanceCount; i++)
+ for (i = 0; i < MaxInstances; i++)
delete theInstanceList[i];
delete [] theInstanceList;
}
@@ -3016,7 +3010,9 @@ bool
SimulatedBlock::debugOutOn()
{
SignalLoggerManager::LogMode mask = SignalLoggerManager::LogInOut;
- return globalSignalLoggers.logMatch(number(), mask);
+ return
+ globalData.testOn &&
+ globalSignalLoggers.logMatch(number(), mask);
}
const char*
=== modified file 'storage/ndb/src/kernel/vm/SimulatedBlock.hpp'
--- a/storage/ndb/src/kernel/vm/SimulatedBlock.hpp 2008-11-06 16:52:59 +0000
+++ b/storage/ndb/src/kernel/vm/SimulatedBlock.hpp 2008-11-15 15:43:59 +0000
@@ -145,19 +145,13 @@ public:
Uint32 instance() const {
return theInstance;
}
- Uint32 getWorkerCount() const {
- ndbrequire(theInstance == 0); // valid only on main instance
- ndbrequire(theInstanceCount >= 1);
- return theInstanceCount - 1;
- }
SimulatedBlock* getInstance(Uint32 instanceNumber) {
- ndbrequire(theInstance == 0);
+ ndbrequire(theInstance == 0); // valid only on main instance
if (instanceNumber == 0)
return this;
- if (instanceNumber < theInstanceCount) {
- ndbrequire(theInstanceList != 0);
+ ndbrequire(instanceNumber < MaxInstances);
+ if (theInstanceList != 0)
return theInstanceList[instanceNumber];
- }
return 0;
}
virtual void loadWorkers() {}
@@ -501,8 +495,7 @@ private:
* In MT LQH main instance is the LQH proxy and the others ("workers")
* are real LQHs run by multiple threads.
*/
- enum { MaxInstances = 1 + MAX_NDBMT_LQH_WORKERS };
- Uint32 theInstanceCount; // set in main
+ enum { MaxInstances = 1 + MAX_NDBMT_LQH_WORKERS + 1 }; // main+lqh+extra
SimulatedBlock** theInstanceList; // set in main, indexed by instance
SimulatedBlock* theMainInstance; // set in all
/*
=== modified file 'storage/ndb/src/kernel/vm/dummy_nonmt.cpp'
--- a/storage/ndb/src/kernel/vm/dummy_nonmt.cpp 2008-08-15 11:01:41 +0000
+++ b/storage/ndb/src/kernel/vm/dummy_nonmt.cpp 2008-11-15 15:43:59 +0000
@@ -18,3 +18,9 @@ add_lqh_worker_thr_map(Uint32, Uint32)
{
assert(false);
}
+
+void
+add_extra_worker_thr_map(Uint32, Uint32)
+{
+ assert(false);
+}
=== modified file 'storage/ndb/src/kernel/vm/mt.cpp'
--- a/storage/ndb/src/kernel/vm/mt.cpp 2008-11-10 10:04:46 +0000
+++ b/storage/ndb/src/kernel/vm/mt.cpp 2008-11-15 15:43:59 +0000
@@ -68,15 +68,15 @@ static const Uint32 MAX_SIGNALS_BEFORE_W
//#define NDB_MT_LOCK_TO_CPU
-#define MAX_BLOCK_INSTANCES (1 + MAX_NDBMT_LQH_WORKERS)
+#define MAX_BLOCK_INSTANCES (1 + MAX_NDBMT_LQH_WORKERS + 1) //main+lqh+extra
#define NUM_MAIN_THREADS 2 // except receiver
#define MAX_THREADS (NUM_MAIN_THREADS + MAX_NDBMT_LQH_THREADS + 1)
/* If this is too small it crashes before first signal. */
-#define MAX_INSTANCES_PER_THREAD 16
+#define MAX_INSTANCES_PER_THREAD (16 + 8 * MAX_NDBMT_LQH_THREADS)
-static Uint32 ndbmt_workers = 0;
-static Uint32 ndbmt_threads = 0;
+static Uint32 num_lqh_workers = 0;
+static Uint32 num_lqh_threads = 0;
static Uint32 num_threads = 0;
static Uint32 receiver_thread_no = 0;
@@ -2002,6 +2002,42 @@ check_queues_empty(void *data)
}
/*
+ * Map instance number to real instance on this node. Used in
+ * sendlocal/sendprioa to find right thread and in execute_signals
+ * to find right block instance. SignalHeader is not modified.
+ */
+
+static Uint8 g_map_instance[MAX_BLOCK_INSTANCES];
+
+static void
+map_instance_init()
+{
+ g_map_instance[0] = 0;
+ Uint32 ino;
+ for (ino = 1; ino < MAX_BLOCK_INSTANCES; ino++) {
+ if (!globalData.isNdbMtLqh) {
+ g_map_instance[ino] = 0;
+ } else {
+ assert(num_lqh_workers != 0);
+ if (ino <= MAX_NDBMT_LQH_WORKERS) {
+ g_map_instance[ino] = 1 + (ino - 1) % num_lqh_workers;
+ } else {
+ /* Extra workers are not mapped. */
+ g_map_instance[ino] = ino;
+ }
+ }
+ }
+}
+
+static inline Uint32
+map_instance(const SignalHeader *s)
+{
+ Uint32 ino = blockToInstance(s->theReceiversBlockNumber);
+ assert(ino < MAX_BLOCK_INSTANCES);
+ return g_map_instance[ino];
+}
+
+/*
* Execute at most MAX_SIGNALS signals from one job queue, updating local read
* state as appropriate.
*
@@ -2069,17 +2105,9 @@ execute_signals(thr_data *selfptr, thr_j
__builtin_prefetch (read_buffer->m_data + read_pos + 32, 0, 3);
#endif
Uint32 bno = blockToMain(s->theReceiversBlockNumber);
- SimulatedBlock* block = globalData.getBlock(bno);
-
- if (ndbmt_workers != 0) { // MT LQH
- Uint32 ino = blockToInstance(s->theReceiversBlockNumber);
- if (ino != 0) {
- // map instance key to real instance
- ino = 1 + (ino - 1) % ndbmt_workers;
- block = block->getInstance(ino);
- assert(block != 0);
- }
- }
+ Uint32 ino = map_instance(s);
+ SimulatedBlock* block = globalData.getBlock(bno, ino);
+ assert(block != 0);
Uint32 gsn = s->theVerId_signalNumber;
*watchDogCounter = 1;
@@ -2166,6 +2194,17 @@ struct thr_map_entry {
static struct thr_map_entry thr_map[NO_OF_BLOCKS][MAX_BLOCK_INSTANCES];
+static inline Uint32
+block2ThreadId(Uint32 block, Uint32 instance)
+{
+ assert(block >= MIN_BLOCK_NO && block <= MAX_BLOCK_NO);
+ Uint32 index = block - MIN_BLOCK_NO;
+ assert(instance < MAX_BLOCK_INSTANCES);
+ const thr_map_entry& entry = thr_map[index][instance];
+ assert(entry.thr_no < num_threads);
+ return entry.thr_no;
+}
+
void
add_thr_map(Uint32 main, Uint32 instance, Uint32 thr_no)
{
@@ -2243,19 +2282,17 @@ add_lqh_worker_thr_map(Uint32 block, Uin
{
assert(instance != 0);
Uint32 i = instance - 1;
- Uint32 thr_no = NUM_MAIN_THREADS + i % ndbmt_threads;
+ Uint32 thr_no = NUM_MAIN_THREADS + i % num_lqh_threads;
add_thr_map(block, instance, thr_no);
}
-static inline Uint32
-block2ThreadId(Uint32 block, Uint32 instance)
+/* Extra workers run`in proxy thread. */
+void
+add_extra_worker_thr_map(Uint32 block, Uint32 instance)
{
- assert(block >= MIN_BLOCK_NO && block <= MAX_BLOCK_NO);
- Uint32 index = block - MIN_BLOCK_NO;
- assert(instance < MAX_BLOCK_INSTANCES);
- const thr_map_entry& entry = thr_map[index][instance];
- assert(entry.thr_no < num_threads);
- return entry.thr_no;
+ assert(instance != 0);
+ Uint32 thr_no = block2ThreadId(block, 0);
+ add_thr_map(block, instance, thr_no);
}
static void reportSignalStats(Uint32 self, Uint32 a_count, Uint32 a_size,
@@ -2535,13 +2572,7 @@ sendlocal(Uint32 self, const SignalHeade
const Uint32 secPtr[3])
{
Uint32 block = blockToMain(s->theReceiversBlockNumber);
- Uint32 instance = 0;
-
- if (ndbmt_workers != 0) { // MT LQH
- instance = blockToInstance(s->theReceiversBlockNumber);
- if (instance != 0)
- instance = 1 + (instance - 1) % ndbmt_workers;
- }
+ Uint32 instance = map_instance(s);
/*
* Max number of signals to put into job buffer before flushing the buffer
@@ -2576,13 +2607,7 @@ sendprioa(Uint32 self, const SignalHeade
const Uint32 secPtr[3])
{
Uint32 block = blockToMain(s->theReceiversBlockNumber);
- Uint32 instance = 0;
-
- if (ndbmt_workers != 0) { // MT LQH
- instance = blockToInstance(s->theReceiversBlockNumber);
- if (instance != 0)
- instance = 1 + (instance - 1) % ndbmt_workers;
- }
+ Uint32 instance = map_instance(s);
Uint32 dst = block2ThreadId(block, instance);
struct thr_repository* rep = &g_thr_repository;
@@ -2684,7 +2709,7 @@ sendprioa_STOP_FOR_CRASH(Uint32 dst)
main = NDBCNTR;
else if (dst == 1)
main = DBLQH;
- else if (dst >= NUM_MAIN_THREADS && dst < NUM_MAIN_THREADS + ndbmt_threads)
+ else if (dst >= NUM_MAIN_THREADS && dst < NUM_MAIN_THREADS + num_lqh_threads)
{
main = DBLQH;
instance = dst - NUM_MAIN_THREADS + 1;
@@ -2879,14 +2904,15 @@ ThreadConfig::~ThreadConfig()
void
ThreadConfig::init(EmulatorData *emulatorData)
{
- ndbmt_workers = globalData.ndbMtLqhWorkers;
- ndbmt_threads = globalData.ndbMtLqhThreads;
- num_threads = NUM_MAIN_THREADS + ndbmt_threads + 1;
+ num_lqh_workers = globalData.ndbMtLqhWorkers;
+ num_lqh_threads = globalData.ndbMtLqhThreads;
+ num_threads = NUM_MAIN_THREADS + num_lqh_threads + 1;
assert(num_threads <= MAX_THREADS);
receiver_thread_no = num_threads - 1;
ndbout << "NDBMT: num_threads=" << num_threads << endl;
+ ::map_instance_init();
::rep_init(&g_thr_repository, num_threads, emulatorData->m_mem_manager);
}
@@ -3364,7 +3390,7 @@ FastScheduler::dumpSignalMemory(Uint32 t
siglen = 25; // Sanity check
memcpy(&signal.header, s, 4*siglen);
// instance number in trace file is confusing if not MT LQH
- if (ndbmt_workers == 0)
+ if (num_lqh_workers == 0)
signal.header.theReceiversBlockNumber &= NDBMT_BLOCK_MASK;
const Uint32 *posptr = reinterpret_cast<const Uint32 *>(s);
=== modified file 'storage/ndb/src/kernel/vm/mt.hpp'
--- a/storage/ndb/src/kernel/vm/mt.hpp 2008-10-08 19:09:05 +0000
+++ b/storage/ndb/src/kernel/vm/mt.hpp 2008-11-15 15:43:59 +0000
@@ -33,6 +33,7 @@ extern Uint32 receiverThreadId;
void add_thr_map(Uint32 block, Uint32 instance, Uint32 thr_no);
void add_main_thr_map();
void add_lqh_worker_thr_map(Uint32 block, Uint32 instance);
+void add_extra_worker_thr_map(Uint32 block, Uint32 instance);
void sendlocal(Uint32 self, const struct SignalHeader *s,
const Uint32 *data, const Uint32 secPtr[3]);
| Thread |
|---|
| • bzr commit into mysql-5.1 branch (pekka:3080) WL#4391 | Pekka Nousiainen | 15 Nov |