From: Jonas Oreland Date: March 20 2012 4:05pm Subject: bzr push into mysql-5.1-telco-7.0 branch (jonas.oreland:4892 to 4894) List-Archive: http://lists.mysql.com/commits/143253 Message-Id: <20120320160526.91DF555C8EA@perch.localdomain> MIME-Version: 1.0 Content-Type: text/plain; charset="us-ascii" Content-Transfer-Encoding: 7bit 4894 Jonas Oreland 2012-03-20 ndb - add unit test program of mt.cpp's m_force_send algorithm added: storage/ndb/src/kernel/vm/mt-send-t.cpp modified: storage/ndb/src/kernel/vm/Makefile.am 4893 Jonas Oreland 2012-03-20 ndb - move spinlocks into own .hpp-file added: storage/ndb/src/kernel/vm/mt-lock.hpp modified: storage/ndb/src/kernel/vm/mt.cpp 4892 Martin Skold 2012-03-19 Bug#13830980 MYSQL COMPLAINS OF NOT SUPPORTING ALTER ONLINE EVEN WHEN VALID OPTIONS ARE USED: Added support of adding columns with a default value defined and moved check that it currently has to be null to ndbapi modified: mysql-test/suite/ndb/r/ndb_alter_table_online.result mysql-test/suite/ndb/t/ndb_alter_table_online.test sql/ha_ndbcluster.cc storage/ndb/src/ndbapi/NdbDictionaryImpl.cpp === modified file 'storage/ndb/src/kernel/vm/Makefile.am' --- a/storage/ndb/src/kernel/vm/Makefile.am 2012-01-23 08:20:12 +0000 +++ b/storage/ndb/src/kernel/vm/Makefile.am 2012-03-20 13:50:15 +0000 @@ -144,3 +144,12 @@ mt_thr_config_t_LDADD = \ $(top_builddir)/mysys/libmysyslt.la noinst_PROGRAMS = mt_thr_config-t CountingPool-t DynArr256-t + +mt_send_t_CXXFLAGS = -DTAP_TEST +mt_send_t_SOURCES = mt-send-t.cpp +mt_send_t_LDFLAGS = @ndb_bin_am_ldflags@ +mt_send_t_LDADD = $(test_ldadd) + +if BUILD_NDBMTD +noinst_PROGRAMS += mt-send-t +endif === added file 'storage/ndb/src/kernel/vm/mt-lock.hpp' --- a/storage/ndb/src/kernel/vm/mt-lock.hpp 1970-01-01 00:00:00 +0000 +++ b/storage/ndb/src/kernel/vm/mt-lock.hpp 2012-03-20 13:23:29 +0000 @@ -0,0 +1,160 @@ +/* Copyright (c) 2012, Oracle and/or its affiliates. All rights reserved. + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; version 2 of the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ + +#ifndef MT_LOCK_HPP +#define MT_LOCK_HPP + +#include +#include "mt-asm.h" +#include + +struct mt_lock_stat +{ + const void * m_ptr; + char * m_name; + Uint32 m_contended_count; + Uint32 m_spin_count; +}; + +static void register_lock(const void * ptr, const char * name); +static mt_lock_stat * lookup_lock(const void * ptr); + +#ifdef NDB_HAVE_XCNG +template +struct thr_spin_lock +{ + thr_spin_lock(const char * name = 0) + { + m_lock = 0; + register_lock(this, name); + } + + union { + volatile Uint32 m_lock; + char pad[SZ]; + }; +}; + +static +ATTRIBUTE_NOINLINE +void +lock_slow(void * sl, volatile unsigned * val) +{ + mt_lock_stat* s = lookup_lock(sl); // lookup before owning lock + +loop: + Uint32 spins = 0; + do { + spins++; + cpu_pause(); + } while (* val == 1); + + if (unlikely(xcng(val, 1) != 0)) + goto loop; + + if (s) + { + s->m_spin_count += spins; + Uint32 count = ++s->m_contended_count; + Uint32 freq = (count > 10000 ? 5000 : (count > 20 ? 200 : 1)); + + if ((count % freq) == 0) + printf("%s waiting for lock, contentions: %u spins: %u\n", + s->m_name, count, s->m_spin_count); + } +} + +template +static +inline +void +lock(struct thr_spin_lock* sl) +{ + volatile unsigned* val = &sl->m_lock; + if (likely(xcng(val, 1) == 0)) + return; + + lock_slow(sl, val); +} + +template +static +inline +void +unlock(struct thr_spin_lock* sl) +{ + /** + * Memory barrier here, to make sure all of our stores are visible before + * the lock release is. + */ + mb(); + sl->m_lock = 0; +} + +template +static +inline +int +trylock(struct thr_spin_lock* sl) +{ + volatile unsigned* val = &sl->m_lock; + return xcng(val, 1); +} +#else +#define thr_spin_lock thr_mutex +#endif + +template +struct thr_mutex +{ + thr_mutex(const char * name = 0) { + NdbMutex_Init(&m_mutex); + register_lock(this, name); + } + + union { + NdbMutex m_mutex; + char pad[SZ]; + }; +}; + +template +static +inline +void +lock(struct thr_mutex* sl) +{ + NdbMutex_Lock(&sl->m_mutex); +} + +template +static +inline +void +unlock(struct thr_mutex* sl) +{ + NdbMutex_Unlock(&sl->m_mutex); +} + +template +static +inline +int +trylock(struct thr_mutex * sl) +{ + return NdbMutex_Trylock(&sl->m_mutex); +} + +#endif === added file 'storage/ndb/src/kernel/vm/mt-send-t.cpp' --- a/storage/ndb/src/kernel/vm/mt-send-t.cpp 1970-01-01 00:00:00 +0000 +++ b/storage/ndb/src/kernel/vm/mt-send-t.cpp 2012-03-20 13:50:15 +0000 @@ -0,0 +1,489 @@ +#include "mt-asm.h" +#include "mt-lock.hpp" +#include +#include +#include +#include +#include +#include + +#define BUGGY_VERSION 0 + +/** + * DO_SYSCALL inside critical section + * (the equivalent of writev(socket) + */ +#define DO_SYSCALL 1 + +/** + * This is a unit test of the send code for mt.cpp + * specifically the code that manages which thread will send + * (write gathering) + * + * Each thread is a producer of Signals + * Each signal has a destination remote node (transporter) + * Each thread will after having produced a set of signals + * check if it should send them on socket. + * If it decides that it should, it consumes all the signals + * produced by all threads. + * + * In this unit test, we don't send signals, but the producing part + * will only be to increment a counter. + * + ****************************************************************** + * + * To use this program seriously... + * + * you should set BUGGY_VERSION to 1 + * and experiment with values on cnt_* + * until you find a variant which crashes (abort) + * + * The values compiled-in makes it crash on a single socket + * Intel(R) Core(TM) i5-2400 CPU @ 3.10GHz release compiled! + * (i never managed to get it debug compiled) + */ +#define MAX_THREADS 256 +#define MAX_TRANSPORTERS 256 + +/** + * global variables + */ +static unsigned cnt_threads = 64; +static unsigned cnt_transporters = 8; + +/** + * outer loops + * start/stop threads + */ +static unsigned cnt_seconds = 180; + +/** + * no of signals produced before calling consume + */ +static unsigned cnt_signals_before_consume = 4; + +/** + * no of signals produced in one inner loop + */ +static unsigned cnt_signals_per_inner_loop = 4; + +/** + * no inner loops per outer loop + * + * after each inner loop + * threads will be stalled and result verified + */ +static unsigned cnt_inner_loops = 5000; + +typedef Bitmask<(MAX_TRANSPORTERS+31)/32> TransporterMask; + +struct Producer +{ + Producer() { + bzero(val, sizeof(val)); + pendingcount = 0; + } + + void init() {} + + /** + * values produced... + */ + unsigned val[MAX_TRANSPORTERS]; + + /** + * mask/array to keep track of which transporter we have produce values to + */ + TransporterMask pendingmask; + unsigned pendingcount; + unsigned char pendinglist[MAX_TRANSPORTERS]; + + /** + * produce a value + * + * This is the equivalent of mt_send_remote() + */ + void produce(unsigned D); + + /** + * consume values (from all threads) + * for transporters that we have produced a value to + * + * This is the equivalent to do_send + */ + void consume(); +}; + +struct Thread +{ + Thread() { thread= 0; } + + void init() { p.init(); } + + NdbThread * thread; + Producer p; +}; + +/** + * This is the consumer of values for *one* transporter + */ +struct Consumer +{ + Consumer() { + m_force_send = 0; bzero(val, sizeof(val)); + } + + void init() {} + + struct thr_spin_lock<8> m_send_lock; + unsigned m_force_send; + unsigned val[MAX_THREADS]; + + /** + * consume values from all threads to this transporter + */ + void consume(unsigned D); +}; + +struct Consumer_pad +{ + Consumer c; + char pad[NDB_CL_PADSZ(sizeof(Consumer))]; +}; + +struct Thread_pad +{ + Thread t; + char pad[NDB_CL_PADSZ(sizeof(Thread))]; +}; + +/** + * Thread repository + * and an instance of it + */ +static +struct Rep +{ + Thread_pad t[MAX_THREADS]; + Consumer_pad c[MAX_TRANSPORTERS]; + + /** + * This menthod is called when all threads are stalled + * so it's safe to read values without locks + */ + void validate() { + for (unsigned ic = 0; ic < cnt_transporters; ic++) + { + for (unsigned it = 0; it < cnt_threads; it++) + { + if (c[ic].c.val[it] != t[it].t.p.val[ic]) + { + printf("Detected bug!!!\n"); + printf("ic: %u it: %u c[ic].c.val[it]: %u t[it].t.p.val[ic]: %u\n", + ic, it, c[ic].c.val[it], t[it].t.p.val[ic]); + abort(); + } + } + } + } + + void init() { + for (unsigned i = 0; i < cnt_threads; i++) + t[i].t.init(); + + for (unsigned i = 0; i < cnt_transporters; i++) + c[i].c.init(); + } +} rep; + +static +struct Test +{ + Test() { + waiting_start = 0; + waiting_stop = 0; + mutex = 0; + cond = 0; + } + + void init() { + mutex = NdbMutex_Create(); + cond = NdbCondition_Create(); + } + + unsigned waiting_start; + unsigned waiting_stop; + NdbMutex* mutex; + NdbCondition* cond; + + void wait_started(); + void wait_completed(); +} test; + +void* +thread_main(void * _t) +{ + unsigned seed = + (unsigned)NdbTick_CurrentNanosecond() + + (unsigned)(unsigned long long)_t; + + Thread * self = (Thread*) _t; + for (unsigned i = 0; i < cnt_inner_loops; i++) + { + test.wait_started(); + for (unsigned j = 0; j < cnt_signals_per_inner_loop;) + { + for (unsigned k = 0; k < cnt_signals_before_consume; k++) + { + /** + * Produce a signal to destination D + */ + unsigned D = rand_r(&seed) % cnt_transporters; + self->p.produce(D); + } + + j += cnt_signals_before_consume; + + /** + * This is the equivalent of do_send() + */ + self->p.consume(); + } + test.wait_completed(); + } + return 0; +} + +static +bool +match(const char * arg, const char * val, unsigned * valptr) +{ + if (strncmp(arg, val, strlen(val)) == 0) + { + * valptr = atoi(arg + strlen(val)); + return true; + } + return false; +} + +int +main(int argc, char ** argv) +{ + plan(1); + ndb_init(); + test.init(); + rep.init(); + + if (argc == 1) + { + printf("No arguments supplied...\n" + "assuming we're being run from MTR or similar.\n" + "decreasing loop counts to ridiculously small values...\n"); + cnt_seconds = 10; + cnt_inner_loops = 3000; + cnt_threads = 4; + } + else + { + printf("Arguments supplied...\n"); + for (int i = 1; i < argc; i++) + { + if (match(argv[i], "cnt_seconds=", &cnt_seconds)) + continue; + else if (match(argv[i], "cnt_threads=", &cnt_threads)) + continue; + else if (match(argv[i], "cnt_transporters=", &cnt_transporters)) + continue; + else if (match(argv[i], "cnt_inner_loops=", &cnt_inner_loops)) + continue; + else if (match(argv[i], "cnt_signals_before_consume=", + &cnt_signals_before_consume)) + continue; + else if (match(argv[i], "cnt_signals_per_inner_loop", + &cnt_signals_per_inner_loop)) + continue; + else + { + printf("ignoreing unknown argument: %s\n", argv[i]); + } + } + } + + printf("%s" + " cnt_seconds=%u" + " cnt_threads=%u" + " cnt_transporters=%u" + " cnt_inner_loops=%u" + " cnt_signals_before_consume=%u" + " cnt_signals_per_inner_loop=%u\n", + argv[0], + cnt_seconds, + cnt_threads, + cnt_transporters, + cnt_inner_loops, + cnt_signals_before_consume, + cnt_signals_per_inner_loop); + + Uint32 loop = 0; + Uint64 start = NdbTick_CurrentMillisecond() / 1000; + while (start + cnt_seconds > (NdbTick_CurrentMillisecond() / 1000)) + { + printf("%u ", loop++); fflush(stdout); + if ((loop < 100 && (loop % 25) == 0) || + (loop >= 100 && (loop % 20) == 0)) + printf("\n"); + + for (unsigned t = 0; t < cnt_threads; t++) + { + rep.t[t].t.thread = NdbThread_Create(thread_main, + (void**)&rep.t[t].t, + 1024*1024, + "execute thread", + NDB_THREAD_PRIO_MEAN); + } + + for (unsigned t = 0; t < cnt_threads; t++) + { + void * ret; + NdbThread_WaitFor(rep.t[t].t.thread, &ret); + } + } + printf("\n"); fflush(stdout); + + ok(true, "ok"); + return 0; +} + +inline +void +Producer::produce(unsigned D) +{ + if (!pendingmask.get(D)) + { + pendingmask.set(D); + pendinglist[pendingcount] = D; + pendingcount++; + } + val[D]++; +} + +inline +void +Producer::consume() +{ + unsigned count = pendingcount; + pendingmask.clear(); + pendingcount = 0; + + for (unsigned i = 0; i < count; i++) + { + unsigned D = pendinglist[i]; + rep.c[D].c.consume(D); + } +} + +inline +void +Consumer::consume(unsigned D) +{ + /** + * This is the equivalent of do_send(must_send = 1) + */ + m_force_send = 1; + + do + { + if (trylock(&m_send_lock) != 0) + { + /* Other thread will send for us as we set m_force_send. */ + return; + } + + /** + * Now clear the flag, and start sending all data available to this node. + * + * Put a memory barrier here, so that if another thread tries to grab + * the send lock but fails due to us holding it here, we either + * 1) Will see m_force_send[nodeId] set to 1 at the end of the loop, or + * 2) We clear here the flag just set by the other thread, but then we + * will (thanks to mb()) be able to see and send all of the data already + * in the first send iteration. + */ + m_force_send = 0; + mb(); + + /** + * This is the equivalent of link_thread_send_buffers + */ + for (unsigned i = 0; i < cnt_threads; i++) + { + val[i] = rep.t[i].t.p.val[D]; + } + + /** + * Do a syscall...which could have affect on barriers...etc + */ + if (DO_SYSCALL) + { + NdbTick_CurrentMillisecond(); + } + + unlock(&m_send_lock); + +#if BUGGY_VERSION +#else + mb(); +#endif + } + while (m_force_send != 0); +} + + +void +Test::wait_started() +{ + NdbMutex_Lock(mutex); + if (waiting_start + 1 == cnt_threads) + { + waiting_stop = 0; + } + waiting_start++; + assert(waiting_start <= cnt_threads); + while (waiting_start < cnt_threads) + NdbCondition_Wait(cond, mutex); + + NdbCondition_Broadcast(cond); + NdbMutex_Unlock(mutex); +} + +void +Test::wait_completed() +{ + NdbMutex_Lock(mutex); + if (waiting_stop + 1 == cnt_threads) + { + rep.validate(); + waiting_start = 0; + } + waiting_stop++; + assert(waiting_stop <= cnt_threads); + while (waiting_stop < cnt_threads) + NdbCondition_Wait(cond, mutex); + + NdbCondition_Broadcast(cond); + NdbMutex_Unlock(mutex); +} + +static +void +register_lock(const void * ptr, const char * name) +{ + return; +} + +static +mt_lock_stat * +lookup_lock(const void * ptr) +{ + return 0; +} === modified file 'storage/ndb/src/kernel/vm/mt.cpp' --- a/storage/ndb/src/kernel/vm/mt.cpp 2012-02-20 09:03:01 +0000 +++ b/storage/ndb/src/kernel/vm/mt.cpp 2012-03-20 13:23:29 +0000 @@ -36,6 +36,7 @@ #include #include "mt-asm.h" +#include "mt-lock.hpp" inline SimulatedBlock* @@ -96,16 +97,6 @@ static Uint32 first_receiver_thread_no = /* max signal is 32 words, 7 for signal header and 25 datawords */ #define MIN_SIGNALS_PER_PAGE (thr_job_buffer::SIZE / 32) -struct mt_lock_stat -{ - const void * m_ptr; - char * m_name; - Uint32 m_contended_count; - Uint32 m_spin_count; -}; -static void register_lock(const void * ptr, const char * name); -static mt_lock_stat * lookup_lock(const void * ptr); - #if defined(HAVE_LINUX_FUTEX) && defined(NDB_HAVE_XCNG) #define USE_FUTEX #endif @@ -274,132 +265,6 @@ wakeup(struct thr_wait* wait) #endif -#ifdef NDB_HAVE_XCNG -template -struct thr_spin_lock -{ - thr_spin_lock(const char * name = 0) - { - m_lock = 0; - register_lock(this, name); - } - - union { - volatile Uint32 m_lock; - char pad[SZ]; - }; -}; - -static -ATTRIBUTE_NOINLINE -void -lock_slow(void * sl, volatile unsigned * val) -{ - mt_lock_stat* s = lookup_lock(sl); // lookup before owning lock - -loop: - Uint32 spins = 0; - do { - spins++; - cpu_pause(); - } while (* val == 1); - - if (unlikely(xcng(val, 1) != 0)) - goto loop; - - if (s) - { - s->m_spin_count += spins; - Uint32 count = ++s->m_contended_count; - Uint32 freq = (count > 10000 ? 5000 : (count > 20 ? 200 : 1)); - - if ((count % freq) == 0) - printf("%s waiting for lock, contentions: %u spins: %u\n", - s->m_name, count, s->m_spin_count); - } -} - -template -static -inline -void -lock(struct thr_spin_lock* sl) -{ - volatile unsigned* val = &sl->m_lock; - if (likely(xcng(val, 1) == 0)) - return; - - lock_slow(sl, val); -} - -template -static -inline -void -unlock(struct thr_spin_lock* sl) -{ - /** - * Memory barrier here, to make sure all of our stores are visible before - * the lock release is. - */ - mb(); - sl->m_lock = 0; -} - -template -static -inline -int -trylock(struct thr_spin_lock* sl) -{ - volatile unsigned* val = &sl->m_lock; - return xcng(val, 1); -} -#else -#define thr_spin_lock thr_mutex -#endif - -template -struct thr_mutex -{ - thr_mutex(const char * name = 0) { - NdbMutex_Init(&m_mutex); - register_lock(this, name); - } - - union { - NdbMutex m_mutex; - char pad[SZ]; - }; -}; - -template -static -inline -void -lock(struct thr_mutex* sl) -{ - NdbMutex_Lock(&sl->m_mutex); -} - -template -static -inline -void -unlock(struct thr_mutex* sl) -{ - NdbMutex_Unlock(&sl->m_mutex); -} - -template -static -inline -int -trylock(struct thr_mutex * sl) -{ - return NdbMutex_Trylock(&sl->m_mutex); -} - /** * thr_safe_pool */ No bundle (reason: useless for push emails).