From: Ole John Aske Date: November 23 2012 11:47am Subject: bzr push into mysql-5.5-cluster-7.2 branch (ole.john.aske:4113 to 4114) List-Archive: http://lists.mysql.com/commits/145381 Message-Id: <20121123114721.15121.92955.4114@fimafeng09.no.oracle.com> MIME-Version: 1.0 Content-Type: text/plain; charset="us-ascii" Content-Transfer-Encoding: 7bit 4114 Ole John Aske 2012-11-23 Revert of previous unintended pushes modified: mysql-test/suite/ndb/t/ndb_bushy_joins.cnf storage/ndb/include/transporter/TransporterRegistry.hpp storage/ndb/src/common/transporter/TransporterRegistry.cpp storage/ndb/src/kernel/vm/mt.cpp 4113 Ole John Aske 2012-11-23 Fix failing ndb_bushy_joins.test under Valgrind Extends the TimeBetweenEpoch to avoid failure due to exceeding 'MaxBufferedEpocs > 100). modified: mysql-test/suite/ndb/t/ndb_bushy_joins.cnf === modified file 'mysql-test/suite/ndb/t/ndb_bushy_joins.cnf' --- a/mysql-test/suite/ndb/t/ndb_bushy_joins.cnf 2012-11-23 11:34:35 +0000 +++ b/mysql-test/suite/ndb/t/ndb_bushy_joins.cnf 2012-11-23 11:46:55 +0000 @@ -15,7 +15,6 @@ ThreadConfig=ldm={count=4} MaxNoOfConcurrentOperations=250000 LongMessageBuffer=64M TransactionDeadlockDetectionTimeout=30000 -TimeBetweenEpochs=1000 [ENV] # Need to always use ndbmtd when we want lots of partitions === modified file 'storage/ndb/include/transporter/TransporterRegistry.hpp' --- a/storage/ndb/include/transporter/TransporterRegistry.hpp 2012-11-22 12:18:17 +0000 +++ b/storage/ndb/include/transporter/TransporterRegistry.hpp 2012-11-23 11:46:55 +0000 @@ -108,27 +108,17 @@ struct TransporterReceiveData NodeBitmask m_transporters; /** - * Bitmask of nodes having data awaiting to be received + * Bitmask of transporters having data awaiting to be received * from its transporter. */ NodeBitmask m_recv_transporters; /** - * Bitmask of nodes that has already received data buffered + * Bitmask of transporters that has already received data buffered * inside its transporter. Possibly "carried over" from last * performReceive */ NodeBitmask m_has_data_transporters; - - /** - * Subset of m_has_data_transporters which we completed handling - * of in previous ::performReceive before we was interrupted due - * to lack of job buffers. Will skip these when we later retry - * ::performReceive in order to avoid startvation of non-handled - * transporters. - */ - NodeBitmask m_handled_transporters; - #if defined(HAVE_EPOLL_CREATE) int m_epoll_fd; struct epoll_event *m_epoll_events; @@ -589,7 +579,7 @@ public: * Receiving */ Uint32 pollReceive(Uint32 timeOutMillis, TransporterReceiveHandle& mask); - Uint32 performReceive(TransporterReceiveHandle&); + void performReceive(TransporterReceiveHandle&); void update_connections(TransporterReceiveHandle&); inline Uint32 pollReceive(Uint32 timeOutMillis) { @@ -597,9 +587,9 @@ public: return pollReceive(timeOutMillis, * receiveHandle); } - inline Uint32 performReceive() { + inline void performReceive() { assert(receiveHandle != 0); - return performReceive(* receiveHandle); + performReceive(* receiveHandle); } inline void update_connections() { === modified file 'storage/ndb/src/common/transporter/TransporterRegistry.cpp' --- a/storage/ndb/src/common/transporter/TransporterRegistry.cpp 2012-11-22 12:18:17 +0000 +++ b/storage/ndb/src/common/transporter/TransporterRegistry.cpp 2012-11-23 11:46:55 +0000 @@ -94,7 +94,6 @@ TransporterReceiveData::TransporterRecei */ m_transporters.set(); // Handle all m_transporters.clear(Uint32(0)); // Except wakeup socket... - m_handled_transporters.clear(); #if defined(HAVE_EPOLL_CREATE) m_epoll_fd = -1; @@ -1143,13 +1142,13 @@ TransporterRegistry::pollReceive(Uint32 { for (int i = 0; i < num_socket_events; i++) { - const Uint32 node_id = recvdata.m_epoll_events[i].data.u32; + const Uint32 trpid = recvdata.m_epoll_events[i].data.u32; /** * check that it's assigned to "us" */ - assert(recvdata.m_transporters.get(node_id)); + assert(recvdata.m_transporters.get(trpid)); - recvdata.m_recv_transporters.set(node_id); + recvdata.m_recv_transporters.set(trpid); } } else if (num_socket_events < 0) @@ -1320,15 +1319,14 @@ TransporterRegistry::poll_TCP(Uint32 tim /** * In multi-threaded cases, this must be protected by a global receive lock. - * In case we were unable to received due to job buffers being full. - * Returns 0 when receive succeeded from all Transporters having data, - * else 1. */ -Uint32 +void TransporterRegistry::performReceive(TransporterReceiveHandle& recvdata) { assert((receiveHandle == &recvdata) || (receiveHandle == 0)); + bool hasReceived = false; + if (recvdata.m_recv_transporters.get(0)) { assert(recvdata.m_transporters.get(0)); @@ -1382,7 +1380,7 @@ TransporterRegistry::performReceive(Tran recvdata.m_recv_transporters.clear(); /** - * Unpack data either received above or pending from prev rounds. + * Handle data either received above or pending from prev rounds. */ for(Uint32 id = recvdata.m_has_data_transporters.find_first(); id != BitmaskImpl::NotFound; @@ -1397,10 +1395,9 @@ TransporterRegistry::performReceive(Tran { if (t->isConnected()) { - if (unlikely(recvdata.checkJobBuffer())) - return 1; // Full, can't unpack more - if (unlikely(recvdata.m_handled_transporters.get(id))) - continue; // Skip now to avoid startvation + if (hasReceived) + recvdata.checkJobBuffer(); + hasReceived = true; Uint32 * ptr; Uint32 sz = t->getReceiveData(&ptr); Uint32 szUsed = unpack(recvdata, ptr, sz, id, ioStates[id]); @@ -1416,7 +1413,6 @@ TransporterRegistry::performReceive(Tran } // If transporter still have data, make sure that it's remember to next time recvdata.m_has_data_transporters.set(id, hasdata); - recvdata.m_handled_transporters.set(id, hasdata); } #endif @@ -1432,11 +1428,9 @@ TransporterRegistry::performReceive(Tran { if(t->isConnected() && t->checkConnected()) { - if (unlikely(recvdata.checkJobBuffer())) - return 1; // Full, can't unpack more - if (unlikely(recvdata.m_handled_transporters.get(nodeId))) - continue; // Skip now to avoid startvation - + if (hasReceived) + callbackObj->checkJobBuffer(); + hasReceived = true; Uint32 * readPtr, * eodPtr; t->getReceivePtr(&readPtr, &eodPtr); callbackObj->transporter_recv_from(nodeId); @@ -1444,7 +1438,6 @@ TransporterRegistry::performReceive(Tran t->updateReceivePtr(newPtr); } } - recvdata.m_handled_transporters.set(nodeId); } #endif #ifdef NDB_SHM_TRANSPORTER @@ -1456,11 +1449,9 @@ TransporterRegistry::performReceive(Tran if(is_connected(nodeId)){ if(t->isConnected() && t->checkConnected()) { - if (unlikely(recvdata.checkJobBuffer())) - return 1; // Full, can't unpack more - if (unlikely(recvdata.m_handled_transporters.get(nodeId))) - continue; // Skip now to avoid startvation - + if (hasReceived) + recvdata.checkJobBuffer(); + hasReceived = true; Uint32 * readPtr, * eodPtr; t->getReceivePtr(&readPtr, &eodPtr); recvdata.transporter_recv_from(nodeId); @@ -1469,11 +1460,8 @@ TransporterRegistry::performReceive(Tran t->updateReceivePtr(newPtr); } } - recvdata.m_handled_transporters.set(nodeId); } #endif - recvdata.m_handled_transporters.clear(); - return 0; } /** @@ -1788,7 +1776,6 @@ TransporterRegistry::report_disconnect(T performStates[node_id] = DISCONNECTED; recvdata.m_recv_transporters.clear(node_id); recvdata.m_has_data_transporters.clear(node_id); - recvdata.m_handled_transporters.clear(node_id); recvdata.reportDisconnect(node_id, errnum); DBUG_VOID_RETURN; } === modified file 'storage/ndb/src/kernel/vm/mt.cpp' --- a/storage/ndb/src/kernel/vm/mt.cpp 2012-11-22 13:20:37 +0000 +++ b/storage/ndb/src/kernel/vm/mt.cpp 2012-11-23 11:46:55 +0000 @@ -38,8 +38,6 @@ #include "mt-asm.h" #include "mt-lock.hpp" -static void dumpJobQueues(void); - inline SimulatedBlock* GlobalData::mt_getBlock(BlockNumber blockNo, Uint32 instanceNo) @@ -157,7 +155,7 @@ struct thr_wait static inline bool yield(struct thr_wait* wait, const Uint32 nsec, - bool (*check_callback)(void *), void *check_arg) + bool (*check_callback)(struct thr_data *), struct thr_data *check_arg) { volatile unsigned * val = &wait->m_futex_state; #ifndef NDEBUG @@ -226,7 +224,7 @@ struct thr_wait static inline bool yield(struct thr_wait* wait, const Uint32 nsec, - bool (*check_callback)(void *), void *check_arg) + bool (*check_callback)(struct thr_data *), struct thr_data *check_arg) { struct timespec end; NdbCondition_ComputeAbsTime(&end, nsec/1000000); @@ -587,20 +585,12 @@ struct thr_job_queue_head unsigned m_read_index; // Read/written by consumer, read by producer unsigned m_write_index; // Read/written by producer, read by consumer - /** - * Waiter object: In case job queue is full, the produced thread - * will 'yield' on this waiter object until the consumer thread - * has consumed (at least) a job buffer. - */ - thr_wait m_waiter; - Uint32 used() const; }; struct thr_job_queue { static const unsigned SIZE = 32; - static const unsigned SAFETY = 2; struct thr_job_buffer* m_buffers[SIZE]; }; @@ -1396,7 +1386,7 @@ thr_send_threads::alert_send_thread(Node extern "C" bool -check_available_send_data(void *not_used) +check_available_send_data(struct thr_data *not_used) { (void)not_used; return !g_send_threads->data_available(); @@ -1562,7 +1552,6 @@ void job_buffer_full(struct thr_data* selfptr) { ndbout_c("job buffer full"); - dumpJobQueues(); abort(); } @@ -1571,7 +1560,6 @@ void out_of_job_buffer(struct thr_data* selfptr) { ndbout_c("out of job buffer"); - dumpJobQueues(); abort(); } @@ -2043,81 +2031,40 @@ flush_jbb_write_state(thr_data *selfptr) } } -static -void -dumpJobQueues(void) -{ - const struct thr_repository* rep = &g_thr_repository; - for (unsigned from = 0; fromm_thread + to; - const struct thr_data *thrptr = &thr_align_ptr->m_thr_data; - const thr_job_queue_head *q_head = thrptr->m_in_queue_head + from; - - unsigned used = q_head->used(); - if (used > 0) - { - ndbout << " job buffer " << from << " --> " << to - << ", used:" << used - << endl; - } - } - } -} - /** - * Receive thread will unpack 1024 signals (MAX_RECEIVED_SIGNALS) - * from Transporters before running another check_recv_queue - * - * This function returns true if there is not space to unpack - * this amount of signals, else false. + * Transporter will receive 1024 signals (MAX_RECEIVED_SIGNALS) + * before running check_job_buffers * - * Also used as callback function from yield() to recheck - * 'full' condition before going to sleep. + * This function returns 0 if there is space to receive this amount of + * signals + * else 1 */ -static bool -check_recv_queue(void *arg) +static int +check_job_buffers(struct thr_repository* rep, Uint32 recv_thread_id) { - thr_job_queue_head *q_head = static_cast(arg); - const Uint32 minfree = (1024 + MIN_SIGNALS_PER_PAGE - 1)/MIN_SIGNALS_PER_PAGE; - /** - * NOTE: m_read_index is read wo/ lock (and updated by different thread) - * but since the different thread can only consume - * signals this means that the value returned from this - * function is always conservative (i.e it can be better than - * returned value, if read-index has moved but we didnt see it) - */ - const unsigned ri = q_head->m_read_index; - const unsigned wi = q_head->m_write_index; - const unsigned busy = (wi >= ri) ? wi - ri : (thr_job_queue::SIZE - ri) + wi; - return (1 + minfree + busy >= thr_job_queue::SIZE); -} - -/** - * Check if any of the receive queues for the threads being served - * by this receive thread, are full. - * If full: Return 'Thr_data*' for (one of) the thread(s) - * which we have to wait for. (to consume from queue) - */ -static struct thr_data* -get_congested_recv_queue(struct thr_repository* rep, Uint32 recv_thread_id) -{ - const unsigned thr_no = first_receiver_thread_no + recv_thread_id; - thr_data_aligned *thr_align_ptr = rep->m_thread; - + unsigned thr_no = first_receiver_thread_no + recv_thread_id; + const thr_data_aligned *thr_align_ptr = rep->m_thread; for (unsigned i = 0; im_thr_data; - thr_job_queue_head *q_head = thrptr->m_in_queue_head + thr_no; - if (check_recv_queue(q_head)) + /** + * NOTE: m_read_index is read wo/ lock (and updated by different thread) + * but since the different thread can only consume + * signals this means that the value returned from this + * function is always conservative (i.e it can be better than + * returned value, if read-index has moved but we didnt see it) + */ + const struct thr_data *thrptr = &thr_align_ptr->m_thr_data; + const thr_job_queue_head *q_head = thrptr->m_in_queue_head + thr_no; + unsigned ri = q_head->m_read_index; + unsigned wi = q_head->m_write_index; + unsigned busy = (wi >= ri) ? wi - ri : (thr_job_queue::SIZE - ri) + wi; + if (1 + minfree + busy >= thr_job_queue::SIZE) { - return thrptr; + return 1; } } - return NULL; + return 0; } /** @@ -2139,29 +2086,6 @@ get_congested_recv_queue(struct thr_repo */ static Uint32 -compute_free_buffers_in_queue(const thr_job_queue_head *q_head) -{ - /** - * NOTE: m_read_index is read wo/ lock (and updated by different thread) - * but since the different thread can only consume - * signals this means that the value returned from this - * function is always conservative (i.e it can be better than - * returned value, if read-index has moved but we didnt see it) - */ - unsigned ri = q_head->m_read_index; - unsigned wi = q_head->m_write_index; - unsigned free = (wi < ri) ? ri - wi : (thr_job_queue::SIZE + ri) - wi; - - assert(free <= thr_job_queue::SIZE); - - if (free <= (1 + thr_job_queue::SAFETY)) - return 0; - else - return free - (1 + thr_job_queue::SAFETY); -} - -static -Uint32 compute_max_signals_to_execute(Uint32 thr_no) { Uint32 minfree = thr_job_queue::SIZE; @@ -2170,17 +2094,30 @@ compute_max_signals_to_execute(Uint32 th for (unsigned i = 0; im_thr_data; const thr_job_queue_head *q_head = thrptr->m_in_queue_head + thr_no; - unsigned free = compute_free_buffers_in_queue(q_head); + unsigned ri = q_head->m_read_index; + unsigned wi = q_head->m_write_index; + unsigned free = (wi < ri) ? ri - wi : (thr_job_queue::SIZE + ri) - wi; + + assert(free <= thr_job_queue::SIZE); if (free < minfree) minfree = free; } - if (minfree > 0) +#define SAFETY 2 + + if (minfree >= (1 + SAFETY)) { - return (3 + (minfree) * MIN_SIGNALS_PER_PAGE) / 4; + return (3 + (minfree - (1 + SAFETY)) * MIN_SIGNALS_PER_PAGE) / 4; } else { @@ -2283,22 +2220,41 @@ int mt_checkDoJob(Uint32 recv_thread_idx) { struct thr_repository* rep = &g_thr_repository; + if (unlikely(check_job_buffers(rep, recv_thread_idx))) + { + do + { + /** + * theoretically (or when we do single threaded by using ndbmtd with + * all in same thread) we should execute signals here...to + * prevent dead-lock, but...with current ndbmtd only TRPMAN runs in + * this thread, and other thread is waiting for TRPMAN + * except for QMGR open/close connection, but that is not + * (i think) sufficient to create a deadlock + */ - /** - * Return '1' if we are not allowed to receive more signals - * into the job buffers from this 'recv_thread_idx'. - * - * NOTE: - * We should not loop-wait for buffers to become available - * here as we currently hold the receiver-lock. Furthermore - * waiting too long here could cause the receiver thread to be - * less responsive wrt. moving incoming (TCP) data from the - * TCPTransporters into the (local) receiveBuffers. - * The thread could also oversleep on its other tasks as - * handling open/close of connections, and catching - * its own shutdown events - */ - return (get_congested_recv_queue(rep, recv_thread_idx) != NULL); + /** FIXME: + * On a CMT chip where #CPU >= #NDB-threads sched_yield() is + * effectively a NOOP as there will normally be an idle CPU available + * to immediately resume thread execution. + * On a Niagara chip this may severely impact performance as the CPUs + * are virtualized by timemultiplexing the physical core. + * The thread should really be 'parked' on + * a condition to free its execution resources. + */ +// usleep(a-few-usec); /* A micro-sleep would likely have been better... */ +#if defined HAVE_SCHED_YIELD + sched_yield(); +#elif defined _WIN32 + SwitchToThread(); +#else + NdbSleep_MilliSleep(0); +#endif + + } while (check_job_buffers(rep, recv_thread_idx)); + } + + return 0; } /** @@ -3121,9 +3077,8 @@ read_jba_state(thr_data *selfptr) /* Check all job queues, return true only if all are empty. */ static bool -check_queues_empty(void *arg) +check_queues_empty(thr_data *selfptr) { - thr_data *selfptr = static_cast(arg); Uint32 thr_count = g_thr_repository.m_thread_count; bool empty = read_jba_state(selfptr); if (!empty) @@ -3186,7 +3141,6 @@ execute_signals(thr_data *selfptr, r->m_read_buffer = read_buffer; r->m_read_pos = read_pos; r->m_read_end = read_end; - wakeup(&h->m_waiter); } } @@ -3653,33 +3607,13 @@ mt_receiver_thread_main(void *thr_arg) has_received = false; if (globalTransporterRegistry.pollReceive(1, recvdata)) { - watchDogCounter = 8; - lock(&rep->m_receive_lock[recv_thread_idx]); - const bool buffersFull = (globalTransporterRegistry.performReceive(recvdata) != 0); - unlock(&rep->m_receive_lock[recv_thread_idx]); - has_received = true; - - if (buffersFull) /* Receive queues(s) are full */ + if (check_job_buffers(rep, recv_thread_idx) == 0) { - thr_data* waitthr = get_congested_recv_queue(rep, recv_thread_idx); - if (waitthr != NULL) /* Will wait for buffers to be freed */ - { - /** - * Wait for thread 'waitthr' to consume some of the - * pending signals in m_in_queue previously received - * from this receive thread, 'thr_no'. - * Will recheck queue status with 'check_recv_queue' after latch - * has been set, and *before* going to sleep. - */ - const Uint32 nano_wait = 1000*1000; /* -> 1 ms */ - thr_job_queue_head *wait_queue = waitthr->m_in_queue_head + thr_no; - - const bool waited = yield(&wait_queue->m_waiter, - nano_wait, - check_recv_queue, - wait_queue); - (void)waited; - } + watchDogCounter = 8; + lock(&rep->m_receive_lock[recv_thread_idx]); + globalTransporterRegistry.performReceive(recvdata); + unlock(&rep->m_receive_lock[recv_thread_idx]); + has_received = true; } } selfptr->m_stat.m_loop_cnt++; @@ -3710,46 +3644,28 @@ sendpacked(struct thr_data* thr_ptr, Sig } /** - * Callback function used by yield() to recheck - * 'job queue full' condition before going to sleep. - * - * Check if the specified 'thr_job_queue_head' (arg) - * is still full, return true if so. + * check if out-queues of selfptr is full + * return true is so */ static bool -check_congested_job_queue(void *arg) +check_job_buffer_full(thr_data *selfptr) { - const thr_job_queue_head *waitfor = static_cast(arg); - return (compute_free_buffers_in_queue(waitfor) == 0); -} - -/** - * Check if any out-queues of selfptr is full. - * If full: Return 'Thr_data*' for (one of) the thread(s) - * which we have to wait for. (to consume from queue) - */ -static struct thr_data* -get_congested_job_queue(const thr_data *selfptr) -{ - const Uint32 thr_no = selfptr->m_thr_no; - struct thr_repository* rep = &g_thr_repository; - struct thr_data_aligned *thr_align_ptr = rep->m_thread; - struct thr_data *waitfor = NULL; + Uint32 thr_no = selfptr->m_thr_no; + Uint32 tmp = compute_max_signals_to_execute(thr_no); +#if 0 + Uint32 perjb = tmp / g_thr_repository.m_thread_count; - for (unsigned i = 0; im_thr_data; - thr_job_queue_head *q_head = thrptr->m_in_queue_head + thr_no; - - if (compute_free_buffers_in_queue(q_head) == 0) - { - if (thrptr != selfptr) // Don't wait on myself (yet) - return thrptr; - else - waitfor = thrptr; - } + return true; } - return waitfor; // Possibly 'thrptr == selfptr' + + return false; +#else + if (tmp < g_thr_repository.m_thread_count) + return true; + return false; +#endif } /** @@ -3800,7 +3716,6 @@ loop: */ selfptr->m_max_signals_per_jb = 1; ndbout_c("%u - sleeploop 10!!", selfptr->m_thr_no); - dumpJobQueues(); return true; } @@ -3810,47 +3725,8 @@ loop: pending_send = do_send(selfptr, TRUE); } - struct thr_data* waitthr = get_congested_job_queue(selfptr); - if (waitthr == NULL) // Waiters resolved - { - goto loop; - } - else if (waitthr == selfptr) // Avoid self-wait - { - //ndbout << "update_sched_config" - // << ", thr_no=" << thr_no - // << ", in 'self wait' - proceeding slowly" - // << endl; - selfptr->m_max_signals_per_jb = 1; // Proceed slowly - return sleeploop > 0; - } - else if (false) - { - ndbout << "update_sched_config" - << ", thr_no: " << thr_no << " waitfor: " << waitthr->m_thr_no - << endl; - } - /** - * Wait for thread 'waitthr' to consume some of the - * pending signals in m_in_queue[]. - * Will recheck queue status with 'check_recv_queue' - * after latch has been set, and *before* going to sleep. - */ - const Uint32 nano_wait = 1000*1000; /* -> 1 ms */ - thr_job_queue_head *wait_queue = waitthr->m_in_queue_head + thr_no; - - const bool waited = yield(&wait_queue->m_waiter, - nano_wait, - check_congested_job_queue, - wait_queue); - if (waited) - { - sleeploop++; - //ndbout << "update_sched_config" - // << ", thr_no=" << thr_no - // << ", waited due to 'job_buffers_full'" - // << endl; - } + const Uint32 wait = 1000000; /* 1 ms */ + yield(&selfptr->m_waiter, wait, check_job_buffer_full, selfptr); goto loop; } @@ -3931,7 +3807,7 @@ mt_job_thread_main(void *thr_arg) else { /* No signals processed, prepare to sleep to wait for more */ - if ((pending_send + send_sum) > 0) + if (pending_send || send_sum > 0) { /* About to sleep, _must_ send now. */ pending_send = do_send(selfptr, TRUE); @@ -3960,7 +3836,7 @@ mt_job_thread_main(void *thr_arg) */ if (sum >= selfptr->m_max_exec_signals) { - if (update_sched_config(selfptr, pending_send + send_sum)) + if (update_sched_config(selfptr, pending_send)) { /* Update current time after sleeping */ now = NdbTick_CurrentMillisecond(); @@ -4309,7 +4185,6 @@ thr_init(struct thr_repository* rep, str { selfptr->m_in_queue_head[i].m_read_index = 0; selfptr->m_in_queue_head[i].m_write_index = 0; - selfptr->m_in_queue_head[i].m_waiter.init(); buffer = may_communicate(i,thr_no) ? seize_buffer(rep, thr_no, false) : NULL; selfptr->m_in_queue[i].m_buffers[0] = buffer; No bundle (reason: useless for push emails).