List:Commits« Previous MessageNext Message »
From:Jonas Oreland Date:March 21 2012 11:22am
Subject:bzr push into mysql-5.1-telco-7.1 branch (jonas.oreland:4491 to 4492)
View as plain text  
 4492 Jonas Oreland	2012-03-21 [merge]
      ndb - merge 70 to 71

    added:
      storage/ndb/src/kernel/vm/mt-lock.hpp
      storage/ndb/src/kernel/vm/mt-send-t.cpp
    modified:
      storage/ndb/src/kernel/vm/Makefile.am
      storage/ndb/src/kernel/vm/mt.cpp
 4491 Martin Skold	2012-03-19 [merge]
      Merge from 7.0

    modified:
      mysql-test/suite/ndb/r/ndb_alter_table_online.result
      mysql-test/suite/ndb/t/ndb_alter_table_online.test
      sql/ha_ndbcluster.cc
      storage/ndb/include/kernel/signaldata/ApiVersion.hpp
      storage/ndb/include/kernel/signaldata/FireTrigOrd.hpp
      storage/ndb/include/kernel/signaldata/LqhKey.hpp
      storage/ndb/include/kernel/trigger_definitions.h
      storage/ndb/src/common/debugger/signaldata/ApiVersion.cpp
      storage/ndb/src/kernel/blocks/dbtc/Dbtc.hpp
      storage/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp
      storage/ndb/src/kernel/blocks/qmgr/QmgrMain.cpp
      storage/ndb/src/mgmsrv/MgmtSrvr.cpp
      storage/ndb/src/ndbapi/NdbDictionaryImpl.cpp
      storage/ndb/test/ndbapi/testIndex.cpp
=== modified file 'storage/ndb/src/kernel/vm/Makefile.am'
--- a/storage/ndb/src/kernel/vm/Makefile.am	2012-01-23 08:20:12 +0000
+++ b/storage/ndb/src/kernel/vm/Makefile.am	2012-03-20 13:50:15 +0000
@@ -144,3 +144,12 @@ mt_thr_config_t_LDADD = \
 	$(top_builddir)/mysys/libmysyslt.la
 
 noinst_PROGRAMS = mt_thr_config-t CountingPool-t DynArr256-t
+
+mt_send_t_CXXFLAGS = -DTAP_TEST
+mt_send_t_SOURCES = mt-send-t.cpp
+mt_send_t_LDFLAGS = @ndb_bin_am_ldflags@
+mt_send_t_LDADD = $(test_ldadd)
+
+if BUILD_NDBMTD
+noinst_PROGRAMS += mt-send-t
+endif

=== added file 'storage/ndb/src/kernel/vm/mt-lock.hpp'
--- a/storage/ndb/src/kernel/vm/mt-lock.hpp	1970-01-01 00:00:00 +0000
+++ b/storage/ndb/src/kernel/vm/mt-lock.hpp	2012-03-20 13:23:29 +0000
@@ -0,0 +1,160 @@
+/* Copyright (c) 2012, Oracle and/or its affiliates. All rights reserved.
+
+   This program is free software; you can redistribute it and/or modify
+   it under the terms of the GNU General Public License as published by
+   the Free Software Foundation; version 2 of the License.
+
+   This program is distributed in the hope that it will be useful,
+   but WITHOUT ANY WARRANTY; without even the implied warranty of
+   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+   GNU General Public License for more details.
+
+   You should have received a copy of the GNU General Public License
+   along with this program; if not, write to the Free Software
+   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
+
+#ifndef MT_LOCK_HPP
+#define MT_LOCK_HPP
+
+#include <ndb_global.h>
+#include "mt-asm.h"
+#include <NdbMutex.h>
+
+struct mt_lock_stat
+{
+  const void * m_ptr;
+  char * m_name;
+  Uint32 m_contended_count;
+  Uint32 m_spin_count;
+};
+
+static void register_lock(const void * ptr, const char * name);
+static mt_lock_stat * lookup_lock(const void * ptr);
+
+#ifdef NDB_HAVE_XCNG
+template <unsigned SZ>
+struct thr_spin_lock
+{
+  thr_spin_lock(const char * name = 0)
+  {
+    m_lock = 0;
+    register_lock(this, name);
+  }
+
+  union {
+    volatile Uint32 m_lock;
+    char pad[SZ];
+  };
+};
+
+static
+ATTRIBUTE_NOINLINE
+void
+lock_slow(void * sl, volatile unsigned * val)
+{
+  mt_lock_stat* s = lookup_lock(sl); // lookup before owning lock
+
+loop:
+  Uint32 spins = 0;
+  do {
+    spins++;
+    cpu_pause();
+  } while (* val == 1);
+
+  if (unlikely(xcng(val, 1) != 0))
+    goto loop;
+
+  if (s)
+  {
+    s->m_spin_count += spins;
+    Uint32 count = ++s->m_contended_count;
+    Uint32 freq = (count > 10000 ? 5000 : (count > 20 ? 200 : 1));
+
+    if ((count % freq) == 0)
+      printf("%s waiting for lock, contentions: %u spins: %u\n",
+             s->m_name, count, s->m_spin_count);
+  }
+}
+
+template <unsigned SZ>
+static
+inline
+void
+lock(struct thr_spin_lock<SZ>* sl)
+{
+  volatile unsigned* val = &sl->m_lock;
+  if (likely(xcng(val, 1) == 0))
+    return;
+
+  lock_slow(sl, val);
+}
+
+template <unsigned SZ>
+static
+inline
+void
+unlock(struct thr_spin_lock<SZ>* sl)
+{
+  /**
+   * Memory barrier here, to make sure all of our stores are visible before
+   * the lock release is.
+   */
+  mb();
+  sl->m_lock = 0;
+}
+
+template <unsigned SZ>
+static
+inline
+int
+trylock(struct thr_spin_lock<SZ>* sl)
+{
+  volatile unsigned* val = &sl->m_lock;
+  return xcng(val, 1);
+}
+#else
+#define thr_spin_lock thr_mutex
+#endif
+
+template <unsigned SZ>
+struct thr_mutex
+{
+  thr_mutex(const char * name = 0) {
+    NdbMutex_Init(&m_mutex);
+    register_lock(this, name);
+  }
+
+  union {
+    NdbMutex m_mutex;
+    char pad[SZ];
+  };
+};
+
+template <unsigned SZ>
+static
+inline
+void
+lock(struct thr_mutex<SZ>* sl)
+{
+  NdbMutex_Lock(&sl->m_mutex);
+}
+
+template <unsigned SZ>
+static
+inline
+void
+unlock(struct thr_mutex<SZ>* sl)
+{
+  NdbMutex_Unlock(&sl->m_mutex);
+}
+
+template <unsigned SZ>
+static
+inline
+int
+trylock(struct thr_mutex<SZ> * sl)
+{
+  return NdbMutex_Trylock(&sl->m_mutex);
+}
+
+#endif

=== added file 'storage/ndb/src/kernel/vm/mt-send-t.cpp'
--- a/storage/ndb/src/kernel/vm/mt-send-t.cpp	1970-01-01 00:00:00 +0000
+++ b/storage/ndb/src/kernel/vm/mt-send-t.cpp	2012-03-21 10:04:27 +0000
@@ -0,0 +1,553 @@
+#include "mt-asm.h"
+#include "mt-lock.hpp"
+#include <NdbTick.h>
+#include <NdbMutex.h>
+#include <NdbThread.h>
+#include <NdbCondition.h>
+#include <NdbTap.hpp>
+#include <Bitmask.hpp>
+
+#define BUGGY_VERSION 0
+
+/**
+ * DO_SYSCALL inside critical section
+ *  (the equivalent of writev(socket)
+ */
+#define DO_SYSCALL 1
+
+/**
+ * This is a unit test of the send code for mt.cpp
+ *   specifically the code that manages which thread will send
+ *   (write gathering)
+ *
+ * Each thread is a producer of Signals
+ * Each signal has a destination remote node (transporter)
+ * Each thread will after having produced a set of signals
+ *   check if it should send them on socket.
+ *   If it decides that it should, it consumes all the signals
+ *   produced by all threads.
+ *
+ * In this unit test, we don't send signals, but the producing part
+ *   will only be to increment a counter.
+ *
+ ******************************************************************
+ *
+ * To use this program seriously...
+ *
+ *   you should set BUGGY_VERSION to 1
+ *   and experiment with values on cnt_*
+ *   until you find a variant which crashes (abort)
+ *
+ * The values compiled-in makes it crash on a single socket
+ *   Intel(R) Core(TM) i5-2400 CPU @ 3.10GHz release compiled!
+ *   (i never managed to get it debug compiled)
+ */
+#define MAX_THREADS 256
+#define MAX_TRANSPORTERS 256
+
+/**
+ * global variables
+ */
+static unsigned cnt_threads = 64;
+static unsigned cnt_transporters = 8;
+
+/**
+ * outer loops
+ *   start/stop threads
+ */
+static unsigned cnt_seconds = 180;
+
+/**
+ * no of signals produced before calling consume
+ */
+static unsigned cnt_signals_before_consume = 4;
+
+/**
+ * no of signals produced in one inner loop
+ */
+static unsigned cnt_signals_per_inner_loop = 4;
+
+/**
+ * no inner loops per outer loop
+ *
+ *   after each inner loop
+ *   threads will be stalled and result verified
+ */
+static unsigned cnt_inner_loops = 5000;
+
+/**
+ * pct of do_send that are using forceSend()
+ */
+static unsigned pct_force = 15;
+
+typedef Bitmask<(MAX_TRANSPORTERS+31)/32> TransporterMask;
+
+struct Producer
+{
+  Producer() {
+    bzero(val, sizeof(val));
+    pendingcount = 0;
+  }
+
+  void init() {}
+
+  /**
+   * values produced...
+   */
+  unsigned val[MAX_TRANSPORTERS];
+
+  /**
+   * mask/array to keep track of which transporter we have produce values to
+   */
+  TransporterMask pendingmask;
+  unsigned pendingcount;
+  unsigned char pendinglist[MAX_TRANSPORTERS];
+
+  /**
+   * produce a value
+   *
+   * This is the equivalent of mt_send_remote()
+   */
+  void produce(unsigned D);
+
+  /**
+   * consume values (from all threads)
+   *   for transporters that we have produced a value to
+   *
+   * This is the equivalent to do_send and if force=true
+   *   this is the equivalent of forceSend()
+   */
+  void consume(bool force);
+};
+
+struct Thread
+{
+  Thread() { thread= 0; }
+
+  void init() { p.init(); }
+
+  NdbThread * thread;
+  Producer p;
+};
+
+/**
+ * This is the consumer of values for *one* transporter
+ */
+struct Consumer
+{
+  Consumer() {
+    m_force_send = 0; bzero(val, sizeof(val));
+  }
+
+  void init() {}
+
+  struct thr_spin_lock<8> m_send_lock;
+  unsigned m_force_send;
+  unsigned val[MAX_THREADS];
+
+  /**
+   * consume values from all threads to this transporter
+   */
+  void consume(unsigned D);
+
+  /**
+   * force_consume
+   */
+  void forceConsume(unsigned D);
+};
+
+struct Consumer_pad
+{
+  Consumer c;
+  char pad[NDB_CL_PADSZ(sizeof(Consumer))];
+};
+
+struct Thread_pad
+{
+  Thread t;
+  char pad[NDB_CL_PADSZ(sizeof(Thread))];
+};
+
+/**
+ * Thread repository
+ *   and an instance of it
+ */
+static
+struct Rep
+{
+  Thread_pad t[MAX_THREADS];
+  Consumer_pad c[MAX_TRANSPORTERS];
+
+  /**
+   * This menthod is called when all threads are stalled
+   *   so it's safe to read values without locks
+   */
+  void validate() {
+    for (unsigned ic = 0; ic < cnt_transporters; ic++)
+    {
+      for (unsigned it = 0; it < cnt_threads; it++)
+      {
+        if (c[ic].c.val[it] != t[it].t.p.val[ic])
+        {
+          printf("Detected bug!!!\n");
+          printf("ic: %u it: %u c[ic].c.val[it]: %u t[it].t.p.val[ic]: %u\n",
+                 ic, it, c[ic].c.val[it], t[it].t.p.val[ic]);
+          abort();
+        }
+      }
+    }
+  }
+
+  void init() {
+    for (unsigned i = 0; i < cnt_threads; i++)
+      t[i].t.init();
+
+    for (unsigned i = 0; i < cnt_transporters; i++)
+      c[i].c.init();
+  }
+} rep;
+
+static
+struct Test
+{
+  Test() {
+    waiting_start = 0;
+    waiting_stop = 0;
+    mutex = 0;
+    cond = 0;
+  }
+
+  void init() {
+    mutex = NdbMutex_Create();
+    cond = NdbCondition_Create();
+  }
+
+  unsigned waiting_start;
+  unsigned waiting_stop;
+  NdbMutex* mutex;
+  NdbCondition* cond;
+
+  void wait_started();
+  void wait_completed();
+} test;
+
+void*
+thread_main(void * _t)
+{
+  unsigned seed =
+    (unsigned)NdbTick_CurrentNanosecond() +
+    (unsigned)(unsigned long long)_t;
+
+  Thread * self = (Thread*) _t;
+  for (unsigned i = 0; i < cnt_inner_loops; i++)
+  {
+    test.wait_started();
+    for (unsigned j = 0; j < cnt_signals_per_inner_loop;)
+    {
+      for (unsigned k = 0; k < cnt_signals_before_consume; k++)
+      {
+        /**
+         * Produce a signal to destination D
+         */
+        unsigned D = rand_r(&seed) % cnt_transporters;
+        self->p.produce(D);
+      }
+
+      j += cnt_signals_before_consume;
+
+      /**
+       * This is the equivalent of do_send()
+       */
+      bool force = unsigned(rand_r(&seed) % 100) < pct_force;
+      self->p.consume(force);
+    }
+    test.wait_completed();
+  }
+  return 0;
+}
+
+static
+bool
+match(const char * arg, const char * val, unsigned * valptr)
+{
+  if (strncmp(arg, val, strlen(val)) == 0)
+  {
+    * valptr = atoi(arg + strlen(val));
+    return true;
+  }
+  return false;
+}
+
+int
+main(int argc, char ** argv)
+{
+  plan(1);
+  ndb_init();
+  test.init();
+  rep.init();
+
+  if (argc == 1)
+  {
+    printf("No arguments supplied...\n"
+           "assuming we're being run from MTR or similar.\n"
+           "decreasing loop counts to ridiculously small values...\n");
+    cnt_seconds = 10;
+    cnt_inner_loops = 3000;
+    cnt_threads = 4;
+  }
+  else
+  {
+    printf("Arguments supplied...\n");
+    for (int i = 1; i < argc; i++)
+    {
+      if (match(argv[i], "cnt_seconds=", &cnt_seconds))
+        continue;
+      else if (match(argv[i], "cnt_threads=", &cnt_threads))
+        continue;
+      else if (match(argv[i], "cnt_transporters=", &cnt_transporters))
+        continue;
+      else if (match(argv[i], "cnt_inner_loops=", &cnt_inner_loops))
+        continue;
+      else if (match(argv[i], "cnt_signals_before_consume=",
+                     &cnt_signals_before_consume))
+        continue;
+      else if (match(argv[i], "cnt_signals_per_inner_loop=",
+                     &cnt_signals_per_inner_loop))
+        continue;
+      else if (match(argv[i], "pct_force=",
+                     &pct_force))
+        continue;
+      else
+      {
+        printf("ignoreing unknown argument: %s\n", argv[i]);
+      }
+    }
+  }
+
+  printf("%s"
+         " cnt_seconds=%u"
+         " cnt_threads=%u"
+         " cnt_transporters=%u"
+         " cnt_inner_loops=%u"
+         " cnt_signals_before_consume=%u"
+         " cnt_signals_per_inner_loop=%u"
+         " pct_force=%u"
+         "\n",
+         argv[0],
+         cnt_seconds,
+         cnt_threads,
+         cnt_transporters,
+         cnt_inner_loops,
+         cnt_signals_before_consume,
+         cnt_signals_per_inner_loop,
+         pct_force);
+
+  Uint32 loop = 0;
+  Uint64 start = NdbTick_CurrentMillisecond() / 1000;
+  while (start + cnt_seconds > (NdbTick_CurrentMillisecond() / 1000))
+  {
+    printf("%u ", loop++); fflush(stdout);
+    if ((loop < 100 && (loop % 25) == 0) ||
+        (loop >= 100 && (loop % 20) == 0))
+      printf("\n");
+
+    for (unsigned t = 0; t < cnt_threads; t++)
+    {
+      rep.t[t].t.thread = NdbThread_Create(thread_main,
+                                           (void**)&rep.t[t].t,
+                                           1024*1024,
+                                           "execute thread",
+                                           NDB_THREAD_PRIO_MEAN);
+    }
+
+    for (unsigned t = 0; t < cnt_threads; t++)
+    {
+      void * ret;
+      NdbThread_WaitFor(rep.t[t].t.thread, &ret);
+    }
+  }
+  printf("\n"); fflush(stdout);
+
+  ok(true, "ok");
+  return 0;
+}
+
+inline
+void
+Producer::produce(unsigned D)
+{
+  if (!pendingmask.get(D))
+  {
+    pendingmask.set(D);
+    pendinglist[pendingcount] = D;
+    pendingcount++;
+  }
+  val[D]++;
+}
+
+inline
+void
+Producer::consume(bool force)
+{
+  unsigned count = pendingcount;
+  pendingmask.clear();
+  pendingcount = 0;
+
+  for (unsigned i = 0; i < count; i++)
+  {
+    unsigned D = pendinglist[i];
+    if (force)
+      rep.c[D].c.forceConsume(D);
+    else
+      rep.c[D].c.consume(D);
+  }
+}
+
+inline
+void
+Consumer::consume(unsigned D)
+{
+  /**
+   * This is the equivalent of do_send(must_send = 1)
+   */
+  m_force_send = 1;
+
+  do
+  {
+    if (trylock(&m_send_lock) != 0)
+    {
+      /* Other thread will send for us as we set m_force_send. */
+      return;
+    }
+
+    /**
+     * Now clear the flag, and start sending all data available to this node.
+     *
+     * Put a memory barrier here, so that if another thread tries to grab
+     * the send lock but fails due to us holding it here, we either
+     * 1) Will see m_force_send[nodeId] set to 1 at the end of the loop, or
+     * 2) We clear here the flag just set by the other thread, but then we
+     * will (thanks to mb()) be able to see and send all of the data already
+     * in the first send iteration.
+     */
+    m_force_send = 0;
+    mb();
+
+    /**
+     * This is the equivalent of link_thread_send_buffers
+     */
+    for (unsigned i = 0; i < cnt_threads; i++)
+    {
+      val[i] = rep.t[i].t.p.val[D];
+    }
+
+    /**
+     * Do a syscall...which could have affect on barriers...etc
+     */
+    if (DO_SYSCALL)
+    {
+      NdbTick_CurrentMillisecond();
+    }
+
+    unlock(&m_send_lock);
+
+#if BUGGY_VERSION
+#else
+    mb();
+#endif
+  }
+  while (m_force_send != 0);
+}
+
+inline
+void
+Consumer::forceConsume(unsigned D)
+{
+  /**
+   * This is the equivalent of forceSend()
+   */
+
+  do
+  {
+    /**
+     * NOTE: since we unconditionally lock m_send_lock
+     *   we don't need a mb() after the clearing of m_force_send here.
+     */
+    m_force_send = 0;
+
+    lock(&m_send_lock);
+
+    /**
+     * This is the equivalent of link_thread_send_buffers
+     */
+    for (unsigned i = 0; i < cnt_threads; i++)
+    {
+      val[i] = rep.t[i].t.p.val[D];
+    }
+
+    /**
+     * Do a syscall...which could have affect on barriers...etc
+     */
+    if (DO_SYSCALL)
+    {
+      NdbTick_CurrentMillisecond();
+    }
+
+    unlock(&m_send_lock);
+
+#if BUGGY_VERSION
+#else
+    mb();
+#endif
+  }
+  while (m_force_send != 0);
+}
+
+void
+Test::wait_started()
+{
+  NdbMutex_Lock(mutex);
+  if (waiting_start + 1 == cnt_threads)
+  {
+    waiting_stop = 0;
+  }
+  waiting_start++;
+  assert(waiting_start <= cnt_threads);
+  while (waiting_start < cnt_threads)
+    NdbCondition_Wait(cond, mutex);
+
+  NdbCondition_Broadcast(cond);
+  NdbMutex_Unlock(mutex);
+}
+
+void
+Test::wait_completed()
+{
+  NdbMutex_Lock(mutex);
+  if (waiting_stop + 1 == cnt_threads)
+  {
+    rep.validate();
+    waiting_start = 0;
+  }
+  waiting_stop++;
+  assert(waiting_stop <= cnt_threads);
+  while (waiting_stop < cnt_threads)
+    NdbCondition_Wait(cond, mutex);
+
+  NdbCondition_Broadcast(cond);
+  NdbMutex_Unlock(mutex);
+}
+
+static
+void
+register_lock(const void * ptr, const char * name)
+{
+  return;
+}
+
+static
+mt_lock_stat *
+lookup_lock(const void * ptr)
+{
+  return 0;
+}

=== modified file 'storage/ndb/src/kernel/vm/mt.cpp'
--- a/storage/ndb/src/kernel/vm/mt.cpp	2012-02-20 09:13:16 +0000
+++ b/storage/ndb/src/kernel/vm/mt.cpp	2012-03-21 11:22:10 +0000
@@ -36,6 +36,7 @@
 #include <portlib/ndb_prefetch.h>
 
 #include "mt-asm.h"
+#include "mt-lock.hpp"
 
 inline
 SimulatedBlock*
@@ -96,16 +97,6 @@ static Uint32 first_receiver_thread_no =
 /* max signal is 32 words, 7 for signal header and 25 datawords */
 #define MIN_SIGNALS_PER_PAGE (thr_job_buffer::SIZE / 32)
 
-struct mt_lock_stat
-{
-  const void * m_ptr;
-  char * m_name;
-  Uint32 m_contended_count;
-  Uint32 m_spin_count;
-};
-static void register_lock(const void * ptr, const char * name);
-static mt_lock_stat * lookup_lock(const void * ptr);
-
 #if defined(HAVE_LINUX_FUTEX) && defined(NDB_HAVE_XCNG)
 #define USE_FUTEX
 #endif
@@ -274,132 +265,6 @@ wakeup(struct thr_wait* wait)
 
 #endif
 
-#ifdef NDB_HAVE_XCNG
-template <unsigned SZ>
-struct thr_spin_lock
-{
-  thr_spin_lock(const char * name = 0)
-  {
-    m_lock = 0;
-    register_lock(this, name);
-  }
-
-  union {
-    volatile Uint32 m_lock;
-    char pad[SZ];
-  };
-};
-
-static
-ATTRIBUTE_NOINLINE
-void
-lock_slow(void * sl, volatile unsigned * val)
-{
-  mt_lock_stat* s = lookup_lock(sl); // lookup before owning lock
-
-loop:
-  Uint32 spins = 0;
-  do {
-    spins++;
-    cpu_pause();
-  } while (* val == 1);
-
-  if (unlikely(xcng(val, 1) != 0))
-    goto loop;
-
-  if (s)
-  {
-    s->m_spin_count += spins;
-    Uint32 count = ++s->m_contended_count;
-    Uint32 freq = (count > 10000 ? 5000 : (count > 20 ? 200 : 1));
-
-    if ((count % freq) == 0)
-      printf("%s waiting for lock, contentions: %u spins: %u\n",
-             s->m_name, count, s->m_spin_count);
-  }
-}
-
-template <unsigned SZ>
-static
-inline
-void
-lock(struct thr_spin_lock<SZ>* sl)
-{
-  volatile unsigned* val = &sl->m_lock;
-  if (likely(xcng(val, 1) == 0))
-    return;
-
-  lock_slow(sl, val);
-}
-
-template <unsigned SZ>
-static
-inline
-void
-unlock(struct thr_spin_lock<SZ>* sl)
-{
-  /**
-   * Memory barrier here, to make sure all of our stores are visible before
-   * the lock release is.
-   */
-  mb();
-  sl->m_lock = 0;
-}
-
-template <unsigned SZ>
-static
-inline
-int
-trylock(struct thr_spin_lock<SZ>* sl)
-{
-  volatile unsigned* val = &sl->m_lock;
-  return xcng(val, 1);
-}
-#else
-#define thr_spin_lock thr_mutex
-#endif
-
-template <unsigned SZ>
-struct thr_mutex
-{
-  thr_mutex(const char * name = 0) {
-    NdbMutex_Init(&m_mutex);
-    register_lock(this, name);
-  }
-
-  union {
-    NdbMutex m_mutex;
-    char pad[SZ];
-  };
-};
-
-template <unsigned SZ>
-static
-inline
-void
-lock(struct thr_mutex<SZ>* sl)
-{
-  NdbMutex_Lock(&sl->m_mutex);
-}
-
-template <unsigned SZ>
-static
-inline
-void
-unlock(struct thr_mutex<SZ>* sl)
-{
-  NdbMutex_Unlock(&sl->m_mutex);
-}
-
-template <unsigned SZ>
-static
-inline
-int
-trylock(struct thr_mutex<SZ> * sl)
-{
-  return NdbMutex_Trylock(&sl->m_mutex);
-}
-
 /**
  * thr_safe_pool
  */
@@ -2719,6 +2584,13 @@ pack_send_buffer(thr_data *selfptr, Uint
    * After having locked/unlock m_send_lock
    *   "protocol" dictates that we must check the m_force_send
    */
+
+  /**
+   * We need a memory barrier here to prevent race between clearing lock
+   *   and reading of m_force_send.
+   *   CPU can reorder the load to before the clear of the lock
+   */
+  mb();
   if (sb->m_force_send)
   {
     try_send(selfptr, node);
@@ -2787,7 +2659,13 @@ mt_send_handle::forceSend(NodeId nodeId)
 
   do
   {
+    /**
+     * NOTE: we don't need a memory barrier after clearing
+     *       m_force_send here as we unconditionally lock m_send_lock
+     *       hence there is no way that our data can be "unsent"
+     */
     sb->m_force_send = 0;
+
     lock(&sb->m_send_lock);
     sb->m_send_thread = selfptr->m_thr_no;
     globalTransporterRegistry.performSend(nodeId);
@@ -2799,6 +2677,12 @@ mt_send_handle::forceSend(NodeId nodeId)
      */
     selfptr->m_send_buffer_pool.release_global(rep->m_mm,
                                                RG_TRANSPORTER_BUFFERS);
+    /**
+     * We need a memory barrier here to prevent race between clearing lock
+     *   and reading of m_force_send.
+     *   CPU can reorder the load to before the clear of the lock
+     */
+    mb();
   } while (sb->m_force_send);
 
   return true;
@@ -2821,6 +2705,16 @@ try_send(thr_data * selfptr, Uint32 node
       return;
     }
 
+    /**
+     * Now clear the flag, and start sending all data available to this node.
+     *
+     * Put a memory barrier here, so that if another thread tries to grab
+     * the send lock but fails due to us holding it here, we either
+     * 1) Will see m_force_send[nodeId] set to 1 at the end of the loop, or
+     * 2) We clear here the flag just set by the other thread, but then we
+     * will (thanks to mb()) be able to see and send all of the data already
+     * in the first send iteration.
+     */
     sb->m_force_send = 0;
     mb();
 
@@ -2834,6 +2728,13 @@ try_send(thr_data * selfptr, Uint32 node
      */
     selfptr->m_send_buffer_pool.release_global(rep->m_mm,
                                                RG_TRANSPORTER_BUFFERS);
+
+    /**
+     * We need a memory barrier here to prevent race between clearing lock
+     *   and reading of m_force_send.
+     *   CPU can reorder the load to before the clear of the lock
+     */
+    mb();
   } while (sb->m_force_send);
 }
 
@@ -2966,6 +2867,13 @@ do_send(struct thr_data* selfptr, bool m
       {
         register_pending_send(selfptr, node);
       }
+
+      /**
+       * We need a memory barrier here to prevent race between clearing lock
+       *   and reading of m_force_send.
+       *   CPU can reorder the load to before the clear of the lock
+       */
+      mb();
       if (sb->m_force_send)
       {
         /**

No bundle (reason: useless for push emails).
Thread
bzr push into mysql-5.1-telco-7.1 branch (jonas.oreland:4491 to 4492) Jonas Oreland21 Mar