From: Ole John Aske Date: November 23 2012 11:41am Subject: bzr push into mysql-5.5-cluster-7.2 branch (ole.john.aske:4109 to 4113) List-Archive: http://lists.mysql.com/commits/145382 Message-Id: <20121123114159.14559.78474.4113@fimafeng09.no.oracle.com> MIME-Version: 1.0 Content-Type: text/plain; charset="us-ascii" Content-Transfer-Encoding: 7bit 4113 Ole John Aske 2012-11-23 Fix failing ndb_bushy_joins.test under Valgrind Extends the TimeBetweenEpoch to avoid failure due to exceeding 'MaxBufferedEpocs > 100). modified: mysql-test/suite/ndb/t/ndb_bushy_joins.cnf 4112 Ole John Aske 2012-11-22 Fix for bug#15908684: Job threads may 'sleep' without pending messages being sent This fix correct the calculation of the 'pending' argument to update_sched_config() such that both previously failed 'pending_send', and currently available unsent 'send_sum' together forms the number of 'pending' signals. modified: storage/ndb/src/kernel/vm/mt.cpp 4111 Ole John Aske 2012-11-22 Fix for Bug#15907515 RECEIVER THREAD COULD BLOCK/BUSY WAIT WHILE HOLDING RECEIVER MUTEX Note: This fix require the fix for bug 15907122 as 'baseline'. This fix removes the waiting for more job buffers to become available inside performReceive() (or actually: mt_checkDoJob() called from it). Instead performReceive() will now return with a 'full' status to the receive thread, which will unlock the receive mutex, and start a conditional wait for more job buffers to become available. Furthermore this fix also removes the check_job_buffer() *before* performReceive() such that we will now doReceive() of any pending TCP data into our local receiveBuffers even if the job buffers are full. modified: storage/ndb/include/transporter/TransporterRegistry.hpp storage/ndb/src/common/transporter/TransporterRegistry.cpp storage/ndb/src/kernel/vm/mt.cpp 4110 Ole John Aske 2012-11-22 Fix for Bug#15907122 INCORRECT HANDLING OF JOB-BUFFERS ALMOST FULL - 'SLEEPLOOP 10' This fix introduce a 'thr_wait*' (Sort of a condition/mutex) which is used to wait for job buffers to become available when we are in an 'job buffers almost full' situation. This 'thr_wait' will be signaled when a job buffer has been consumed & freed. This fix is a collection of the following 5 sub patches: ****** Fix mixed up comment & naming wrt. whether 'm_recv_transporters' and 'm_has_data_transporters' is a bitmask of NodeId or Tranporters. (It *is* NodeId bitmasks) ****** Add dump of job buffer utilization before we crash due to 'job buffer full', and to 'sleeploop 10' reports where we have to wait due to almost full job buffers. ****** Change the signature of the yield() function such that it can take a more general '*check_callback()' function as arguments. Change the existing functions currently used as 'callbacks' to the new (relaxed) signature. ****** Refactor: Splitt out compute_free_buffers_in_queue() from compute_max_signals_to_execute(). New function contains common code intended for reuse in later patches. ****** update_sched_config() will 'yield' the CPU and wait for more job buffers to become available when it is about to run out of job buffers. The yield() call will wait on a 'thr_wait' object, which may be sent a 'wakeup()' when the waiting condition has been resolved (by another thread) However, update_schec_config() used the 'thr_wait' object intended to be used to wait for more *incomming signal* - What we actually have to wait for are signals to be *consumed* by the destination thread. Luckily there is also defined a 'max wait' of 1ms which currently will wakeup the thread . This patch introduce a own 'thr_wait' object which which is signaled by the consumer, and we now wait on this object. Furthermore the patch also avoid a situation where a thread could end up waiting for itself. modified: storage/ndb/include/transporter/TransporterRegistry.hpp storage/ndb/src/common/transporter/TransporterRegistry.cpp storage/ndb/src/kernel/vm/mt.cpp 4109 Mauritz Sundell 2012-11-22 [merge] merge 7.1 -> 7.2 === modified file 'mysql-test/suite/ndb/t/ndb_bushy_joins.cnf' --- a/mysql-test/suite/ndb/t/ndb_bushy_joins.cnf 2012-11-13 10:27:06 +0000 +++ b/mysql-test/suite/ndb/t/ndb_bushy_joins.cnf 2012-11-23 11:34:35 +0000 @@ -15,6 +15,7 @@ ThreadConfig=ldm={count=4} MaxNoOfConcurrentOperations=250000 LongMessageBuffer=64M TransactionDeadlockDetectionTimeout=30000 +TimeBetweenEpochs=1000 [ENV] # Need to always use ndbmtd when we want lots of partitions === modified file 'storage/ndb/include/transporter/TransporterRegistry.hpp' --- a/storage/ndb/include/transporter/TransporterRegistry.hpp 2012-09-29 00:02:40 +0000 +++ b/storage/ndb/include/transporter/TransporterRegistry.hpp 2012-11-22 12:18:17 +0000 @@ -108,17 +108,27 @@ struct TransporterReceiveData NodeBitmask m_transporters; /** - * Bitmask of transporters having data awaiting to be received + * Bitmask of nodes having data awaiting to be received * from its transporter. */ NodeBitmask m_recv_transporters; /** - * Bitmask of transporters that has already received data buffered + * Bitmask of nodes that has already received data buffered * inside its transporter. Possibly "carried over" from last * performReceive */ NodeBitmask m_has_data_transporters; + + /** + * Subset of m_has_data_transporters which we completed handling + * of in previous ::performReceive before we was interrupted due + * to lack of job buffers. Will skip these when we later retry + * ::performReceive in order to avoid startvation of non-handled + * transporters. + */ + NodeBitmask m_handled_transporters; + #if defined(HAVE_EPOLL_CREATE) int m_epoll_fd; struct epoll_event *m_epoll_events; @@ -579,7 +589,7 @@ public: * Receiving */ Uint32 pollReceive(Uint32 timeOutMillis, TransporterReceiveHandle& mask); - void performReceive(TransporterReceiveHandle&); + Uint32 performReceive(TransporterReceiveHandle&); void update_connections(TransporterReceiveHandle&); inline Uint32 pollReceive(Uint32 timeOutMillis) { @@ -587,9 +597,9 @@ public: return pollReceive(timeOutMillis, * receiveHandle); } - inline void performReceive() { + inline Uint32 performReceive() { assert(receiveHandle != 0); - performReceive(* receiveHandle); + return performReceive(* receiveHandle); } inline void update_connections() { === modified file 'storage/ndb/src/common/transporter/TransporterRegistry.cpp' --- a/storage/ndb/src/common/transporter/TransporterRegistry.cpp 2012-09-29 00:02:40 +0000 +++ b/storage/ndb/src/common/transporter/TransporterRegistry.cpp 2012-11-22 12:18:17 +0000 @@ -94,6 +94,7 @@ TransporterReceiveData::TransporterRecei */ m_transporters.set(); // Handle all m_transporters.clear(Uint32(0)); // Except wakeup socket... + m_handled_transporters.clear(); #if defined(HAVE_EPOLL_CREATE) m_epoll_fd = -1; @@ -1142,13 +1143,13 @@ TransporterRegistry::pollReceive(Uint32 { for (int i = 0; i < num_socket_events; i++) { - const Uint32 trpid = recvdata.m_epoll_events[i].data.u32; + const Uint32 node_id = recvdata.m_epoll_events[i].data.u32; /** * check that it's assigned to "us" */ - assert(recvdata.m_transporters.get(trpid)); + assert(recvdata.m_transporters.get(node_id)); - recvdata.m_recv_transporters.set(trpid); + recvdata.m_recv_transporters.set(node_id); } } else if (num_socket_events < 0) @@ -1319,14 +1320,15 @@ TransporterRegistry::poll_TCP(Uint32 tim /** * In multi-threaded cases, this must be protected by a global receive lock. + * In case we were unable to received due to job buffers being full. + * Returns 0 when receive succeeded from all Transporters having data, + * else 1. */ -void +Uint32 TransporterRegistry::performReceive(TransporterReceiveHandle& recvdata) { assert((receiveHandle == &recvdata) || (receiveHandle == 0)); - bool hasReceived = false; - if (recvdata.m_recv_transporters.get(0)) { assert(recvdata.m_transporters.get(0)); @@ -1380,7 +1382,7 @@ TransporterRegistry::performReceive(Tran recvdata.m_recv_transporters.clear(); /** - * Handle data either received above or pending from prev rounds. + * Unpack data either received above or pending from prev rounds. */ for(Uint32 id = recvdata.m_has_data_transporters.find_first(); id != BitmaskImpl::NotFound; @@ -1395,9 +1397,10 @@ TransporterRegistry::performReceive(Tran { if (t->isConnected()) { - if (hasReceived) - recvdata.checkJobBuffer(); - hasReceived = true; + if (unlikely(recvdata.checkJobBuffer())) + return 1; // Full, can't unpack more + if (unlikely(recvdata.m_handled_transporters.get(id))) + continue; // Skip now to avoid startvation Uint32 * ptr; Uint32 sz = t->getReceiveData(&ptr); Uint32 szUsed = unpack(recvdata, ptr, sz, id, ioStates[id]); @@ -1413,6 +1416,7 @@ TransporterRegistry::performReceive(Tran } // If transporter still have data, make sure that it's remember to next time recvdata.m_has_data_transporters.set(id, hasdata); + recvdata.m_handled_transporters.set(id, hasdata); } #endif @@ -1428,9 +1432,11 @@ TransporterRegistry::performReceive(Tran { if(t->isConnected() && t->checkConnected()) { - if (hasReceived) - callbackObj->checkJobBuffer(); - hasReceived = true; + if (unlikely(recvdata.checkJobBuffer())) + return 1; // Full, can't unpack more + if (unlikely(recvdata.m_handled_transporters.get(nodeId))) + continue; // Skip now to avoid startvation + Uint32 * readPtr, * eodPtr; t->getReceivePtr(&readPtr, &eodPtr); callbackObj->transporter_recv_from(nodeId); @@ -1438,6 +1444,7 @@ TransporterRegistry::performReceive(Tran t->updateReceivePtr(newPtr); } } + recvdata.m_handled_transporters.set(nodeId); } #endif #ifdef NDB_SHM_TRANSPORTER @@ -1449,9 +1456,11 @@ TransporterRegistry::performReceive(Tran if(is_connected(nodeId)){ if(t->isConnected() && t->checkConnected()) { - if (hasReceived) - recvdata.checkJobBuffer(); - hasReceived = true; + if (unlikely(recvdata.checkJobBuffer())) + return 1; // Full, can't unpack more + if (unlikely(recvdata.m_handled_transporters.get(nodeId))) + continue; // Skip now to avoid startvation + Uint32 * readPtr, * eodPtr; t->getReceivePtr(&readPtr, &eodPtr); recvdata.transporter_recv_from(nodeId); @@ -1460,8 +1469,11 @@ TransporterRegistry::performReceive(Tran t->updateReceivePtr(newPtr); } } + recvdata.m_handled_transporters.set(nodeId); } #endif + recvdata.m_handled_transporters.clear(); + return 0; } /** @@ -1776,6 +1788,7 @@ TransporterRegistry::report_disconnect(T performStates[node_id] = DISCONNECTED; recvdata.m_recv_transporters.clear(node_id); recvdata.m_has_data_transporters.clear(node_id); + recvdata.m_handled_transporters.clear(node_id); recvdata.reportDisconnect(node_id, errnum); DBUG_VOID_RETURN; } === modified file 'storage/ndb/src/kernel/vm/mt.cpp' --- a/storage/ndb/src/kernel/vm/mt.cpp 2012-08-14 11:23:36 +0000 +++ b/storage/ndb/src/kernel/vm/mt.cpp 2012-11-22 13:20:37 +0000 @@ -38,6 +38,8 @@ #include "mt-asm.h" #include "mt-lock.hpp" +static void dumpJobQueues(void); + inline SimulatedBlock* GlobalData::mt_getBlock(BlockNumber blockNo, Uint32 instanceNo) @@ -155,7 +157,7 @@ struct thr_wait static inline bool yield(struct thr_wait* wait, const Uint32 nsec, - bool (*check_callback)(struct thr_data *), struct thr_data *check_arg) + bool (*check_callback)(void *), void *check_arg) { volatile unsigned * val = &wait->m_futex_state; #ifndef NDEBUG @@ -224,7 +226,7 @@ struct thr_wait static inline bool yield(struct thr_wait* wait, const Uint32 nsec, - bool (*check_callback)(struct thr_data *), struct thr_data *check_arg) + bool (*check_callback)(void *), void *check_arg) { struct timespec end; NdbCondition_ComputeAbsTime(&end, nsec/1000000); @@ -585,12 +587,20 @@ struct thr_job_queue_head unsigned m_read_index; // Read/written by consumer, read by producer unsigned m_write_index; // Read/written by producer, read by consumer + /** + * Waiter object: In case job queue is full, the produced thread + * will 'yield' on this waiter object until the consumer thread + * has consumed (at least) a job buffer. + */ + thr_wait m_waiter; + Uint32 used() const; }; struct thr_job_queue { static const unsigned SIZE = 32; + static const unsigned SAFETY = 2; struct thr_job_buffer* m_buffers[SIZE]; }; @@ -1386,7 +1396,7 @@ thr_send_threads::alert_send_thread(Node extern "C" bool -check_available_send_data(struct thr_data *not_used) +check_available_send_data(void *not_used) { (void)not_used; return !g_send_threads->data_available(); @@ -1552,6 +1562,7 @@ void job_buffer_full(struct thr_data* selfptr) { ndbout_c("job buffer full"); + dumpJobQueues(); abort(); } @@ -1560,6 +1571,7 @@ void out_of_job_buffer(struct thr_data* selfptr) { ndbout_c("out of job buffer"); + dumpJobQueues(); abort(); } @@ -2031,40 +2043,81 @@ flush_jbb_write_state(thr_data *selfptr) } } +static +void +dumpJobQueues(void) +{ + const struct thr_repository* rep = &g_thr_repository; + for (unsigned from = 0; fromm_thread + to; + const struct thr_data *thrptr = &thr_align_ptr->m_thr_data; + const thr_job_queue_head *q_head = thrptr->m_in_queue_head + from; + + unsigned used = q_head->used(); + if (used > 0) + { + ndbout << " job buffer " << from << " --> " << to + << ", used:" << used + << endl; + } + } + } +} + /** - * Transporter will receive 1024 signals (MAX_RECEIVED_SIGNALS) - * before running check_job_buffers + * Receive thread will unpack 1024 signals (MAX_RECEIVED_SIGNALS) + * from Transporters before running another check_recv_queue + * + * This function returns true if there is not space to unpack + * this amount of signals, else false. * - * This function returns 0 if there is space to receive this amount of - * signals - * else 1 + * Also used as callback function from yield() to recheck + * 'full' condition before going to sleep. */ -static int -check_job_buffers(struct thr_repository* rep, Uint32 recv_thread_id) +static bool +check_recv_queue(void *arg) { + thr_job_queue_head *q_head = static_cast(arg); + const Uint32 minfree = (1024 + MIN_SIGNALS_PER_PAGE - 1)/MIN_SIGNALS_PER_PAGE; - unsigned thr_no = first_receiver_thread_no + recv_thread_id; - const thr_data_aligned *thr_align_ptr = rep->m_thread; + /** + * NOTE: m_read_index is read wo/ lock (and updated by different thread) + * but since the different thread can only consume + * signals this means that the value returned from this + * function is always conservative (i.e it can be better than + * returned value, if read-index has moved but we didnt see it) + */ + const unsigned ri = q_head->m_read_index; + const unsigned wi = q_head->m_write_index; + const unsigned busy = (wi >= ri) ? wi - ri : (thr_job_queue::SIZE - ri) + wi; + return (1 + minfree + busy >= thr_job_queue::SIZE); +} + +/** + * Check if any of the receive queues for the threads being served + * by this receive thread, are full. + * If full: Return 'Thr_data*' for (one of) the thread(s) + * which we have to wait for. (to consume from queue) + */ +static struct thr_data* +get_congested_recv_queue(struct thr_repository* rep, Uint32 recv_thread_id) +{ + const unsigned thr_no = first_receiver_thread_no + recv_thread_id; + thr_data_aligned *thr_align_ptr = rep->m_thread; + for (unsigned i = 0; im_thr_data; - const thr_job_queue_head *q_head = thrptr->m_in_queue_head + thr_no; - unsigned ri = q_head->m_read_index; - unsigned wi = q_head->m_write_index; - unsigned busy = (wi >= ri) ? wi - ri : (thr_job_queue::SIZE - ri) + wi; - if (1 + minfree + busy >= thr_job_queue::SIZE) + struct thr_data *thrptr = &thr_align_ptr->m_thr_data; + thr_job_queue_head *q_head = thrptr->m_in_queue_head + thr_no; + if (check_recv_queue(q_head)) { - return 1; + return thrptr; } } - return 0; + return NULL; } /** @@ -2086,6 +2139,29 @@ check_job_buffers(struct thr_repository* */ static Uint32 +compute_free_buffers_in_queue(const thr_job_queue_head *q_head) +{ + /** + * NOTE: m_read_index is read wo/ lock (and updated by different thread) + * but since the different thread can only consume + * signals this means that the value returned from this + * function is always conservative (i.e it can be better than + * returned value, if read-index has moved but we didnt see it) + */ + unsigned ri = q_head->m_read_index; + unsigned wi = q_head->m_write_index; + unsigned free = (wi < ri) ? ri - wi : (thr_job_queue::SIZE + ri) - wi; + + assert(free <= thr_job_queue::SIZE); + + if (free <= (1 + thr_job_queue::SAFETY)) + return 0; + else + return free - (1 + thr_job_queue::SAFETY); +} + +static +Uint32 compute_max_signals_to_execute(Uint32 thr_no) { Uint32 minfree = thr_job_queue::SIZE; @@ -2094,30 +2170,17 @@ compute_max_signals_to_execute(Uint32 th for (unsigned i = 0; im_thr_data; const thr_job_queue_head *q_head = thrptr->m_in_queue_head + thr_no; - unsigned ri = q_head->m_read_index; - unsigned wi = q_head->m_write_index; - unsigned free = (wi < ri) ? ri - wi : (thr_job_queue::SIZE + ri) - wi; - - assert(free <= thr_job_queue::SIZE); + unsigned free = compute_free_buffers_in_queue(q_head); if (free < minfree) minfree = free; } -#define SAFETY 2 - - if (minfree >= (1 + SAFETY)) + if (minfree > 0) { - return (3 + (minfree - (1 + SAFETY)) * MIN_SIGNALS_PER_PAGE) / 4; + return (3 + (minfree) * MIN_SIGNALS_PER_PAGE) / 4; } else { @@ -2220,41 +2283,22 @@ int mt_checkDoJob(Uint32 recv_thread_idx) { struct thr_repository* rep = &g_thr_repository; - if (unlikely(check_job_buffers(rep, recv_thread_idx))) - { - do - { - /** - * theoretically (or when we do single threaded by using ndbmtd with - * all in same thread) we should execute signals here...to - * prevent dead-lock, but...with current ndbmtd only TRPMAN runs in - * this thread, and other thread is waiting for TRPMAN - * except for QMGR open/close connection, but that is not - * (i think) sufficient to create a deadlock - */ - - /** FIXME: - * On a CMT chip where #CPU >= #NDB-threads sched_yield() is - * effectively a NOOP as there will normally be an idle CPU available - * to immediately resume thread execution. - * On a Niagara chip this may severely impact performance as the CPUs - * are virtualized by timemultiplexing the physical core. - * The thread should really be 'parked' on - * a condition to free its execution resources. - */ -// usleep(a-few-usec); /* A micro-sleep would likely have been better... */ -#if defined HAVE_SCHED_YIELD - sched_yield(); -#elif defined _WIN32 - SwitchToThread(); -#else - NdbSleep_MilliSleep(0); -#endif - - } while (check_job_buffers(rep, recv_thread_idx)); - } - return 0; + /** + * Return '1' if we are not allowed to receive more signals + * into the job buffers from this 'recv_thread_idx'. + * + * NOTE: + * We should not loop-wait for buffers to become available + * here as we currently hold the receiver-lock. Furthermore + * waiting too long here could cause the receiver thread to be + * less responsive wrt. moving incoming (TCP) data from the + * TCPTransporters into the (local) receiveBuffers. + * The thread could also oversleep on its other tasks as + * handling open/close of connections, and catching + * its own shutdown events + */ + return (get_congested_recv_queue(rep, recv_thread_idx) != NULL); } /** @@ -3077,8 +3121,9 @@ read_jba_state(thr_data *selfptr) /* Check all job queues, return true only if all are empty. */ static bool -check_queues_empty(thr_data *selfptr) +check_queues_empty(void *arg) { + thr_data *selfptr = static_cast(arg); Uint32 thr_count = g_thr_repository.m_thread_count; bool empty = read_jba_state(selfptr); if (!empty) @@ -3141,6 +3186,7 @@ execute_signals(thr_data *selfptr, r->m_read_buffer = read_buffer; r->m_read_pos = read_pos; r->m_read_end = read_end; + wakeup(&h->m_waiter); } } @@ -3607,13 +3653,33 @@ mt_receiver_thread_main(void *thr_arg) has_received = false; if (globalTransporterRegistry.pollReceive(1, recvdata)) { - if (check_job_buffers(rep, recv_thread_idx) == 0) + watchDogCounter = 8; + lock(&rep->m_receive_lock[recv_thread_idx]); + const bool buffersFull = (globalTransporterRegistry.performReceive(recvdata) != 0); + unlock(&rep->m_receive_lock[recv_thread_idx]); + has_received = true; + + if (buffersFull) /* Receive queues(s) are full */ { - watchDogCounter = 8; - lock(&rep->m_receive_lock[recv_thread_idx]); - globalTransporterRegistry.performReceive(recvdata); - unlock(&rep->m_receive_lock[recv_thread_idx]); - has_received = true; + thr_data* waitthr = get_congested_recv_queue(rep, recv_thread_idx); + if (waitthr != NULL) /* Will wait for buffers to be freed */ + { + /** + * Wait for thread 'waitthr' to consume some of the + * pending signals in m_in_queue previously received + * from this receive thread, 'thr_no'. + * Will recheck queue status with 'check_recv_queue' after latch + * has been set, and *before* going to sleep. + */ + const Uint32 nano_wait = 1000*1000; /* -> 1 ms */ + thr_job_queue_head *wait_queue = waitthr->m_in_queue_head + thr_no; + + const bool waited = yield(&wait_queue->m_waiter, + nano_wait, + check_recv_queue, + wait_queue); + (void)waited; + } } } selfptr->m_stat.m_loop_cnt++; @@ -3644,28 +3710,46 @@ sendpacked(struct thr_data* thr_ptr, Sig } /** - * check if out-queues of selfptr is full - * return true is so + * Callback function used by yield() to recheck + * 'job queue full' condition before going to sleep. + * + * Check if the specified 'thr_job_queue_head' (arg) + * is still full, return true if so. */ static bool -check_job_buffer_full(thr_data *selfptr) +check_congested_job_queue(void *arg) { - Uint32 thr_no = selfptr->m_thr_no; - Uint32 tmp = compute_max_signals_to_execute(thr_no); -#if 0 - Uint32 perjb = tmp / g_thr_repository.m_thread_count; + const thr_job_queue_head *waitfor = static_cast(arg); + return (compute_free_buffers_in_queue(waitfor) == 0); +} + +/** + * Check if any out-queues of selfptr is full. + * If full: Return 'Thr_data*' for (one of) the thread(s) + * which we have to wait for. (to consume from queue) + */ +static struct thr_data* +get_congested_job_queue(const thr_data *selfptr) +{ + const Uint32 thr_no = selfptr->m_thr_no; + struct thr_repository* rep = &g_thr_repository; + struct thr_data_aligned *thr_align_ptr = rep->m_thread; + struct thr_data *waitfor = NULL; - if (perjb == 0) + for (unsigned i = 0; im_thr_data; + thr_job_queue_head *q_head = thrptr->m_in_queue_head + thr_no; - return false; -#else - if (tmp < g_thr_repository.m_thread_count) - return true; - return false; -#endif + if (compute_free_buffers_in_queue(q_head) == 0) + { + if (thrptr != selfptr) // Don't wait on myself (yet) + return thrptr; + else + waitfor = thrptr; + } + } + return waitfor; // Possibly 'thrptr == selfptr' } /** @@ -3716,6 +3800,7 @@ loop: */ selfptr->m_max_signals_per_jb = 1; ndbout_c("%u - sleeploop 10!!", selfptr->m_thr_no); + dumpJobQueues(); return true; } @@ -3725,8 +3810,47 @@ loop: pending_send = do_send(selfptr, TRUE); } - const Uint32 wait = 1000000; /* 1 ms */ - yield(&selfptr->m_waiter, wait, check_job_buffer_full, selfptr); + struct thr_data* waitthr = get_congested_job_queue(selfptr); + if (waitthr == NULL) // Waiters resolved + { + goto loop; + } + else if (waitthr == selfptr) // Avoid self-wait + { + //ndbout << "update_sched_config" + // << ", thr_no=" << thr_no + // << ", in 'self wait' - proceeding slowly" + // << endl; + selfptr->m_max_signals_per_jb = 1; // Proceed slowly + return sleeploop > 0; + } + else if (false) + { + ndbout << "update_sched_config" + << ", thr_no: " << thr_no << " waitfor: " << waitthr->m_thr_no + << endl; + } + /** + * Wait for thread 'waitthr' to consume some of the + * pending signals in m_in_queue[]. + * Will recheck queue status with 'check_recv_queue' + * after latch has been set, and *before* going to sleep. + */ + const Uint32 nano_wait = 1000*1000; /* -> 1 ms */ + thr_job_queue_head *wait_queue = waitthr->m_in_queue_head + thr_no; + + const bool waited = yield(&wait_queue->m_waiter, + nano_wait, + check_congested_job_queue, + wait_queue); + if (waited) + { + sleeploop++; + //ndbout << "update_sched_config" + // << ", thr_no=" << thr_no + // << ", waited due to 'job_buffers_full'" + // << endl; + } goto loop; } @@ -3807,7 +3931,7 @@ mt_job_thread_main(void *thr_arg) else { /* No signals processed, prepare to sleep to wait for more */ - if (pending_send || send_sum > 0) + if ((pending_send + send_sum) > 0) { /* About to sleep, _must_ send now. */ pending_send = do_send(selfptr, TRUE); @@ -3836,7 +3960,7 @@ mt_job_thread_main(void *thr_arg) */ if (sum >= selfptr->m_max_exec_signals) { - if (update_sched_config(selfptr, pending_send)) + if (update_sched_config(selfptr, pending_send + send_sum)) { /* Update current time after sleeping */ now = NdbTick_CurrentMillisecond(); @@ -4185,6 +4309,7 @@ thr_init(struct thr_repository* rep, str { selfptr->m_in_queue_head[i].m_read_index = 0; selfptr->m_in_queue_head[i].m_write_index = 0; + selfptr->m_in_queue_head[i].m_waiter.init(); buffer = may_communicate(i,thr_no) ? seize_buffer(rep, thr_no, false) : NULL; selfptr->m_in_queue[i].m_buffers[0] = buffer; No bundle (reason: useless for push emails).