List:Commits« Previous MessageNext Message »
From:Pekka Nousiainen Date:November 15 2008 3:44pm
Subject:bzr commit into mysql-5.1 branch (pekka:3080) WL#4391
View as plain text  
#At file:///export/space/pekka/ndb/version/my51-wl4391/

 3080 Pekka Nousiainen	2008-11-15
      wl#4391 25a_pgman.diff
      DD - mt pgman: extra worker in proxy thread
modified:
  storage/ndb/src/kernel/blocks/LocalProxy.cpp
  storage/ndb/src/kernel/blocks/LocalProxy.hpp
  storage/ndb/src/kernel/vm/SimulatedBlock.cpp
  storage/ndb/src/kernel/vm/SimulatedBlock.hpp
  storage/ndb/src/kernel/vm/dummy_nonmt.cpp
  storage/ndb/src/kernel/vm/mt.cpp
  storage/ndb/src/kernel/vm/mt.hpp

=== modified file 'storage/ndb/src/kernel/blocks/LocalProxy.cpp'
--- a/storage/ndb/src/kernel/blocks/LocalProxy.cpp	2008-08-27 08:00:52 +0000
+++ b/storage/ndb/src/kernel/blocks/LocalProxy.cpp	2008-11-15 15:43:59 +0000
@@ -23,6 +23,8 @@ LocalProxy::LocalProxy(BlockNumber block
   BLOCK_CONSTRUCTOR(LocalProxy);
 
   ndbrequire(instance() == 0); // this is main block
+  c_lqhWorkers = 0;
+  c_extraWorkers = 0; // sub-class constructor can set
   c_workers = 0;
   Uint32 i;
   for (i = 0; i < MaxWorkers; i++)
@@ -162,7 +164,8 @@ LocalProxy::recvCONF(Signal* signal, SsP
   BlockReference ref = signal->getSendersBlockRef();
   ndbrequire(refToMain(ref) == number());
 
-  ss.m_worker = refToInstance(ref) - 1;
+  Uint32 ino = refToInstance(ref);
+  ss.m_worker = workerIndex(ino);
   ndbrequire(ref == workerRef(ss.m_worker));
   ndbrequire(ss.m_worker < c_workers);
   ndbrequire(ss.m_workerMask.get(ss.m_worker));
@@ -219,17 +222,24 @@ LocalProxy::lastReply(const SsParallel& 
 void
 LocalProxy::loadWorkers()
 {
-  c_workers = getLqhWorkers();
+  c_lqhWorkers = getLqhWorkers();
+  c_workers = c_lqhWorkers + c_extraWorkers;
 
   Uint32 i;
   for (i = 0; i < c_workers; i++) {
-    const Uint32 instanceNo = 1 + i;
+    jam();
+    Uint32 instanceNo = workerInstance(i);
+
     SimulatedBlock* worker = newWorker(instanceNo);
     ndbrequire(worker->instance() == instanceNo);
     ndbrequire(this->getInstance(instanceNo) == worker);
     c_worker[i] = worker;
 
-    add_lqh_worker_thr_map(number(), instanceNo);
+    if (i < c_lqhWorkers) {
+      add_lqh_worker_thr_map(number(), instanceNo);
+    } else {
+      add_extra_worker_thr_map(number(), instanceNo);
+    }
   }
 }
 

=== modified file 'storage/ndb/src/kernel/blocks/LocalProxy.hpp'
--- a/storage/ndb/src/kernel/blocks/LocalProxy.hpp	2008-08-27 08:00:52 +0000
+++ b/storage/ndb/src/kernel/blocks/LocalProxy.hpp	2008-11-15 15:43:59 +0000
@@ -33,10 +33,17 @@
  *
  * The LQH proxy is the LQH block seen by other nodes and blocks,
  * unless by-passed for efficiency.  Real LQH instances (workers)
- * run behind it.
+ * run behind it.  The instance number is 1 + worker index.
  *
- * There are also ACC,TUP,TUX,BACKUP,RESTORE proxies and workers.
- * All proxy classes are subclasses of LocalProxy.
+ * There are also proxies and workers for ACC, TUP, TUX, BACKUP,
+ * RESTORE, and PGMAN.  Proxy classes are subclasses of LocalProxy.
+ * Workers with same instance number (one from each class) run in
+ * same thread.
+ *
+ * After LQH workers there is an optional extra worker.  It runs
+ * in the thread of the main block (i.e. the proxy).  Its instance
+ * number is fixed as 1 + MaxLqhWorkers (currently 5) i.e. it skips
+ * over any unused LQH instance numbers.
  */
 
 class LocalProxy : public SimulatedBlock {
@@ -46,24 +53,64 @@ public:
   BLOCK_DEFINES(LocalProxy);
 
 protected:
-  enum { MaxWorkers = MAX_NDBMT_LQH_WORKERS };
-  typedef Bitmask<MaxWorkers> WorkerMask;
+  enum { MaxLqhWorkers = MAX_NDBMT_LQH_WORKERS };
+  enum { MaxExtraWorkers = 1 };
+  enum { MaxWorkers = MaxLqhWorkers + MaxExtraWorkers };
+  typedef Bitmask<(MaxWorkers+31)/32> WorkerMask;
+  Uint32 c_lqhWorkers;
+  Uint32 c_extraWorkers;
   Uint32 c_workers;
+  // no gaps - extra worker has index c_lqhWorkers (not MaxLqhWorkers)
   SimulatedBlock* c_worker[MaxWorkers];
 
   virtual SimulatedBlock* newWorker(Uint32 instanceNo) = 0;
   virtual void loadWorkers();
 
-  // worker index to worker instance
-  Uint32 workerInstance(Uint32 i) {
+  // get worker block by index (not by instance)
+
+  SimulatedBlock* workerBlock(Uint32 i) {
     ndbrequire(i < c_workers);
-    return 1 + i;
+    ndbrequire(c_worker[i] != 0);
+    return c_worker[i];
+  }
+
+  SimulatedBlock* extraWorkerBlock() {
+    return workerBlock(c_lqhWorkers);
   }
 
-  // worker index to worker ref
+  // get worker block reference by index (not by instance)
+
   BlockReference workerRef(Uint32 i) {
+    return numberToRef(number(), workerInstance(i), getOwnNodeId());
+  }
+
+  BlockReference extraWorkerRef() {
+    ndbrequire(c_workers == c_lqhWorkers + 1);
+    Uint32 i = c_lqhWorkers;
+    return workerRef(i);
+  }
+
+  // convert between worker index and worker instance
+
+  Uint32 workerInstance(Uint32 i) {
+    ndbrequire(i < c_workers);
+    Uint32 ino;
+    if (i < c_lqhWorkers)
+      ino = 1 + i;
+    else
+      ino = 1 + MaxLqhWorkers;
+    return ino;
+  }
+
+  Uint32 workerIndex(Uint32 ino) {
+    ndbrequire(ino != 0);
+    Uint32 i;
+    if (ino != 1 + MaxLqhWorkers)
+      i = ino - 1;
+    else
+      i = c_lqhWorkers;
     ndbrequire(i < c_workers);
-    return numberToRef(number(), 1 + i, getOwnNodeId());
+    return i;
   }
 
   // support routines and classes ("Ss" = signal state)

=== modified file 'storage/ndb/src/kernel/vm/SimulatedBlock.cpp'
--- a/storage/ndb/src/kernel/vm/SimulatedBlock.cpp	2008-10-20 13:23:01 +0000
+++ b/storage/ndb/src/kernel/vm/SimulatedBlock.cpp	2008-11-15 15:43:59 +0000
@@ -66,7 +66,6 @@ SimulatedBlock::SimulatedBlock(BlockNumb
     theNumber(blockNumber),
     theInstance(instanceNumber),
     theReference(numberToRef(blockNumber, instanceNumber, globalData.ownId)),
-    theInstanceCount(0),
     theInstanceList(0),
     theMainInstance(0),
     m_ctx(ctx),
@@ -89,31 +88,26 @@ SimulatedBlock::SimulatedBlock(BlockNumb
   m_jamBuffer = (EmulatedJamBuffer *)NdbThread_GetTlsKey(NDB_THREAD_TLS_JAM);
   NewVarRef = 0;
   
-  SimulatedBlock* main = globalData.getBlock(blockNumber);
+  SimulatedBlock* mainBlock = globalData.getBlock(blockNumber);
 
   if (theInstance == 0) {
-    ndbrequire(main == 0);
-    main = this;
-    globalData.setBlock(blockNumber, main);
-    main->theInstanceCount = 1;
+    ndbrequire(mainBlock == 0);
+    mainBlock = this;
+    globalData.setBlock(blockNumber, mainBlock);
   } else {
-    ndbrequire(main != 0);
-    ndbrequire(theInstance == main->theInstanceCount);
-    ndbrequire(theInstance < MaxInstances);
-    if (theInstance == 1) {
-      ndbrequire(main->theInstanceList == 0);
-      main->theInstanceList = new SimulatedBlock* [MaxInstances];
+    ndbrequire(mainBlock != 0);
+    if (mainBlock->theInstanceList == 0) {
+      mainBlock->theInstanceList = new SimulatedBlock* [MaxInstances];
+      ndbrequire(mainBlock->theInstanceList != 0);
       Uint32 i;
       for (i = 0; i < MaxInstances; i++)
-        main->theInstanceList[i] = 0;
-    } else {
-      ndbrequire(main->theInstanceList != 0);
-    }
-    ndbrequire(main->theInstanceList[theInstance] == 0);
-    main->theInstanceList[theInstance] = this;
-    main->theInstanceCount = theInstance + 1;
+        mainBlock->theInstanceList[i] = 0;
+    }
+    ndbrequire(theInstance < MaxInstances);
+    ndbrequire(mainBlock->theInstanceList[theInstance] == 0);
+    mainBlock->theInstanceList[theInstance] = this;
   }
-  theMainInstance = main;
+  theMainInstance = mainBlock;
 
   c_fragmentIdCounter = 1;
   c_fragSenderRunning = false;
@@ -172,7 +166,7 @@ SimulatedBlock::~SimulatedBlock()
 
   if (theInstanceList != 0) {
     Uint32 i;
-    for (i = 0; i < theInstanceCount; i++)
+    for (i = 0; i < MaxInstances; i++)
       delete theInstanceList[i];
     delete [] theInstanceList;
   }
@@ -3016,7 +3010,9 @@ bool
 SimulatedBlock::debugOutOn()
 {
   SignalLoggerManager::LogMode mask = SignalLoggerManager::LogInOut;
-  return globalSignalLoggers.logMatch(number(), mask);
+  return
+    globalData.testOn &&
+    globalSignalLoggers.logMatch(number(), mask);
 }
 
 const char*

=== modified file 'storage/ndb/src/kernel/vm/SimulatedBlock.hpp'
--- a/storage/ndb/src/kernel/vm/SimulatedBlock.hpp	2008-11-06 16:52:59 +0000
+++ b/storage/ndb/src/kernel/vm/SimulatedBlock.hpp	2008-11-15 15:43:59 +0000
@@ -145,19 +145,13 @@ public:
   Uint32 instance() const {
     return theInstance;
   }
-  Uint32 getWorkerCount() const {
-    ndbrequire(theInstance == 0); // valid only on main instance
-    ndbrequire(theInstanceCount >= 1);
-    return theInstanceCount - 1;
-  }
   SimulatedBlock* getInstance(Uint32 instanceNumber) {
-    ndbrequire(theInstance == 0);
+    ndbrequire(theInstance == 0); // valid only on main instance
     if (instanceNumber == 0)
       return this;
-    if (instanceNumber < theInstanceCount) {
-      ndbrequire(theInstanceList != 0);
+    ndbrequire(instanceNumber < MaxInstances);
+    if (theInstanceList != 0)
       return theInstanceList[instanceNumber];
-    }
     return 0;
   }
   virtual void loadWorkers() {}
@@ -501,8 +495,7 @@ private:
    * In MT LQH main instance is the LQH proxy and the others ("workers")
    * are real LQHs run by multiple threads.
    */
-  enum { MaxInstances = 1 + MAX_NDBMT_LQH_WORKERS };
-  Uint32 theInstanceCount;          // set in main
+  enum { MaxInstances = 1 + MAX_NDBMT_LQH_WORKERS + 1 }; // main+lqh+extra
   SimulatedBlock** theInstanceList; // set in main, indexed by instance
   SimulatedBlock* theMainInstance;  // set in all
   /*

=== modified file 'storage/ndb/src/kernel/vm/dummy_nonmt.cpp'
--- a/storage/ndb/src/kernel/vm/dummy_nonmt.cpp	2008-08-15 11:01:41 +0000
+++ b/storage/ndb/src/kernel/vm/dummy_nonmt.cpp	2008-11-15 15:43:59 +0000
@@ -18,3 +18,9 @@ add_lqh_worker_thr_map(Uint32, Uint32)
 {
   assert(false);
 }
+
+void
+add_extra_worker_thr_map(Uint32, Uint32)
+{
+  assert(false);
+}

=== modified file 'storage/ndb/src/kernel/vm/mt.cpp'
--- a/storage/ndb/src/kernel/vm/mt.cpp	2008-11-10 10:04:46 +0000
+++ b/storage/ndb/src/kernel/vm/mt.cpp	2008-11-15 15:43:59 +0000
@@ -68,15 +68,15 @@ static const Uint32 MAX_SIGNALS_BEFORE_W
 
 //#define NDB_MT_LOCK_TO_CPU
 
-#define MAX_BLOCK_INSTANCES (1 + MAX_NDBMT_LQH_WORKERS)
+#define MAX_BLOCK_INSTANCES (1 + MAX_NDBMT_LQH_WORKERS + 1) //main+lqh+extra
 #define NUM_MAIN_THREADS 2 // except receiver
 #define MAX_THREADS (NUM_MAIN_THREADS + MAX_NDBMT_LQH_THREADS + 1)
 
 /* If this is too small it crashes before first signal. */
-#define MAX_INSTANCES_PER_THREAD 16
+#define MAX_INSTANCES_PER_THREAD (16 + 8 * MAX_NDBMT_LQH_THREADS)
 
-static Uint32 ndbmt_workers = 0;
-static Uint32 ndbmt_threads = 0;
+static Uint32 num_lqh_workers = 0;
+static Uint32 num_lqh_threads = 0;
 static Uint32 num_threads = 0;
 static Uint32 receiver_thread_no = 0;
 
@@ -2002,6 +2002,42 @@ check_queues_empty(void *data)
 }
 
 /*
+ * Map instance number to real instance on this node.  Used in
+ * sendlocal/sendprioa to find right thread and in execute_signals
+ * to find right block instance.  SignalHeader is not modified.
+ */
+
+static Uint8 g_map_instance[MAX_BLOCK_INSTANCES];
+
+static void
+map_instance_init()
+{
+  g_map_instance[0] = 0;
+  Uint32 ino;
+  for (ino = 1; ino < MAX_BLOCK_INSTANCES; ino++) {
+    if (!globalData.isNdbMtLqh) {
+      g_map_instance[ino] = 0;
+    } else {
+      assert(num_lqh_workers != 0);
+      if (ino <= MAX_NDBMT_LQH_WORKERS) {
+        g_map_instance[ino] = 1 + (ino - 1) % num_lqh_workers;
+      } else {
+        /* Extra workers are not mapped. */
+        g_map_instance[ino] = ino;
+      }
+    }
+  }
+}
+
+static inline Uint32
+map_instance(const SignalHeader *s)
+{
+  Uint32 ino = blockToInstance(s->theReceiversBlockNumber);
+  assert(ino < MAX_BLOCK_INSTANCES);
+  return g_map_instance[ino];
+}
+
+/*
  * Execute at most MAX_SIGNALS signals from one job queue, updating local read
  * state as appropriate.
  *
@@ -2069,17 +2105,9 @@ execute_signals(thr_data *selfptr, thr_j
       __builtin_prefetch (read_buffer->m_data + read_pos + 32, 0, 3);
 #endif
     Uint32 bno = blockToMain(s->theReceiversBlockNumber);
-    SimulatedBlock* block = globalData.getBlock(bno);
-
-    if (ndbmt_workers != 0) { // MT LQH
-      Uint32 ino = blockToInstance(s->theReceiversBlockNumber);
-      if (ino != 0) {
-        // map instance key to real instance
-        ino = 1 + (ino - 1) % ndbmt_workers;
-        block = block->getInstance(ino);
-        assert(block != 0);
-      }
-    }
+    Uint32 ino = map_instance(s);
+    SimulatedBlock* block = globalData.getBlock(bno, ino);
+    assert(block != 0);
 
     Uint32 gsn = s->theVerId_signalNumber;
     *watchDogCounter = 1;
@@ -2166,6 +2194,17 @@ struct thr_map_entry {
 
 static struct thr_map_entry thr_map[NO_OF_BLOCKS][MAX_BLOCK_INSTANCES];
 
+static inline Uint32
+block2ThreadId(Uint32 block, Uint32 instance)
+{
+  assert(block >= MIN_BLOCK_NO && block <= MAX_BLOCK_NO);
+  Uint32 index = block - MIN_BLOCK_NO;
+  assert(instance < MAX_BLOCK_INSTANCES);
+  const thr_map_entry& entry = thr_map[index][instance];
+  assert(entry.thr_no < num_threads);
+  return entry.thr_no;
+}
+
 void
 add_thr_map(Uint32 main, Uint32 instance, Uint32 thr_no)
 {
@@ -2243,19 +2282,17 @@ add_lqh_worker_thr_map(Uint32 block, Uin
 {
   assert(instance != 0);
   Uint32 i = instance - 1;
-  Uint32 thr_no = NUM_MAIN_THREADS + i % ndbmt_threads;
+  Uint32 thr_no = NUM_MAIN_THREADS + i % num_lqh_threads;
   add_thr_map(block, instance, thr_no);
 }
 
-static inline Uint32
-block2ThreadId(Uint32 block, Uint32 instance)
+/* Extra workers run`in proxy thread. */
+void
+add_extra_worker_thr_map(Uint32 block, Uint32 instance)
 {
-  assert(block >= MIN_BLOCK_NO && block <= MAX_BLOCK_NO);
-  Uint32 index = block - MIN_BLOCK_NO;
-  assert(instance < MAX_BLOCK_INSTANCES);
-  const thr_map_entry& entry = thr_map[index][instance];
-  assert(entry.thr_no < num_threads);
-  return entry.thr_no;
+  assert(instance != 0);
+  Uint32 thr_no = block2ThreadId(block, 0);
+  add_thr_map(block, instance, thr_no);
 }
 
 static void reportSignalStats(Uint32 self, Uint32 a_count, Uint32 a_size,
@@ -2535,13 +2572,7 @@ sendlocal(Uint32 self, const SignalHeade
           const Uint32 secPtr[3])
 {
   Uint32 block = blockToMain(s->theReceiversBlockNumber);
-  Uint32 instance = 0;
-
-  if (ndbmt_workers != 0) { // MT LQH
-    instance = blockToInstance(s->theReceiversBlockNumber);
-    if (instance != 0)
-      instance = 1 + (instance - 1) % ndbmt_workers;
-  }
+  Uint32 instance = map_instance(s);
 
   /*
    * Max number of signals to put into job buffer before flushing the buffer
@@ -2576,13 +2607,7 @@ sendprioa(Uint32 self, const SignalHeade
           const Uint32 secPtr[3])
 {
   Uint32 block = blockToMain(s->theReceiversBlockNumber);
-  Uint32 instance = 0;
-
-  if (ndbmt_workers != 0) { // MT LQH
-    instance = blockToInstance(s->theReceiversBlockNumber);
-    if (instance != 0)
-      instance = 1 + (instance - 1) % ndbmt_workers;
-  }
+  Uint32 instance = map_instance(s);
 
   Uint32 dst = block2ThreadId(block, instance);
   struct thr_repository* rep = &g_thr_repository;
@@ -2684,7 +2709,7 @@ sendprioa_STOP_FOR_CRASH(Uint32 dst)
     main = NDBCNTR;
   else if (dst == 1)
     main = DBLQH;
-  else if (dst >= NUM_MAIN_THREADS && dst < NUM_MAIN_THREADS + ndbmt_threads)
+  else if (dst >= NUM_MAIN_THREADS && dst < NUM_MAIN_THREADS + num_lqh_threads)
   {
     main = DBLQH;
     instance = dst - NUM_MAIN_THREADS + 1;
@@ -2879,14 +2904,15 @@ ThreadConfig::~ThreadConfig()
 void
 ThreadConfig::init(EmulatorData *emulatorData)
 {
-  ndbmt_workers = globalData.ndbMtLqhWorkers;
-  ndbmt_threads = globalData.ndbMtLqhThreads;
-  num_threads = NUM_MAIN_THREADS + ndbmt_threads + 1;
+  num_lqh_workers = globalData.ndbMtLqhWorkers;
+  num_lqh_threads = globalData.ndbMtLqhThreads;
+  num_threads = NUM_MAIN_THREADS + num_lqh_threads + 1;
   assert(num_threads <= MAX_THREADS);
   receiver_thread_no = num_threads - 1;
 
   ndbout << "NDBMT: num_threads=" << num_threads << endl;
 
+  ::map_instance_init();
   ::rep_init(&g_thr_repository, num_threads, emulatorData->m_mem_manager);
 }
 
@@ -3364,7 +3390,7 @@ FastScheduler::dumpSignalMemory(Uint32 t
       siglen = 25;              // Sanity check
     memcpy(&signal.header, s, 4*siglen);
     // instance number in trace file is confusing if not MT LQH
-    if (ndbmt_workers == 0)
+    if (num_lqh_workers == 0)
       signal.header.theReceiversBlockNumber &= NDBMT_BLOCK_MASK;
 
     const Uint32 *posptr = reinterpret_cast<const Uint32 *>(s);

=== modified file 'storage/ndb/src/kernel/vm/mt.hpp'
--- a/storage/ndb/src/kernel/vm/mt.hpp	2008-10-08 19:09:05 +0000
+++ b/storage/ndb/src/kernel/vm/mt.hpp	2008-11-15 15:43:59 +0000
@@ -33,6 +33,7 @@ extern Uint32 receiverThreadId;
 void add_thr_map(Uint32 block, Uint32 instance, Uint32 thr_no);
 void add_main_thr_map();
 void add_lqh_worker_thr_map(Uint32 block, Uint32 instance);
+void add_extra_worker_thr_map(Uint32 block, Uint32 instance);
 
 void sendlocal(Uint32 self, const struct SignalHeader *s,
                const Uint32 *data, const Uint32 secPtr[3]);

Thread
bzr commit into mysql-5.1 branch (pekka:3080) WL#4391Pekka Nousiainen15 Nov