From: Jonas Oreland Date: March 21 2012 9:04am Subject: bzr push into mysql-5.1-telco-7.0 branch (jonas.oreland:4894 to 4895) List-Archive: http://lists.mysql.com/commits/143255 Message-Id: <20120321090426.66D9455C8EA@perch.localdomain> MIME-Version: 1.0 Content-Type: text/plain; charset="us-ascii" Content-Transfer-Encoding: 7bit 4895 Jonas Oreland 2012-03-21 ndb - add forceSend to mt-send-t modified: storage/ndb/src/kernel/vm/mt-send-t.cpp 4894 Jonas Oreland 2012-03-20 ndb - add unit test program of mt.cpp's m_force_send algorithm added: storage/ndb/src/kernel/vm/mt-send-t.cpp modified: storage/ndb/src/kernel/vm/Makefile.am === modified file 'storage/ndb/src/kernel/vm/mt-send-t.cpp' --- a/storage/ndb/src/kernel/vm/mt-send-t.cpp 2012-03-20 13:50:15 +0000 +++ b/storage/ndb/src/kernel/vm/mt-send-t.cpp 2012-03-21 08:59:28 +0000 @@ -75,6 +75,11 @@ static unsigned cnt_signals_per_inner_lo */ static unsigned cnt_inner_loops = 5000; +/** + * pct of do_send that are using forceSend() + */ +static unsigned pct_force = 15; + typedef Bitmask<(MAX_TRANSPORTERS+31)/32> TransporterMask; struct Producer @@ -109,9 +114,10 @@ struct Producer * consume values (from all threads) * for transporters that we have produced a value to * - * This is the equivalent to do_send + * This is the equivalent to do_send and if force=true + * this is the equivalent of forceSend() */ - void consume(); + void consume(bool force); }; struct Thread @@ -143,6 +149,11 @@ struct Consumer * consume values from all threads to this transporter */ void consume(unsigned D); + + /** + * force_consume + */ + void forceConsume(unsigned D); }; struct Consumer_pad @@ -247,7 +258,8 @@ thread_main(void * _t) /** * This is the equivalent of do_send() */ - self->p.consume(); + int force = (rand_r(&seed) % 100) < pct_force; + self->p.consume(force); } test.wait_completed(); } @@ -299,9 +311,12 @@ main(int argc, char ** argv) else if (match(argv[i], "cnt_signals_before_consume=", &cnt_signals_before_consume)) continue; - else if (match(argv[i], "cnt_signals_per_inner_loop", + else if (match(argv[i], "cnt_signals_per_inner_loop=", &cnt_signals_per_inner_loop)) continue; + else if (match(argv[i], "pct_force=", + &pct_force)) + continue; else { printf("ignoreing unknown argument: %s\n", argv[i]); @@ -315,14 +330,17 @@ main(int argc, char ** argv) " cnt_transporters=%u" " cnt_inner_loops=%u" " cnt_signals_before_consume=%u" - " cnt_signals_per_inner_loop=%u\n", + " cnt_signals_per_inner_loop=%u" + " pct_force=%u" + "\n", argv[0], cnt_seconds, cnt_threads, cnt_transporters, cnt_inner_loops, cnt_signals_before_consume, - cnt_signals_per_inner_loop); + cnt_signals_per_inner_loop, + pct_force); Uint32 loop = 0; Uint64 start = NdbTick_CurrentMillisecond() / 1000; @@ -369,7 +387,7 @@ Producer::produce(unsigned D) inline void -Producer::consume() +Producer::consume(bool force) { unsigned count = pendingcount; pendingmask.clear(); @@ -378,7 +396,10 @@ Producer::consume() for (unsigned i = 0; i < count; i++) { unsigned D = pendinglist[i]; - rep.c[D].c.consume(D); + if (force) + rep.c[D].c.forceConsume(D); + else + rep.c[D].c.consume(D); } } @@ -438,6 +459,49 @@ Consumer::consume(unsigned D) while (m_force_send != 0); } +inline +void +Consumer::forceConsume(unsigned D) +{ + /** + * This is the equivalent of forceSend() + */ + + do + { + /** + * NOTE: since we unconditionally lock m_send_lock + * we don't need a mb() after the clearing of m_force_send here. + */ + m_force_send = 0; + + lock(&m_send_lock); + + /** + * This is the equivalent of link_thread_send_buffers + */ + for (unsigned i = 0; i < cnt_threads; i++) + { + val[i] = rep.t[i].t.p.val[D]; + } + + /** + * Do a syscall...which could have affect on barriers...etc + */ + if (DO_SYSCALL) + { + NdbTick_CurrentMillisecond(); + } + + unlock(&m_send_lock); + +#if BUGGY_VERSION +#else + mb(); +#endif + } + while (m_force_send != 0); +} void Test::wait_started() No bundle (reason: useless for push emails).