4113 Ole John Aske 2012-11-23
Fix failing ndb_bushy_joins.test under Valgrind
Extends the TimeBetweenEpoch to avoid failure due to
exceeding 'MaxBufferedEpocs > 100).
modified:
mysql-test/suite/ndb/t/ndb_bushy_joins.cnf
4112 Ole John Aske 2012-11-22
Fix for bug#15908684: Job threads may 'sleep' without pending messages being sent
This fix correct the calculation of the 'pending' argument to
update_sched_config() such that both previously failed 'pending_send',
and currently available unsent 'send_sum' together forms the
number of 'pending' signals.
modified:
storage/ndb/src/kernel/vm/mt.cpp
4111 Ole John Aske 2012-11-22
Fix for Bug#15907515 RECEIVER THREAD COULD BLOCK/BUSY WAIT WHILE HOLDING RECEIVER MUTEX
Note: This fix require the fix for bug 15907122 as 'baseline'.
This fix removes the waiting for more job buffers to become available
inside performReceive() (or actually: mt_checkDoJob() called from it).
Instead performReceive() will now return with a 'full' status
to the receive thread, which will unlock the receive mutex,
and start a conditional wait for more job buffers to become available.
Furthermore this fix also removes the check_job_buffer() *before*
performReceive() such that we will now doReceive() of any pending
TCP data into our local receiveBuffers even if the job buffers are full.
modified:
storage/ndb/include/transporter/TransporterRegistry.hpp
storage/ndb/src/common/transporter/TransporterRegistry.cpp
storage/ndb/src/kernel/vm/mt.cpp
4110 Ole John Aske 2012-11-22
Fix for Bug#15907122 INCORRECT HANDLING OF JOB-BUFFERS ALMOST FULL - 'SLEEPLOOP 10'
This fix introduce a 'thr_wait*' (Sort of a condition/mutex)
which is used to wait for job buffers to become available when
we are in an 'job buffers almost full' situation. This 'thr_wait'
will be signaled when a job buffer has been consumed & freed.
This fix is a collection of the following 5 sub patches:
******
Fix mixed up comment & naming wrt. whether 'm_recv_transporters' and
'm_has_data_transporters' is a bitmask of NodeId or Tranporters.
(It *is* NodeId bitmasks)
******
Add dump of job buffer utilization before we
crash due to 'job buffer full', and to 'sleeploop 10'
reports where we have to wait due to almost full job buffers.
******
Change the signature of the yield() function such that it can take a more
general '*check_callback()' function as arguments.
Change the existing functions currently used as 'callbacks'
to the new (relaxed) signature.
******
Refactor: Splitt out compute_free_buffers_in_queue()
from compute_max_signals_to_execute().
New function contains common code intended for reuse in later patches.
******
update_sched_config() will 'yield' the CPU and wait for more job buffers
to become available when it is about to run out of job buffers.
The yield() call will wait on a 'thr_wait' object, which may be sent
a 'wakeup()' when the waiting condition has been resolved (by another thread)
However, update_schec_config() used the 'thr_wait' object intended to be used
to wait for more *incomming signal* - What we actually have to wait for
are signals to be *consumed* by the destination thread. Luckily there is
also defined a 'max wait' of 1ms which currently will wakeup the thread .
This patch introduce a own 'thr_wait' object which which is signaled by the
consumer, and we now wait on this object.
Furthermore the patch also avoid a situation where a thread could end up
waiting for itself.
modified:
storage/ndb/include/transporter/TransporterRegistry.hpp
storage/ndb/src/common/transporter/TransporterRegistry.cpp
storage/ndb/src/kernel/vm/mt.cpp
4109 Mauritz Sundell 2012-11-22 [merge]
merge 7.1 -> 7.2
=== modified file 'mysql-test/suite/ndb/t/ndb_bushy_joins.cnf'
--- a/mysql-test/suite/ndb/t/ndb_bushy_joins.cnf 2012-11-13 10:27:06 +0000
+++ b/mysql-test/suite/ndb/t/ndb_bushy_joins.cnf 2012-11-23 11:34:35 +0000
@@ -15,6 +15,7 @@ ThreadConfig=ldm={count=4}
MaxNoOfConcurrentOperations=250000
LongMessageBuffer=64M
TransactionDeadlockDetectionTimeout=30000
+TimeBetweenEpochs=1000
[ENV]
# Need to always use ndbmtd when we want lots of partitions
=== modified file 'storage/ndb/include/transporter/TransporterRegistry.hpp'
--- a/storage/ndb/include/transporter/TransporterRegistry.hpp 2012-09-29 00:02:40 +0000
+++ b/storage/ndb/include/transporter/TransporterRegistry.hpp 2012-11-22 12:18:17 +0000
@@ -108,17 +108,27 @@ struct TransporterReceiveData
NodeBitmask m_transporters;
/**
- * Bitmask of transporters having data awaiting to be received
+ * Bitmask of nodes having data awaiting to be received
* from its transporter.
*/
NodeBitmask m_recv_transporters;
/**
- * Bitmask of transporters that has already received data buffered
+ * Bitmask of nodes that has already received data buffered
* inside its transporter. Possibly "carried over" from last
* performReceive
*/
NodeBitmask m_has_data_transporters;
+
+ /**
+ * Subset of m_has_data_transporters which we completed handling
+ * of in previous ::performReceive before we was interrupted due
+ * to lack of job buffers. Will skip these when we later retry
+ * ::performReceive in order to avoid startvation of non-handled
+ * transporters.
+ */
+ NodeBitmask m_handled_transporters;
+
#if defined(HAVE_EPOLL_CREATE)
int m_epoll_fd;
struct epoll_event *m_epoll_events;
@@ -579,7 +589,7 @@ public:
* Receiving
*/
Uint32 pollReceive(Uint32 timeOutMillis, TransporterReceiveHandle& mask);
- void performReceive(TransporterReceiveHandle&);
+ Uint32 performReceive(TransporterReceiveHandle&);
void update_connections(TransporterReceiveHandle&);
inline Uint32 pollReceive(Uint32 timeOutMillis) {
@@ -587,9 +597,9 @@ public:
return pollReceive(timeOutMillis, * receiveHandle);
}
- inline void performReceive() {
+ inline Uint32 performReceive() {
assert(receiveHandle != 0);
- performReceive(* receiveHandle);
+ return performReceive(* receiveHandle);
}
inline void update_connections() {
=== modified file 'storage/ndb/src/common/transporter/TransporterRegistry.cpp'
--- a/storage/ndb/src/common/transporter/TransporterRegistry.cpp 2012-09-29 00:02:40 +0000
+++ b/storage/ndb/src/common/transporter/TransporterRegistry.cpp 2012-11-22 12:18:17 +0000
@@ -94,6 +94,7 @@ TransporterReceiveData::TransporterRecei
*/
m_transporters.set(); // Handle all
m_transporters.clear(Uint32(0)); // Except wakeup socket...
+ m_handled_transporters.clear();
#if defined(HAVE_EPOLL_CREATE)
m_epoll_fd = -1;
@@ -1142,13 +1143,13 @@ TransporterRegistry::pollReceive(Uint32
{
for (int i = 0; i < num_socket_events; i++)
{
- const Uint32 trpid = recvdata.m_epoll_events[i].data.u32;
+ const Uint32 node_id = recvdata.m_epoll_events[i].data.u32;
/**
* check that it's assigned to "us"
*/
- assert(recvdata.m_transporters.get(trpid));
+ assert(recvdata.m_transporters.get(node_id));
- recvdata.m_recv_transporters.set(trpid);
+ recvdata.m_recv_transporters.set(node_id);
}
}
else if (num_socket_events < 0)
@@ -1319,14 +1320,15 @@ TransporterRegistry::poll_TCP(Uint32 tim
/**
* In multi-threaded cases, this must be protected by a global receive lock.
+ * In case we were unable to received due to job buffers being full.
+ * Returns 0 when receive succeeded from all Transporters having data,
+ * else 1.
*/
-void
+Uint32
TransporterRegistry::performReceive(TransporterReceiveHandle& recvdata)
{
assert((receiveHandle == &recvdata) || (receiveHandle == 0));
- bool hasReceived = false;
-
if (recvdata.m_recv_transporters.get(0))
{
assert(recvdata.m_transporters.get(0));
@@ -1380,7 +1382,7 @@ TransporterRegistry::performReceive(Tran
recvdata.m_recv_transporters.clear();
/**
- * Handle data either received above or pending from prev rounds.
+ * Unpack data either received above or pending from prev rounds.
*/
for(Uint32 id = recvdata.m_has_data_transporters.find_first();
id != BitmaskImpl::NotFound;
@@ -1395,9 +1397,10 @@ TransporterRegistry::performReceive(Tran
{
if (t->isConnected())
{
- if (hasReceived)
- recvdata.checkJobBuffer();
- hasReceived = true;
+ if (unlikely(recvdata.checkJobBuffer()))
+ return 1; // Full, can't unpack more
+ if (unlikely(recvdata.m_handled_transporters.get(id)))
+ continue; // Skip now to avoid startvation
Uint32 * ptr;
Uint32 sz = t->getReceiveData(&ptr);
Uint32 szUsed = unpack(recvdata, ptr, sz, id, ioStates[id]);
@@ -1413,6 +1416,7 @@ TransporterRegistry::performReceive(Tran
}
// If transporter still have data, make sure that it's remember to next time
recvdata.m_has_data_transporters.set(id, hasdata);
+ recvdata.m_handled_transporters.set(id, hasdata);
}
#endif
@@ -1428,9 +1432,11 @@ TransporterRegistry::performReceive(Tran
{
if(t->isConnected() && t->checkConnected())
{
- if (hasReceived)
- callbackObj->checkJobBuffer();
- hasReceived = true;
+ if (unlikely(recvdata.checkJobBuffer()))
+ return 1; // Full, can't unpack more
+ if (unlikely(recvdata.m_handled_transporters.get(nodeId)))
+ continue; // Skip now to avoid startvation
+
Uint32 * readPtr, * eodPtr;
t->getReceivePtr(&readPtr, &eodPtr);
callbackObj->transporter_recv_from(nodeId);
@@ -1438,6 +1444,7 @@ TransporterRegistry::performReceive(Tran
t->updateReceivePtr(newPtr);
}
}
+ recvdata.m_handled_transporters.set(nodeId);
}
#endif
#ifdef NDB_SHM_TRANSPORTER
@@ -1449,9 +1456,11 @@ TransporterRegistry::performReceive(Tran
if(is_connected(nodeId)){
if(t->isConnected() && t->checkConnected())
{
- if (hasReceived)
- recvdata.checkJobBuffer();
- hasReceived = true;
+ if (unlikely(recvdata.checkJobBuffer()))
+ return 1; // Full, can't unpack more
+ if (unlikely(recvdata.m_handled_transporters.get(nodeId)))
+ continue; // Skip now to avoid startvation
+
Uint32 * readPtr, * eodPtr;
t->getReceivePtr(&readPtr, &eodPtr);
recvdata.transporter_recv_from(nodeId);
@@ -1460,8 +1469,11 @@ TransporterRegistry::performReceive(Tran
t->updateReceivePtr(newPtr);
}
}
+ recvdata.m_handled_transporters.set(nodeId);
}
#endif
+ recvdata.m_handled_transporters.clear();
+ return 0;
}
/**
@@ -1776,6 +1788,7 @@ TransporterRegistry::report_disconnect(T
performStates[node_id] = DISCONNECTED;
recvdata.m_recv_transporters.clear(node_id);
recvdata.m_has_data_transporters.clear(node_id);
+ recvdata.m_handled_transporters.clear(node_id);
recvdata.reportDisconnect(node_id, errnum);
DBUG_VOID_RETURN;
}
=== modified file 'storage/ndb/src/kernel/vm/mt.cpp'
--- a/storage/ndb/src/kernel/vm/mt.cpp 2012-08-14 11:23:36 +0000
+++ b/storage/ndb/src/kernel/vm/mt.cpp 2012-11-22 13:20:37 +0000
@@ -38,6 +38,8 @@
#include "mt-asm.h"
#include "mt-lock.hpp"
+static void dumpJobQueues(void);
+
inline
SimulatedBlock*
GlobalData::mt_getBlock(BlockNumber blockNo, Uint32 instanceNo)
@@ -155,7 +157,7 @@ struct thr_wait
static inline
bool
yield(struct thr_wait* wait, const Uint32 nsec,
- bool (*check_callback)(struct thr_data *), struct thr_data *check_arg)
+ bool (*check_callback)(void *), void *check_arg)
{
volatile unsigned * val = &wait->m_futex_state;
#ifndef NDEBUG
@@ -224,7 +226,7 @@ struct thr_wait
static inline
bool
yield(struct thr_wait* wait, const Uint32 nsec,
- bool (*check_callback)(struct thr_data *), struct thr_data *check_arg)
+ bool (*check_callback)(void *), void *check_arg)
{
struct timespec end;
NdbCondition_ComputeAbsTime(&end, nsec/1000000);
@@ -585,12 +587,20 @@ struct thr_job_queue_head
unsigned m_read_index; // Read/written by consumer, read by producer
unsigned m_write_index; // Read/written by producer, read by consumer
+ /**
+ * Waiter object: In case job queue is full, the produced thread
+ * will 'yield' on this waiter object until the consumer thread
+ * has consumed (at least) a job buffer.
+ */
+ thr_wait m_waiter;
+
Uint32 used() const;
};
struct thr_job_queue
{
static const unsigned SIZE = 32;
+ static const unsigned SAFETY = 2;
struct thr_job_buffer* m_buffers[SIZE];
};
@@ -1386,7 +1396,7 @@ thr_send_threads::alert_send_thread(Node
extern "C"
bool
-check_available_send_data(struct thr_data *not_used)
+check_available_send_data(void *not_used)
{
(void)not_used;
return !g_send_threads->data_available();
@@ -1552,6 +1562,7 @@ void
job_buffer_full(struct thr_data* selfptr)
{
ndbout_c("job buffer full");
+ dumpJobQueues();
abort();
}
@@ -1560,6 +1571,7 @@ void
out_of_job_buffer(struct thr_data* selfptr)
{
ndbout_c("out of job buffer");
+ dumpJobQueues();
abort();
}
@@ -2031,40 +2043,81 @@ flush_jbb_write_state(thr_data *selfptr)
}
}
+static
+void
+dumpJobQueues(void)
+{
+ const struct thr_repository* rep = &g_thr_repository;
+ for (unsigned from = 0; from<num_threads; from++)
+ {
+ for (unsigned to = 0; to<num_threads; to++)
+ {
+ const thr_data_aligned *thr_align_ptr = rep->m_thread + to;
+ const struct thr_data *thrptr = &thr_align_ptr->m_thr_data;
+ const thr_job_queue_head *q_head = thrptr->m_in_queue_head + from;
+
+ unsigned used = q_head->used();
+ if (used > 0)
+ {
+ ndbout << " job buffer " << from << " --> " << to
+ << ", used:" << used
+ << endl;
+ }
+ }
+ }
+}
+
/**
- * Transporter will receive 1024 signals (MAX_RECEIVED_SIGNALS)
- * before running check_job_buffers
+ * Receive thread will unpack 1024 signals (MAX_RECEIVED_SIGNALS)
+ * from Transporters before running another check_recv_queue
+ *
+ * This function returns true if there is not space to unpack
+ * this amount of signals, else false.
*
- * This function returns 0 if there is space to receive this amount of
- * signals
- * else 1
+ * Also used as callback function from yield() to recheck
+ * 'full' condition before going to sleep.
*/
-static int
-check_job_buffers(struct thr_repository* rep, Uint32 recv_thread_id)
+static bool
+check_recv_queue(void *arg)
{
+ thr_job_queue_head *q_head = static_cast<thr_job_queue_head*>(arg);
+
const Uint32 minfree = (1024 + MIN_SIGNALS_PER_PAGE - 1)/MIN_SIGNALS_PER_PAGE;
- unsigned thr_no = first_receiver_thread_no + recv_thread_id;
- const thr_data_aligned *thr_align_ptr = rep->m_thread;
+ /**
+ * NOTE: m_read_index is read wo/ lock (and updated by different thread)
+ * but since the different thread can only consume
+ * signals this means that the value returned from this
+ * function is always conservative (i.e it can be better than
+ * returned value, if read-index has moved but we didnt see it)
+ */
+ const unsigned ri = q_head->m_read_index;
+ const unsigned wi = q_head->m_write_index;
+ const unsigned busy = (wi >= ri) ? wi - ri : (thr_job_queue::SIZE - ri) + wi;
+ return (1 + minfree + busy >= thr_job_queue::SIZE);
+}
+
+/**
+ * Check if any of the receive queues for the threads being served
+ * by this receive thread, are full.
+ * If full: Return 'Thr_data*' for (one of) the thread(s)
+ * which we have to wait for. (to consume from queue)
+ */
+static struct thr_data*
+get_congested_recv_queue(struct thr_repository* rep, Uint32 recv_thread_id)
+{
+ const unsigned thr_no = first_receiver_thread_no + recv_thread_id;
+ thr_data_aligned *thr_align_ptr = rep->m_thread;
+
for (unsigned i = 0; i<num_threads; i++, thr_align_ptr++)
{
- /**
- * NOTE: m_read_index is read wo/ lock (and updated by different thread)
- * but since the different thread can only consume
- * signals this means that the value returned from this
- * function is always conservative (i.e it can be better than
- * returned value, if read-index has moved but we didnt see it)
- */
- const struct thr_data *thrptr = &thr_align_ptr->m_thr_data;
- const thr_job_queue_head *q_head = thrptr->m_in_queue_head + thr_no;
- unsigned ri = q_head->m_read_index;
- unsigned wi = q_head->m_write_index;
- unsigned busy = (wi >= ri) ? wi - ri : (thr_job_queue::SIZE - ri) + wi;
- if (1 + minfree + busy >= thr_job_queue::SIZE)
+ struct thr_data *thrptr = &thr_align_ptr->m_thr_data;
+ thr_job_queue_head *q_head = thrptr->m_in_queue_head + thr_no;
+ if (check_recv_queue(q_head))
{
- return 1;
+ return thrptr;
}
}
- return 0;
+ return NULL;
}
/**
@@ -2086,6 +2139,29 @@ check_job_buffers(struct thr_repository*
*/
static
Uint32
+compute_free_buffers_in_queue(const thr_job_queue_head *q_head)
+{
+ /**
+ * NOTE: m_read_index is read wo/ lock (and updated by different thread)
+ * but since the different thread can only consume
+ * signals this means that the value returned from this
+ * function is always conservative (i.e it can be better than
+ * returned value, if read-index has moved but we didnt see it)
+ */
+ unsigned ri = q_head->m_read_index;
+ unsigned wi = q_head->m_write_index;
+ unsigned free = (wi < ri) ? ri - wi : (thr_job_queue::SIZE + ri) - wi;
+
+ assert(free <= thr_job_queue::SIZE);
+
+ if (free <= (1 + thr_job_queue::SAFETY))
+ return 0;
+ else
+ return free - (1 + thr_job_queue::SAFETY);
+}
+
+static
+Uint32
compute_max_signals_to_execute(Uint32 thr_no)
{
Uint32 minfree = thr_job_queue::SIZE;
@@ -2094,30 +2170,17 @@ compute_max_signals_to_execute(Uint32 th
for (unsigned i = 0; i<num_threads; i++, thr_align_ptr++)
{
- /**
- * NOTE: m_read_index is read wo/ lock (and updated by different thread)
- * but since the different thread can only consume
- * signals this means that the value returned from this
- * function is always conservative (i.e it can be better than
- * returned value, if read-index has moved but we didnt see it)
- */
const struct thr_data *thrptr = &thr_align_ptr->m_thr_data;
const thr_job_queue_head *q_head = thrptr->m_in_queue_head + thr_no;
- unsigned ri = q_head->m_read_index;
- unsigned wi = q_head->m_write_index;
- unsigned free = (wi < ri) ? ri - wi : (thr_job_queue::SIZE + ri) - wi;
-
- assert(free <= thr_job_queue::SIZE);
+ unsigned free = compute_free_buffers_in_queue(q_head);
if (free < minfree)
minfree = free;
}
-#define SAFETY 2
-
- if (minfree >= (1 + SAFETY))
+ if (minfree > 0)
{
- return (3 + (minfree - (1 + SAFETY)) * MIN_SIGNALS_PER_PAGE) / 4;
+ return (3 + (minfree) * MIN_SIGNALS_PER_PAGE) / 4;
}
else
{
@@ -2220,41 +2283,22 @@ int
mt_checkDoJob(Uint32 recv_thread_idx)
{
struct thr_repository* rep = &g_thr_repository;
- if (unlikely(check_job_buffers(rep, recv_thread_idx)))
- {
- do
- {
- /**
- * theoretically (or when we do single threaded by using ndbmtd with
- * all in same thread) we should execute signals here...to
- * prevent dead-lock, but...with current ndbmtd only TRPMAN runs in
- * this thread, and other thread is waiting for TRPMAN
- * except for QMGR open/close connection, but that is not
- * (i think) sufficient to create a deadlock
- */
-
- /** FIXME:
- * On a CMT chip where #CPU >= #NDB-threads sched_yield() is
- * effectively a NOOP as there will normally be an idle CPU available
- * to immediately resume thread execution.
- * On a Niagara chip this may severely impact performance as the CPUs
- * are virtualized by timemultiplexing the physical core.
- * The thread should really be 'parked' on
- * a condition to free its execution resources.
- */
-// usleep(a-few-usec); /* A micro-sleep would likely have been better... */
-#if defined HAVE_SCHED_YIELD
- sched_yield();
-#elif defined _WIN32
- SwitchToThread();
-#else
- NdbSleep_MilliSleep(0);
-#endif
-
- } while (check_job_buffers(rep, recv_thread_idx));
- }
- return 0;
+ /**
+ * Return '1' if we are not allowed to receive more signals
+ * into the job buffers from this 'recv_thread_idx'.
+ *
+ * NOTE:
+ * We should not loop-wait for buffers to become available
+ * here as we currently hold the receiver-lock. Furthermore
+ * waiting too long here could cause the receiver thread to be
+ * less responsive wrt. moving incoming (TCP) data from the
+ * TCPTransporters into the (local) receiveBuffers.
+ * The thread could also oversleep on its other tasks as
+ * handling open/close of connections, and catching
+ * its own shutdown events
+ */
+ return (get_congested_recv_queue(rep, recv_thread_idx) != NULL);
}
/**
@@ -3077,8 +3121,9 @@ read_jba_state(thr_data *selfptr)
/* Check all job queues, return true only if all are empty. */
static bool
-check_queues_empty(thr_data *selfptr)
+check_queues_empty(void *arg)
{
+ thr_data *selfptr = static_cast<thr_data *>(arg);
Uint32 thr_count = g_thr_repository.m_thread_count;
bool empty = read_jba_state(selfptr);
if (!empty)
@@ -3141,6 +3186,7 @@ execute_signals(thr_data *selfptr,
r->m_read_buffer = read_buffer;
r->m_read_pos = read_pos;
r->m_read_end = read_end;
+ wakeup(&h->m_waiter);
}
}
@@ -3607,13 +3653,33 @@ mt_receiver_thread_main(void *thr_arg)
has_received = false;
if (globalTransporterRegistry.pollReceive(1, recvdata))
{
- if (check_job_buffers(rep, recv_thread_idx) == 0)
+ watchDogCounter = 8;
+ lock(&rep->m_receive_lock[recv_thread_idx]);
+ const bool buffersFull = (globalTransporterRegistry.performReceive(recvdata) != 0);
+ unlock(&rep->m_receive_lock[recv_thread_idx]);
+ has_received = true;
+
+ if (buffersFull) /* Receive queues(s) are full */
{
- watchDogCounter = 8;
- lock(&rep->m_receive_lock[recv_thread_idx]);
- globalTransporterRegistry.performReceive(recvdata);
- unlock(&rep->m_receive_lock[recv_thread_idx]);
- has_received = true;
+ thr_data* waitthr = get_congested_recv_queue(rep, recv_thread_idx);
+ if (waitthr != NULL) /* Will wait for buffers to be freed */
+ {
+ /**
+ * Wait for thread 'waitthr' to consume some of the
+ * pending signals in m_in_queue previously received
+ * from this receive thread, 'thr_no'.
+ * Will recheck queue status with 'check_recv_queue' after latch
+ * has been set, and *before* going to sleep.
+ */
+ const Uint32 nano_wait = 1000*1000; /* -> 1 ms */
+ thr_job_queue_head *wait_queue = waitthr->m_in_queue_head + thr_no;
+
+ const bool waited = yield(&wait_queue->m_waiter,
+ nano_wait,
+ check_recv_queue,
+ wait_queue);
+ (void)waited;
+ }
}
}
selfptr->m_stat.m_loop_cnt++;
@@ -3644,28 +3710,46 @@ sendpacked(struct thr_data* thr_ptr, Sig
}
/**
- * check if out-queues of selfptr is full
- * return true is so
+ * Callback function used by yield() to recheck
+ * 'job queue full' condition before going to sleep.
+ *
+ * Check if the specified 'thr_job_queue_head' (arg)
+ * is still full, return true if so.
*/
static bool
-check_job_buffer_full(thr_data *selfptr)
+check_congested_job_queue(void *arg)
{
- Uint32 thr_no = selfptr->m_thr_no;
- Uint32 tmp = compute_max_signals_to_execute(thr_no);
-#if 0
- Uint32 perjb = tmp / g_thr_repository.m_thread_count;
+ const thr_job_queue_head *waitfor = static_cast<thr_job_queue_head*>(arg);
+ return (compute_free_buffers_in_queue(waitfor) == 0);
+}
+
+/**
+ * Check if any out-queues of selfptr is full.
+ * If full: Return 'Thr_data*' for (one of) the thread(s)
+ * which we have to wait for. (to consume from queue)
+ */
+static struct thr_data*
+get_congested_job_queue(const thr_data *selfptr)
+{
+ const Uint32 thr_no = selfptr->m_thr_no;
+ struct thr_repository* rep = &g_thr_repository;
+ struct thr_data_aligned *thr_align_ptr = rep->m_thread;
+ struct thr_data *waitfor = NULL;
- if (perjb == 0)
+ for (unsigned i = 0; i<num_threads; i++, thr_align_ptr++)
{
- return true;
- }
+ struct thr_data *thrptr = &thr_align_ptr->m_thr_data;
+ thr_job_queue_head *q_head = thrptr->m_in_queue_head + thr_no;
- return false;
-#else
- if (tmp < g_thr_repository.m_thread_count)
- return true;
- return false;
-#endif
+ if (compute_free_buffers_in_queue(q_head) == 0)
+ {
+ if (thrptr != selfptr) // Don't wait on myself (yet)
+ return thrptr;
+ else
+ waitfor = thrptr;
+ }
+ }
+ return waitfor; // Possibly 'thrptr == selfptr'
}
/**
@@ -3716,6 +3800,7 @@ loop:
*/
selfptr->m_max_signals_per_jb = 1;
ndbout_c("%u - sleeploop 10!!", selfptr->m_thr_no);
+ dumpJobQueues();
return true;
}
@@ -3725,8 +3810,47 @@ loop:
pending_send = do_send(selfptr, TRUE);
}
- const Uint32 wait = 1000000; /* 1 ms */
- yield(&selfptr->m_waiter, wait, check_job_buffer_full, selfptr);
+ struct thr_data* waitthr = get_congested_job_queue(selfptr);
+ if (waitthr == NULL) // Waiters resolved
+ {
+ goto loop;
+ }
+ else if (waitthr == selfptr) // Avoid self-wait
+ {
+ //ndbout << "update_sched_config"
+ // << ", thr_no=" << thr_no
+ // << ", in 'self wait' - proceeding slowly"
+ // << endl;
+ selfptr->m_max_signals_per_jb = 1; // Proceed slowly
+ return sleeploop > 0;
+ }
+ else if (false)
+ {
+ ndbout << "update_sched_config"
+ << ", thr_no: " << thr_no << " waitfor: " << waitthr->m_thr_no
+ << endl;
+ }
+ /**
+ * Wait for thread 'waitthr' to consume some of the
+ * pending signals in m_in_queue[].
+ * Will recheck queue status with 'check_recv_queue'
+ * after latch has been set, and *before* going to sleep.
+ */
+ const Uint32 nano_wait = 1000*1000; /* -> 1 ms */
+ thr_job_queue_head *wait_queue = waitthr->m_in_queue_head + thr_no;
+
+ const bool waited = yield(&wait_queue->m_waiter,
+ nano_wait,
+ check_congested_job_queue,
+ wait_queue);
+ if (waited)
+ {
+ sleeploop++;
+ //ndbout << "update_sched_config"
+ // << ", thr_no=" << thr_no
+ // << ", waited due to 'job_buffers_full'"
+ // << endl;
+ }
goto loop;
}
@@ -3807,7 +3931,7 @@ mt_job_thread_main(void *thr_arg)
else
{
/* No signals processed, prepare to sleep to wait for more */
- if (pending_send || send_sum > 0)
+ if ((pending_send + send_sum) > 0)
{
/* About to sleep, _must_ send now. */
pending_send = do_send(selfptr, TRUE);
@@ -3836,7 +3960,7 @@ mt_job_thread_main(void *thr_arg)
*/
if (sum >= selfptr->m_max_exec_signals)
{
- if (update_sched_config(selfptr, pending_send))
+ if (update_sched_config(selfptr, pending_send + send_sum))
{
/* Update current time after sleeping */
now = NdbTick_CurrentMillisecond();
@@ -4185,6 +4309,7 @@ thr_init(struct thr_repository* rep, str
{
selfptr->m_in_queue_head[i].m_read_index = 0;
selfptr->m_in_queue_head[i].m_write_index = 0;
+ selfptr->m_in_queue_head[i].m_waiter.init();
buffer = may_communicate(i,thr_no)
? seize_buffer(rep, thr_no, false) : NULL;
selfptr->m_in_queue[i].m_buffers[0] = buffer;
No bundle (reason: useless for push emails).
| Thread |
|---|
| • bzr push into mysql-5.5-cluster-7.2 branch (ole.john.aske:4109 to 4113) | Ole John Aske | 23 Nov |