4894 Jonas Oreland 2012-03-20
ndb - add unit test program of mt.cpp's m_force_send algorithm
added:
storage/ndb/src/kernel/vm/mt-send-t.cpp
modified:
storage/ndb/src/kernel/vm/Makefile.am
4893 Jonas Oreland 2012-03-20
ndb - move spinlocks into own .hpp-file
added:
storage/ndb/src/kernel/vm/mt-lock.hpp
modified:
storage/ndb/src/kernel/vm/mt.cpp
4892 Martin Skold 2012-03-19
Bug#13830980 MYSQL COMPLAINS OF NOT SUPPORTING ALTER ONLINE EVEN WHEN VALID OPTIONS ARE USED: Added support of adding columns with a default value defined and moved check that it currently has to be null to ndbapi
modified:
mysql-test/suite/ndb/r/ndb_alter_table_online.result
mysql-test/suite/ndb/t/ndb_alter_table_online.test
sql/ha_ndbcluster.cc
storage/ndb/src/ndbapi/NdbDictionaryImpl.cpp
=== modified file 'storage/ndb/src/kernel/vm/Makefile.am'
--- a/storage/ndb/src/kernel/vm/Makefile.am 2012-01-23 08:20:12 +0000
+++ b/storage/ndb/src/kernel/vm/Makefile.am 2012-03-20 13:50:15 +0000
@@ -144,3 +144,12 @@ mt_thr_config_t_LDADD = \
$(top_builddir)/mysys/libmysyslt.la
noinst_PROGRAMS = mt_thr_config-t CountingPool-t DynArr256-t
+
+mt_send_t_CXXFLAGS = -DTAP_TEST
+mt_send_t_SOURCES = mt-send-t.cpp
+mt_send_t_LDFLAGS = @ndb_bin_am_ldflags@
+mt_send_t_LDADD = $(test_ldadd)
+
+if BUILD_NDBMTD
+noinst_PROGRAMS += mt-send-t
+endif
=== added file 'storage/ndb/src/kernel/vm/mt-lock.hpp'
--- a/storage/ndb/src/kernel/vm/mt-lock.hpp 1970-01-01 00:00:00 +0000
+++ b/storage/ndb/src/kernel/vm/mt-lock.hpp 2012-03-20 13:23:29 +0000
@@ -0,0 +1,160 @@
+/* Copyright (c) 2012, Oracle and/or its affiliates. All rights reserved.
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; version 2 of the License.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
+
+#ifndef MT_LOCK_HPP
+#define MT_LOCK_HPP
+
+#include <ndb_global.h>
+#include "mt-asm.h"
+#include <NdbMutex.h>
+
+struct mt_lock_stat
+{
+ const void * m_ptr;
+ char * m_name;
+ Uint32 m_contended_count;
+ Uint32 m_spin_count;
+};
+
+static void register_lock(const void * ptr, const char * name);
+static mt_lock_stat * lookup_lock(const void * ptr);
+
+#ifdef NDB_HAVE_XCNG
+template <unsigned SZ>
+struct thr_spin_lock
+{
+ thr_spin_lock(const char * name = 0)
+ {
+ m_lock = 0;
+ register_lock(this, name);
+ }
+
+ union {
+ volatile Uint32 m_lock;
+ char pad[SZ];
+ };
+};
+
+static
+ATTRIBUTE_NOINLINE
+void
+lock_slow(void * sl, volatile unsigned * val)
+{
+ mt_lock_stat* s = lookup_lock(sl); // lookup before owning lock
+
+loop:
+ Uint32 spins = 0;
+ do {
+ spins++;
+ cpu_pause();
+ } while (* val == 1);
+
+ if (unlikely(xcng(val, 1) != 0))
+ goto loop;
+
+ if (s)
+ {
+ s->m_spin_count += spins;
+ Uint32 count = ++s->m_contended_count;
+ Uint32 freq = (count > 10000 ? 5000 : (count > 20 ? 200 : 1));
+
+ if ((count % freq) == 0)
+ printf("%s waiting for lock, contentions: %u spins: %u\n",
+ s->m_name, count, s->m_spin_count);
+ }
+}
+
+template <unsigned SZ>
+static
+inline
+void
+lock(struct thr_spin_lock<SZ>* sl)
+{
+ volatile unsigned* val = &sl->m_lock;
+ if (likely(xcng(val, 1) == 0))
+ return;
+
+ lock_slow(sl, val);
+}
+
+template <unsigned SZ>
+static
+inline
+void
+unlock(struct thr_spin_lock<SZ>* sl)
+{
+ /**
+ * Memory barrier here, to make sure all of our stores are visible before
+ * the lock release is.
+ */
+ mb();
+ sl->m_lock = 0;
+}
+
+template <unsigned SZ>
+static
+inline
+int
+trylock(struct thr_spin_lock<SZ>* sl)
+{
+ volatile unsigned* val = &sl->m_lock;
+ return xcng(val, 1);
+}
+#else
+#define thr_spin_lock thr_mutex
+#endif
+
+template <unsigned SZ>
+struct thr_mutex
+{
+ thr_mutex(const char * name = 0) {
+ NdbMutex_Init(&m_mutex);
+ register_lock(this, name);
+ }
+
+ union {
+ NdbMutex m_mutex;
+ char pad[SZ];
+ };
+};
+
+template <unsigned SZ>
+static
+inline
+void
+lock(struct thr_mutex<SZ>* sl)
+{
+ NdbMutex_Lock(&sl->m_mutex);
+}
+
+template <unsigned SZ>
+static
+inline
+void
+unlock(struct thr_mutex<SZ>* sl)
+{
+ NdbMutex_Unlock(&sl->m_mutex);
+}
+
+template <unsigned SZ>
+static
+inline
+int
+trylock(struct thr_mutex<SZ> * sl)
+{
+ return NdbMutex_Trylock(&sl->m_mutex);
+}
+
+#endif
=== added file 'storage/ndb/src/kernel/vm/mt-send-t.cpp'
--- a/storage/ndb/src/kernel/vm/mt-send-t.cpp 1970-01-01 00:00:00 +0000
+++ b/storage/ndb/src/kernel/vm/mt-send-t.cpp 2012-03-20 13:50:15 +0000
@@ -0,0 +1,489 @@
+#include "mt-asm.h"
+#include "mt-lock.hpp"
+#include <NdbTick.h>
+#include <NdbMutex.h>
+#include <NdbThread.h>
+#include <NdbCondition.h>
+#include <NdbTap.hpp>
+#include <Bitmask.hpp>
+
+#define BUGGY_VERSION 0
+
+/**
+ * DO_SYSCALL inside critical section
+ * (the equivalent of writev(socket)
+ */
+#define DO_SYSCALL 1
+
+/**
+ * This is a unit test of the send code for mt.cpp
+ * specifically the code that manages which thread will send
+ * (write gathering)
+ *
+ * Each thread is a producer of Signals
+ * Each signal has a destination remote node (transporter)
+ * Each thread will after having produced a set of signals
+ * check if it should send them on socket.
+ * If it decides that it should, it consumes all the signals
+ * produced by all threads.
+ *
+ * In this unit test, we don't send signals, but the producing part
+ * will only be to increment a counter.
+ *
+ ******************************************************************
+ *
+ * To use this program seriously...
+ *
+ * you should set BUGGY_VERSION to 1
+ * and experiment with values on cnt_*
+ * until you find a variant which crashes (abort)
+ *
+ * The values compiled-in makes it crash on a single socket
+ * Intel(R) Core(TM) i5-2400 CPU @ 3.10GHz release compiled!
+ * (i never managed to get it debug compiled)
+ */
+#define MAX_THREADS 256
+#define MAX_TRANSPORTERS 256
+
+/**
+ * global variables
+ */
+static unsigned cnt_threads = 64;
+static unsigned cnt_transporters = 8;
+
+/**
+ * outer loops
+ * start/stop threads
+ */
+static unsigned cnt_seconds = 180;
+
+/**
+ * no of signals produced before calling consume
+ */
+static unsigned cnt_signals_before_consume = 4;
+
+/**
+ * no of signals produced in one inner loop
+ */
+static unsigned cnt_signals_per_inner_loop = 4;
+
+/**
+ * no inner loops per outer loop
+ *
+ * after each inner loop
+ * threads will be stalled and result verified
+ */
+static unsigned cnt_inner_loops = 5000;
+
+typedef Bitmask<(MAX_TRANSPORTERS+31)/32> TransporterMask;
+
+struct Producer
+{
+ Producer() {
+ bzero(val, sizeof(val));
+ pendingcount = 0;
+ }
+
+ void init() {}
+
+ /**
+ * values produced...
+ */
+ unsigned val[MAX_TRANSPORTERS];
+
+ /**
+ * mask/array to keep track of which transporter we have produce values to
+ */
+ TransporterMask pendingmask;
+ unsigned pendingcount;
+ unsigned char pendinglist[MAX_TRANSPORTERS];
+
+ /**
+ * produce a value
+ *
+ * This is the equivalent of mt_send_remote()
+ */
+ void produce(unsigned D);
+
+ /**
+ * consume values (from all threads)
+ * for transporters that we have produced a value to
+ *
+ * This is the equivalent to do_send
+ */
+ void consume();
+};
+
+struct Thread
+{
+ Thread() { thread= 0; }
+
+ void init() { p.init(); }
+
+ NdbThread * thread;
+ Producer p;
+};
+
+/**
+ * This is the consumer of values for *one* transporter
+ */
+struct Consumer
+{
+ Consumer() {
+ m_force_send = 0; bzero(val, sizeof(val));
+ }
+
+ void init() {}
+
+ struct thr_spin_lock<8> m_send_lock;
+ unsigned m_force_send;
+ unsigned val[MAX_THREADS];
+
+ /**
+ * consume values from all threads to this transporter
+ */
+ void consume(unsigned D);
+};
+
+struct Consumer_pad
+{
+ Consumer c;
+ char pad[NDB_CL_PADSZ(sizeof(Consumer))];
+};
+
+struct Thread_pad
+{
+ Thread t;
+ char pad[NDB_CL_PADSZ(sizeof(Thread))];
+};
+
+/**
+ * Thread repository
+ * and an instance of it
+ */
+static
+struct Rep
+{
+ Thread_pad t[MAX_THREADS];
+ Consumer_pad c[MAX_TRANSPORTERS];
+
+ /**
+ * This menthod is called when all threads are stalled
+ * so it's safe to read values without locks
+ */
+ void validate() {
+ for (unsigned ic = 0; ic < cnt_transporters; ic++)
+ {
+ for (unsigned it = 0; it < cnt_threads; it++)
+ {
+ if (c[ic].c.val[it] != t[it].t.p.val[ic])
+ {
+ printf("Detected bug!!!\n");
+ printf("ic: %u it: %u c[ic].c.val[it]: %u t[it].t.p.val[ic]: %u\n",
+ ic, it, c[ic].c.val[it], t[it].t.p.val[ic]);
+ abort();
+ }
+ }
+ }
+ }
+
+ void init() {
+ for (unsigned i = 0; i < cnt_threads; i++)
+ t[i].t.init();
+
+ for (unsigned i = 0; i < cnt_transporters; i++)
+ c[i].c.init();
+ }
+} rep;
+
+static
+struct Test
+{
+ Test() {
+ waiting_start = 0;
+ waiting_stop = 0;
+ mutex = 0;
+ cond = 0;
+ }
+
+ void init() {
+ mutex = NdbMutex_Create();
+ cond = NdbCondition_Create();
+ }
+
+ unsigned waiting_start;
+ unsigned waiting_stop;
+ NdbMutex* mutex;
+ NdbCondition* cond;
+
+ void wait_started();
+ void wait_completed();
+} test;
+
+void*
+thread_main(void * _t)
+{
+ unsigned seed =
+ (unsigned)NdbTick_CurrentNanosecond() +
+ (unsigned)(unsigned long long)_t;
+
+ Thread * self = (Thread*) _t;
+ for (unsigned i = 0; i < cnt_inner_loops; i++)
+ {
+ test.wait_started();
+ for (unsigned j = 0; j < cnt_signals_per_inner_loop;)
+ {
+ for (unsigned k = 0; k < cnt_signals_before_consume; k++)
+ {
+ /**
+ * Produce a signal to destination D
+ */
+ unsigned D = rand_r(&seed) % cnt_transporters;
+ self->p.produce(D);
+ }
+
+ j += cnt_signals_before_consume;
+
+ /**
+ * This is the equivalent of do_send()
+ */
+ self->p.consume();
+ }
+ test.wait_completed();
+ }
+ return 0;
+}
+
+static
+bool
+match(const char * arg, const char * val, unsigned * valptr)
+{
+ if (strncmp(arg, val, strlen(val)) == 0)
+ {
+ * valptr = atoi(arg + strlen(val));
+ return true;
+ }
+ return false;
+}
+
+int
+main(int argc, char ** argv)
+{
+ plan(1);
+ ndb_init();
+ test.init();
+ rep.init();
+
+ if (argc == 1)
+ {
+ printf("No arguments supplied...\n"
+ "assuming we're being run from MTR or similar.\n"
+ "decreasing loop counts to ridiculously small values...\n");
+ cnt_seconds = 10;
+ cnt_inner_loops = 3000;
+ cnt_threads = 4;
+ }
+ else
+ {
+ printf("Arguments supplied...\n");
+ for (int i = 1; i < argc; i++)
+ {
+ if (match(argv[i], "cnt_seconds=", &cnt_seconds))
+ continue;
+ else if (match(argv[i], "cnt_threads=", &cnt_threads))
+ continue;
+ else if (match(argv[i], "cnt_transporters=", &cnt_transporters))
+ continue;
+ else if (match(argv[i], "cnt_inner_loops=", &cnt_inner_loops))
+ continue;
+ else if (match(argv[i], "cnt_signals_before_consume=",
+ &cnt_signals_before_consume))
+ continue;
+ else if (match(argv[i], "cnt_signals_per_inner_loop",
+ &cnt_signals_per_inner_loop))
+ continue;
+ else
+ {
+ printf("ignoreing unknown argument: %s\n", argv[i]);
+ }
+ }
+ }
+
+ printf("%s"
+ " cnt_seconds=%u"
+ " cnt_threads=%u"
+ " cnt_transporters=%u"
+ " cnt_inner_loops=%u"
+ " cnt_signals_before_consume=%u"
+ " cnt_signals_per_inner_loop=%u\n",
+ argv[0],
+ cnt_seconds,
+ cnt_threads,
+ cnt_transporters,
+ cnt_inner_loops,
+ cnt_signals_before_consume,
+ cnt_signals_per_inner_loop);
+
+ Uint32 loop = 0;
+ Uint64 start = NdbTick_CurrentMillisecond() / 1000;
+ while (start + cnt_seconds > (NdbTick_CurrentMillisecond() / 1000))
+ {
+ printf("%u ", loop++); fflush(stdout);
+ if ((loop < 100 && (loop % 25) == 0) ||
+ (loop >= 100 && (loop % 20) == 0))
+ printf("\n");
+
+ for (unsigned t = 0; t < cnt_threads; t++)
+ {
+ rep.t[t].t.thread = NdbThread_Create(thread_main,
+ (void**)&rep.t[t].t,
+ 1024*1024,
+ "execute thread",
+ NDB_THREAD_PRIO_MEAN);
+ }
+
+ for (unsigned t = 0; t < cnt_threads; t++)
+ {
+ void * ret;
+ NdbThread_WaitFor(rep.t[t].t.thread, &ret);
+ }
+ }
+ printf("\n"); fflush(stdout);
+
+ ok(true, "ok");
+ return 0;
+}
+
+inline
+void
+Producer::produce(unsigned D)
+{
+ if (!pendingmask.get(D))
+ {
+ pendingmask.set(D);
+ pendinglist[pendingcount] = D;
+ pendingcount++;
+ }
+ val[D]++;
+}
+
+inline
+void
+Producer::consume()
+{
+ unsigned count = pendingcount;
+ pendingmask.clear();
+ pendingcount = 0;
+
+ for (unsigned i = 0; i < count; i++)
+ {
+ unsigned D = pendinglist[i];
+ rep.c[D].c.consume(D);
+ }
+}
+
+inline
+void
+Consumer::consume(unsigned D)
+{
+ /**
+ * This is the equivalent of do_send(must_send = 1)
+ */
+ m_force_send = 1;
+
+ do
+ {
+ if (trylock(&m_send_lock) != 0)
+ {
+ /* Other thread will send for us as we set m_force_send. */
+ return;
+ }
+
+ /**
+ * Now clear the flag, and start sending all data available to this node.
+ *
+ * Put a memory barrier here, so that if another thread tries to grab
+ * the send lock but fails due to us holding it here, we either
+ * 1) Will see m_force_send[nodeId] set to 1 at the end of the loop, or
+ * 2) We clear here the flag just set by the other thread, but then we
+ * will (thanks to mb()) be able to see and send all of the data already
+ * in the first send iteration.
+ */
+ m_force_send = 0;
+ mb();
+
+ /**
+ * This is the equivalent of link_thread_send_buffers
+ */
+ for (unsigned i = 0; i < cnt_threads; i++)
+ {
+ val[i] = rep.t[i].t.p.val[D];
+ }
+
+ /**
+ * Do a syscall...which could have affect on barriers...etc
+ */
+ if (DO_SYSCALL)
+ {
+ NdbTick_CurrentMillisecond();
+ }
+
+ unlock(&m_send_lock);
+
+#if BUGGY_VERSION
+#else
+ mb();
+#endif
+ }
+ while (m_force_send != 0);
+}
+
+
+void
+Test::wait_started()
+{
+ NdbMutex_Lock(mutex);
+ if (waiting_start + 1 == cnt_threads)
+ {
+ waiting_stop = 0;
+ }
+ waiting_start++;
+ assert(waiting_start <= cnt_threads);
+ while (waiting_start < cnt_threads)
+ NdbCondition_Wait(cond, mutex);
+
+ NdbCondition_Broadcast(cond);
+ NdbMutex_Unlock(mutex);
+}
+
+void
+Test::wait_completed()
+{
+ NdbMutex_Lock(mutex);
+ if (waiting_stop + 1 == cnt_threads)
+ {
+ rep.validate();
+ waiting_start = 0;
+ }
+ waiting_stop++;
+ assert(waiting_stop <= cnt_threads);
+ while (waiting_stop < cnt_threads)
+ NdbCondition_Wait(cond, mutex);
+
+ NdbCondition_Broadcast(cond);
+ NdbMutex_Unlock(mutex);
+}
+
+static
+void
+register_lock(const void * ptr, const char * name)
+{
+ return;
+}
+
+static
+mt_lock_stat *
+lookup_lock(const void * ptr)
+{
+ return 0;
+}
=== modified file 'storage/ndb/src/kernel/vm/mt.cpp'
--- a/storage/ndb/src/kernel/vm/mt.cpp 2012-02-20 09:03:01 +0000
+++ b/storage/ndb/src/kernel/vm/mt.cpp 2012-03-20 13:23:29 +0000
@@ -36,6 +36,7 @@
#include <portlib/ndb_prefetch.h>
#include "mt-asm.h"
+#include "mt-lock.hpp"
inline
SimulatedBlock*
@@ -96,16 +97,6 @@ static Uint32 first_receiver_thread_no =
/* max signal is 32 words, 7 for signal header and 25 datawords */
#define MIN_SIGNALS_PER_PAGE (thr_job_buffer::SIZE / 32)
-struct mt_lock_stat
-{
- const void * m_ptr;
- char * m_name;
- Uint32 m_contended_count;
- Uint32 m_spin_count;
-};
-static void register_lock(const void * ptr, const char * name);
-static mt_lock_stat * lookup_lock(const void * ptr);
-
#if defined(HAVE_LINUX_FUTEX) && defined(NDB_HAVE_XCNG)
#define USE_FUTEX
#endif
@@ -274,132 +265,6 @@ wakeup(struct thr_wait* wait)
#endif
-#ifdef NDB_HAVE_XCNG
-template <unsigned SZ>
-struct thr_spin_lock
-{
- thr_spin_lock(const char * name = 0)
- {
- m_lock = 0;
- register_lock(this, name);
- }
-
- union {
- volatile Uint32 m_lock;
- char pad[SZ];
- };
-};
-
-static
-ATTRIBUTE_NOINLINE
-void
-lock_slow(void * sl, volatile unsigned * val)
-{
- mt_lock_stat* s = lookup_lock(sl); // lookup before owning lock
-
-loop:
- Uint32 spins = 0;
- do {
- spins++;
- cpu_pause();
- } while (* val == 1);
-
- if (unlikely(xcng(val, 1) != 0))
- goto loop;
-
- if (s)
- {
- s->m_spin_count += spins;
- Uint32 count = ++s->m_contended_count;
- Uint32 freq = (count > 10000 ? 5000 : (count > 20 ? 200 : 1));
-
- if ((count % freq) == 0)
- printf("%s waiting for lock, contentions: %u spins: %u\n",
- s->m_name, count, s->m_spin_count);
- }
-}
-
-template <unsigned SZ>
-static
-inline
-void
-lock(struct thr_spin_lock<SZ>* sl)
-{
- volatile unsigned* val = &sl->m_lock;
- if (likely(xcng(val, 1) == 0))
- return;
-
- lock_slow(sl, val);
-}
-
-template <unsigned SZ>
-static
-inline
-void
-unlock(struct thr_spin_lock<SZ>* sl)
-{
- /**
- * Memory barrier here, to make sure all of our stores are visible before
- * the lock release is.
- */
- mb();
- sl->m_lock = 0;
-}
-
-template <unsigned SZ>
-static
-inline
-int
-trylock(struct thr_spin_lock<SZ>* sl)
-{
- volatile unsigned* val = &sl->m_lock;
- return xcng(val, 1);
-}
-#else
-#define thr_spin_lock thr_mutex
-#endif
-
-template <unsigned SZ>
-struct thr_mutex
-{
- thr_mutex(const char * name = 0) {
- NdbMutex_Init(&m_mutex);
- register_lock(this, name);
- }
-
- union {
- NdbMutex m_mutex;
- char pad[SZ];
- };
-};
-
-template <unsigned SZ>
-static
-inline
-void
-lock(struct thr_mutex<SZ>* sl)
-{
- NdbMutex_Lock(&sl->m_mutex);
-}
-
-template <unsigned SZ>
-static
-inline
-void
-unlock(struct thr_mutex<SZ>* sl)
-{
- NdbMutex_Unlock(&sl->m_mutex);
-}
-
-template <unsigned SZ>
-static
-inline
-int
-trylock(struct thr_mutex<SZ> * sl)
-{
- return NdbMutex_Trylock(&sl->m_mutex);
-}
-
/**
* thr_safe_pool
*/
No bundle (reason: useless for push emails).
| Thread |
|---|
| • bzr push into mysql-5.1-telco-7.0 branch (jonas.oreland:4892 to 4894) | Jonas Oreland | 20 Mar |