List:Commits« Previous MessageNext Message »
From:Jonas Oreland Date:January 25 2012 10:41am
Subject:bzr push into mysql-5.5-cluster-7.2 branch (jonas.oreland:3783 to 3785)
View as plain text  
 3785 Jonas Oreland	2012-01-25 [merge]
      ndb - merge 71 to 72

    modified:
      storage/ndb/src/kernel/vm/mt.cpp
      storage/ndb/src/mgmsrv/ConfigInfo.cpp
 3784 mikael.ronstrom@stripped	2012-01-25
      add --no-static-linking to BUILD/build_mccge.sh. Patch from mikael

    modified:
      BUILD/build_mccge.sh
 3783 Martin Skold	2012-01-24 [merge]
      Merge

    modified:
      mysql-test/suite/ndb/r/ndb_condition_pushdown.result
      mysql-test/suite/ndb/t/ndb_condition_pushdown.test
      sql/ha_ndbcluster_cond.cc
      sql/item_cmpfunc.h
=== modified file 'BUILD/build_mccge.sh'
--- a/BUILD/build_mccge.sh	2011-06-30 15:46:53 +0000
+++ b/BUILD/build_mccge.sh	2012-01-25 09:10:25 +0000
@@ -856,6 +856,9 @@ parse_options()
     --static-linking)
       static_linking_flag="yes"
       ;;
+    --no-static-linking)
+      static_linking_flag="no"
+      ;;
     --strip)
       strip_flag="yes"
       ;;
@@ -1763,8 +1766,10 @@ set_static_link_configs()
     loc_static_link="--with-mysqld-ldflags=\"-all-static\""
     loc_static_link="$loc_static_link --with-client-ldflags=\"-all-static\""
   else
-    loc_static_link="--with-mysqld-ldflags=\"-static\""
-    loc_static_link="$loc_static_link --with-client-ldflags=\"-static\""
+    if test "x$static_linking_flag" != "xno" ; then
+      loc_static_link="--with-mysqld-ldflags=\"-static\""
+      loc_static_link="$loc_static_link --with-client-ldflags=\"-static\""
+    fi
   fi
   base_configs="$base_configs $loc_static_link"
 }

=== modified file 'storage/ndb/src/kernel/vm/mt.cpp'
--- a/storage/ndb/src/kernel/vm/mt.cpp	2012-01-24 06:20:13 +0000
+++ b/storage/ndb/src/kernel/vm/mt.cpp	2012-01-25 10:39:40 +0000
@@ -576,6 +576,15 @@ public:
     validate();
   }
 
+  /**
+   * release everything if more than m_max_free
+   *   else do nothing
+   */
+  void release_chunk(Ndbd_mem_manager *mm, Uint32 rg) {
+    if (m_free > m_max_free)
+      release_all(mm, rg);
+  }
+
   void set_pool(thr_safe_pool<T> * pool) { m_global_pool = pool; }
 
 private:
@@ -1023,6 +1032,477 @@ struct thr_repository
   Uint32 stopped_threads;
 };
 
+/**
+ *  Class to handle send threads
+ *  ----------------------------
+ *  We can have up to 8 send threads.
+ *
+ *  This class will handle when a block thread needs to send, it will
+ *  handle the running of the send thread and will also start the
+ *  send thread.
+ */
+#define is_send_thread(thr_no) (thr_no >= num_threads)
+
+struct thr_send_thread_instance
+{
+  thr_send_thread_instance() :
+               m_instance_no(0),
+               m_watchdog_counter(0),
+               m_awake(FALSE),
+               m_thread(NULL),
+               m_waiter_struct(),
+               m_send_buffer_pool(0, THR_FREE_BUF_MAX)
+  {}
+  Uint32 m_instance_no;
+  Uint32 m_watchdog_counter;
+  Uint32 m_awake;
+  NdbThread *m_thread;
+  thr_wait m_waiter_struct;
+  class thread_local_pool<thr_send_page> m_send_buffer_pool;
+};
+
+struct thr_send_nodes
+{
+  /* 0 means NULL */
+  Uint16 m_next;
+  Uint16 m_data_available;
+};
+
+class thr_send_threads
+{
+public:
+  /* Create send thread environment */
+  thr_send_threads();
+
+  /* Destroy send thread environment and ensure threads are stopped */
+  ~thr_send_threads();
+
+  /* A block thread has flushed data for a node and wants it sent */
+  void alert_send_thread(NodeId node);
+
+  /* Method used to run the send thread */
+  void run_send_thread(Uint32 instance_no);
+
+  /* Method to start the send threads */
+  void start_send_threads();
+
+  /* Check if at least one node to send to */
+  bool data_available()
+  {
+    rmb();
+    return m_first_node;
+  }
+
+  bool data_available(NodeId node)
+  {
+    struct thr_send_nodes *node_state;
+
+    node_state = &m_node_state[node];
+    return node_state->m_data_available;
+  }
+
+  /* Get send buffer pool for send thread */
+  thread_local_pool<thr_send_page>* get_send_buffer_pool(Uint32 thr_no)
+  {
+    return &m_send_threads[thr_no - num_threads].m_send_buffer_pool;
+  }
+
+private:
+  /* Insert a node in list of nodes that has data available to send */
+  void insert_node(NodeId node);
+
+  /* Get a node from the list in order to send to it */
+  NodeId get_node();
+
+  /* Get a send thread which isn't awake currently */
+  struct thr_send_thread_instance* get_not_awake_send_thread();
+  /*
+   * Try to lock send for this node, if not successful, try another
+   * node ready for sending, unlock send thread mutex as part of
+   * the call.
+   */
+  NodeId check_and_lock_send_node(NodeId node);
+
+  /* Perform the actual send to the node */
+  int perform_send(NodeId node, Uint32 instance_no);
+
+  /* Have threads been started */
+  Uint32 m_started_threads;
+
+  /* First node that has data to be sent */
+  Uint32 m_first_node;
+
+  /* Last node in list of nodes with data available for sending */
+  Uint32 m_last_node;
+
+  /* Is data available and next reference for each node in cluster */
+  struct thr_send_nodes m_node_state[MAX_NODES];
+
+  /**
+   * Very few compiler (gcc) allow zero length arrays
+   */
+#if MAX_NDBMT_SEND_THREADS == 0
+#define _MAX_SEND_THREADS 1
+#else
+#define _MAX_SEND_THREADS MAX_NDBMT_SEND_THREADS
+#endif
+
+  /* Data and state for the send threads */
+  struct thr_send_thread_instance m_send_threads[_MAX_SEND_THREADS];
+
+  /**
+   * Mutex protecting the linked list of nodes awaiting sending
+   * and also the not_awake variable of the send thread.
+   */
+  NdbMutex *send_thread_mutex;
+};
+
+
+/*
+ * The single instance of the thr_send_threads class, if this variable
+ * is non-NULL, then we're using send threads, otherwise if NULL, there
+ * are no send threads.
+ */
+static thr_send_threads *g_send_threads = NULL;
+
+extern "C"
+void *
+mt_send_thread_main(void *thr_arg)
+{
+  struct thr_send_thread_instance *this_send_thread =
+    (thr_send_thread_instance*)thr_arg;
+
+  Uint32 instance_no = this_send_thread->m_instance_no;
+  ndbout_c("Send thread : %u is started", instance_no);
+  g_send_threads->run_send_thread(instance_no);
+  return NULL;
+}
+
+thr_send_threads::thr_send_threads()
+{
+  struct thr_repository *rep = &g_thr_repository;
+
+  m_started_threads = FALSE;
+  m_first_node = 0;
+  m_last_node = 0;
+  for (Uint32 i = 0; i < NDB_ARRAY_SIZE(m_node_state); i++)
+  {
+    m_node_state[i].m_next = 0;
+    m_node_state[i].m_data_available = FALSE;
+  }
+  for (Uint32 i = 0; i < NDB_ARRAY_SIZE(m_send_threads); i++)
+  {
+    m_send_threads[i].m_waiter_struct.init();
+    m_send_threads[i].m_instance_no = i;
+    m_send_threads[i].m_send_buffer_pool.set_pool(&rep->m_sb_pool);
+  }
+  send_thread_mutex = NdbMutex_Create();
+}
+
+thr_send_threads::~thr_send_threads()
+{
+  if (!m_started_threads)
+    return;
+
+  for (Uint32 i = 0; i < globalData.ndbMtSendThreads; i++)
+  {
+    void *dummy_return_status;
+
+    /* Ensure thread is woken up to die */
+    wakeup(&(m_send_threads[i].m_waiter_struct));
+    NdbThread_WaitFor(m_send_threads[i].m_thread, &dummy_return_status);
+    NdbThread_Destroy(&(m_send_threads[i].m_thread));
+  }
+}
+
+void
+thr_send_threads::start_send_threads()
+{
+  for (Uint32 i = 0; i < globalData.ndbMtSendThreads; i++)
+  {
+    ndbout_c("Start send thread: %u", i);
+    m_send_threads[i].m_thread =
+      NdbThread_Create(mt_send_thread_main,
+                       (void **)&m_send_threads[i],
+                       1024*1024,
+                       "send thread", //ToDo add number
+                       NDB_THREAD_PRIO_MEAN);
+  }
+  m_started_threads = TRUE;
+}
+
+/* Called under mutex protection of send_thread_mutex */
+void
+thr_send_threads::insert_node(NodeId node)
+{
+  Uint8 last_node = m_last_node;
+  Uint8 first_node = m_first_node;
+  struct thr_send_nodes *last_node_state = &m_node_state[last_node];
+  struct thr_send_nodes *node_state = &m_node_state[node];
+
+  node_state->m_next = 0;
+  node_state->m_data_available = TRUE;
+  m_last_node = node;
+  if (first_node == 0)
+  {
+    m_first_node = node;
+    wmb(); /* Ensure send threads see the change to m_first_node != 0 */
+  }
+  else
+    last_node_state->m_next = node;
+}
+
+/* Called under mutex protection of send_thread_mutex */
+NodeId
+thr_send_threads::get_node()
+{
+  Uint32 first_node = m_first_node;
+  struct thr_send_nodes *node_state;
+  Uint32 next;
+
+  if (first_node)
+  {
+    node_state = &m_node_state[first_node];
+    next = node_state->m_next;
+
+    node_state->m_next = 0;
+    node_state->m_data_available = FALSE;
+
+    m_first_node = next;
+    if (next == 0)
+    {
+      m_last_node = 0;
+      wmb(); /* Ensure send threads see the change to m_first_node == 0 */
+    }
+    return (NodeId)first_node;
+  }
+  return 0;
+}
+
+struct thr_send_thread_instance*
+thr_send_threads::get_not_awake_send_thread()
+{
+  struct thr_send_thread_instance *used_send_thread;
+
+  for (Uint32 i = 0; i < globalData.ndbMtSendThreads; i++)
+  {
+    if (!m_send_threads[i].m_awake)
+    {
+      used_send_thread= &m_send_threads[i];
+      return used_send_thread;
+    }
+  }
+  return NULL;
+}
+
+void
+thr_send_threads::alert_send_thread(NodeId node)
+{
+  struct thr_send_thread_instance *used_send_thread = NULL;
+
+  NdbMutex_Lock(send_thread_mutex);
+  if (m_node_state[node].m_data_available)
+  {
+    /*
+     * The node is already flagged that it has data needing to be sent.
+     * This means that someone else also ensured that a send thread
+     * was woken up if necessary, so we can rely on this that the send
+     * threads will now take care of it.
+     *
+     * We are safe that the buffers we have flushed will be read by a send
+     * thread since this variable is always reset before reading the send
+     * buffers of the node.
+     *
+     * The thread that set this flag should also have ensured that a send
+     * thread is awoken if needed, there is no need to wake even more
+     * threads up in this case since we piggyback on someone else's request.
+    */
+    assert(m_first_node);
+    NdbMutex_Unlock(send_thread_mutex);
+    return;
+  }
+  else
+    insert_node(node);
+  /*
+   * Search for a send thread which is asleep, if there is one, wake it
+   *
+   * If everyone is already awake we don't need to wake anyone up since
+   * the threads will check if there is nodes available to send to before
+   * they go to sleep.
+   *
+   * The reason to look for anyone asleep is to ensure proper use of CPU
+   * resources and ensure that we use all the send thread CPUs available
+   * to our disposal when necessary.
+   */
+  used_send_thread = get_not_awake_send_thread();
+
+  NdbMutex_Unlock(send_thread_mutex);
+
+  if (used_send_thread)
+  {
+    /*
+     * Wake the assigned sleeping send thread, potentially a spurious wakeup,
+     * but this is not a problem, important is to ensure that at least one
+     * send thread is awoken to handle our request. If someone is already
+     * awake and takes of our request before we get to wake someone up it's
+     * not a problem.
+     */
+    wakeup(&(used_send_thread->m_waiter_struct));
+  }
+}
+
+extern "C"
+bool
+check_available_send_data(struct thr_data *not_used)
+{
+  (void)not_used;
+  return !g_send_threads->data_available();
+}
+
+NodeId
+thr_send_threads::check_and_lock_send_node(NodeId node)
+{
+  thr_repository::send_buffer * sb = g_thr_repository.m_send_buffers+node;
+  NodeId in_node = node;
+
+  /* We own send_thread_mutex when entering */
+  while (trylock(&sb->m_send_lock) != 0)
+  {
+    /*
+     * We failed to acquire lock, another send thread is busy writing to
+     * this node most likely, we'll post it last if there is other nodes
+     * to send data to.
+     */
+    insert_node(node);
+    node = get_node();
+    sb = g_thr_repository.m_send_buffers+node;
+    if (node == in_node)
+    {
+      /*
+       * No other node available to send to, we'll wait for this one to
+       * become free by using normal lock.
+       */
+      NdbMutex_Unlock(send_thread_mutex);
+      lock(&sb->m_send_lock);
+      return node;
+    }
+  }
+  /* Only gets here when own the send lock of node */
+  NdbMutex_Unlock(send_thread_mutex);
+  /* We don't want to have send thread mutex while sending */
+  return node;
+}
+
+int
+thr_send_threads::perform_send(NodeId node, Uint32 instance_no)
+{
+  int res;
+  thr_repository::send_buffer * sb = g_thr_repository.m_send_buffers+node;
+  sb->m_send_thread = num_threads + instance_no;
+  res = globalTransporterRegistry.performSend(node);
+  sb->m_send_thread = NO_SEND_THREAD;
+  unlock(&sb->m_send_lock);
+  return res;
+}
+
+void
+thr_send_threads::run_send_thread(Uint32 instance_no)
+{
+  struct thr_send_thread_instance *this_send_thread =
+    &m_send_threads[instance_no];
+  const Uint32 thr_no = num_threads + instance_no;
+
+  {
+    /**
+     * Wait for thread object to be visible
+     */
+    while(this_send_thread->m_thread == 0)
+      NdbSleep_MilliSleep(30);
+  }
+
+  {
+    /**
+     * Print out information about starting thread
+     *   (number, tid, name, the CPU it's locked into (if locked at all))
+     * Also perform the locking to CPU.
+     */
+    BaseString tmp;
+    THRConfigApplier & conf = globalEmulatorData.theConfiguration->m_thr_config;
+    tmp.appfmt("thr: %u ", thr_no);
+    int tid = NdbThread_GetTid(this_send_thread->m_thread);
+    if (tid != -1)
+    {
+      tmp.appfmt("tid: %u ", tid);
+    }
+    conf.appendInfoSendThread(tmp, instance_no);
+    int res = conf.do_bind_send(this_send_thread->m_thread, instance_no);
+    if (res < 0)
+    {
+      tmp.appfmt("err: %d ", -res);
+    }
+    else if (res > 0)
+    {
+      tmp.appfmt("OK ");
+    }
+    printf("%s\n", tmp.c_str());
+    fflush(stdout);
+  }
+
+  /**
+   * register watchdog
+   */
+  globalEmulatorData.theWatchDog->
+    registerWatchedThread(&this_send_thread->m_watchdog_counter, thr_no);
+
+  NdbMutex_Lock(send_thread_mutex);
+  this_send_thread->m_awake = FALSE;
+  NdbMutex_Unlock(send_thread_mutex);
+
+  while (globalData.theRestartFlag != perform_stop)
+  {
+    this_send_thread->m_watchdog_counter = 1;
+
+    /* Yield for a maximum of 1ms */
+    const Uint32 wait = 1000000;
+    yield(&this_send_thread->m_waiter_struct, wait,
+          check_available_send_data, NULL);
+
+    NdbMutex_Lock(send_thread_mutex);
+    this_send_thread->m_awake = TRUE;
+
+    NodeId node;
+    while ((node = get_node()) != 0 &&
+           globalData.theRestartFlag != perform_stop)
+    {
+      this_send_thread->m_watchdog_counter = 2;
+
+      /* We enter this method with send thread mutex and come
+       * back with send thread mutex released and instead owning
+       * the spin lock to send to the node returned
+       */
+      node = check_and_lock_send_node(node);
+
+      int res = perform_send(node, instance_no);
+      /* We return with no spin locks or mutexes held */
+
+      /* Release chunk-wise to decrease pressure on spin lock */
+      this_send_thread->m_watchdog_counter = 3;
+      this_send_thread->m_send_buffer_pool.
+        release_chunk(g_thr_repository.m_mm, RG_TRANSPORTER_BUFFERS);
+
+      NdbMutex_Lock(send_thread_mutex);
+      if (res && !data_available(node))
+        insert_node(node);
+    }
+
+    /* No data to send, prepare to sleep */
+    this_send_thread->m_awake = FALSE;
+    NdbMutex_Unlock(send_thread_mutex);
+  }
+
+  globalEmulatorData.theWatchDog->unregisterWatchedThread(thr_no);
+}
+
 #if 0
 static
 Uint32
@@ -1642,6 +2122,14 @@ trp_callback::reportSendLen(NodeId nodeI
   Signal &signal = * new (&signalT) Signal(0);
   memset(&signal.header, 0, sizeof(signal.header));
 
+  if (g_send_threads)
+  {
+    /**
+     * TODO: Implement this also when using send threads!!
+     */
+    return;
+  }
+
   signal.header.theLength = 3;
   signal.header.theSendersSignalId = 0;
   signal.header.theSendersBlockRef = numberToRef(0, globalData.ownId);
@@ -1928,8 +2416,16 @@ trp_callback::bytes_sent(NodeId node, Ui
   thr_repository::send_buffer * sb = g_thr_repository.m_send_buffers+node;
   Uint32 thr_no = sb->m_send_thread;
   assert(thr_no != NO_SEND_THREAD);
-  return ::bytes_sent(&g_thr_repository.m_thread[thr_no].m_send_buffer_pool,
-                      sb, bytes);
+  if (!is_send_thread(thr_no))
+  {
+    return ::bytes_sent(&g_thr_repository.m_thread[thr_no].m_send_buffer_pool,
+                        sb, bytes);
+  }
+  else
+  {
+    return ::bytes_sent(g_send_threads->get_send_buffer_pool(thr_no),
+                        sb, bytes);
+  }
 }
 
 bool
@@ -1940,6 +2436,7 @@ trp_callback::has_data_to_send(NodeId no
   thr_repository::send_buffer * sb = g_thr_repository.m_send_buffers + node;
   Uint32 thr_no = sb->m_send_thread;
   assert(thr_no != NO_SEND_THREAD);
+  assert(!is_send_thread(thr_no));
   assert((sb->m_bytes > 0) == (sb->m_buffer.m_first_page != 0));
   if (sb->m_bytes > 0 || sb->m_force_send)
     return true;
@@ -2146,6 +2643,11 @@ do_send(struct thr_data* selfptr, bool m
 
     flush_send_buffer(selfptr, node);
 
+    if (g_send_threads)
+    {
+      g_send_threads->alert_send_thread(node);
+      continue;
+    }
     thr_repository::send_buffer * sb = rep->m_send_buffers + node;
 
     /**
@@ -2230,7 +2732,8 @@ mt_send_handle::getWritePtr(NodeId node,
   {
     // TODO: maybe dont always flush on page-boundary ???
     flush_send_buffer(m_selfptr, node);
-    try_send(m_selfptr, node);
+    if (!g_send_threads)
+      try_send(m_selfptr, node);
   }
 
   if ((p = m_selfptr->m_send_buffer_pool.seize(g_thr_repository.m_mm,
@@ -3619,16 +4122,26 @@ ThreadConfig::ipControlLoop(NdbThread* p
    */
   setcpuaffinity(rep);
 
+  if (globalData.ndbMtSendThreads)
+  {
+    g_send_threads = new thr_send_threads();
+  }
+
   /**
    * assign nodes to receiver threads
    */
   assign_receiver_threads();
 
+  /* Start the send thread(s) */
+  if (g_send_threads)
+  {
+    g_send_threads->start_send_threads();
+  }
+
   /*
    * Start threads for all execution threads, except for the receiver
    * thread, which runs in the main thread.
    */
-
   for (thr_no = 0; thr_no < num_threads; thr_no++)
   {
     rep->m_thread[thr_no].m_time = NdbTick_CurrentMillisecond();
@@ -3677,6 +4190,12 @@ ThreadConfig::ipControlLoop(NdbThread* p
     NdbThread_WaitFor(rep->m_thread[thr_no].m_thread, &dummy_return_status);
     NdbThread_Destroy(&(rep->m_thread[thr_no].m_thread));
   }
+
+  /* Delete send threads, includes waiting for threads to shutdown */
+  if (g_send_threads)
+  {
+    delete g_send_threads;
+  }
 }
 
 int

=== modified file 'storage/ndb/src/mgmsrv/ConfigInfo.cpp'
--- a/storage/ndb/src/mgmsrv/ConfigInfo.cpp	2012-01-24 06:20:13 +0000
+++ b/storage/ndb/src/mgmsrv/ConfigInfo.cpp	2012-01-25 10:39:40 +0000
@@ -1120,7 +1120,7 @@ const ConfigInfo::ParamInfo ConfigInfo::
     ConfigInfo::CI_INT,
     "4",
     "4",
-    "4"
+    STR_VALUE(NDB_MAX_LOG_PARTS)
   },
 
   {

No bundle (reason: useless for push emails).
Thread
bzr push into mysql-5.5-cluster-7.2 branch (jonas.oreland:3783 to 3785) Jonas Oreland30 Jan