From: John David Duncan Date: May 31 2012 6:59pm Subject: bzr push into mysql-5.5-cluster-7.2 branch (john.duncan:3929 to 3930) List-Archive: http://lists.mysql.com/commits/144059 Message-Id: <201205311859.q4VIxM5L013206@acsmt357.oracle.com> MIME-Version: 1.0 Content-Type: text/plain; charset="us-ascii" Content-Transfer-Encoding: 7bit 3930 John David Duncan 2012-05-31 7.2-atc: Apply Jonas' ATC patch to 7.2 modified: storage/ndb/include/transporter/TransporterCallback.hpp storage/ndb/include/transporter/TransporterRegistry.hpp storage/ndb/src/common/transporter/Packer.cpp storage/ndb/src/common/transporter/TransporterRegistry.cpp storage/ndb/src/kernel/vm/TransporterCallback.cpp storage/ndb/src/kernel/vm/TransporterCallbackKernel.hpp storage/ndb/src/ndbapi/ClusterMgr.cpp storage/ndb/src/ndbapi/NdbScanOperation.cpp storage/ndb/src/ndbapi/NdbTransaction.cpp storage/ndb/src/ndbapi/Ndbif.cpp storage/ndb/src/ndbapi/TransporterFacade.cpp storage/ndb/src/ndbapi/TransporterFacade.hpp storage/ndb/src/ndbapi/ndb_cluster_connection.cpp storage/ndb/src/ndbapi/trp_buffer.cpp storage/ndb/src/ndbapi/trp_buffer.hpp storage/ndb/src/ndbapi/trp_client.cpp storage/ndb/src/ndbapi/trp_client.hpp 3929 Frazer Clement 2012-05-31 Bug#14083116 (Bad error handling in LQH corrupts transid hash) Testcase verifying fix. modified: mysql-test/suite/ndb/r/ndb_join_pushdown_default.result mysql-test/suite/ndb/t/ndb_join_pushdown.inc === modified file 'storage/ndb/include/transporter/TransporterCallback.hpp' --- a/storage/ndb/include/transporter/TransporterCallback.hpp 2012-01-17 08:33:59 +0000 +++ b/storage/ndb/include/transporter/TransporterCallback.hpp 2012-05-31 18:26:14 +0000 @@ -46,8 +46,10 @@ public: * * The method may either execute the signal immediately (NDB API), or * queue it for later execution (kernel). + * + * @returns true if no more signals should be delivered */ - virtual void deliver_signal(SignalHeader * const header, + virtual bool deliver_signal(SignalHeader * const header, Uint8 prio, Uint32 * const signalData, LinearSectionPtr ptr[3]) = 0; === modified file 'storage/ndb/include/transporter/TransporterRegistry.hpp' --- a/storage/ndb/include/transporter/TransporterRegistry.hpp 2012-01-30 15:12:41 +0000 +++ b/storage/ndb/include/transporter/TransporterRegistry.hpp 2012-05-31 18:26:14 +0000 @@ -445,13 +445,15 @@ private: Uint32 * readPtr, Uint32 bufferSize, NodeId remoteNodeId, - IOState state); + IOState state, + bool & stopReceiving); Uint32 * unpack(TransporterReceiveHandle&, Uint32 * readPtr, Uint32 * eodPtr, NodeId remoteNodeId, - IOState state); + IOState state, + bool & stopReceiving); static Uint32 unpack_length_words(const Uint32 *readPtr, Uint32 maxWords); /** @@ -488,6 +490,7 @@ private: void updateWritePtr(TransporterSendBufferHandle *handle, NodeId node, Uint32 lenBytes, Uint32 prio); +public: /** * TransporterSendBufferHandle implementation. * === modified file 'storage/ndb/src/common/transporter/Packer.cpp' --- a/storage/ndb/src/common/transporter/Packer.cpp 2012-02-23 15:41:31 +0000 +++ b/storage/ndb/src/common/transporter/Packer.cpp 2012-05-31 18:26:14 +0000 @@ -33,16 +33,21 @@ TransporterRegistry::unpack(TransporterR Uint32 * readPtr, Uint32 sizeOfData, NodeId remoteNodeId, - IOState state) { + IOState state, + bool & stopReceiving) +{ + assert(stopReceiving == false); SignalHeader signalHeader; LinearSectionPtr ptr[3]; Uint32 usedData = 0; Uint32 loop_count = 0; + bool doStopReceiving = false; if(state == NoHalt || state == HaltOutput){ while ((sizeOfData >= 4 + sizeof(Protocol6)) && - (loop_count < MAX_RECEIVED_SIGNALS)) { + (loop_count < MAX_RECEIVED_SIGNALS) && + doStopReceiving == false) { Uint32 word1 = readPtr[0]; Uint32 word2 = readPtr[1]; Uint32 word3 = readPtr[2]; @@ -113,19 +118,21 @@ TransporterRegistry::unpack(TransporterR sectionData += sz; } - recvHandle.deliver_signal(&signalHeader, prio, signalData, ptr); + doStopReceiving = recvHandle.deliver_signal(&signalHeader, prio, signalData, ptr); readPtr += messageLen32; sizeOfData -= messageLenBytes; usedData += messageLenBytes; }//while - + + stopReceiving = doStopReceiving; return usedData; } else { /** state = HaltIO || state == HaltInput */ while ((sizeOfData >= 4 + sizeof(Protocol6)) && - (loop_count < MAX_RECEIVED_SIGNALS)) { + (loop_count < MAX_RECEIVED_SIGNALS) && + doStopReceiving == false) { Uint32 word1 = readPtr[0]; Uint32 word2 = readPtr[1]; Uint32 word3 = readPtr[2]; @@ -199,7 +206,7 @@ TransporterRegistry::unpack(TransporterR sectionData += sz; } - recvHandle.deliver_signal(&signalHeader, prio, signalData, ptr); + doStopReceiving = recvHandle.deliver_signal(&signalHeader, prio, signalData, ptr); } else { DEBUG("prepareReceive(...) - Discarding message to block: " << rBlockNum << " from Node: " << remoteNodeId); @@ -210,7 +217,7 @@ TransporterRegistry::unpack(TransporterR usedData += messageLenBytes; }//while - + stopReceiving = doStopReceiving; return usedData; }//if } @@ -220,12 +227,15 @@ TransporterRegistry::unpack(TransporterR Uint32 * readPtr, Uint32 * eodPtr, NodeId remoteNodeId, - IOState state) { + IOState state, + bool & stopReceiving) { + assert(stopReceiving == false); SignalHeader signalHeader; LinearSectionPtr ptr[3]; Uint32 loop_count = 0; + bool doStopReceiving = false; if(state == NoHalt || state == HaltOutput){ - while ((readPtr < eodPtr) && (loop_count < MAX_RECEIVED_SIGNALS)) { + while ((readPtr < eodPtr) && (loop_count < MAX_RECEIVED_SIGNALS) && doStopReceiving == false) { Uint32 word1 = readPtr[0]; Uint32 word2 = readPtr[1]; Uint32 word3 = readPtr[2]; @@ -291,14 +301,14 @@ TransporterRegistry::unpack(TransporterR sectionData += sz; } - recvHandle.deliver_signal(&signalHeader, prio, signalData, ptr); + doStopReceiving = recvHandle.deliver_signal(&signalHeader, prio, signalData, ptr); readPtr += messageLen32; }//while } else { /** state = HaltIO || state == HaltInput */ - while ((readPtr < eodPtr) && (loop_count < MAX_RECEIVED_SIGNALS)) { + while ((readPtr < eodPtr) && (loop_count < MAX_RECEIVED_SIGNALS) && doStopReceiving == false) { Uint32 word1 = readPtr[0]; Uint32 word2 = readPtr[1]; Uint32 word3 = readPtr[2]; @@ -368,7 +378,7 @@ TransporterRegistry::unpack(TransporterR sectionData += sz; } - recvHandle.deliver_signal(&signalHeader, prio, signalData, ptr); + doStopReceiving = recvHandle.deliver_signal(&signalHeader, prio, signalData, ptr); } else { DEBUG("prepareReceive(...) - Discarding message to block: " << rBlockNum << " from Node: " << remoteNodeId); @@ -377,6 +387,7 @@ TransporterRegistry::unpack(TransporterR readPtr += messageLen32; }//while }//if + stopReceiving = doStopReceiving; return readPtr; } === modified file 'storage/ndb/src/common/transporter/TransporterRegistry.cpp' --- a/storage/ndb/src/common/transporter/TransporterRegistry.cpp 2012-03-14 10:31:02 +0000 +++ b/storage/ndb/src/common/transporter/TransporterRegistry.cpp 2012-05-31 18:26:14 +0000 @@ -1333,6 +1333,7 @@ TransporterRegistry::performReceive(Tran assert((receiveHandle == &recvdata) || (receiveHandle == 0)); bool hasReceived = false; + bool stopReceiving = false; if (recvdata.m_has_data_transporters.get(0)) { @@ -1357,7 +1358,7 @@ TransporterRegistry::performReceive(Tran #ifdef NDB_TCP_TRANSPORTER for(Uint32 id = recvdata.m_has_data_transporters.find_first(); - id != BitmaskImpl::NotFound; + id != BitmaskImpl::NotFound && !stopReceiving; id = recvdata.m_has_data_transporters.find_next(id + 1)) { bool hasdata = false; @@ -1376,7 +1377,7 @@ TransporterRegistry::performReceive(Tran Uint32 * ptr; Uint32 sz = t->getReceiveData(&ptr); recvdata.transporter_recv_from(id); - Uint32 szUsed = unpack(recvdata, ptr, sz, id, ioStates[id]); + Uint32 szUsed = unpack(recvdata, ptr, sz, id, ioStates[id], stopReceiving); t->updateReceiveDataPtr(szUsed); hasdata = t->hasReceiveData(); } @@ -1389,7 +1390,7 @@ TransporterRegistry::performReceive(Tran #ifdef NDB_SCI_TRANSPORTER //performReceive //do prepareReceive on the SCI transporters (prepareReceive(t,,,,)) - for (int i=0; igetRemoteNodeId(); @@ -1404,14 +1405,14 @@ TransporterRegistry::performReceive(Tran Uint32 * readPtr, * eodPtr; t->getReceivePtr(&readPtr, &eodPtr); callbackObj->transporter_recv_from(nodeId); - Uint32 *newPtr = unpack(readPtr, eodPtr, nodeId, ioStates[nodeId]); + Uint32 *newPtr = unpack(readPtr, eodPtr, nodeId, ioStates[nodeId], stopReceiving); t->updateReceivePtr(newPtr); } } } #endif #ifdef NDB_SHM_TRANSPORTER - for (int i=0; igetRemoteNodeId(); @@ -1426,7 +1427,8 @@ TransporterRegistry::performReceive(Tran t->getReceivePtr(&readPtr, &eodPtr); recvdata.transporter_recv_from(nodeId); Uint32 *newPtr = unpack(recvdata, - readPtr, eodPtr, nodeId, ioStates[nodeId]); + readPtr, eodPtr, nodeId, ioStates[nodeId], + stopReceiving); t->updateReceivePtr(newPtr); } } === modified file 'storage/ndb/src/kernel/vm/TransporterCallback.cpp' --- a/storage/ndb/src/kernel/vm/TransporterCallback.cpp 2012-01-24 06:20:13 +0000 +++ b/storage/ndb/src/kernel/vm/TransporterCallback.cpp 2012-05-31 18:26:14 +0000 @@ -133,7 +133,7 @@ void mt_init_receiver_cache(){} void mt_set_section_chunk_size(){} #endif -void +bool TransporterReceiveHandleKernel::deliver_signal(SignalHeader * const header, Uint8 prio, Uint32 * const theData, @@ -198,7 +198,7 @@ TransporterReceiveHandleKernel::deliver_ header, theData, secPtrI); #endif - return; + return false; } /** @@ -234,6 +234,7 @@ TransporterReceiveHandleKernel::deliver_ sendprioa(m_thr_no /* self */, header, theData, NULL); #endif + return false; } NdbOut & === modified file 'storage/ndb/src/kernel/vm/TransporterCallbackKernel.hpp' --- a/storage/ndb/src/kernel/vm/TransporterCallbackKernel.hpp 2012-01-23 20:23:08 +0000 +++ b/storage/ndb/src/kernel/vm/TransporterCallbackKernel.hpp 2012-05-31 18:26:14 +0000 @@ -44,7 +44,7 @@ public: #endif /* TransporterCallback interface. */ - void deliver_signal(SignalHeader * const header, + bool deliver_signal(SignalHeader * const header, Uint8 prio, Uint32 * const signalData, LinearSectionPtr ptr[3]); === modified file 'storage/ndb/src/ndbapi/ClusterMgr.cpp' --- a/storage/ndb/src/ndbapi/ClusterMgr.cpp 2011-07-04 16:30:34 +0000 +++ b/storage/ndb/src/ndbapi/ClusterMgr.cpp 2012-05-31 18:26:14 +0000 @@ -217,12 +217,12 @@ ClusterMgr::doStop( ){ void ClusterMgr::forceHB() { - theFacade.lock_mutex(); + theFacade.lock_poll_mutex(); if(waitingForHB) { - NdbCondition_WaitTimeout(waitForHBCond, theFacade.theMutexPtr, 1000); - theFacade.unlock_mutex(); + NdbCondition_WaitTimeout(waitForHBCond, theFacade.thePollMutex, 1000); + theFacade.unlock_poll_mutex(); return; } @@ -243,7 +243,7 @@ ClusterMgr::forceHB() } } waitForHBFromNodes.bitAND(ndb_nodes); - theFacade.unlock_mutex(); + theFacade.unlock_poll_mutex(); #ifdef DEBUG_REG char buf[128]; @@ -273,18 +273,19 @@ ClusterMgr::forceHB() #endif raw_sendSignal(&signal, nodeId); } + flush_send_buffers(); unlock(); } /* Wait for nodes to reply - if any heartbeats was sent */ - theFacade.lock_mutex(); + theFacade.lock_poll_mutex(); if (!waitForHBFromNodes.isclear()) - NdbCondition_WaitTimeout(waitForHBCond, theFacade.theMutexPtr, 1000); + NdbCondition_WaitTimeout(waitForHBCond, theFacade.thePollMutex, 1000); waitingForHB= false; #ifdef DEBUG_REG ndbout << "Still waiting for HB from " << waitForHBFromNodes.getText(buf) << endl; #endif - theFacade.unlock_mutex(); + theFacade.unlock_poll_mutex(); } void @@ -298,12 +299,14 @@ ClusterMgr::startup() lock(); theFacade.doConnect(nodeId); + flush_send_buffers(); unlock(); for (Uint32 i = 0; i<3000; i++) { lock(); theFacade.theTransporterRegistry->update_connections(); + flush_send_buffers(); unlock(); if (theNode.is_connected()) break; @@ -345,9 +348,9 @@ ClusterMgr::threadMain() { /* Sleep at 100ms between each heartbeat check */ NDB_TICKS before = now; - for (Uint32 i = 0; i<10; i++) + for (Uint32 i = 0; i<5; i++) { - NdbSleep_MilliSleep(10); + NdbSleep_MilliSleep(20); { Guard g(clusterMgrThreadMutex); /** @@ -381,7 +384,7 @@ ClusterMgr::threadMain() nodeFailRep->noOfNodes = 0; NodeBitmask::clear(nodeFailRep->theNodes); - trp_client::lock(); + lock(); for (int i = 1; i < MAX_NODES; i++){ /** * Send register request (heartbeat) to all available nodes @@ -444,12 +447,16 @@ ClusterMgr::threadMain() NodeBitmask::set(nodeFailRep->theNodes, nodeId); } } + flush_send_buffers(); + unlock(); if (nodeFailRep->noOfNodes) { + lock(); raw_sendSignal(&nodeFail_signal, getOwnNodeId()); + flush_send_buffers(); + unlock(); } - trp_client::unlock(); } } @@ -530,7 +537,7 @@ ClusterMgr::trp_deliver_signal(const Ndb tSignal.theReceiversBlockNumber= refToBlock(ref); tSignal.theVerId_signalNumber= GSN_SUB_GCP_COMPLETE_ACK; tSignal.theSendersBlockRef = API_CLUSTERMGR; - safe_sendSignal(&tSignal, aNodeId); + safe_noflush_sendSignal(&tSignal, aNodeId); } break; } @@ -1435,7 +1442,7 @@ ArbitMgr::sendSignalToQmgr(ArbitSignal& { m_clusterMgr.lock(); m_clusterMgr.raw_sendSignal(&signal, aSignal.data.sender); + m_clusterMgr.flush_send_buffers(); m_clusterMgr.unlock(); } } - === modified file 'storage/ndb/src/ndbapi/NdbScanOperation.cpp' --- a/storage/ndb/src/ndbapi/NdbScanOperation.cpp 2011-11-16 08:17:17 +0000 +++ b/storage/ndb/src/ndbapi/NdbScanOperation.cpp 2012-05-31 18:26:14 +0000 @@ -1669,7 +1669,6 @@ NdbScanOperation::executeCursor(int node * Call finaliseScanOldApi() for old style scans before * proceeding */ - bool locked = false; NdbImpl* theImpl = theNdb->theImpl; int res = 0; @@ -1680,9 +1679,7 @@ NdbScanOperation::executeCursor(int node } { - locked = true; NdbTransaction * tCon = theNdbCon; - theImpl->lock(); Uint32 seq = tCon->theNodeSequence; @@ -1733,9 +1730,6 @@ done: m_api_receivers_count = theParallelism; } - if (locked) - theImpl->unlock(); - return res; } === modified file 'storage/ndb/src/ndbapi/NdbTransaction.cpp' --- a/storage/ndb/src/ndbapi/NdbTransaction.cpp 2011-11-16 08:17:17 +0000 +++ b/storage/ndb/src/ndbapi/NdbTransaction.cpp 2012-05-31 18:26:14 +0000 @@ -733,27 +733,6 @@ NdbTransaction::executeAsynchPrepare(Ndb releaseCompletedQueries(); } - NdbScanOperation* tcOp = m_theFirstScanOperation; - if (tcOp != 0){ - // Execute any cursor operations - while (tcOp != NULL) { - int tReturnCode; - tReturnCode = tcOp->executeCursor(theDBnode); - if (tReturnCode == -1) { - DBUG_VOID_RETURN; - }//if - tcOp->postExecuteRelease(); // Release unneeded resources - // outside TP mutex - tcOp = (NdbScanOperation*)tcOp->next(); - } // while - m_theLastScanOperation->next(m_firstExecutedScanOp); - m_firstExecutedScanOp = m_theFirstScanOperation; - // Discard cursor operations, since these are also - // in the complete operations list we do not need - // to release them. - m_theFirstScanOperation = m_theLastScanOperation = NULL; - } - bool tTransactionIsStarted = theTransactionIsStarted; NdbOperation* tLastOp = theLastOpInList; Ndb* tNdb = theNdb; @@ -1013,6 +992,7 @@ NdbTransaction::sendTC_HBREP() // Send tNdb->theImpl->lock(); const int res = tNdb->theImpl->sendSignal(tSignal,theDBnode); + tNdb->theImpl->flush_send_buffers(); tNdb->theImpl->unlock(); tNdb->releaseSignal(tSignal); @@ -1040,6 +1020,26 @@ NdbTransaction::doSend() This method assumes that at least one operation or query have been defined. This is ensured by the caller of this routine (=execute). */ + NdbScanOperation* tcOp = m_theFirstScanOperation; + if (tcOp != 0){ + // Execute any cursor operations + while (tcOp != NULL) { + int tReturnCode; + tReturnCode = tcOp->executeCursor(theDBnode); + if (tReturnCode == -1) { + goto fail; + }//if + tcOp->postExecuteRelease(); // Release unneeded resources + // outside TP mutex + tcOp = (NdbScanOperation*)tcOp->next(); + } // while + m_theLastScanOperation->next(m_firstExecutedScanOp); + m_firstExecutedScanOp = m_theFirstScanOperation; + // Discard cursor operations, since these are also + // in the complete operations list we do not need + // to release them. + m_theFirstScanOperation = m_theLastScanOperation = NULL; + } switch(theSendStatus){ case sendOperations: { === modified file 'storage/ndb/src/ndbapi/Ndbif.cpp' --- a/storage/ndb/src/ndbapi/Ndbif.cpp 2012-02-23 15:41:31 +0000 +++ b/storage/ndb/src/ndbapi/Ndbif.cpp 2012-05-31 18:26:14 +0000 @@ -73,7 +73,7 @@ Ndb::init(int aMaxNoOfTransactions) }//if theInitState = StartingInit; TransporterFacade * theFacade = theImpl->m_transporter_facade; - theEventBuffer->m_mutex = theFacade->theMutexPtr; + theEventBuffer->m_mutex = theImpl->m_mutex; const Uint32 tRef = theImpl->open(theFacade); @@ -92,9 +92,9 @@ Ndb::init(int aMaxNoOfTransactions) } /* Init cached min node version */ - theFacade->lock_mutex(); + theFacade->lock_poll_mutex(); theCachedMinDbNodeVersion = theFacade->getMinDbNodeVersion(); - theFacade->unlock_mutex(); + theFacade->unlock_poll_mutex(); theDictionary->setTransporter(this, theFacade); @@ -1465,7 +1465,7 @@ NdbTransaction::sendTC_COMMIT_ACK(NdbImp Uint32 * dataPtr = aSignal->getDataPtrSend(); dataPtr[0] = transId1; dataPtr[1] = transId2; - impl->safe_sendSignal(aSignal, refToNode(aTCRef)); + impl->safe_noflush_sendSignal(aSignal, refToNode(aTCRef)); } int @@ -1503,6 +1503,7 @@ NdbImpl::send_event_report(bool has_lock done: if (!has_lock) { + flush_send_buffers(); unlock(); } return ret; === modified file 'storage/ndb/src/ndbapi/TransporterFacade.cpp' --- a/storage/ndb/src/ndbapi/TransporterFacade.cpp 2012-02-23 15:41:31 +0000 +++ b/storage/ndb/src/ndbapi/TransporterFacade.cpp 2012-05-31 18:26:14 +0000 @@ -59,6 +59,9 @@ static int indexToNumber(int index) #define TRP_DEBUG(t) #endif +#define DBG_POLL 0 +#define dbg(x,y) if (DBG_POLL) printf("%llu : " x "\n", NdbTick_CurrentNanosecond() / 1000, y) + /***************************************************************************** * Call back functions *****************************************************************************/ @@ -206,7 +209,7 @@ TRACE_GSN(Uint32 gsn) /** * The execute function : Handle received signal */ -void +bool TransporterFacade::deliver_signal(SignalHeader * const header, Uint8 prio, Uint32 * const theData, LinearSectionPtr ptr[3]) @@ -224,11 +227,13 @@ TransporterFacade::deliver_signal(Signal } #endif + bool last_message = false; if (tRecBlockNo >= MIN_API_BLOCK_NO) { trp_client * clnt = m_threads.get(tRecBlockNo); if (clnt != 0) { + last_message = m_poll_owner->m_poll.lock_client(clnt); /** * Handle received signal immediately to avoid any unnecessary * copying of data, allocation of memory and other things. Copying @@ -285,13 +290,13 @@ TransporterFacade::deliver_signal(Signal NdbApiSignal tmpSignal(*header); NdbApiSignal * tSignal = &tmpSignal; tSignal->setDataPtr(tDataPtr); + last_message = m_poll_owner->m_poll.lock_client(clnt); clnt->trp_deliver_signal(tSignal, 0); } } } } } - return; } else if (tRecBlockNo >= MIN_API_FIXED_BLOCK_NO && tRecBlockNo <= MAX_API_FIXED_BLOCK_NO) @@ -303,6 +308,7 @@ TransporterFacade::deliver_signal(Signal NdbApiSignal tmpSignal(*header); NdbApiSignal * tSignal = &tmpSignal; tSignal->setDataPtr(theData); + last_message = m_poll_owner->m_poll.lock_client(clnt); clnt->trp_deliver_signal(tSignal, ptr); }//if } @@ -317,6 +323,7 @@ TransporterFacade::deliver_signal(Signal abort(); } } + return last_message; } // These symbols are needed, but not used in the API @@ -351,7 +358,7 @@ TransporterFacade::start_instance(NodeId (void)signal(SIGPIPE, SIG_IGN); #endif - theTransporterRegistry = new TransporterRegistry(this, this); + theTransporterRegistry = new TransporterRegistry(this, this, false); if (theTransporterRegistry == NULL) return -1; @@ -446,6 +453,41 @@ runSendRequest_C(void * me) return 0; } +inline +void +link_buffer(TFBuffer* dst, const TFBuffer * src) +{ + assert(dst); + assert(src); + assert(src->m_head); + assert(src->m_tail); + TFBufferGuard g0(* dst); + TFBufferGuard g1(* src); + if (dst->m_head == 0) + { + dst->m_head = src->m_head; + } + else + { + dst->m_tail->m_next = src->m_head; + } + dst->m_tail = src->m_tail; + dst->m_bytes_in_buffer += src->m_bytes_in_buffer; +} + +static const Uint32 SEND_THREAD_NO = 0; + +void +TransporterFacade::wakeup_send_thread(Uint32 node) +{ + Guard g(m_send_thread_mutex); + if (m_send_thread_nodes.get(SEND_THREAD_NO) == false) + { + NdbCondition_Signal(m_send_thread_cond); + } + m_send_thread_nodes.set(SEND_THREAD_NO); +} + void TransporterFacade::threadMainSend(void) { theTransporterRegistry->startSending(); @@ -456,15 +498,61 @@ void TransporterFacade::threadMainSend(v m_socket_server.startServer(); - while(!theStopReceive) { - NdbSleep_MilliSleep(sendThreadWaitMillisec); - NdbMutex_Lock(theMutexPtr); - if (sendPerformedLastInterval == 0) { - theTransporterRegistry->performSend(); - } - sendPerformedLastInterval = 0; - NdbMutex_Unlock(theMutexPtr); + NdbMutex_Lock(m_send_thread_mutex); + while(!theStopReceive) + { + if (m_send_thread_nodes.get(SEND_THREAD_NO) == false) + { + NdbCondition_WaitTimeout(m_send_thread_cond, + m_send_thread_mutex, + sendThreadWaitMillisec); + } + m_send_thread_nodes.clear(SEND_THREAD_NO); + NdbMutex_Unlock(m_send_thread_mutex); + bool all_empty = true; + do + { + all_empty = true; + for (Uint32 i = 0; im_mutex); + if (b->m_sending) + { + /** + * sender does stuff when clearing m_sending + */ + } + else if (b->m_buffer.m_bytes_in_buffer > 0 || + b->m_out_buffer.m_bytes_in_buffer > 0) + { + /** + * Copy all data from m_buffer to m_out_buffer + */ + TFBuffer copy = b->m_buffer; + bzero(&b->m_buffer, sizeof(b->m_buffer)); + b->m_sending = true; + NdbMutex_Unlock(&b->m_mutex); + if (copy.m_bytes_in_buffer > 0) + { + link_buffer(&b->m_out_buffer, ©); + } + theTransporterRegistry->performSend(i); + NdbMutex_Lock(&b->m_mutex); + b->m_sending = false; + if (b->m_buffer.m_bytes_in_buffer > 0 || + b->m_out_buffer.m_bytes_in_buffer > 0) + { + all_empty = false; + } + } + NdbMutex_Unlock(&b->m_mutex); + } + } while (!theStopReceive && all_empty == false); + + NdbMutex_Lock(m_send_thread_mutex); } + NdbMutex_Unlock(m_send_thread_mutex); theTransporterRegistry->stopSending(); m_socket_server.stopServer(); @@ -499,6 +587,7 @@ void TransporterFacade::threadMainReceiv { theClusterMgr->lock(); theTransporterRegistry->update_connections(); + theClusterMgr->flush_send_buffers(); theClusterMgr->unlock(); NdbSleep_MilliSleep(100); }//while @@ -510,10 +599,9 @@ void TransporterFacade::threadMainReceiv and returns to caller. It will quickly come back here if not all data was received for the worker thread. */ -void TransporterFacade::external_poll(Uint32 wait_time) +void +TransporterFacade::external_poll(trp_client* clnt, Uint32 wait_time) { - NdbMutex_Unlock(theMutexPtr); - #ifdef NDB_SHM_TRANSPORTER /* If shared memory transporters are used we need to set our sigmask @@ -529,10 +617,11 @@ void TransporterFacade::external_poll(Ui NdbThread_set_shm_sigmask(TRUE); #endif - NdbMutex_Lock(theMutexPtr); if (res > 0) { + m_poll_cnt++; theTransporterRegistry->performReceive(); + m_poll_cnt++; } } @@ -552,11 +641,22 @@ TransporterFacade::TransporterFacade(Glo theSendThread(NULL), theReceiveThread(NULL), m_fragmented_signal_id(0), - m_globalDictCache(cache) + m_globalDictCache(cache), + m_send_buffer("sendbufferpool") { DBUG_ENTER("TransporterFacade::TransporterFacade"); - theMutexPtr = NdbMutex_CreateWithName("TTFM"); + thePollMutex = NdbMutex_CreateWithName("PollMutex"); sendPerformedLastInterval = 0; + m_open_close_mutex = NdbMutex_Create(); + for (Uint32 i = 0; iallocate_send_buffers(total_send_buffer, - extra_send_buffer); + if (!m_send_buffer.inited()) + { + Uint32 total_send_buffer = 0; + iter.get(CFG_TOTAL_SEND_BUFFER_MEMORY, &total_send_buffer); + + if (total_send_buffer == 0) + { + total_send_buffer = theTransporterRegistry->get_total_max_send_buffer(); + } + + Uint64 extra_send_buffer = 0; + iter.get(CFG_EXTRA_SEND_BUFFER_MEMORY, &extra_send_buffer); + + total_send_buffer += extra_send_buffer; + if (!m_send_buffer.init(total_send_buffer)) + { + ndbout << "Unable to allocate " + << total_send_buffer + << " bytes of memory for send buffers!!" << endl; + return false; + } + } Uint32 auto_reconnect=1; iter.get(CFG_AUTO_RECONNECT, &auto_reconnect); @@ -686,15 +803,52 @@ TransporterFacade::for_each(trp_client* const NdbApiSignal* aSignal, const LinearSectionPtr ptr[3]) { + trp_client * woken[16]; + Uint32 cnt_woken = 0; Uint32 sz = m_threads.m_statusNext.size(); for (Uint32 i = 0; i < sz ; i ++) { trp_client * clnt = m_threads.m_objectExecute[i]; if (clnt != 0 && clnt != sender) { - clnt->trp_deliver_signal(aSignal, ptr); + bool res = m_poll_owner->m_poll.check_if_locked(clnt); + if (res) + { + clnt->trp_deliver_signal(aSignal, ptr); + } + else + { + NdbMutex_Lock(clnt->m_mutex); + int save = clnt->m_poll.m_waiting; + clnt->trp_deliver_signal(aSignal, ptr); + if (save != clnt->m_poll.m_waiting && + clnt->m_poll.m_waiting == trp_client::PollQueue::PQ_WOKEN) + { + woken[cnt_woken++] = clnt; + if (cnt_woken == NDB_ARRAY_SIZE(woken)) + { + lock_poll_mutex(); + remove_from_poll_queue(woken, cnt_woken); + unlock_poll_mutex(); + unlock_and_signal(woken, cnt_woken); + cnt_woken = 0; + } + } + else + { + NdbMutex_Unlock(clnt->m_mutex); + } + } } } + + if (cnt_woken != 0) + { + lock_poll_mutex(); + remove_from_poll_queue(woken, cnt_woken); + unlock_poll_mutex(); + unlock_and_signal(woken, cnt_woken); + } } void @@ -732,7 +886,8 @@ TransporterFacade::close_clnt(trp_client int ret = -1; if (clnt) { - NdbMutex_Lock(theMutexPtr); + Guard g(m_open_close_mutex); + if (DBG_POLL) ndbout_c("close(%p)", clnt); if (m_threads.get(clnt->m_blockNo) == clnt) { m_threads.close(clnt->m_blockNo); @@ -742,7 +897,20 @@ TransporterFacade::close_clnt(trp_client { assert(0); } - NdbMutex_Unlock(theMutexPtr); + + /** + * Wait for any ongoing poll + */ + clnt->lock(); + Uint32 start_val = m_poll_cnt; + clnt->unlock(); + Uint32 curr_val; + do + { + clnt->lock(); + curr_val = m_poll_cnt; + clnt->unlock(); + } while (start_val == curr_val && ((start_val & 1) == 1)); } return ret; } @@ -751,7 +919,8 @@ Uint32 TransporterFacade::open_clnt(trp_client * clnt, int blockNo) { DBUG_ENTER("TransporterFacade::open"); - Guard g(theMutexPtr); + Guard g(m_open_close_mutex); + if (DBG_POLL) ndbout_c("open(%p)", clnt); int r= m_threads.open(clnt); if (r < 0) { @@ -785,10 +954,13 @@ TransporterFacade::~TransporterFacade() DBUG_ENTER("TransporterFacade::~TransporterFacade"); delete theClusterMgr; - NdbMutex_Lock(theMutexPtr); + NdbMutex_Lock(thePollMutex); delete theTransporterRegistry; - NdbMutex_Unlock(theMutexPtr); - NdbMutex_Destroy(theMutexPtr); + NdbMutex_Unlock(thePollMutex); + NdbMutex_Destroy(thePollMutex); + NdbMutex_Destroy(m_open_close_mutex); + NdbMutex_Destroy(m_send_thread_mutex); + NdbCondition_Destroy(m_send_thread_cond); #ifdef API_TRACE signalLogger.setOutputStream(0); #endif @@ -821,6 +993,7 @@ TransporterFacade::calculateSendLimit() // adaptive algorithm. //------------------------------------------------- void TransporterFacade::forceSend(Uint32 block_number) { +#if 0 checkCounter--; m_threads.m_statusNext[numberToIndex(block_number)] = ThreadData::ACTIVE; sendPerformedLastInterval = 1; @@ -828,6 +1001,7 @@ void TransporterFacade::forceSend(Uint32 calculateSendLimit(); } theTransporterRegistry->forceSendCheck(0); +#endif } //------------------------------------------------- @@ -835,6 +1009,7 @@ void TransporterFacade::forceSend(Uint32 //------------------------------------------------- int TransporterFacade::checkForceSend(Uint32 block_number) { +#if 0 m_threads.m_statusNext[numberToIndex(block_number)] = ThreadData::ACTIVE; //------------------------------------------------- // This code is an adaptive algorithm to discover when @@ -856,6 +1031,9 @@ TransporterFacade::checkForceSend(Uint32 calculateSendLimit(); } return did_send; +#else + return 0; +#endif } @@ -863,7 +1041,8 @@ TransporterFacade::checkForceSend(Uint32 * SEND SIGNAL METHODS *****************************************************************************/ int -TransporterFacade::sendSignal(const NdbApiSignal * aSignal, NodeId aNode) +TransporterFacade::sendSignal(trp_client* clnt, + const NdbApiSignal * aSignal, NodeId aNode) { const Uint32* tDataPtr = aSignal->getConstDataPtrSend(); Uint32 Tlen = aSignal->theLength; @@ -881,7 +1060,8 @@ TransporterFacade::sendSignal(const NdbA } #endif if ((Tlen != 0) && (Tlen <= 25) && (TBno != 0)) { - SendStatus ss = theTransporterRegistry->prepareSend(aSignal, + SendStatus ss = theTransporterRegistry->prepareSend(clnt, + aSignal, 1, // JBB tDataPtr, aNode, @@ -1129,7 +1309,8 @@ public: * boundaries to simplify reassembly in the kernel. */ int -TransporterFacade::sendFragmentedSignal(const NdbApiSignal* inputSignal, +TransporterFacade::sendFragmentedSignal(trp_client* clnt, + const NdbApiSignal* inputSignal, NodeId aNode, const GenericSectionPtr ptr[3], Uint32 secs) @@ -1144,7 +1325,7 @@ TransporterFacade::sendFragmentedSignal( /* If there's no need to fragment, send normally */ if (totalSectionLength <= CHUNK_SZ) - return sendSignal(aSignal, aNode, ptr, secs); + return sendSignal(clnt, aSignal, aNode, ptr, secs); // TODO : Consider tracing fragment signals? #ifdef API_TRACE @@ -1263,7 +1444,8 @@ TransporterFacade::sendFragmentedSignal( // do prepare send { SendStatus ss = theTransporterRegistry->prepareSend - (&tmp_signal, + (clnt, + &tmp_signal, 1, /*JBB*/ tmp_signal_data, aNode, @@ -1318,7 +1500,8 @@ TransporterFacade::sendFragmentedSignal( int ret; { SendStatus ss = theTransporterRegistry->prepareSend - (aSignal, + (clnt, + aSignal, 1/*JBB*/, aSignal->getConstDataPtrSend(), aNode, @@ -1338,7 +1521,8 @@ TransporterFacade::sendFragmentedSignal( } int -TransporterFacade::sendFragmentedSignal(const NdbApiSignal* aSignal, +TransporterFacade::sendFragmentedSignal(trp_client* clnt, + const NdbApiSignal* aSignal, NodeId aNode, const LinearSectionPtr ptr[3], Uint32 secs) @@ -1364,12 +1548,13 @@ TransporterFacade::sendFragmentedSignal( tmpPtr[2].sz= linCopy[2].sz; tmpPtr[2].sectionIter= &two; - return sendFragmentedSignal(aSignal, aNode, tmpPtr, secs); + return sendFragmentedSignal(clnt, aSignal, aNode, tmpPtr, secs); } int -TransporterFacade::sendSignal(const NdbApiSignal* aSignal, NodeId aNode, +TransporterFacade::sendSignal(trp_client* clnt, + const NdbApiSignal* aSignal, NodeId aNode, const LinearSectionPtr ptr[3], Uint32 secs) { Uint32 save = aSignal->m_noOfSections; @@ -1386,7 +1571,8 @@ TransporterFacade::sendSignal(const NdbA } #endif SendStatus ss = theTransporterRegistry->prepareSend - (aSignal, + (clnt, + aSignal, 1, // JBB aSignal->getConstDataPtrSend(), aNode, @@ -1402,7 +1588,8 @@ TransporterFacade::sendSignal(const NdbA } int -TransporterFacade::sendSignal(const NdbApiSignal* aSignal, NodeId aNode, +TransporterFacade::sendSignal(trp_client* clnt, + const NdbApiSignal* aSignal, NodeId aNode, const GenericSectionPtr ptr[3], Uint32 secs) { Uint32 save = aSignal->m_noOfSections; @@ -1421,7 +1608,8 @@ TransporterFacade::sendSignal(const NdbA } #endif SendStatus ss = theTransporterRegistry->prepareSend - (aSignal, + (clnt, + aSignal, 1, // JBB aSignal->getConstDataPtrSend(), aNode, @@ -1566,21 +1754,25 @@ TransporterFacade::get_active_ndb_object return m_threads.m_use_cnt; } - void TransporterFacade::start_poll(trp_client* clnt) { - lock_mutex(); - clnt->m_poll.m_locked = true; + assert(clnt->m_poll.m_locked == true); + assert(clnt->m_poll.m_poll_owner == false); + assert(clnt->m_poll.m_poll_queue == false); + assert(clnt->m_poll.m_waiting == trp_client::PollQueue::PQ_IDLE); } void TransporterFacade::do_poll(trp_client* clnt, Uint32 wait_time) { - clnt->m_poll.m_waiting = true; + dbg("do_poll(%p)", clnt); + clnt->m_poll.m_waiting = trp_client::PollQueue::PQ_WAITING; assert(clnt->m_poll.m_locked == true); - trp_client* owner = m_poll_owner; - if (owner != NULL && owner != clnt) + assert(clnt->m_poll.m_poll_owner == false); + assert(clnt->m_poll.m_poll_queue == false); + lock_poll_mutex(); + if (m_poll_owner != NULL) { /* We didn't get hold of the poll "right". We will sleep on a @@ -1591,82 +1783,371 @@ TransporterFacade::do_poll(trp_client* c queue if it hasn't happened already. It is usually already out of the queue but at time-out it could be that the object is still there. */ - assert(clnt->m_poll.m_poll_owner == false); add_to_poll_queue(clnt); - NdbCondition_WaitTimeout(clnt->m_poll.m_condition, theMutexPtr, - wait_time); - if (clnt != m_poll_owner && clnt->m_poll.m_waiting) + unlock_poll_mutex(); + dbg("cond_wait(%p)", clnt); + NdbCondition_WaitTimeout(clnt->m_poll.m_condition, + clnt->m_mutex, + wait_time); + + switch(clnt->m_poll.m_waiting) { + case trp_client::PollQueue::PQ_WOKEN: + dbg("%p - PQ_WOKEN", clnt); + // we have already been taken out of poll queue + assert(clnt->m_poll.m_poll_queue == false); + + /** + * clear m_poll_owner + * it can be that we were proposed as poll owner + * and later woken by another thread that became poll owner + */ + clnt->m_poll.m_poll_owner = false; + clnt->m_poll.m_waiting = trp_client::PollQueue::PQ_IDLE; + return; + case trp_client::PollQueue::PQ_IDLE: + dbg("%p - PQ_IDLE", clnt); + assert(false); // should not happen!! + // ...treat as timeout...fall-through + case trp_client::PollQueue::PQ_WAITING: + dbg("%p - PQ_WAITING", clnt); + break; + } + + lock_poll_mutex(); + if (clnt->m_poll.m_poll_owner == false) { + /** + * We got timeout...hopefully rare... + */ + assert(clnt->m_poll.m_poll_queue == true); remove_from_poll_queue(clnt); + unlock_poll_mutex(); + clnt->m_poll.m_waiting = trp_client::PollQueue::PQ_IDLE; + dbg("%p - PQ_WAITING poll_owner == false => return", clnt); + return; + } + else if (m_poll_owner != 0) + { + /** + * We were proposed as new poll owner...but someone else beat us too it + * break out...and retry the whole thing... + */ + clnt->m_poll.m_poll_owner = false; + assert(clnt->m_poll.m_poll_queue == false); + unlock_poll_mutex(); + clnt->m_poll.m_waiting = trp_client::PollQueue::PQ_IDLE; + dbg("%p - PQ_WAITING m_poll_owner != 0 => return", clnt); + return; } + + /** + * We were proposed as new poll owner, and was first to wakeup + */ + dbg("%p - PQ_WAITING => new poll_owner", clnt); } - else - { - /* - We got the poll "right" and we poll until data is received. After - receiving data we will check if all data is received, if not we - poll again. + m_poll_owner = clnt; + unlock_poll_mutex(); + + /** + * We have the poll "right" and we poll until data is received. After + * receiving data we will check if all data is received, + * if not we poll again. */ - assert(owner == clnt || clnt->m_poll.m_poll_owner == false); - m_poll_owner = clnt; - clnt->m_poll.m_poll_owner = true; - external_poll(wait_time); + clnt->m_poll.m_poll_owner = true; + clnt->m_poll.start_poll(clnt); + dbg("%p->external_poll", clnt); + external_poll(clnt, wait_time); + +#ifndef NDEBUG + { + Uint32 cnt = clnt->m_poll.m_locked_cnt; + assert(cnt >= 1); + assert(cnt <= NDB_ARRAY_SIZE(clnt->m_poll.m_locked_clients)); + assert(clnt->m_poll.m_locked_clients[0] == clnt); + // no duplicates + if (DBG_POLL) printf("after external_poll: cnt: %u ", cnt); + for (Uint32 i = 0; i < cnt; i++) + { + trp_client * tmp = clnt->m_poll.m_locked_clients[i]; + if (DBG_POLL) printf("%p(%u) ", tmp, tmp->m_poll.m_waiting); + for (Uint32 j = i + 1; j < cnt; j++) + { + assert(tmp != clnt->m_poll.m_locked_clients[j]); + } + } + if (DBG_POLL) printf("\n"); + + for (Uint32 i = 1; i < cnt; i++) + { + trp_client * tmp = clnt->m_poll.m_locked_clients[i]; + if (tmp->m_poll.m_locked == true) + { + assert(tmp->m_poll.m_waiting != trp_client::PollQueue::PQ_IDLE); + } + else + { + assert(tmp->m_poll.m_poll_owner == false); + assert(tmp->m_poll.m_poll_queue == false); + assert(tmp->m_poll.m_waiting == trp_client::PollQueue::PQ_IDLE); + } + } } -} +#endif -void -TransporterFacade::wakeup(trp_client* clnt) -{ - if (clnt->m_poll.m_waiting) + /** + * we're finished polling + */ + clnt->m_poll.m_poll_owner = false; + clnt->m_poll.m_waiting = trp_client::PollQueue::PQ_IDLE; + + /** + * count woken clients + * and put them to the left in array + */ + Uint32 cnt = clnt->m_poll.m_locked_cnt - 1; // skip self + Uint32 cnt_woken = 0; + trp_client ** arr = clnt->m_poll.m_locked_clients + 1; // skip self + for (Uint32 i = 0; i < cnt; i++) { - clnt->m_poll.m_waiting = false; - if (m_poll_owner != clnt) + trp_client * tmp = arr[i]; + if (tmp->m_poll.m_waiting == trp_client::PollQueue::PQ_WOKEN) { - remove_from_poll_queue(clnt); - NdbCondition_Signal(clnt->m_poll.m_condition); + arr[i] = arr[cnt_woken]; + arr[cnt_woken] = tmp; + cnt_woken++; + } + } + + if (DBG_POLL) + { + Uint32 cnt = clnt->m_poll.m_locked_cnt; + printf("after sort: cnt: %u ", cnt); + for (Uint32 i = 0; i < cnt; i++) + { + trp_client * tmp = clnt->m_poll.m_locked_clients[i]; + printf("%p(%u) ", tmp, tmp->m_poll.m_waiting); } + printf("\n"); } + + lock_poll_mutex(); + + /** + * now remove all woken from poll queue + * note: poll mutex held + */ + remove_from_poll_queue(arr, cnt_woken); + + /** + * take last client in poll queue and try lock it + */ + bool new_owner_locked = true; + trp_client * new_owner = remove_last_from_poll_queue(); + assert(new_owner != clnt); + if (new_owner != 0) + { + dbg("0 new_owner: %p", new_owner); + /** + * Note: we can only try lock here, to prevent potential deadlock + * given that we acquire mutex in different order when starting to poll + */ + if (NdbMutex_Trylock(new_owner->m_mutex) != 0) + { + /** + * If we fail to try lock...we put him back into poll-queue + */ + new_owner_locked = false; + add_to_poll_queue(new_owner); + dbg("try-lock failed %p", new_owner); + } + } + + /** + * clear poll owner variable and unlock + */ + m_poll_owner = 0; + unlock_poll_mutex(); + + if (new_owner && new_owner_locked) + { + /** + * Propose a poll owner + * Wakeup a client, that will race to become poll-owner + * I.e we don't assign m_poll_owner but let the wakeing up thread + * do this itself, if it is first + */ + dbg("wake new_owner(%p)", new_owner); +#ifndef NDEBUG + for (Uint32 i = 0; i <= cnt; i++) + { + assert(clnt->m_poll.m_locked_clients[i] != new_owner); + } +#endif + assert(new_owner->m_poll.m_waiting == trp_client::PollQueue::PQ_WAITING); + new_owner->m_poll.m_poll_owner = true; + NdbCondition_Signal(new_owner->m_poll.m_condition); + NdbMutex_Unlock(new_owner->m_mutex); + } + + /** + * Now wake all the woken clients + */ + unlock_and_signal(arr, cnt_woken); + + /** + * And unlock the rest that we delivered messages to + */ + for (Uint32 i = cnt_woken; i < cnt; i++) + { + dbg("unlock (%p)", arr[i]); + NdbMutex_Unlock(arr[i]->m_mutex); + } + + /** + * If we failed to propose new poll owner above, then we retry it here + */ + if (new_owner_locked == false) + { + dbg("new_owner_locked == false", 0); + trp_client * new_owner; + while (true) + { + new_owner = 0; + lock_poll_mutex(); + if (m_poll_owner != 0) + { + /** + * new poll owner already appointed...no need to do anything + */ + break; + } + + new_owner = remove_last_from_poll_queue(); + if (new_owner == 0) + { + /** + * poll queue empty...no need to do anything + */ + break; + } + + if (NdbMutex_Trylock(new_owner->m_mutex) == 0) + { + /** + * We locked a client that we will propose as poll owner + */ + break; + } + + /** + * Failed to lock new owner, but him back on queue, and retry + */ + add_to_poll_queue(new_owner); + unlock_poll_mutex(); + } + + unlock_poll_mutex(); + + if (new_owner) + { + /** + * Propose a poll owner + */ + assert(new_owner->m_poll.m_waiting == trp_client::PollQueue::PQ_WAITING); + new_owner->m_poll.m_poll_owner = true; + NdbCondition_Signal(new_owner->m_poll.m_condition); + NdbMutex_Unlock(new_owner->m_mutex); + } + } + + clnt->m_poll.m_locked_cnt = 0; + dbg("%p->do_poll return", clnt); } void -TransporterFacade::complete_poll(trp_client* clnt) +TransporterFacade::wakeup(trp_client* clnt) { - clnt->m_poll.m_waiting = false; - if (!clnt->m_poll.m_locked) - { - assert(clnt->m_poll.m_poll_owner == false); - return; + switch(clnt->m_poll.m_waiting) { + case trp_client::PollQueue::PQ_WAITING: + dbg("TransporterFacade::wakeup(%p) PQ_WAITING => PQ_WOKEN", clnt); + clnt->m_poll.m_waiting = trp_client::PollQueue::PQ_WOKEN; + break; + case trp_client::PollQueue::PQ_WOKEN: + dbg("TransporterFacade::wakeup(%p) PQ_WOKEN", clnt); + break; + case trp_client::PollQueue::PQ_IDLE: + dbg("TransporterFacade::wakeup(%p) PQ_IDLE", clnt); + break; } +} - /* - When completing the poll for this thread we must return the poll - ownership if we own it. We will give it to the last thread that - came here (the most recent) which is likely to be the one also - last to complete. We will remove that thread from the conditional - wait queue and set him as the new owner of the poll "right". - We will wait however with the signal until we have unlocked the - mutex for performance reasons. - See Stevens book on Unix NetworkProgramming: The Sockets Networking - API Volume 1 Third Edition on page 703-704 for a discussion on this - subject. - */ - trp_client* new_owner = 0; - if (m_poll_owner == clnt) +void +TransporterFacade::unlock_and_signal(trp_client ** arr, Uint32 cnt) +{ + for (Uint32 i = 0; i < cnt; i++) { - assert(clnt->m_poll.m_poll_owner == true); - m_poll_owner = new_owner = remove_last_from_poll_queue(); + NdbCondition_Signal(arr[i]->m_poll.m_condition); + NdbMutex_Unlock(arr[i]->m_mutex); } - if (new_owner) +} + +void +TransporterFacade::complete_poll(trp_client* clnt) +{ + assert(clnt->m_poll.m_poll_owner == false); + assert(clnt->m_poll.m_poll_queue == false); + assert(clnt->m_poll.m_waiting == trp_client::PollQueue::PQ_IDLE); + clnt->flush_send_buffers(); +} + +inline +void +trp_client::PollQueue::start_poll(trp_client* self) +{ + assert(m_waiting == PQ_WAITING); + assert(m_locked); + assert(m_poll_owner); + assert(m_locked_cnt == 0); + assert(&self->m_poll == this); + m_locked_cnt = 1; + m_locked_clients[0] = self; +} + +inline +bool +trp_client::PollQueue::check_if_locked(const trp_client* clnt) const +{ + for (Uint32 i = 0; im_poll.m_poll_owner == false); - assert(new_owner->m_poll.m_locked == true); - assert(new_owner->m_poll.m_waiting == true); - NdbCondition_Signal(new_owner->m_poll.m_condition); - new_owner->m_poll.m_poll_owner = true; + if (m_locked_clients[i] == clnt) // already locked + return true; } - clnt->m_poll.m_locked = false; - clnt->m_poll.m_poll_owner = false; - unlock_mutex(); + return false; +} + +inline +bool +trp_client::PollQueue::lock_client (trp_client* clnt) +{ + assert(m_locked_cnt <= NDB_ARRAY_SIZE(m_locked_clients)); + if (check_if_locked(clnt)) + return false; + + dbg("lock_client(%p)", clnt); + + /** + * API_PACKED contains a number of messages, + * so even if we signal "last_message"... + * there will(might) be a few more + * + * TODO: Check that 3 is correct number!! + */ + const Uint32 MAX_MESSAGES_IN_API_PACKED = 3; + + assert(m_locked_cnt < NDB_ARRAY_SIZE(m_locked_clients)); + NdbMutex_Lock(clnt->m_mutex); + m_locked_clients[m_locked_cnt++] = clnt; + return m_locked_cnt + MAX_MESSAGES_IN_API_PACKED >= NDB_ARRAY_SIZE(m_locked_clients); } void @@ -1677,7 +2158,9 @@ TransporterFacade::add_to_poll_queue(trp assert(clnt->m_poll.m_next == 0); assert(clnt->m_poll.m_locked == true); assert(clnt->m_poll.m_poll_owner == false); + assert(clnt->m_poll.m_poll_queue == false); + clnt->m_poll.m_poll_queue = true; if (m_poll_queue_head == 0) { assert(m_poll_queue_tail == 0); @@ -1694,12 +2177,26 @@ TransporterFacade::add_to_poll_queue(trp } void +TransporterFacade::remove_from_poll_queue(trp_client** arr, Uint32 cnt) +{ + for (Uint32 i = 0; i< cnt; i++) + { + if (arr[i]->m_poll.m_poll_queue) + { + remove_from_poll_queue(arr[i]); + } + } +} + +void TransporterFacade::remove_from_poll_queue(trp_client* clnt) { assert(clnt != 0); assert(clnt->m_poll.m_locked == true); assert(clnt->m_poll.m_poll_owner == false); + assert(clnt->m_poll.m_poll_queue == true); + clnt->m_poll.m_poll_queue = false; if (clnt->m_poll.m_prev != 0) { clnt->m_poll.m_prev->m_poll.m_next = clnt->m_poll.m_next; @@ -1758,6 +2255,174 @@ SignalSectionIterator::getNextWords(Uint return NULL; } +void +TransporterFacade::flush_send_buffer(Uint32 node, const TFBuffer * sb) +{ + Guard g(&m_send_buffers[node].m_mutex); + assert(node < NDB_ARRAY_SIZE(m_send_buffers)); + link_buffer(&m_send_buffers[node].m_buffer, sb); +} + +void +TransporterFacade::flush_and_send_buffer(Uint32 node, const TFBuffer * sb) +{ + assert(node < NDB_ARRAY_SIZE(m_send_buffers)); + struct TFSendBuffer * b = m_send_buffers + node; + bool wake = false; + NdbMutex_Lock(&b->m_mutex); + link_buffer(&b->m_buffer, sb); + + if (b->m_sending == true) + { + /** + * Sender will check if here is data, and wake send-thread + * if needed + */ + } + else + { + b->m_sending = true; + + /** + * Copy all data from m_buffer to m_out_buffer + */ + TFBuffer copy = b->m_buffer; + bzero(&b->m_buffer, sizeof(b->m_buffer)); + NdbMutex_Unlock(&b->m_mutex); + link_buffer(&b->m_out_buffer, ©); + theTransporterRegistry->performSend(node); + NdbMutex_Lock(&b->m_mutex); + b->m_sending = false; + if (b->m_buffer.m_bytes_in_buffer > 0 || + b->m_out_buffer.m_bytes_in_buffer > 0) + { + wake = true; + } + } + NdbMutex_Unlock(&b->m_mutex); + + if (wake) + { + wakeup_send_thread(node); + } +} + +Uint32 +TransporterFacade::get_bytes_to_send_iovec(NodeId node, struct iovec *dst, + Uint32 max) +{ + if (max == 0) + { + return 0; + } + + Uint32 count = 0; + TFBuffer *b = &m_send_buffers[node].m_out_buffer; + TFBufferGuard g0(* b); + TFPage *page = b->m_head; + while (page != NULL && count < max) + { + dst[count].iov_base = page->m_data+page->m_start; + dst[count].iov_len = page->m_bytes; + assert(page->m_start + page->m_bytes <= page->max_data_bytes()); + page = page->m_next; + count++; + } + + return count; +} + +Uint32 +TransporterFacade::bytes_sent(NodeId node, Uint32 bytes) +{ + TFBuffer *b = &m_send_buffers[node].m_out_buffer; + TFBufferGuard g0(* b); + Uint32 used_bytes = b->m_bytes_in_buffer; + + if (bytes == 0) + { + return used_bytes; + } + + assert(used_bytes >= bytes); + used_bytes -= bytes; + b->m_bytes_in_buffer = used_bytes; + + TFPage *page = b->m_head; + TFPage *prev = 0; + while (bytes && bytes >= page->m_bytes) + { + prev = page; + bytes -= page->m_bytes; + page = page->m_next; + } + + if (used_bytes == 0) + { + m_send_buffer.release(b->m_head, b->m_tail); + b->m_head = 0; + b->m_tail = 0; + } + else + { + if (prev) + { + m_send_buffer.release(b->m_head, prev); + } + + page->m_start += bytes; + page->m_bytes -= bytes; + assert(page->m_start + page->m_bytes <= page->max_data_bytes()); + b->m_head = page; + } + + return used_bytes; +} + +bool +TransporterFacade::has_data_to_send(NodeId node) +{ + /** + * Not used... + */ + abort(); +} + +void +TransporterFacade::reset_send_buffer(NodeId node, bool should_be_empty) +{ + // Make sure that buffer is already empty if the "should_be_empty" + // flag is set. This is done to quickly catch any stray signals + // written to the send buffer while not being connected + TFBuffer *b0 = &m_send_buffers[node].m_buffer; + TFBuffer *b1 = &m_send_buffers[node].m_out_buffer; + bool has_data_to_send = + (b0->m_head != NULL && b0->m_head->m_bytes) || + (b1->m_head != NULL && b1->m_head->m_bytes); + + if (should_be_empty && !has_data_to_send) + return; + assert(!should_be_empty); + + { + TFBuffer *b = &m_send_buffers[node].m_buffer; + if (b->m_head != 0) + { + m_send_buffer.release(b->m_head, b->m_tail); + } + bzero(b, sizeof(* b)); + } + + { + TFBuffer *b = &m_send_buffers[node].m_out_buffer; + if (b->m_head != 0) + { + m_send_buffer.release(b->m_head, b->m_tail); + } + bzero(b, sizeof(* b)); + } +} + #ifdef UNIT_TEST // Unit test code starts @@ -2035,6 +2700,7 @@ TransporterFacade::ext_update_connection { theClusterMgr->lock(); theTransporterRegistry->update_connections(); + theClusterMgr->flush_send_buffers(); theClusterMgr->unlock(); } @@ -2074,11 +2740,11 @@ TransporterFacade::setupWakeup() { /* Ask TransporterRegistry to setup wakeup sockets */ bool rc; - lock_mutex(); + lock_poll_mutex(); { rc = theTransporterRegistry->setup_wakeup_socket(); } - unlock_mutex(); + unlock_poll_mutex(); return rc; } === modified file 'storage/ndb/src/ndbapi/TransporterFacade.hpp' --- a/storage/ndb/src/ndbapi/TransporterFacade.hpp 2012-02-23 15:41:31 +0000 +++ b/storage/ndb/src/ndbapi/TransporterFacade.hpp 2012-05-31 18:26:14 +0000 @@ -26,6 +26,7 @@ #include "DictCache.hpp" #include #include +#include "trp_buffer.hpp" class ClusterMgr; class ArbitMgr; @@ -81,14 +82,14 @@ public: // Only sends to nodes which are alive private: - int sendSignal(const NdbApiSignal * signal, NodeId nodeId); - int sendSignal(const NdbApiSignal*, NodeId, + int sendSignal(trp_client*, const NdbApiSignal *, NodeId nodeId); + int sendSignal(trp_client*, const NdbApiSignal*, NodeId, const LinearSectionPtr ptr[3], Uint32 secs); - int sendSignal(const NdbApiSignal*, NodeId, + int sendSignal(trp_client*, const NdbApiSignal*, NodeId, const GenericSectionPtr ptr[3], Uint32 secs); - int sendFragmentedSignal(const NdbApiSignal*, NodeId, + int sendFragmentedSignal(trp_client*, const NdbApiSignal*, NodeId, const LinearSectionPtr ptr[3], Uint32 secs); - int sendFragmentedSignal(const NdbApiSignal*, NodeId, + int sendFragmentedSignal(trp_client*, const NdbApiSignal*, NodeId, const GenericSectionPtr ptr[3], Uint32 secs); public: @@ -129,8 +130,8 @@ public: void for_each(trp_client* clnt, const NdbApiSignal* aSignal, const LinearSectionPtr ptr[3]); - void lock_mutex(); - void unlock_mutex(); + void lock_poll_mutex(); + void unlock_poll_mutex(); // Improving the API performance void forceSend(Uint32 block_number); @@ -159,7 +160,10 @@ public: void complete_poll(trp_client*); void wakeup(trp_client*); - void external_poll(Uint32 wait_time); + void external_poll(trp_client* clnt, Uint32 wait_time); + + void remove_from_poll_queue(trp_client**, Uint32 cnt); + void unlock_and_signal(trp_client**, Uint32 cnt); trp_client* get_poll_owner(bool) const { return m_poll_owner;} trp_client* remove_last_from_poll_queue(); @@ -177,7 +181,7 @@ public: int get_auto_reconnect() const; /* TransporterCallback interface. */ - void deliver_signal(SignalHeader * const header, + bool deliver_signal(SignalHeader * const header, Uint8 prio, Uint32 * const signalData, LinearSectionPtr ptr[3]); @@ -189,22 +193,7 @@ public: void reportError(NodeId nodeId, TransporterError errorCode, const char *info = 0); void transporter_recv_from(NodeId node); - Uint32 get_bytes_to_send_iovec(NodeId node, struct iovec *dst, Uint32 max) - { - return theTransporterRegistry->get_bytes_to_send_iovec(node, dst, max); - } - Uint32 bytes_sent(NodeId node, Uint32 bytes) - { - return theTransporterRegistry->bytes_sent(node, bytes); - } - bool has_data_to_send(NodeId node) - { - return theTransporterRegistry->has_data_to_send(node); - } - void reset_send_buffer(NodeId node, bool should_be_empty) - { - theTransporterRegistry->reset_send_buffer(node, should_be_empty); - } + /** * Wakeup * @@ -300,24 +289,67 @@ private: Uint32 m_fragmented_signal_id; public: - NdbMutex* theMutexPtr; + volatile Uint32 m_poll_cnt; + NdbMutex* m_open_close_mutex; + NdbMutex* thePollMutex; public: GlobalDictCache *m_globalDictCache; + +public: + /** + * Add a send buffer to out-buffer + */ + void flush_send_buffer(Uint32 node, const TFBuffer* buffer); + void flush_and_send_buffer(Uint32 node, const TFBuffer* buffer); + + /** + * Allocate a send buffer + */ + TFPage *alloc_sb_page() { return m_send_buffer.try_alloc(1);} + + Uint32 get_bytes_to_send_iovec(NodeId node, struct iovec *dst, Uint32 max); + Uint32 bytes_sent(NodeId node, Uint32 bytes); + bool has_data_to_send(NodeId node); + void reset_send_buffer(NodeId node, bool should_be_empty); + +private: + TFMTPool m_send_buffer; + struct TFSendBuffer + { + TFSendBuffer() { m_sending = false; } + NdbMutex m_mutex; + bool m_sending; + + /** + * This is data that have been "scheduled" to be sent + */ + TFBuffer m_buffer; + + /** + * This is data that is being sent + */ + TFBuffer m_out_buffer; + } m_send_buffers[MAX_NODES]; + + void wakeup_send_thread(Uint32 node); + NdbMutex * m_send_thread_mutex; + NdbCondition * m_send_thread_cond; + NodeBitmask m_send_thread_nodes; }; inline void -TransporterFacade::lock_mutex() +TransporterFacade::lock_poll_mutex() { - NdbMutex_Lock(theMutexPtr); + NdbMutex_Lock(thePollMutex); } inline void -TransporterFacade::unlock_mutex() +TransporterFacade::unlock_poll_mutex() { - NdbMutex_Unlock(theMutexPtr); + NdbMutex_Unlock(thePollMutex); } #include "ClusterMgr.hpp" === modified file 'storage/ndb/src/ndbapi/ndb_cluster_connection.cpp' --- a/storage/ndb/src/ndbapi/ndb_cluster_connection.cpp 2012-03-05 13:04:02 +0000 +++ b/storage/ndb/src/ndbapi/ndb_cluster_connection.cpp 2012-05-31 18:26:14 +0000 @@ -204,13 +204,13 @@ Ndb_cluster_connection_impl::get_next_al while ((id = get_next_node(iter))) { - tp->lock_mutex(); + tp->lock_poll_mutex(); if (tp->get_node_alive(id) != 0) { - tp->unlock_mutex(); + tp->unlock_poll_mutex(); return id; } - tp->unlock_mutex(); + tp->unlock_poll_mutex(); } return 0; } @@ -235,7 +235,7 @@ Ndb_cluster_connection::max_nodegroup() return 0; Bitmask ng; - tp->lock_mutex(); + tp->lock_poll_mutex(); for(unsigned i= 0; i < no_db_nodes(); i++) { //************************************************ @@ -245,7 +245,7 @@ Ndb_cluster_connection::max_nodegroup() if (n.is_confirmed() && n.m_state.nodeGroup <= MAX_NDB_NODES) ng.set(n.m_state.nodeGroup); } - tp->unlock_mutex(); + tp->unlock_poll_mutex(); if (ng.isclear()) return 0; @@ -267,7 +267,7 @@ int Ndb_cluster_connection::get_no_ready return -1; unsigned int foundAliveNode = 0; - tp->lock_mutex(); + tp->lock_poll_mutex(); for(unsigned i= 0; i < no_db_nodes(); i++) { //************************************************ @@ -277,7 +277,7 @@ int Ndb_cluster_connection::get_no_ready foundAliveNode++; } } - tp->unlock_mutex(); + tp->unlock_poll_mutex(); return foundAliveNode; } === modified file 'storage/ndb/src/ndbapi/trp_buffer.cpp' --- a/storage/ndb/src/ndbapi/trp_buffer.cpp 2011-02-02 00:40:07 +0000 +++ b/storage/ndb/src/ndbapi/trp_buffer.cpp 2012-05-31 18:26:14 +0000 @@ -44,6 +44,11 @@ TFPool::~TFPool() free (m_alloc_ptr); } +TFMTPool::TFMTPool(const char * name) +{ + NdbMutex_InitWithName(&m_mutex, name); +} + void TFBuffer::validate() const { @@ -52,8 +57,8 @@ TFBuffer::validate() const assert(m_head == m_tail); if (m_head) { - assert(m_head->m_bytes < m_head->m_size); // Full pages should be release - assert(m_head->m_bytes == m_head->m_start); + assert(m_head->m_start < m_head->m_size); // Full pages should be release + assert(m_head->m_bytes == 0); } return; } @@ -67,10 +72,10 @@ TFBuffer::validate() const while (p) { assert(p->m_bytes <= p->m_size); - assert(p->m_start <= p->m_bytes); - assert((p->m_start & 3) == 0); - assert(p->m_bytes - p->m_start > 0); - assert(p->m_bytes - p->m_start <= (int)m_bytes_in_buffer); + assert(p->m_start <= p->m_size); + assert((p->m_bytes & 3) == 0); + assert(p->m_start + p->m_bytes <= p->m_size); + assert(p->m_bytes <= (int)m_bytes_in_buffer); assert(p->m_next != p); if (p == m_tail) { @@ -80,7 +85,7 @@ TFBuffer::validate() const { assert(p->m_next != 0); } - sum += p->m_bytes - p->m_start; + sum += p->m_bytes; p = p->m_next; } assert(sum == m_bytes_in_buffer); === modified file 'storage/ndb/src/ndbapi/trp_buffer.hpp' --- a/storage/ndb/src/ndbapi/trp_buffer.hpp 2011-02-02 00:40:07 +0000 +++ b/storage/ndb/src/ndbapi/trp_buffer.hpp 2012-05-31 18:26:14 +0000 @@ -20,6 +20,7 @@ #include #include // struct iovec +#include struct TFPage { @@ -98,9 +99,9 @@ struct TFSentinel struct TFBuffer { TFBuffer() { m_bytes_in_buffer = 0; m_head = m_tail = 0;} - Uint32 m_bytes_in_buffer; struct TFPage * m_head; struct TFPage * m_tail; + Uint32 m_bytes_in_buffer; void validate() const; }; @@ -139,6 +140,37 @@ public: void release_list(TFPage*); }; +class TFMTPool : private TFPool +{ + NdbMutex m_mutex; +public: + TFMTPool(const char * name = 0); + + bool init(size_t total_memory, size_t page_sz = 32768) { + return TFPool::init(total_memory, page_sz); + } + bool inited() const { + return TFPool::inited(); + } + + TFPage* try_alloc(Uint32 N) { + Guard g(&m_mutex); + return TFPool::try_alloc(N); + } + + void release(TFPage* first, TFPage* last) { + Guard g(&m_mutex); + TFPool::release(first, last); + } + + void release_list(TFPage* head) { + TFPage * tail = head; + while (tail->m_next != 0) + tail = tail->m_next; + release(head, tail); + } +}; + inline TFPage * TFPool::try_alloc(Uint32 n) === modified file 'storage/ndb/src/ndbapi/trp_client.cpp' --- a/storage/ndb/src/ndbapi/trp_client.cpp 2011-09-08 06:22:07 +0000 +++ b/storage/ndb/src/ndbapi/trp_client.cpp 2012-05-31 18:26:14 +0000 @@ -21,12 +21,10 @@ trp_client::trp_client() : m_blockNo(~Uint32(0)), m_facade(0) { - m_poll.m_waiting = false; - m_poll.m_locked = false; - m_poll.m_poll_owner = false; - m_poll.m_next = 0; - m_poll.m_prev = 0; - m_poll.m_condition = NdbCondition_Create(); + m_mutex = NdbMutex_Create(); + + m_send_nodes_cnt = 0; + m_send_buffers = new TFBuffer[MAX_NODES]; } trp_client::~trp_client() @@ -35,13 +33,14 @@ trp_client::~trp_client() * require that trp_client user * doesnt destroy object when holding any locks */ - assert(m_poll.m_locked == 0); - assert(m_poll.m_poll_owner == false); - assert(m_poll.m_next == 0); - assert(m_poll.m_prev == 0); + m_poll.assert_destroy(); close(); NdbCondition_Destroy(m_poll.m_condition); + NdbMutex_Destroy(m_mutex); + + assert(m_send_nodes_cnt == 0); + delete [] m_send_buffers; } Uint32 @@ -86,6 +85,9 @@ trp_client::close() void trp_client::start_poll() { + NdbMutex_Lock(m_mutex); + assert(m_poll.m_locked == false); + m_poll.m_locked = true; m_facade->start_poll(this); } @@ -98,30 +100,163 @@ trp_client::do_poll(Uint32 to) void trp_client::complete_poll() { - m_facade->complete_poll(this); + if (m_poll.m_locked == true) + { + m_facade->complete_poll(this); + m_poll.m_locked = false; + NdbMutex_Unlock(m_mutex); + } } int trp_client::do_forceSend(int val) { - int did_send = 1; + /** + * since force send is disabled in this "version" + * set forceSend=1 always... + */ + val = 1; + if (val == 0) { - did_send = m_facade->checkForceSend(m_blockNo); + flush_send_buffers(); + return 0; } else if (val == 1) { - m_facade->forceSend(m_blockNo); + for (Uint32 i = 0; i < m_send_nodes_cnt; i++) + { + Uint32 n = m_send_nodes_list[i]; + TFBuffer* b = m_send_buffers + n; + TFBufferGuard g0(* b); + m_facade->flush_and_send_buffer(n, b); + bzero(b, sizeof(* b)); + } + m_send_nodes_cnt = 0; + m_send_nodes_mask.clear(); + return 1; } - return did_send; + return 0; } int -trp_client::safe_sendSignal(const NdbApiSignal* signal, Uint32 nodeId) +trp_client::safe_noflush_sendSignal(const NdbApiSignal* signal, Uint32 nodeId) { return m_facade->m_poll_owner->raw_sendSignal(signal, nodeId); } +int +trp_client::safe_sendSignal(const NdbApiSignal* signal, Uint32 nodeId) +{ + int res; + if ((res = safe_noflush_sendSignal(signal, nodeId)) != -1) + { + m_facade->m_poll_owner->flush_send_buffers(); + } + return res; +} + +Uint32 * +trp_client::getWritePtr(NodeId node, Uint32 lenBytes, Uint32 prio, + Uint32 max_use) +{ + TFBuffer* b = m_send_buffers+node; + TFBufferGuard g0(* b); + bool found = m_send_nodes_mask.get(node); + if (likely(found)) + { + TFPage * page = b->m_tail; + assert(page != 0); + if (page->m_bytes + page->m_start + lenBytes <= page->max_data_bytes()) + { + return (Uint32 *)(page->m_data + page->m_start + page->m_bytes); + } + } + else + { + Uint32 cnt = m_send_nodes_cnt; + m_send_nodes_mask.set(node); + m_send_nodes_list[cnt] = node; + m_send_nodes_cnt = cnt + 1; + } + + TFPage* page = m_facade->alloc_sb_page(); + if (likely(page != 0)) + { + page->init(); + + if (b->m_tail == NULL) + { + assert(!found); + b->m_head = page; + b->m_tail = page; + } + else + { + assert(found); + assert(b->m_head != NULL); + b->m_tail->m_next = page; + b->m_tail = page; + } + return (Uint32 *)(page->m_data); + } + + if (b->m_tail == 0) + { + assert(!found); + m_send_nodes_mask.clear(node); + m_send_nodes_cnt--; + } + else + { + assert(found); + } + + return NULL; +} + +Uint32 +trp_client::updateWritePtr(NodeId node, Uint32 lenBytes, Uint32 prio) +{ + TFBuffer* b = m_send_buffers+node; + TFBufferGuard g0(* b); + assert(m_send_nodes_mask.get(node)); + assert(b->m_head != 0); + assert(b->m_tail != 0); + + TFPage *page = b->m_tail; + assert(page->m_bytes + lenBytes <= page->max_data_bytes()); + page->m_bytes += lenBytes; + b->m_bytes_in_buffer += lenBytes; + return b->m_bytes_in_buffer; +} + +void +trp_client::flush_send_buffers() +{ + assert(m_poll.m_locked); + Uint32 cnt = m_send_nodes_cnt; + for (Uint32 i = 0; iflush_send_buffer(node, b); + bzero(b, sizeof(* b)); + } + + m_send_nodes_cnt = 0; + m_send_nodes_mask.clear(); +} + +bool +trp_client::forceSend(NodeId node) +{ + do_forceSend(); + return true; +} + #include "NdbImpl.hpp" PollGuard::PollGuard(NdbImpl& impl) === modified file 'storage/ndb/src/ndbapi/trp_client.hpp' --- a/storage/ndb/src/ndbapi/trp_client.hpp 2011-09-09 12:29:43 +0000 +++ b/storage/ndb/src/ndbapi/trp_client.hpp 2012-05-31 18:26:14 +0000 @@ -20,13 +20,17 @@ #include #include +#include +#include struct trp_node; class NdbApiSignal; struct LinearSectionPtr; struct GenericSectionPtr; +struct TFBuffer; +class trp_client; -class trp_client +class trp_client : TransporterSendBufferHandle { friend class TransporterFacade; public: @@ -46,6 +50,7 @@ public: void complete_poll(); void wakeup(); + void flush_send_buffers(); int do_forceSend(int val = 1); int raw_sendSignal(const NdbApiSignal*, Uint32 nodeId); @@ -71,10 +76,29 @@ public: /** * This sendSignal variant can be called on any trp_client * but perform the send on the trp_client-object that - * is currently receiving + * is currently receiving (m_poll_owner) + * + * This variant does flush thread-local send-buffer */ int safe_sendSignal(const NdbApiSignal*, Uint32 nodeId); + /** + * This sendSignal variant can be called on any trp_client + * but perform the send on the trp_client-object that + * is currently receiving (m_poll_owner) + * + * This variant does not flush thread-local send-buffer + */ + int safe_noflush_sendSignal(const NdbApiSignal*, Uint32 nodeId); +private: + /** + * TransporterSendBufferHandle interface + */ + virtual Uint32 *getWritePtr(NodeId node, Uint32 lenBytes, Uint32 prio, + Uint32 max_use); + virtual Uint32 updateWritePtr(NodeId node, Uint32 lenBytes, Uint32 prio); + virtual bool forceSend(NodeId node); + private: Uint32 m_blockNo; TransporterFacade * m_facade; @@ -82,15 +106,41 @@ private: /** * This is used for polling */ +public: + NdbMutex* m_mutex; // thread local mutex... +private: struct PollQueue { + PollQueue(); + void assert_destroy() const; + bool m_locked; bool m_poll_owner; - bool m_waiting; + bool m_poll_queue; + enum { PQ_WOKEN, PQ_IDLE, PQ_WAITING } m_waiting; trp_client *m_prev; trp_client *m_next; NdbCondition * m_condition; + + /** + * This is called by poll owner + * before doing external poll + */ + void start_poll(trp_client* self); + + bool lock_client(trp_client*); + bool check_if_locked(const trp_client*) const; + Uint32 m_locked_cnt; + trp_client * m_locked_clients[16]; } m_poll; + + /** + * This is used for sending + */ + Uint32 m_send_nodes_cnt; + Uint8 m_send_nodes_list[MAX_NODES]; + NodeBitmask m_send_nodes_mask; + TFBuffer* m_send_buffers; }; class PollGuard @@ -115,7 +165,7 @@ inline void trp_client::lock() { - NdbMutex_Lock(m_facade->theMutexPtr); + NdbMutex_Lock(m_mutex); assert(m_poll.m_locked == false); m_poll.m_locked = true; } @@ -124,9 +174,10 @@ inline void trp_client::unlock() { + assert(m_send_nodes_mask.isclear()); // Nothing unsent when unlocking... assert(m_poll.m_locked == true); m_poll.m_locked = false; - NdbMutex_Unlock(m_facade->theMutexPtr); + NdbMutex_Unlock(m_mutex); } inline @@ -141,7 +192,7 @@ int trp_client::raw_sendSignal(const NdbApiSignal * signal, Uint32 nodeId) { assert(m_poll.m_locked); - return m_facade->sendSignal(signal, nodeId); + return m_facade->sendSignal(this, signal, nodeId); } inline @@ -150,7 +201,7 @@ trp_client::raw_sendSignal(const NdbApiS const LinearSectionPtr ptr[3], Uint32 secs) { assert(m_poll.m_locked); - return m_facade->sendSignal(signal, nodeId, ptr, secs); + return m_facade->sendSignal(this, signal, nodeId, ptr, secs); } inline @@ -159,7 +210,7 @@ trp_client::raw_sendSignal(const NdbApiS const GenericSectionPtr ptr[3], Uint32 secs) { assert(m_poll.m_locked); - return m_facade->sendSignal(signal, nodeId, ptr, secs); + return m_facade->sendSignal(this, signal, nodeId, ptr, secs); } inline @@ -168,7 +219,7 @@ trp_client::raw_sendFragmentedSignal(con const LinearSectionPtr ptr[3], Uint32 secs) { assert(m_poll.m_locked); - return m_facade->sendFragmentedSignal(signal, nodeId, ptr, secs); + return m_facade->sendFragmentedSignal(this, signal, nodeId, ptr, secs); } inline @@ -178,7 +229,33 @@ trp_client::raw_sendFragmentedSignal(con Uint32 secs) { assert(m_poll.m_locked); - return m_facade->sendFragmentedSignal(signal, nodeId, ptr, secs); + return m_facade->sendFragmentedSignal(this, signal, nodeId, ptr, secs); +} + +inline +trp_client::PollQueue::PollQueue() +{ + m_waiting = PQ_IDLE; + m_locked = false; + m_poll_owner = false; + m_poll_queue = false; + m_next = 0; + m_prev = 0; + m_condition = NdbCondition_Create(); + m_locked_cnt = 0; +} + +inline +void +trp_client::PollQueue::assert_destroy() const +{ + assert(m_waiting == PQ_IDLE); + assert(m_locked == false); + assert(m_poll_owner == false); + assert(m_poll_queue == false); + assert(m_next == 0); + assert(m_prev == 0); + assert(m_locked_cnt == 0); } #endif No bundle (reason: useless for push emails).