4114 Ole John Aske 2012-11-23
Revert of previous unintended pushes
modified:
mysql-test/suite/ndb/t/ndb_bushy_joins.cnf
storage/ndb/include/transporter/TransporterRegistry.hpp
storage/ndb/src/common/transporter/TransporterRegistry.cpp
storage/ndb/src/kernel/vm/mt.cpp
4113 Ole John Aske 2012-11-23
Fix failing ndb_bushy_joins.test under Valgrind
Extends the TimeBetweenEpoch to avoid failure due to
exceeding 'MaxBufferedEpocs > 100).
modified:
mysql-test/suite/ndb/t/ndb_bushy_joins.cnf
=== modified file 'mysql-test/suite/ndb/t/ndb_bushy_joins.cnf'
--- a/mysql-test/suite/ndb/t/ndb_bushy_joins.cnf 2012-11-23 11:34:35 +0000
+++ b/mysql-test/suite/ndb/t/ndb_bushy_joins.cnf 2012-11-23 11:46:55 +0000
@@ -15,7 +15,6 @@ ThreadConfig=ldm={count=4}
MaxNoOfConcurrentOperations=250000
LongMessageBuffer=64M
TransactionDeadlockDetectionTimeout=30000
-TimeBetweenEpochs=1000
[ENV]
# Need to always use ndbmtd when we want lots of partitions
=== modified file 'storage/ndb/include/transporter/TransporterRegistry.hpp'
--- a/storage/ndb/include/transporter/TransporterRegistry.hpp 2012-11-22 12:18:17 +0000
+++ b/storage/ndb/include/transporter/TransporterRegistry.hpp 2012-11-23 11:46:55 +0000
@@ -108,27 +108,17 @@ struct TransporterReceiveData
NodeBitmask m_transporters;
/**
- * Bitmask of nodes having data awaiting to be received
+ * Bitmask of transporters having data awaiting to be received
* from its transporter.
*/
NodeBitmask m_recv_transporters;
/**
- * Bitmask of nodes that has already received data buffered
+ * Bitmask of transporters that has already received data buffered
* inside its transporter. Possibly "carried over" from last
* performReceive
*/
NodeBitmask m_has_data_transporters;
-
- /**
- * Subset of m_has_data_transporters which we completed handling
- * of in previous ::performReceive before we was interrupted due
- * to lack of job buffers. Will skip these when we later retry
- * ::performReceive in order to avoid startvation of non-handled
- * transporters.
- */
- NodeBitmask m_handled_transporters;
-
#if defined(HAVE_EPOLL_CREATE)
int m_epoll_fd;
struct epoll_event *m_epoll_events;
@@ -589,7 +579,7 @@ public:
* Receiving
*/
Uint32 pollReceive(Uint32 timeOutMillis, TransporterReceiveHandle& mask);
- Uint32 performReceive(TransporterReceiveHandle&);
+ void performReceive(TransporterReceiveHandle&);
void update_connections(TransporterReceiveHandle&);
inline Uint32 pollReceive(Uint32 timeOutMillis) {
@@ -597,9 +587,9 @@ public:
return pollReceive(timeOutMillis, * receiveHandle);
}
- inline Uint32 performReceive() {
+ inline void performReceive() {
assert(receiveHandle != 0);
- return performReceive(* receiveHandle);
+ performReceive(* receiveHandle);
}
inline void update_connections() {
=== modified file 'storage/ndb/src/common/transporter/TransporterRegistry.cpp'
--- a/storage/ndb/src/common/transporter/TransporterRegistry.cpp 2012-11-22 12:18:17 +0000
+++ b/storage/ndb/src/common/transporter/TransporterRegistry.cpp 2012-11-23 11:46:55 +0000
@@ -94,7 +94,6 @@ TransporterReceiveData::TransporterRecei
*/
m_transporters.set(); // Handle all
m_transporters.clear(Uint32(0)); // Except wakeup socket...
- m_handled_transporters.clear();
#if defined(HAVE_EPOLL_CREATE)
m_epoll_fd = -1;
@@ -1143,13 +1142,13 @@ TransporterRegistry::pollReceive(Uint32
{
for (int i = 0; i < num_socket_events; i++)
{
- const Uint32 node_id = recvdata.m_epoll_events[i].data.u32;
+ const Uint32 trpid = recvdata.m_epoll_events[i].data.u32;
/**
* check that it's assigned to "us"
*/
- assert(recvdata.m_transporters.get(node_id));
+ assert(recvdata.m_transporters.get(trpid));
- recvdata.m_recv_transporters.set(node_id);
+ recvdata.m_recv_transporters.set(trpid);
}
}
else if (num_socket_events < 0)
@@ -1320,15 +1319,14 @@ TransporterRegistry::poll_TCP(Uint32 tim
/**
* In multi-threaded cases, this must be protected by a global receive lock.
- * In case we were unable to received due to job buffers being full.
- * Returns 0 when receive succeeded from all Transporters having data,
- * else 1.
*/
-Uint32
+void
TransporterRegistry::performReceive(TransporterReceiveHandle& recvdata)
{
assert((receiveHandle == &recvdata) || (receiveHandle == 0));
+ bool hasReceived = false;
+
if (recvdata.m_recv_transporters.get(0))
{
assert(recvdata.m_transporters.get(0));
@@ -1382,7 +1380,7 @@ TransporterRegistry::performReceive(Tran
recvdata.m_recv_transporters.clear();
/**
- * Unpack data either received above or pending from prev rounds.
+ * Handle data either received above or pending from prev rounds.
*/
for(Uint32 id = recvdata.m_has_data_transporters.find_first();
id != BitmaskImpl::NotFound;
@@ -1397,10 +1395,9 @@ TransporterRegistry::performReceive(Tran
{
if (t->isConnected())
{
- if (unlikely(recvdata.checkJobBuffer()))
- return 1; // Full, can't unpack more
- if (unlikely(recvdata.m_handled_transporters.get(id)))
- continue; // Skip now to avoid startvation
+ if (hasReceived)
+ recvdata.checkJobBuffer();
+ hasReceived = true;
Uint32 * ptr;
Uint32 sz = t->getReceiveData(&ptr);
Uint32 szUsed = unpack(recvdata, ptr, sz, id, ioStates[id]);
@@ -1416,7 +1413,6 @@ TransporterRegistry::performReceive(Tran
}
// If transporter still have data, make sure that it's remember to next time
recvdata.m_has_data_transporters.set(id, hasdata);
- recvdata.m_handled_transporters.set(id, hasdata);
}
#endif
@@ -1432,11 +1428,9 @@ TransporterRegistry::performReceive(Tran
{
if(t->isConnected() && t->checkConnected())
{
- if (unlikely(recvdata.checkJobBuffer()))
- return 1; // Full, can't unpack more
- if (unlikely(recvdata.m_handled_transporters.get(nodeId)))
- continue; // Skip now to avoid startvation
-
+ if (hasReceived)
+ callbackObj->checkJobBuffer();
+ hasReceived = true;
Uint32 * readPtr, * eodPtr;
t->getReceivePtr(&readPtr, &eodPtr);
callbackObj->transporter_recv_from(nodeId);
@@ -1444,7 +1438,6 @@ TransporterRegistry::performReceive(Tran
t->updateReceivePtr(newPtr);
}
}
- recvdata.m_handled_transporters.set(nodeId);
}
#endif
#ifdef NDB_SHM_TRANSPORTER
@@ -1456,11 +1449,9 @@ TransporterRegistry::performReceive(Tran
if(is_connected(nodeId)){
if(t->isConnected() && t->checkConnected())
{
- if (unlikely(recvdata.checkJobBuffer()))
- return 1; // Full, can't unpack more
- if (unlikely(recvdata.m_handled_transporters.get(nodeId)))
- continue; // Skip now to avoid startvation
-
+ if (hasReceived)
+ recvdata.checkJobBuffer();
+ hasReceived = true;
Uint32 * readPtr, * eodPtr;
t->getReceivePtr(&readPtr, &eodPtr);
recvdata.transporter_recv_from(nodeId);
@@ -1469,11 +1460,8 @@ TransporterRegistry::performReceive(Tran
t->updateReceivePtr(newPtr);
}
}
- recvdata.m_handled_transporters.set(nodeId);
}
#endif
- recvdata.m_handled_transporters.clear();
- return 0;
}
/**
@@ -1788,7 +1776,6 @@ TransporterRegistry::report_disconnect(T
performStates[node_id] = DISCONNECTED;
recvdata.m_recv_transporters.clear(node_id);
recvdata.m_has_data_transporters.clear(node_id);
- recvdata.m_handled_transporters.clear(node_id);
recvdata.reportDisconnect(node_id, errnum);
DBUG_VOID_RETURN;
}
=== modified file 'storage/ndb/src/kernel/vm/mt.cpp'
--- a/storage/ndb/src/kernel/vm/mt.cpp 2012-11-22 13:20:37 +0000
+++ b/storage/ndb/src/kernel/vm/mt.cpp 2012-11-23 11:46:55 +0000
@@ -38,8 +38,6 @@
#include "mt-asm.h"
#include "mt-lock.hpp"
-static void dumpJobQueues(void);
-
inline
SimulatedBlock*
GlobalData::mt_getBlock(BlockNumber blockNo, Uint32 instanceNo)
@@ -157,7 +155,7 @@ struct thr_wait
static inline
bool
yield(struct thr_wait* wait, const Uint32 nsec,
- bool (*check_callback)(void *), void *check_arg)
+ bool (*check_callback)(struct thr_data *), struct thr_data *check_arg)
{
volatile unsigned * val = &wait->m_futex_state;
#ifndef NDEBUG
@@ -226,7 +224,7 @@ struct thr_wait
static inline
bool
yield(struct thr_wait* wait, const Uint32 nsec,
- bool (*check_callback)(void *), void *check_arg)
+ bool (*check_callback)(struct thr_data *), struct thr_data *check_arg)
{
struct timespec end;
NdbCondition_ComputeAbsTime(&end, nsec/1000000);
@@ -587,20 +585,12 @@ struct thr_job_queue_head
unsigned m_read_index; // Read/written by consumer, read by producer
unsigned m_write_index; // Read/written by producer, read by consumer
- /**
- * Waiter object: In case job queue is full, the produced thread
- * will 'yield' on this waiter object until the consumer thread
- * has consumed (at least) a job buffer.
- */
- thr_wait m_waiter;
-
Uint32 used() const;
};
struct thr_job_queue
{
static const unsigned SIZE = 32;
- static const unsigned SAFETY = 2;
struct thr_job_buffer* m_buffers[SIZE];
};
@@ -1396,7 +1386,7 @@ thr_send_threads::alert_send_thread(Node
extern "C"
bool
-check_available_send_data(void *not_used)
+check_available_send_data(struct thr_data *not_used)
{
(void)not_used;
return !g_send_threads->data_available();
@@ -1562,7 +1552,6 @@ void
job_buffer_full(struct thr_data* selfptr)
{
ndbout_c("job buffer full");
- dumpJobQueues();
abort();
}
@@ -1571,7 +1560,6 @@ void
out_of_job_buffer(struct thr_data* selfptr)
{
ndbout_c("out of job buffer");
- dumpJobQueues();
abort();
}
@@ -2043,81 +2031,40 @@ flush_jbb_write_state(thr_data *selfptr)
}
}
-static
-void
-dumpJobQueues(void)
-{
- const struct thr_repository* rep = &g_thr_repository;
- for (unsigned from = 0; from<num_threads; from++)
- {
- for (unsigned to = 0; to<num_threads; to++)
- {
- const thr_data_aligned *thr_align_ptr = rep->m_thread + to;
- const struct thr_data *thrptr = &thr_align_ptr->m_thr_data;
- const thr_job_queue_head *q_head = thrptr->m_in_queue_head + from;
-
- unsigned used = q_head->used();
- if (used > 0)
- {
- ndbout << " job buffer " << from << " --> " << to
- << ", used:" << used
- << endl;
- }
- }
- }
-}
-
/**
- * Receive thread will unpack 1024 signals (MAX_RECEIVED_SIGNALS)
- * from Transporters before running another check_recv_queue
- *
- * This function returns true if there is not space to unpack
- * this amount of signals, else false.
+ * Transporter will receive 1024 signals (MAX_RECEIVED_SIGNALS)
+ * before running check_job_buffers
*
- * Also used as callback function from yield() to recheck
- * 'full' condition before going to sleep.
+ * This function returns 0 if there is space to receive this amount of
+ * signals
+ * else 1
*/
-static bool
-check_recv_queue(void *arg)
+static int
+check_job_buffers(struct thr_repository* rep, Uint32 recv_thread_id)
{
- thr_job_queue_head *q_head = static_cast<thr_job_queue_head*>(arg);
-
const Uint32 minfree = (1024 + MIN_SIGNALS_PER_PAGE - 1)/MIN_SIGNALS_PER_PAGE;
- /**
- * NOTE: m_read_index is read wo/ lock (and updated by different thread)
- * but since the different thread can only consume
- * signals this means that the value returned from this
- * function is always conservative (i.e it can be better than
- * returned value, if read-index has moved but we didnt see it)
- */
- const unsigned ri = q_head->m_read_index;
- const unsigned wi = q_head->m_write_index;
- const unsigned busy = (wi >= ri) ? wi - ri : (thr_job_queue::SIZE - ri) + wi;
- return (1 + minfree + busy >= thr_job_queue::SIZE);
-}
-
-/**
- * Check if any of the receive queues for the threads being served
- * by this receive thread, are full.
- * If full: Return 'Thr_data*' for (one of) the thread(s)
- * which we have to wait for. (to consume from queue)
- */
-static struct thr_data*
-get_congested_recv_queue(struct thr_repository* rep, Uint32 recv_thread_id)
-{
- const unsigned thr_no = first_receiver_thread_no + recv_thread_id;
- thr_data_aligned *thr_align_ptr = rep->m_thread;
-
+ unsigned thr_no = first_receiver_thread_no + recv_thread_id;
+ const thr_data_aligned *thr_align_ptr = rep->m_thread;
for (unsigned i = 0; i<num_threads; i++, thr_align_ptr++)
{
- struct thr_data *thrptr = &thr_align_ptr->m_thr_data;
- thr_job_queue_head *q_head = thrptr->m_in_queue_head + thr_no;
- if (check_recv_queue(q_head))
+ /**
+ * NOTE: m_read_index is read wo/ lock (and updated by different thread)
+ * but since the different thread can only consume
+ * signals this means that the value returned from this
+ * function is always conservative (i.e it can be better than
+ * returned value, if read-index has moved but we didnt see it)
+ */
+ const struct thr_data *thrptr = &thr_align_ptr->m_thr_data;
+ const thr_job_queue_head *q_head = thrptr->m_in_queue_head + thr_no;
+ unsigned ri = q_head->m_read_index;
+ unsigned wi = q_head->m_write_index;
+ unsigned busy = (wi >= ri) ? wi - ri : (thr_job_queue::SIZE - ri) + wi;
+ if (1 + minfree + busy >= thr_job_queue::SIZE)
{
- return thrptr;
+ return 1;
}
}
- return NULL;
+ return 0;
}
/**
@@ -2139,29 +2086,6 @@ get_congested_recv_queue(struct thr_repo
*/
static
Uint32
-compute_free_buffers_in_queue(const thr_job_queue_head *q_head)
-{
- /**
- * NOTE: m_read_index is read wo/ lock (and updated by different thread)
- * but since the different thread can only consume
- * signals this means that the value returned from this
- * function is always conservative (i.e it can be better than
- * returned value, if read-index has moved but we didnt see it)
- */
- unsigned ri = q_head->m_read_index;
- unsigned wi = q_head->m_write_index;
- unsigned free = (wi < ri) ? ri - wi : (thr_job_queue::SIZE + ri) - wi;
-
- assert(free <= thr_job_queue::SIZE);
-
- if (free <= (1 + thr_job_queue::SAFETY))
- return 0;
- else
- return free - (1 + thr_job_queue::SAFETY);
-}
-
-static
-Uint32
compute_max_signals_to_execute(Uint32 thr_no)
{
Uint32 minfree = thr_job_queue::SIZE;
@@ -2170,17 +2094,30 @@ compute_max_signals_to_execute(Uint32 th
for (unsigned i = 0; i<num_threads; i++, thr_align_ptr++)
{
+ /**
+ * NOTE: m_read_index is read wo/ lock (and updated by different thread)
+ * but since the different thread can only consume
+ * signals this means that the value returned from this
+ * function is always conservative (i.e it can be better than
+ * returned value, if read-index has moved but we didnt see it)
+ */
const struct thr_data *thrptr = &thr_align_ptr->m_thr_data;
const thr_job_queue_head *q_head = thrptr->m_in_queue_head + thr_no;
- unsigned free = compute_free_buffers_in_queue(q_head);
+ unsigned ri = q_head->m_read_index;
+ unsigned wi = q_head->m_write_index;
+ unsigned free = (wi < ri) ? ri - wi : (thr_job_queue::SIZE + ri) - wi;
+
+ assert(free <= thr_job_queue::SIZE);
if (free < minfree)
minfree = free;
}
- if (minfree > 0)
+#define SAFETY 2
+
+ if (minfree >= (1 + SAFETY))
{
- return (3 + (minfree) * MIN_SIGNALS_PER_PAGE) / 4;
+ return (3 + (minfree - (1 + SAFETY)) * MIN_SIGNALS_PER_PAGE) / 4;
}
else
{
@@ -2283,22 +2220,41 @@ int
mt_checkDoJob(Uint32 recv_thread_idx)
{
struct thr_repository* rep = &g_thr_repository;
+ if (unlikely(check_job_buffers(rep, recv_thread_idx)))
+ {
+ do
+ {
+ /**
+ * theoretically (or when we do single threaded by using ndbmtd with
+ * all in same thread) we should execute signals here...to
+ * prevent dead-lock, but...with current ndbmtd only TRPMAN runs in
+ * this thread, and other thread is waiting for TRPMAN
+ * except for QMGR open/close connection, but that is not
+ * (i think) sufficient to create a deadlock
+ */
- /**
- * Return '1' if we are not allowed to receive more signals
- * into the job buffers from this 'recv_thread_idx'.
- *
- * NOTE:
- * We should not loop-wait for buffers to become available
- * here as we currently hold the receiver-lock. Furthermore
- * waiting too long here could cause the receiver thread to be
- * less responsive wrt. moving incoming (TCP) data from the
- * TCPTransporters into the (local) receiveBuffers.
- * The thread could also oversleep on its other tasks as
- * handling open/close of connections, and catching
- * its own shutdown events
- */
- return (get_congested_recv_queue(rep, recv_thread_idx) != NULL);
+ /** FIXME:
+ * On a CMT chip where #CPU >= #NDB-threads sched_yield() is
+ * effectively a NOOP as there will normally be an idle CPU available
+ * to immediately resume thread execution.
+ * On a Niagara chip this may severely impact performance as the CPUs
+ * are virtualized by timemultiplexing the physical core.
+ * The thread should really be 'parked' on
+ * a condition to free its execution resources.
+ */
+// usleep(a-few-usec); /* A micro-sleep would likely have been better... */
+#if defined HAVE_SCHED_YIELD
+ sched_yield();
+#elif defined _WIN32
+ SwitchToThread();
+#else
+ NdbSleep_MilliSleep(0);
+#endif
+
+ } while (check_job_buffers(rep, recv_thread_idx));
+ }
+
+ return 0;
}
/**
@@ -3121,9 +3077,8 @@ read_jba_state(thr_data *selfptr)
/* Check all job queues, return true only if all are empty. */
static bool
-check_queues_empty(void *arg)
+check_queues_empty(thr_data *selfptr)
{
- thr_data *selfptr = static_cast<thr_data *>(arg);
Uint32 thr_count = g_thr_repository.m_thread_count;
bool empty = read_jba_state(selfptr);
if (!empty)
@@ -3186,7 +3141,6 @@ execute_signals(thr_data *selfptr,
r->m_read_buffer = read_buffer;
r->m_read_pos = read_pos;
r->m_read_end = read_end;
- wakeup(&h->m_waiter);
}
}
@@ -3653,33 +3607,13 @@ mt_receiver_thread_main(void *thr_arg)
has_received = false;
if (globalTransporterRegistry.pollReceive(1, recvdata))
{
- watchDogCounter = 8;
- lock(&rep->m_receive_lock[recv_thread_idx]);
- const bool buffersFull = (globalTransporterRegistry.performReceive(recvdata) != 0);
- unlock(&rep->m_receive_lock[recv_thread_idx]);
- has_received = true;
-
- if (buffersFull) /* Receive queues(s) are full */
+ if (check_job_buffers(rep, recv_thread_idx) == 0)
{
- thr_data* waitthr = get_congested_recv_queue(rep, recv_thread_idx);
- if (waitthr != NULL) /* Will wait for buffers to be freed */
- {
- /**
- * Wait for thread 'waitthr' to consume some of the
- * pending signals in m_in_queue previously received
- * from this receive thread, 'thr_no'.
- * Will recheck queue status with 'check_recv_queue' after latch
- * has been set, and *before* going to sleep.
- */
- const Uint32 nano_wait = 1000*1000; /* -> 1 ms */
- thr_job_queue_head *wait_queue = waitthr->m_in_queue_head + thr_no;
-
- const bool waited = yield(&wait_queue->m_waiter,
- nano_wait,
- check_recv_queue,
- wait_queue);
- (void)waited;
- }
+ watchDogCounter = 8;
+ lock(&rep->m_receive_lock[recv_thread_idx]);
+ globalTransporterRegistry.performReceive(recvdata);
+ unlock(&rep->m_receive_lock[recv_thread_idx]);
+ has_received = true;
}
}
selfptr->m_stat.m_loop_cnt++;
@@ -3710,46 +3644,28 @@ sendpacked(struct thr_data* thr_ptr, Sig
}
/**
- * Callback function used by yield() to recheck
- * 'job queue full' condition before going to sleep.
- *
- * Check if the specified 'thr_job_queue_head' (arg)
- * is still full, return true if so.
+ * check if out-queues of selfptr is full
+ * return true is so
*/
static bool
-check_congested_job_queue(void *arg)
+check_job_buffer_full(thr_data *selfptr)
{
- const thr_job_queue_head *waitfor = static_cast<thr_job_queue_head*>(arg);
- return (compute_free_buffers_in_queue(waitfor) == 0);
-}
-
-/**
- * Check if any out-queues of selfptr is full.
- * If full: Return 'Thr_data*' for (one of) the thread(s)
- * which we have to wait for. (to consume from queue)
- */
-static struct thr_data*
-get_congested_job_queue(const thr_data *selfptr)
-{
- const Uint32 thr_no = selfptr->m_thr_no;
- struct thr_repository* rep = &g_thr_repository;
- struct thr_data_aligned *thr_align_ptr = rep->m_thread;
- struct thr_data *waitfor = NULL;
+ Uint32 thr_no = selfptr->m_thr_no;
+ Uint32 tmp = compute_max_signals_to_execute(thr_no);
+#if 0
+ Uint32 perjb = tmp / g_thr_repository.m_thread_count;
- for (unsigned i = 0; i<num_threads; i++, thr_align_ptr++)
+ if (perjb == 0)
{
- struct thr_data *thrptr = &thr_align_ptr->m_thr_data;
- thr_job_queue_head *q_head = thrptr->m_in_queue_head + thr_no;
-
- if (compute_free_buffers_in_queue(q_head) == 0)
- {
- if (thrptr != selfptr) // Don't wait on myself (yet)
- return thrptr;
- else
- waitfor = thrptr;
- }
+ return true;
}
- return waitfor; // Possibly 'thrptr == selfptr'
+
+ return false;
+#else
+ if (tmp < g_thr_repository.m_thread_count)
+ return true;
+ return false;
+#endif
}
/**
@@ -3800,7 +3716,6 @@ loop:
*/
selfptr->m_max_signals_per_jb = 1;
ndbout_c("%u - sleeploop 10!!", selfptr->m_thr_no);
- dumpJobQueues();
return true;
}
@@ -3810,47 +3725,8 @@ loop:
pending_send = do_send(selfptr, TRUE);
}
- struct thr_data* waitthr = get_congested_job_queue(selfptr);
- if (waitthr == NULL) // Waiters resolved
- {
- goto loop;
- }
- else if (waitthr == selfptr) // Avoid self-wait
- {
- //ndbout << "update_sched_config"
- // << ", thr_no=" << thr_no
- // << ", in 'self wait' - proceeding slowly"
- // << endl;
- selfptr->m_max_signals_per_jb = 1; // Proceed slowly
- return sleeploop > 0;
- }
- else if (false)
- {
- ndbout << "update_sched_config"
- << ", thr_no: " << thr_no << " waitfor: " << waitthr->m_thr_no
- << endl;
- }
- /**
- * Wait for thread 'waitthr' to consume some of the
- * pending signals in m_in_queue[].
- * Will recheck queue status with 'check_recv_queue'
- * after latch has been set, and *before* going to sleep.
- */
- const Uint32 nano_wait = 1000*1000; /* -> 1 ms */
- thr_job_queue_head *wait_queue = waitthr->m_in_queue_head + thr_no;
-
- const bool waited = yield(&wait_queue->m_waiter,
- nano_wait,
- check_congested_job_queue,
- wait_queue);
- if (waited)
- {
- sleeploop++;
- //ndbout << "update_sched_config"
- // << ", thr_no=" << thr_no
- // << ", waited due to 'job_buffers_full'"
- // << endl;
- }
+ const Uint32 wait = 1000000; /* 1 ms */
+ yield(&selfptr->m_waiter, wait, check_job_buffer_full, selfptr);
goto loop;
}
@@ -3931,7 +3807,7 @@ mt_job_thread_main(void *thr_arg)
else
{
/* No signals processed, prepare to sleep to wait for more */
- if ((pending_send + send_sum) > 0)
+ if (pending_send || send_sum > 0)
{
/* About to sleep, _must_ send now. */
pending_send = do_send(selfptr, TRUE);
@@ -3960,7 +3836,7 @@ mt_job_thread_main(void *thr_arg)
*/
if (sum >= selfptr->m_max_exec_signals)
{
- if (update_sched_config(selfptr, pending_send + send_sum))
+ if (update_sched_config(selfptr, pending_send))
{
/* Update current time after sleeping */
now = NdbTick_CurrentMillisecond();
@@ -4309,7 +4185,6 @@ thr_init(struct thr_repository* rep, str
{
selfptr->m_in_queue_head[i].m_read_index = 0;
selfptr->m_in_queue_head[i].m_write_index = 0;
- selfptr->m_in_queue_head[i].m_waiter.init();
buffer = may_communicate(i,thr_no)
? seize_buffer(rep, thr_no, false) : NULL;
selfptr->m_in_queue[i].m_buffers[0] = buffer;
No bundle (reason: useless for push emails).
| Thread |
|---|
| • bzr push into mysql-5.5-cluster-7.2 branch (ole.john.aske:4113 to 4114) | Ole John Aske | 23 Nov |