4895 Jonas Oreland 2012-03-21
ndb - add forceSend to mt-send-t
modified:
storage/ndb/src/kernel/vm/mt-send-t.cpp
4894 Jonas Oreland 2012-03-20
ndb - add unit test program of mt.cpp's m_force_send algorithm
added:
storage/ndb/src/kernel/vm/mt-send-t.cpp
modified:
storage/ndb/src/kernel/vm/Makefile.am
=== modified file 'storage/ndb/src/kernel/vm/mt-send-t.cpp'
--- a/storage/ndb/src/kernel/vm/mt-send-t.cpp 2012-03-20 13:50:15 +0000
+++ b/storage/ndb/src/kernel/vm/mt-send-t.cpp 2012-03-21 08:59:28 +0000
@@ -75,6 +75,11 @@ static unsigned cnt_signals_per_inner_lo
*/
static unsigned cnt_inner_loops = 5000;
+/**
+ * pct of do_send that are using forceSend()
+ */
+static unsigned pct_force = 15;
+
typedef Bitmask<(MAX_TRANSPORTERS+31)/32> TransporterMask;
struct Producer
@@ -109,9 +114,10 @@ struct Producer
* consume values (from all threads)
* for transporters that we have produced a value to
*
- * This is the equivalent to do_send
+ * This is the equivalent to do_send and if force=true
+ * this is the equivalent of forceSend()
*/
- void consume();
+ void consume(bool force);
};
struct Thread
@@ -143,6 +149,11 @@ struct Consumer
* consume values from all threads to this transporter
*/
void consume(unsigned D);
+
+ /**
+ * force_consume
+ */
+ void forceConsume(unsigned D);
};
struct Consumer_pad
@@ -247,7 +258,8 @@ thread_main(void * _t)
/**
* This is the equivalent of do_send()
*/
- self->p.consume();
+ int force = (rand_r(&seed) % 100) < pct_force;
+ self->p.consume(force);
}
test.wait_completed();
}
@@ -299,9 +311,12 @@ main(int argc, char ** argv)
else if (match(argv[i], "cnt_signals_before_consume=",
&cnt_signals_before_consume))
continue;
- else if (match(argv[i], "cnt_signals_per_inner_loop",
+ else if (match(argv[i], "cnt_signals_per_inner_loop=",
&cnt_signals_per_inner_loop))
continue;
+ else if (match(argv[i], "pct_force=",
+ &pct_force))
+ continue;
else
{
printf("ignoreing unknown argument: %s\n", argv[i]);
@@ -315,14 +330,17 @@ main(int argc, char ** argv)
" cnt_transporters=%u"
" cnt_inner_loops=%u"
" cnt_signals_before_consume=%u"
- " cnt_signals_per_inner_loop=%u\n",
+ " cnt_signals_per_inner_loop=%u"
+ " pct_force=%u"
+ "\n",
argv[0],
cnt_seconds,
cnt_threads,
cnt_transporters,
cnt_inner_loops,
cnt_signals_before_consume,
- cnt_signals_per_inner_loop);
+ cnt_signals_per_inner_loop,
+ pct_force);
Uint32 loop = 0;
Uint64 start = NdbTick_CurrentMillisecond() / 1000;
@@ -369,7 +387,7 @@ Producer::produce(unsigned D)
inline
void
-Producer::consume()
+Producer::consume(bool force)
{
unsigned count = pendingcount;
pendingmask.clear();
@@ -378,7 +396,10 @@ Producer::consume()
for (unsigned i = 0; i < count; i++)
{
unsigned D = pendinglist[i];
- rep.c[D].c.consume(D);
+ if (force)
+ rep.c[D].c.forceConsume(D);
+ else
+ rep.c[D].c.consume(D);
}
}
@@ -438,6 +459,49 @@ Consumer::consume(unsigned D)
while (m_force_send != 0);
}
+inline
+void
+Consumer::forceConsume(unsigned D)
+{
+ /**
+ * This is the equivalent of forceSend()
+ */
+
+ do
+ {
+ /**
+ * NOTE: since we unconditionally lock m_send_lock
+ * we don't need a mb() after the clearing of m_force_send here.
+ */
+ m_force_send = 0;
+
+ lock(&m_send_lock);
+
+ /**
+ * This is the equivalent of link_thread_send_buffers
+ */
+ for (unsigned i = 0; i < cnt_threads; i++)
+ {
+ val[i] = rep.t[i].t.p.val[D];
+ }
+
+ /**
+ * Do a syscall...which could have affect on barriers...etc
+ */
+ if (DO_SYSCALL)
+ {
+ NdbTick_CurrentMillisecond();
+ }
+
+ unlock(&m_send_lock);
+
+#if BUGGY_VERSION
+#else
+ mb();
+#endif
+ }
+ while (m_force_send != 0);
+}
void
Test::wait_started()
No bundle (reason: useless for push emails).
| Thread |
|---|
| • bzr push into mysql-5.1-telco-7.0 branch (jonas.oreland:4894 to 4895) | Jonas Oreland | 21 Mar |