List:Commits« Previous MessageNext Message »
From:Jonas Oreland Date:November 7 2008 1:30pm
Subject:bzr push into mysql-5.1 branch (jonas:3054 to 3055)
View as plain text  
 3055 Jonas Oreland	2008-11-07
      ndbmtd - add lock-to-cpu and realtime settings
modified:
  storage/ndb/src/kernel/main.cpp
  storage/ndb/src/kernel/vm/Configuration.cpp
  storage/ndb/src/kernel/vm/Configuration.hpp
  storage/ndb/src/kernel/vm/ThreadConfig.cpp
  storage/ndb/src/kernel/vm/ThreadConfig.hpp
  storage/ndb/src/kernel/vm/mt.cpp
  storage/ndb/src/mgmsrv/ConfigInfo.cpp

 3054 Jonas Oreland	2008-11-07 [merge]
      merge 64-main
modified:
  BUILD/SETUP.sh
  mysql-test/suite/ndb/t/ndb_dbug_lock.test
  sql/mysqld.cc
  storage/ndb/include/ndbinfo.h
  storage/ndb/src/common/debugger/EventLogger.cpp
  storage/ndb/src/common/logger/FileLogHandler.cpp
  storage/ndb/src/common/transporter/TransporterRegistry.cpp
  storage/ndb/src/cw/cpcd/APIService.cpp
  storage/ndb/src/cw/cpcd/Process.cpp
  storage/ndb/src/kernel/blocks/dbdih/DbdihMain.cpp
  storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp
  storage/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp
  storage/ndb/src/kernel/vm/ndbd_malloc_impl.cpp
  storage/ndb/src/mgmsrv/Services.cpp
  storage/ndb/src/ndbapi/ObjectMap.cpp
  support-files/compiler_warnings.supp

=== modified file 'storage/ndb/src/common/util/Bitmask.cpp'
--- a/storage/ndb/src/common/util/Bitmask.cpp	2008-11-06 16:29:57 +0000
+++ b/storage/ndb/src/common/util/Bitmask.cpp	2008-11-07 08:37:04 +0000
@@ -129,7 +129,7 @@ BitmaskImpl::parseMask(unsigned size, Ui
     if (list[i].empty())
       continue;
     unsigned num = 0;
-    char * delim = strchr(list[i].c_str(), '-');
+    char * delim = (char*)strchr(list[i].c_str(), '-');
     unsigned first = 0;
     unsigned last = 0;
     if (delim == 0)

=== modified file 'storage/ndb/src/kernel/main.cpp'
--- a/storage/ndb/src/kernel/main.cpp	2008-11-06 10:17:49 +0000
+++ b/storage/ndb/src/kernel/main.cpp	2008-11-07 13:23:15 +0000
@@ -661,7 +661,7 @@ int main(int argc, char** argv)
     NdbThread *pThis = NdbThread_CreateObject(0);
     Uint32 inx = globalEmulatorData.theConfiguration->addThread(pThis,
                                                                 MainThread);
-    globalEmulatorData.theThreadConfig->ipControlLoop(inx);
+    globalEmulatorData.theThreadConfig->ipControlLoop(pThis, inx);
     globalEmulatorData.theConfiguration->removeThreadId(inx);
   }
   NdbShutdown(NST_Normal);

=== modified file 'storage/ndb/src/kernel/vm/Configuration.cpp'
--- a/storage/ndb/src/kernel/vm/Configuration.cpp	2008-11-06 16:29:57 +0000
+++ b/storage/ndb/src/kernel/vm/Configuration.cpp	2008-11-07 13:23:15 +0000
@@ -471,15 +471,18 @@ Configuration::setupConfiguration(){
 	      "RealtimeScheduler missing");
   }
 
-  if(iter.get(CFG_DB_EXECUTE_LOCK_CPU, &_executeLockCPU)){
-    ERROR_SET(fatal, NDBD_EXIT_INVALID_CONFIG, "Invalid configuration fetched", 
-	      "LockExecuteThreadToCPU missing");
+  const char * mask;
+  if(iter.get(CFG_DB_EXECUTE_LOCK_CPU, &mask) == 0)
+  {
+    if (_executeLockCPU.parseMask(mask) < 0)
+    {
+      _executeLockCPU.clear();
+    }
   }
 
-  if(iter.get(CFG_DB_MAINT_LOCK_CPU, &_maintLockCPU)){
-    ERROR_SET(fatal, NDBD_EXIT_INVALID_CONFIG, "Invalid configuration fetched", 
-	      "LockMaintThreadsToCPU missing");
-  }
+  _maintLockCPU = NO_LOCK_CPU;
+  iter.get(CFG_DB_MAINT_LOCK_CPU, &_maintLockCPU);
+
   if(iter.get(CFG_DB_WATCHDOG_INTERVAL_INITIAL, &_timeBetweenWatchDogCheckInitial)){
     ERROR_SET(fatal, NDBD_EXIT_INVALID_CONFIG, "Invalid configuration fetched", 
 	      "TimeBetweenWatchDogCheckInitial missing");
@@ -570,16 +573,38 @@ Configuration::realtimeScheduler(bool re
 Uint32
 Configuration::executeLockCPU() const
 {
-  return _executeLockCPU;
+  unsigned res = _executeLockCPU.find(0);
+  if (res == _executeLockCPU.NotFound)
+    return NO_LOCK_CPU;
+  else
+    return res;
 }
 
 void
 Configuration::executeLockCPU(Uint32 value)
 {
-  Uint32 old_value = _executeLockCPU;
-  _executeLockCPU = value;
-  if (value != old_value)
+  if (value >= NDB_CPU_MASK_SZ)
+  {
+    value = NO_LOCK_CPU;
+  }
+
+  bool changed = false;
+  if (value == NO_LOCK_CPU)
+  {
+    changed = _executeLockCPU.count() > 0;
+    _executeLockCPU.clear();
+  }
+  else
+  {
+    changed = _executeLockCPU.get(value) == false;
+    _executeLockCPU.clear();
+    _executeLockCPU.set(value);
+  }
+
+  if (changed)
+  {
     setAllLockCPU(TRUE);
+  }
 }
 
 Uint32
@@ -1083,7 +1108,7 @@ Configuration::setLockCPU(NdbThread * pT
       (!exec_thread && type == MainThread))
     return 0;
   if (type == MainThread)
-    cpu_id = _executeLockCPU;
+    cpu_id = executeLockCPU();
   else
     cpu_id = _maintLockCPU;
   if (!init ||
@@ -1121,7 +1146,14 @@ Configuration::addThread(struct NdbThrea
   threadInfo[i].type = type;
   NdbMutex_Unlock(threadIdMutex);
   setRealtimeScheduler(pThread, type, _realtimeScheduler, TRUE);
-  setLockCPU(pThread, type, (type == MainThread), TRUE);
+  if (type != MainThread)
+  {
+    /**
+     * main threads are set in ThreadConfig::ipControlLoop
+     * as it's handled differently with mt
+     */
+    setLockCPU(pThread, type, (type == MainThread), TRUE);
+  }
   return i;
 }
 

=== modified file 'storage/ndb/src/kernel/vm/Configuration.hpp'
--- a/storage/ndb/src/kernel/vm/Configuration.hpp	2008-11-06 10:17:49 +0000
+++ b/storage/ndb/src/kernel/vm/Configuration.hpp	2008-11-07 13:23:15 +0000
@@ -21,6 +21,7 @@
 #include <ndb_types.h>
 #include <NdbMutex.h>
 #include <NdbThread.h>
+#include <Bitmask.hpp>
 
 enum ThreadTypes
 {
@@ -34,6 +35,7 @@ enum ThreadTypes
 
 #define MAX_NDB_THREADS 256
 #define NO_LOCK_CPU 65535
+#define NDB_CPU_MASK_SZ 256
 
 struct ThreadInfo
 {
@@ -72,6 +74,9 @@ public:
 
   Uint32 executeLockCPU() const;
   void executeLockCPU(Uint32 value);
+  const Bitmask<NDB_CPU_MASK_SZ/32> & getExecuteCpuMask() const {
+    return _executeLockCPU;
+  }
 
   Uint32 maintLockCPU() const;
   void maintLockCPU(Uint32 value);
@@ -140,7 +145,7 @@ private:
   Uint32 _schedulerExecutionTimer;
   Uint32 _schedulerSpinTimer;
   Uint32 _realtimeScheduler;
-  Uint32 _executeLockCPU;
+  Bitmask<NDB_CPU_MASK_SZ/32> _executeLockCPU;
   Uint32 _maintLockCPU;
   Uint32 _timeBetweenWatchDogCheckInitial;
 

=== modified file 'storage/ndb/src/kernel/vm/ThreadConfig.cpp'
--- a/storage/ndb/src/kernel/vm/ThreadConfig.cpp	2008-01-01 12:45:11 +0000
+++ b/storage/ndb/src/kernel/vm/ThreadConfig.cpp	2008-11-07 13:23:15 +0000
@@ -103,8 +103,10 @@ ThreadConfig::scanTimeQueue()
 // The timeout value in this call is calculated as (10 ms - laptime)
 // This would make ndb use less cpu while improving response time.
 //--------------------------------------------------------------------
-void ThreadConfig::ipControlLoop(Uint32 thread_index)
+void ThreadConfig::ipControlLoop(NdbThread*, Uint32 thread_index)
 {
+  globalEmulatorData.theConfiguration->setAllLockCPU(true);
+
   Uint32 execute_loop_constant =
         globalEmulatorData.theConfiguration->schedulerExecutionTimer();
   Uint32 min_spin_time = 

=== modified file 'storage/ndb/src/kernel/vm/ThreadConfig.hpp'
--- a/storage/ndb/src/kernel/vm/ThreadConfig.hpp	2008-01-01 12:45:11 +0000
+++ b/storage/ndb/src/kernel/vm/ThreadConfig.hpp	2008-11-07 13:23:15 +0000
@@ -29,7 +29,7 @@ public:
   ~ThreadConfig();
   void init(EmulatorData *emulatorData);
 
-  void ipControlLoop(Uint32 thread_index);
+  void ipControlLoop(NdbThread*, Uint32 thread_index);
 
   int doStart(NodeState::StartLevel startLevel);
 private:

=== modified file 'storage/ndb/src/kernel/vm/mt.cpp'
--- a/storage/ndb/src/kernel/vm/mt.cpp	2008-11-03 14:03:15 +0000
+++ b/storage/ndb/src/kernel/vm/mt.cpp	2008-11-07 13:23:15 +0000
@@ -39,6 +39,7 @@
 #include <DebuggerNames.hpp>
 #include <signaldata/StopForCrash.hpp>
 #include "TransporterCallbackKernel.hpp"
+#include <NdbSleep.h>
 
 #include "mt-asm.h"
 
@@ -724,6 +725,9 @@ struct thr_data
   BlockNumber m_instance_list[MAX_INSTANCES_PER_THREAD];
 
   SectionSegmentPool::Cache m_sectionPoolCache;
+
+  Uint32 m_cpu;
+  NdbThread* m_thread;
 };
 
 struct mt_send_handle  : public TransporterSendBufferHandle
@@ -2294,48 +2298,40 @@ init_thread(thr_data *selfptr)
   selfptr->m_waiter.init();
   selfptr->m_jam.theEmulatedJamIndex = 0;
   selfptr->m_jam.theEmulatedJamBlockNumber = 0;
-  memset(selfptr->m_jam.theEmulatedJam, 0, sizeof(selfptr->m_jam.theEmulatedJam));
+  bzero(selfptr->m_jam.theEmulatedJam, sizeof(selfptr->m_jam.theEmulatedJam));
   NdbThread_SetTlsKey(NDB_THREAD_TLS_JAM, &selfptr->m_jam);
+  NdbThread_SetTlsKey(NDB_THREAD_TLS_THREAD, selfptr);
 
   unsigned thr_no = selfptr->m_thr_no;
   globalEmulatorData.theWatchDog->registerWatchedThread(&selfptr->m_watchdog_counter,
                                                         thr_no);
-  BaseString tmp;
+  {
+    while(selfptr->m_thread == 0)
+      NdbSleep_MilliSleep(30);
+  }
 
-  NdbThread_SetTlsKey(NDB_THREAD_TLS_THREAD, selfptr);
+  BaseString tmp;
   tmp.appfmt("thr: %u ", thr_no);
-#ifdef SYS_gettid
-  tmp.appfmt("tid: %u ", (unsigned)syscall(SYS_gettid));
-#endif
 
-#if 0
-#if defined SYS_gettid && defined HAVE_SCHED_SETAFFINITY
-  bool lock_to_cpu = false;
-  if (lock_to_cpu)
-  {
-    uint cpu_no = 1;
-    if (!(thr_no <= 1 || thr_no == num_threads - 1))
-    {
-      cpu_no = (thr_no & 3);
-    }
-    tmp.appfmt("cpu: %u ", cpu_no);
-    {
-      unsigned tid = (unsigned)syscall(SYS_gettid);
-      cpu_set_t mask;
-      CPU_ZERO(&mask);
-      CPU_SET(cpu_no, &mask);
-      if (sched_setaffinity(tid, sizeof(mask), &mask) == 0)
-      {
-        tmp.appfmt("OK ");
-      }
-      else
-      {
-        tmp.appfmt("err: %u ", errno);
-      }
+  int tid = NdbThread_GetTid(selfptr->m_thread);
+  if (tid != -1)
+  {
+    tmp.appfmt("tid: %u ", (unsigned)syscall(SYS_gettid));
+  }
+
+  if (selfptr->m_cpu != NO_LOCK_CPU)
+  {
+    tmp.appfmt("cpu: %u ", selfptr->m_cpu);
+    int res = NdbThread_LockCPU(selfptr->m_thread, selfptr->m_cpu);
+    if (res == 0)
+    {
+      tmp.appfmt("OK ");
+    }
+    else
+    {
+      tmp.appfmt("err: %u ", res);
     }
   }
-#endif
-#endif
   
   for (Uint32 i = 0; i < selfptr->m_instance_count; i++) 
   {
@@ -2790,6 +2786,9 @@ thr_init(struct thr_repository* rep, str
     selfptr->m_instance_list[i] = 0;
 
   bzero(&selfptr->m_send_buffers, sizeof(selfptr->m_send_buffers));
+
+  selfptr->m_thread = 0;
+  selfptr->m_cpu = NO_LOCK_CPU;
 }
 
 /* Have to do this after init of all m_in_queues is done. */
@@ -2885,11 +2884,106 @@ ThreadConfig::init(EmulatorData *emulato
   ::rep_init(&g_thr_repository, num_threads, emulatorData->m_mem_manager);
 }
 
-void ThreadConfig::ipControlLoop(Uint32 thread_index)
+static
+void
+setcpuaffinity(struct thr_repository* rep)
+{
+  Bitmask<NDB_CPU_MASK_SZ/32> mask =
+    globalEmulatorData.theConfiguration->getExecuteCpuMask();
+
+
+  bool mtlqh = globalData.ndbMtLqhThreads > 0;
+  unsigned cnt = mask.count();
+  if (cnt == 0)
+  {
+    return;
+  }
+  else if (cnt >= num_threads)
+  {
+    unsigned cpu = mask.find(0);
+    for (unsigned thr_no = 0; thr_no < num_threads; thr_no++)
+    {
+      rep->m_thread[thr_no].m_cpu = cpu;
+      cpu = mask.find(cpu + 1);
+    }
+  }
+  else if (cnt == 1)
+  {
+    unsigned cpu = mask.find(0);
+    for (unsigned thr_no = 0; thr_no < num_threads; thr_no++)
+    {
+      rep->m_thread[thr_no].m_cpu = cpu;
+    }
+  }
+  else if (mtlqh)
+  {
+    if (cnt > globalData.ndbMtLqhThreads)
+    {
+      /**
+       * let each LQH have it's on CPU and rest share...
+       */
+      // LQH threads start with 2
+      unsigned cpu = mask.find(0);
+      for (unsigned thr_no = 2; thr_no < num_threads - 1; thr_no++)
+      {
+        rep->m_thread[thr_no].m_cpu = cpu;
+        mask.clear(cpu);
+        cpu = mask.find(cpu + 1);
+      }
+
+      cpu = mask.find(0);
+      rep->m_thread[0].m_cpu = cpu; // TC
+      rep->m_thread[1].m_cpu = cpu; // backup/suma
+      if ((cpu = mask.find(cpu + 1)) == mask.NotFound)
+      {
+        cpu = mask.find(0);
+      }
+      rep->m_thread[receiver_thread_no].m_cpu = cpu; // receiver
+    }
+    else
+    {
+      // put receiver, tc, backup/suma in 1 thread, and round robin LQH for rest
+      unsigned cpu = mask.find(0);
+      rep->m_thread[0].m_cpu = cpu; // TC
+      rep->m_thread[1].m_cpu = cpu; // backup/suma
+      rep->m_thread[receiver_thread_no].m_cpu = cpu; // receiver
+      mask.clear(cpu);
+      cpu = mask.find(0);
+      for (unsigned thr_no = 2; thr_no < num_threads - 1; thr_no++)
+      {
+        rep->m_thread[thr_no].m_cpu = cpu;
+        cpu = mask.find(cpu + 1);
+        if (cpu == mask.NotFound)
+        {
+          cpu = mask.find(0);
+        }
+      }
+    }
+  }
+  else
+  {
+    /**
+     * mt-classic and cnt > 1
+     */
+    assert(num_threads == 3);
+    unsigned cpu = mask.find(0);
+    rep->m_thread[1].m_cpu = cpu; // LQH
+    cpu = mask.find(cpu + 1);
+    rep->m_thread[0].m_cpu = cpu;
+    rep->m_thread[2].m_cpu = cpu;
+  }
+}
+
+void
+ThreadConfig::ipControlLoop(NdbThread* pThis, Uint32 thread_index)
 {
   unsigned int thr_no;
   struct thr_repository* rep = &g_thr_repository;
-  NdbThread *threads[MAX_THREADS];
+
+  /**
+   * assign threads to CPU's
+   */
+  setcpuaffinity(rep);
 
   /*
    * Start threads for all execution threads, except for the receiver
@@ -2901,19 +2995,22 @@ void ThreadConfig::ipControlLoop(Uint32 
 
     if (thr_no == receiver_thread_no)
       continue;                 // Will run in the main thread.
+
     /*
      * The NdbThread_Create() takes void **, but that is cast to void * when
      * passed to the thread function. Which is kind of strange ...
      */
-    threads[thr_no] = NdbThread_Create(mt_job_thread_main,
-                                           (void **)(rep->m_thread + thr_no),
-                                           1024*1024,
-                                           "execute thread", //ToDo add number
-                                           NDB_THREAD_PRIO_MEAN);
-    assert(threads[thr_no] != NULL);
+    rep->m_thread[thr_no].m_thread =
+      NdbThread_Create(mt_job_thread_main,
+                       (void **)(rep->m_thread + thr_no),
+                       1024*1024,
+                       "execute thread", //ToDo add number
+                       NDB_THREAD_PRIO_MEAN);
+    assert(rep->m_thread[thr_no].m_thread != NULL);
   }
 
   /* Now run the main loop for thread 0 directly. */
+  rep->m_thread[receiver_thread_no].m_thread = pThis;
   mt_receiver_thread_main(&(rep->m_thread[receiver_thread_no]));
 
   /* Wait for all threads to shutdown. */
@@ -2922,8 +3019,8 @@ void ThreadConfig::ipControlLoop(Uint32 
     if (thr_no == receiver_thread_no)
       continue;
     void *dummy_return_status;
-    NdbThread_WaitFor(threads[thr_no], &dummy_return_status);
-    NdbThread_Destroy(&(threads[thr_no]));
+    NdbThread_WaitFor(rep->m_thread[thr_no].m_thread, &dummy_return_status);
+    NdbThread_Destroy(&(rep->m_thread[thr_no].m_thread));
   }
 }
 

=== modified file 'storage/ndb/src/mgmsrv/ConfigInfo.cpp'
--- a/storage/ndb/src/mgmsrv/ConfigInfo.cpp	2008-11-03 08:36:33 +0000
+++ b/storage/ndb/src/mgmsrv/ConfigInfo.cpp	2008-11-07 13:23:15 +0000
@@ -622,11 +622,11 @@ const ConfigInfo::ParamInfo ConfigInfo::
     CFG_DB_EXECUTE_LOCK_CPU,
     "LockExecuteThreadToCPU",
     DB_TOKEN,
-    "CPU ID indicating which CPU will run the execution thread",
+    "CPU list indicating which CPU will run the execution thread(s)",
     ConfigInfo::CI_USED,
     true,
-    ConfigInfo::CI_INT,
-    "65535",
+    ConfigInfo::CI_STRING,
+    UNDEFINED,
     "0",
     "65535" },
 
@@ -638,7 +638,7 @@ const ConfigInfo::ParamInfo ConfigInfo::
     ConfigInfo::CI_USED,
     true,
     ConfigInfo::CI_INT,
-    "65535",
+    UNDEFINED,
     "0",
     "65535" },
 

Thread
bzr push into mysql-5.1 branch (jonas:3054 to 3055) Jonas Oreland7 Nov