3930 John David Duncan 2012-05-31
7.2-atc: Apply Jonas' ATC patch to 7.2
modified:
storage/ndb/include/transporter/TransporterCallback.hpp
storage/ndb/include/transporter/TransporterRegistry.hpp
storage/ndb/src/common/transporter/Packer.cpp
storage/ndb/src/common/transporter/TransporterRegistry.cpp
storage/ndb/src/kernel/vm/TransporterCallback.cpp
storage/ndb/src/kernel/vm/TransporterCallbackKernel.hpp
storage/ndb/src/ndbapi/ClusterMgr.cpp
storage/ndb/src/ndbapi/NdbScanOperation.cpp
storage/ndb/src/ndbapi/NdbTransaction.cpp
storage/ndb/src/ndbapi/Ndbif.cpp
storage/ndb/src/ndbapi/TransporterFacade.cpp
storage/ndb/src/ndbapi/TransporterFacade.hpp
storage/ndb/src/ndbapi/ndb_cluster_connection.cpp
storage/ndb/src/ndbapi/trp_buffer.cpp
storage/ndb/src/ndbapi/trp_buffer.hpp
storage/ndb/src/ndbapi/trp_client.cpp
storage/ndb/src/ndbapi/trp_client.hpp
3929 Frazer Clement 2012-05-31
Bug#14083116 (Bad error handling in LQH corrupts transid hash)
Testcase verifying fix.
modified:
mysql-test/suite/ndb/r/ndb_join_pushdown_default.result
mysql-test/suite/ndb/t/ndb_join_pushdown.inc
=== modified file 'storage/ndb/include/transporter/TransporterCallback.hpp'
--- a/storage/ndb/include/transporter/TransporterCallback.hpp 2012-01-17 08:33:59 +0000
+++ b/storage/ndb/include/transporter/TransporterCallback.hpp 2012-05-31 18:26:14 +0000
@@ -46,8 +46,10 @@ public:
*
* The method may either execute the signal immediately (NDB API), or
* queue it for later execution (kernel).
+ *
+ * @returns true if no more signals should be delivered
*/
- virtual void deliver_signal(SignalHeader * const header,
+ virtual bool deliver_signal(SignalHeader * const header,
Uint8 prio,
Uint32 * const signalData,
LinearSectionPtr ptr[3]) = 0;
=== modified file 'storage/ndb/include/transporter/TransporterRegistry.hpp'
--- a/storage/ndb/include/transporter/TransporterRegistry.hpp 2012-01-30 15:12:41 +0000
+++ b/storage/ndb/include/transporter/TransporterRegistry.hpp 2012-05-31 18:26:14 +0000
@@ -445,13 +445,15 @@ private:
Uint32 * readPtr,
Uint32 bufferSize,
NodeId remoteNodeId,
- IOState state);
+ IOState state,
+ bool & stopReceiving);
Uint32 * unpack(TransporterReceiveHandle&,
Uint32 * readPtr,
Uint32 * eodPtr,
NodeId remoteNodeId,
- IOState state);
+ IOState state,
+ bool & stopReceiving);
static Uint32 unpack_length_words(const Uint32 *readPtr, Uint32 maxWords);
/**
@@ -488,6 +490,7 @@ private:
void updateWritePtr(TransporterSendBufferHandle *handle,
NodeId node, Uint32 lenBytes, Uint32 prio);
+public:
/**
* TransporterSendBufferHandle implementation.
*
=== modified file 'storage/ndb/src/common/transporter/Packer.cpp'
--- a/storage/ndb/src/common/transporter/Packer.cpp 2012-02-23 15:41:31 +0000
+++ b/storage/ndb/src/common/transporter/Packer.cpp 2012-05-31 18:26:14 +0000
@@ -33,16 +33,21 @@ TransporterRegistry::unpack(TransporterR
Uint32 * readPtr,
Uint32 sizeOfData,
NodeId remoteNodeId,
- IOState state) {
+ IOState state,
+ bool & stopReceiving)
+{
+ assert(stopReceiving == false);
SignalHeader signalHeader;
LinearSectionPtr ptr[3];
Uint32 usedData = 0;
Uint32 loop_count = 0;
+ bool doStopReceiving = false;
if(state == NoHalt || state == HaltOutput){
while ((sizeOfData >= 4 + sizeof(Protocol6)) &&
- (loop_count < MAX_RECEIVED_SIGNALS)) {
+ (loop_count < MAX_RECEIVED_SIGNALS) &&
+ doStopReceiving == false) {
Uint32 word1 = readPtr[0];
Uint32 word2 = readPtr[1];
Uint32 word3 = readPtr[2];
@@ -113,19 +118,21 @@ TransporterRegistry::unpack(TransporterR
sectionData += sz;
}
- recvHandle.deliver_signal(&signalHeader, prio, signalData, ptr);
+ doStopReceiving = recvHandle.deliver_signal(&signalHeader, prio, signalData, ptr);
readPtr += messageLen32;
sizeOfData -= messageLenBytes;
usedData += messageLenBytes;
}//while
-
+
+ stopReceiving = doStopReceiving;
return usedData;
} else {
/** state = HaltIO || state == HaltInput */
while ((sizeOfData >= 4 + sizeof(Protocol6)) &&
- (loop_count < MAX_RECEIVED_SIGNALS)) {
+ (loop_count < MAX_RECEIVED_SIGNALS) &&
+ doStopReceiving == false) {
Uint32 word1 = readPtr[0];
Uint32 word2 = readPtr[1];
Uint32 word3 = readPtr[2];
@@ -199,7 +206,7 @@ TransporterRegistry::unpack(TransporterR
sectionData += sz;
}
- recvHandle.deliver_signal(&signalHeader, prio, signalData, ptr);
+ doStopReceiving = recvHandle.deliver_signal(&signalHeader, prio, signalData, ptr);
} else {
DEBUG("prepareReceive(...) - Discarding message to block: "
<< rBlockNum << " from Node: " << remoteNodeId);
@@ -210,7 +217,7 @@ TransporterRegistry::unpack(TransporterR
usedData += messageLenBytes;
}//while
-
+ stopReceiving = doStopReceiving;
return usedData;
}//if
}
@@ -220,12 +227,15 @@ TransporterRegistry::unpack(TransporterR
Uint32 * readPtr,
Uint32 * eodPtr,
NodeId remoteNodeId,
- IOState state) {
+ IOState state,
+ bool & stopReceiving) {
+ assert(stopReceiving == false);
SignalHeader signalHeader;
LinearSectionPtr ptr[3];
Uint32 loop_count = 0;
+ bool doStopReceiving = false;
if(state == NoHalt || state == HaltOutput){
- while ((readPtr < eodPtr) && (loop_count < MAX_RECEIVED_SIGNALS)) {
+ while ((readPtr < eodPtr) && (loop_count < MAX_RECEIVED_SIGNALS) && doStopReceiving == false) {
Uint32 word1 = readPtr[0];
Uint32 word2 = readPtr[1];
Uint32 word3 = readPtr[2];
@@ -291,14 +301,14 @@ TransporterRegistry::unpack(TransporterR
sectionData += sz;
}
- recvHandle.deliver_signal(&signalHeader, prio, signalData, ptr);
+ doStopReceiving = recvHandle.deliver_signal(&signalHeader, prio, signalData, ptr);
readPtr += messageLen32;
}//while
} else {
/** state = HaltIO || state == HaltInput */
- while ((readPtr < eodPtr) && (loop_count < MAX_RECEIVED_SIGNALS)) {
+ while ((readPtr < eodPtr) && (loop_count < MAX_RECEIVED_SIGNALS) && doStopReceiving == false) {
Uint32 word1 = readPtr[0];
Uint32 word2 = readPtr[1];
Uint32 word3 = readPtr[2];
@@ -368,7 +378,7 @@ TransporterRegistry::unpack(TransporterR
sectionData += sz;
}
- recvHandle.deliver_signal(&signalHeader, prio, signalData, ptr);
+ doStopReceiving = recvHandle.deliver_signal(&signalHeader, prio, signalData, ptr);
} else {
DEBUG("prepareReceive(...) - Discarding message to block: "
<< rBlockNum << " from Node: " << remoteNodeId);
@@ -377,6 +387,7 @@ TransporterRegistry::unpack(TransporterR
readPtr += messageLen32;
}//while
}//if
+ stopReceiving = doStopReceiving;
return readPtr;
}
=== modified file 'storage/ndb/src/common/transporter/TransporterRegistry.cpp'
--- a/storage/ndb/src/common/transporter/TransporterRegistry.cpp 2012-03-14 10:31:02 +0000
+++ b/storage/ndb/src/common/transporter/TransporterRegistry.cpp 2012-05-31 18:26:14 +0000
@@ -1333,6 +1333,7 @@ TransporterRegistry::performReceive(Tran
assert((receiveHandle == &recvdata) || (receiveHandle == 0));
bool hasReceived = false;
+ bool stopReceiving = false;
if (recvdata.m_has_data_transporters.get(0))
{
@@ -1357,7 +1358,7 @@ TransporterRegistry::performReceive(Tran
#ifdef NDB_TCP_TRANSPORTER
for(Uint32 id = recvdata.m_has_data_transporters.find_first();
- id != BitmaskImpl::NotFound;
+ id != BitmaskImpl::NotFound && !stopReceiving;
id = recvdata.m_has_data_transporters.find_next(id + 1))
{
bool hasdata = false;
@@ -1376,7 +1377,7 @@ TransporterRegistry::performReceive(Tran
Uint32 * ptr;
Uint32 sz = t->getReceiveData(&ptr);
recvdata.transporter_recv_from(id);
- Uint32 szUsed = unpack(recvdata, ptr, sz, id, ioStates[id]);
+ Uint32 szUsed = unpack(recvdata, ptr, sz, id, ioStates[id], stopReceiving);
t->updateReceiveDataPtr(szUsed);
hasdata = t->hasReceiveData();
}
@@ -1389,7 +1390,7 @@ TransporterRegistry::performReceive(Tran
#ifdef NDB_SCI_TRANSPORTER
//performReceive
//do prepareReceive on the SCI transporters (prepareReceive(t,,,,))
- for (int i=0; i<nSCITransporters; i++)
+ for (int i=0; i<nSCITransporters && !stopReceiving; i++)
{
SCI_Transporter *t = theSCITransporters[i];
const NodeId nodeId = t->getRemoteNodeId();
@@ -1404,14 +1405,14 @@ TransporterRegistry::performReceive(Tran
Uint32 * readPtr, * eodPtr;
t->getReceivePtr(&readPtr, &eodPtr);
callbackObj->transporter_recv_from(nodeId);
- Uint32 *newPtr = unpack(readPtr, eodPtr, nodeId, ioStates[nodeId]);
+ Uint32 *newPtr = unpack(readPtr, eodPtr, nodeId, ioStates[nodeId], stopReceiving);
t->updateReceivePtr(newPtr);
}
}
}
#endif
#ifdef NDB_SHM_TRANSPORTER
- for (int i=0; i<nSHMTransporters; i++)
+ for (int i=0; i<nSHMTransporters && !stopReceiving; i++)
{
SHM_Transporter *t = theSHMTransporters[i];
const NodeId nodeId = t->getRemoteNodeId();
@@ -1426,7 +1427,8 @@ TransporterRegistry::performReceive(Tran
t->getReceivePtr(&readPtr, &eodPtr);
recvdata.transporter_recv_from(nodeId);
Uint32 *newPtr = unpack(recvdata,
- readPtr, eodPtr, nodeId, ioStates[nodeId]);
+ readPtr, eodPtr, nodeId, ioStates[nodeId],
+ stopReceiving);
t->updateReceivePtr(newPtr);
}
}
=== modified file 'storage/ndb/src/kernel/vm/TransporterCallback.cpp'
--- a/storage/ndb/src/kernel/vm/TransporterCallback.cpp 2012-01-24 06:20:13 +0000
+++ b/storage/ndb/src/kernel/vm/TransporterCallback.cpp 2012-05-31 18:26:14 +0000
@@ -133,7 +133,7 @@ void mt_init_receiver_cache(){}
void mt_set_section_chunk_size(){}
#endif
-void
+bool
TransporterReceiveHandleKernel::deliver_signal(SignalHeader * const header,
Uint8 prio,
Uint32 * const theData,
@@ -198,7 +198,7 @@ TransporterReceiveHandleKernel::deliver_
header, theData, secPtrI);
#endif
- return;
+ return false;
}
/**
@@ -234,6 +234,7 @@ TransporterReceiveHandleKernel::deliver_
sendprioa(m_thr_no /* self */,
header, theData, NULL);
#endif
+ return false;
}
NdbOut &
=== modified file 'storage/ndb/src/kernel/vm/TransporterCallbackKernel.hpp'
--- a/storage/ndb/src/kernel/vm/TransporterCallbackKernel.hpp 2012-01-23 20:23:08 +0000
+++ b/storage/ndb/src/kernel/vm/TransporterCallbackKernel.hpp 2012-05-31 18:26:14 +0000
@@ -44,7 +44,7 @@ public:
#endif
/* TransporterCallback interface. */
- void deliver_signal(SignalHeader * const header,
+ bool deliver_signal(SignalHeader * const header,
Uint8 prio,
Uint32 * const signalData,
LinearSectionPtr ptr[3]);
=== modified file 'storage/ndb/src/ndbapi/ClusterMgr.cpp'
--- a/storage/ndb/src/ndbapi/ClusterMgr.cpp 2011-07-04 16:30:34 +0000
+++ b/storage/ndb/src/ndbapi/ClusterMgr.cpp 2012-05-31 18:26:14 +0000
@@ -217,12 +217,12 @@ ClusterMgr::doStop( ){
void
ClusterMgr::forceHB()
{
- theFacade.lock_mutex();
+ theFacade.lock_poll_mutex();
if(waitingForHB)
{
- NdbCondition_WaitTimeout(waitForHBCond, theFacade.theMutexPtr, 1000);
- theFacade.unlock_mutex();
+ NdbCondition_WaitTimeout(waitForHBCond, theFacade.thePollMutex, 1000);
+ theFacade.unlock_poll_mutex();
return;
}
@@ -243,7 +243,7 @@ ClusterMgr::forceHB()
}
}
waitForHBFromNodes.bitAND(ndb_nodes);
- theFacade.unlock_mutex();
+ theFacade.unlock_poll_mutex();
#ifdef DEBUG_REG
char buf[128];
@@ -273,18 +273,19 @@ ClusterMgr::forceHB()
#endif
raw_sendSignal(&signal, nodeId);
}
+ flush_send_buffers();
unlock();
}
/* Wait for nodes to reply - if any heartbeats was sent */
- theFacade.lock_mutex();
+ theFacade.lock_poll_mutex();
if (!waitForHBFromNodes.isclear())
- NdbCondition_WaitTimeout(waitForHBCond, theFacade.theMutexPtr, 1000);
+ NdbCondition_WaitTimeout(waitForHBCond, theFacade.thePollMutex, 1000);
waitingForHB= false;
#ifdef DEBUG_REG
ndbout << "Still waiting for HB from " << waitForHBFromNodes.getText(buf) << endl;
#endif
- theFacade.unlock_mutex();
+ theFacade.unlock_poll_mutex();
}
void
@@ -298,12 +299,14 @@ ClusterMgr::startup()
lock();
theFacade.doConnect(nodeId);
+ flush_send_buffers();
unlock();
for (Uint32 i = 0; i<3000; i++)
{
lock();
theFacade.theTransporterRegistry->update_connections();
+ flush_send_buffers();
unlock();
if (theNode.is_connected())
break;
@@ -345,9 +348,9 @@ ClusterMgr::threadMain()
{
/* Sleep at 100ms between each heartbeat check */
NDB_TICKS before = now;
- for (Uint32 i = 0; i<10; i++)
+ for (Uint32 i = 0; i<5; i++)
{
- NdbSleep_MilliSleep(10);
+ NdbSleep_MilliSleep(20);
{
Guard g(clusterMgrThreadMutex);
/**
@@ -381,7 +384,7 @@ ClusterMgr::threadMain()
nodeFailRep->noOfNodes = 0;
NodeBitmask::clear(nodeFailRep->theNodes);
- trp_client::lock();
+ lock();
for (int i = 1; i < MAX_NODES; i++){
/**
* Send register request (heartbeat) to all available nodes
@@ -444,12 +447,16 @@ ClusterMgr::threadMain()
NodeBitmask::set(nodeFailRep->theNodes, nodeId);
}
}
+ flush_send_buffers();
+ unlock();
if (nodeFailRep->noOfNodes)
{
+ lock();
raw_sendSignal(&nodeFail_signal, getOwnNodeId());
+ flush_send_buffers();
+ unlock();
}
- trp_client::unlock();
}
}
@@ -530,7 +537,7 @@ ClusterMgr::trp_deliver_signal(const Ndb
tSignal.theReceiversBlockNumber= refToBlock(ref);
tSignal.theVerId_signalNumber= GSN_SUB_GCP_COMPLETE_ACK;
tSignal.theSendersBlockRef = API_CLUSTERMGR;
- safe_sendSignal(&tSignal, aNodeId);
+ safe_noflush_sendSignal(&tSignal, aNodeId);
}
break;
}
@@ -1435,7 +1442,7 @@ ArbitMgr::sendSignalToQmgr(ArbitSignal&
{
m_clusterMgr.lock();
m_clusterMgr.raw_sendSignal(&signal, aSignal.data.sender);
+ m_clusterMgr.flush_send_buffers();
m_clusterMgr.unlock();
}
}
-
=== modified file 'storage/ndb/src/ndbapi/NdbScanOperation.cpp'
--- a/storage/ndb/src/ndbapi/NdbScanOperation.cpp 2011-11-16 08:17:17 +0000
+++ b/storage/ndb/src/ndbapi/NdbScanOperation.cpp 2012-05-31 18:26:14 +0000
@@ -1669,7 +1669,6 @@ NdbScanOperation::executeCursor(int node
* Call finaliseScanOldApi() for old style scans before
* proceeding
*/
- bool locked = false;
NdbImpl* theImpl = theNdb->theImpl;
int res = 0;
@@ -1680,9 +1679,7 @@ NdbScanOperation::executeCursor(int node
}
{
- locked = true;
NdbTransaction * tCon = theNdbCon;
- theImpl->lock();
Uint32 seq = tCon->theNodeSequence;
@@ -1733,9 +1730,6 @@ done:
m_api_receivers_count = theParallelism;
}
- if (locked)
- theImpl->unlock();
-
return res;
}
=== modified file 'storage/ndb/src/ndbapi/NdbTransaction.cpp'
--- a/storage/ndb/src/ndbapi/NdbTransaction.cpp 2011-11-16 08:17:17 +0000
+++ b/storage/ndb/src/ndbapi/NdbTransaction.cpp 2012-05-31 18:26:14 +0000
@@ -733,27 +733,6 @@ NdbTransaction::executeAsynchPrepare(Ndb
releaseCompletedQueries();
}
- NdbScanOperation* tcOp = m_theFirstScanOperation;
- if (tcOp != 0){
- // Execute any cursor operations
- while (tcOp != NULL) {
- int tReturnCode;
- tReturnCode = tcOp->executeCursor(theDBnode);
- if (tReturnCode == -1) {
- DBUG_VOID_RETURN;
- }//if
- tcOp->postExecuteRelease(); // Release unneeded resources
- // outside TP mutex
- tcOp = (NdbScanOperation*)tcOp->next();
- } // while
- m_theLastScanOperation->next(m_firstExecutedScanOp);
- m_firstExecutedScanOp = m_theFirstScanOperation;
- // Discard cursor operations, since these are also
- // in the complete operations list we do not need
- // to release them.
- m_theFirstScanOperation = m_theLastScanOperation = NULL;
- }
-
bool tTransactionIsStarted = theTransactionIsStarted;
NdbOperation* tLastOp = theLastOpInList;
Ndb* tNdb = theNdb;
@@ -1013,6 +992,7 @@ NdbTransaction::sendTC_HBREP() // Send
tNdb->theImpl->lock();
const int res = tNdb->theImpl->sendSignal(tSignal,theDBnode);
+ tNdb->theImpl->flush_send_buffers();
tNdb->theImpl->unlock();
tNdb->releaseSignal(tSignal);
@@ -1040,6 +1020,26 @@ NdbTransaction::doSend()
This method assumes that at least one operation or query have been defined.
This is ensured by the caller of this routine (=execute).
*/
+ NdbScanOperation* tcOp = m_theFirstScanOperation;
+ if (tcOp != 0){
+ // Execute any cursor operations
+ while (tcOp != NULL) {
+ int tReturnCode;
+ tReturnCode = tcOp->executeCursor(theDBnode);
+ if (tReturnCode == -1) {
+ goto fail;
+ }//if
+ tcOp->postExecuteRelease(); // Release unneeded resources
+ // outside TP mutex
+ tcOp = (NdbScanOperation*)tcOp->next();
+ } // while
+ m_theLastScanOperation->next(m_firstExecutedScanOp);
+ m_firstExecutedScanOp = m_theFirstScanOperation;
+ // Discard cursor operations, since these are also
+ // in the complete operations list we do not need
+ // to release them.
+ m_theFirstScanOperation = m_theLastScanOperation = NULL;
+ }
switch(theSendStatus){
case sendOperations: {
=== modified file 'storage/ndb/src/ndbapi/Ndbif.cpp'
--- a/storage/ndb/src/ndbapi/Ndbif.cpp 2012-02-23 15:41:31 +0000
+++ b/storage/ndb/src/ndbapi/Ndbif.cpp 2012-05-31 18:26:14 +0000
@@ -73,7 +73,7 @@ Ndb::init(int aMaxNoOfTransactions)
}//if
theInitState = StartingInit;
TransporterFacade * theFacade = theImpl->m_transporter_facade;
- theEventBuffer->m_mutex = theFacade->theMutexPtr;
+ theEventBuffer->m_mutex = theImpl->m_mutex;
const Uint32 tRef = theImpl->open(theFacade);
@@ -92,9 +92,9 @@ Ndb::init(int aMaxNoOfTransactions)
}
/* Init cached min node version */
- theFacade->lock_mutex();
+ theFacade->lock_poll_mutex();
theCachedMinDbNodeVersion = theFacade->getMinDbNodeVersion();
- theFacade->unlock_mutex();
+ theFacade->unlock_poll_mutex();
theDictionary->setTransporter(this, theFacade);
@@ -1465,7 +1465,7 @@ NdbTransaction::sendTC_COMMIT_ACK(NdbImp
Uint32 * dataPtr = aSignal->getDataPtrSend();
dataPtr[0] = transId1;
dataPtr[1] = transId2;
- impl->safe_sendSignal(aSignal, refToNode(aTCRef));
+ impl->safe_noflush_sendSignal(aSignal, refToNode(aTCRef));
}
int
@@ -1503,6 +1503,7 @@ NdbImpl::send_event_report(bool has_lock
done:
if (!has_lock)
{
+ flush_send_buffers();
unlock();
}
return ret;
=== modified file 'storage/ndb/src/ndbapi/TransporterFacade.cpp'
--- a/storage/ndb/src/ndbapi/TransporterFacade.cpp 2012-02-23 15:41:31 +0000
+++ b/storage/ndb/src/ndbapi/TransporterFacade.cpp 2012-05-31 18:26:14 +0000
@@ -59,6 +59,9 @@ static int indexToNumber(int index)
#define TRP_DEBUG(t)
#endif
+#define DBG_POLL 0
+#define dbg(x,y) if (DBG_POLL) printf("%llu : " x "\n", NdbTick_CurrentNanosecond() / 1000, y)
+
/*****************************************************************************
* Call back functions
*****************************************************************************/
@@ -206,7 +209,7 @@ TRACE_GSN(Uint32 gsn)
/**
* The execute function : Handle received signal
*/
-void
+bool
TransporterFacade::deliver_signal(SignalHeader * const header,
Uint8 prio, Uint32 * const theData,
LinearSectionPtr ptr[3])
@@ -224,11 +227,13 @@ TransporterFacade::deliver_signal(Signal
}
#endif
+ bool last_message = false;
if (tRecBlockNo >= MIN_API_BLOCK_NO)
{
trp_client * clnt = m_threads.get(tRecBlockNo);
if (clnt != 0)
{
+ last_message = m_poll_owner->m_poll.lock_client(clnt);
/**
* Handle received signal immediately to avoid any unnecessary
* copying of data, allocation of memory and other things. Copying
@@ -285,13 +290,13 @@ TransporterFacade::deliver_signal(Signal
NdbApiSignal tmpSignal(*header);
NdbApiSignal * tSignal = &tmpSignal;
tSignal->setDataPtr(tDataPtr);
+ last_message = m_poll_owner->m_poll.lock_client(clnt);
clnt->trp_deliver_signal(tSignal, 0);
}
}
}
}
}
- return;
}
else if (tRecBlockNo >= MIN_API_FIXED_BLOCK_NO &&
tRecBlockNo <= MAX_API_FIXED_BLOCK_NO)
@@ -303,6 +308,7 @@ TransporterFacade::deliver_signal(Signal
NdbApiSignal tmpSignal(*header);
NdbApiSignal * tSignal = &tmpSignal;
tSignal->setDataPtr(theData);
+ last_message = m_poll_owner->m_poll.lock_client(clnt);
clnt->trp_deliver_signal(tSignal, ptr);
}//if
}
@@ -317,6 +323,7 @@ TransporterFacade::deliver_signal(Signal
abort();
}
}
+ return last_message;
}
// These symbols are needed, but not used in the API
@@ -351,7 +358,7 @@ TransporterFacade::start_instance(NodeId
(void)signal(SIGPIPE, SIG_IGN);
#endif
- theTransporterRegistry = new TransporterRegistry(this, this);
+ theTransporterRegistry = new TransporterRegistry(this, this, false);
if (theTransporterRegistry == NULL)
return -1;
@@ -446,6 +453,41 @@ runSendRequest_C(void * me)
return 0;
}
+inline
+void
+link_buffer(TFBuffer* dst, const TFBuffer * src)
+{
+ assert(dst);
+ assert(src);
+ assert(src->m_head);
+ assert(src->m_tail);
+ TFBufferGuard g0(* dst);
+ TFBufferGuard g1(* src);
+ if (dst->m_head == 0)
+ {
+ dst->m_head = src->m_head;
+ }
+ else
+ {
+ dst->m_tail->m_next = src->m_head;
+ }
+ dst->m_tail = src->m_tail;
+ dst->m_bytes_in_buffer += src->m_bytes_in_buffer;
+}
+
+static const Uint32 SEND_THREAD_NO = 0;
+
+void
+TransporterFacade::wakeup_send_thread(Uint32 node)
+{
+ Guard g(m_send_thread_mutex);
+ if (m_send_thread_nodes.get(SEND_THREAD_NO) == false)
+ {
+ NdbCondition_Signal(m_send_thread_cond);
+ }
+ m_send_thread_nodes.set(SEND_THREAD_NO);
+}
+
void TransporterFacade::threadMainSend(void)
{
theTransporterRegistry->startSending();
@@ -456,15 +498,61 @@ void TransporterFacade::threadMainSend(v
m_socket_server.startServer();
- while(!theStopReceive) {
- NdbSleep_MilliSleep(sendThreadWaitMillisec);
- NdbMutex_Lock(theMutexPtr);
- if (sendPerformedLastInterval == 0) {
- theTransporterRegistry->performSend();
- }
- sendPerformedLastInterval = 0;
- NdbMutex_Unlock(theMutexPtr);
+ NdbMutex_Lock(m_send_thread_mutex);
+ while(!theStopReceive)
+ {
+ if (m_send_thread_nodes.get(SEND_THREAD_NO) == false)
+ {
+ NdbCondition_WaitTimeout(m_send_thread_cond,
+ m_send_thread_mutex,
+ sendThreadWaitMillisec);
+ }
+ m_send_thread_nodes.clear(SEND_THREAD_NO);
+ NdbMutex_Unlock(m_send_thread_mutex);
+ bool all_empty = true;
+ do
+ {
+ all_empty = true;
+ for (Uint32 i = 0; i<MAX_NODES; i++)
+ {
+ struct TFSendBuffer * b = m_send_buffers + i;
+ NdbMutex_Lock(&b->m_mutex);
+ if (b->m_sending)
+ {
+ /**
+ * sender does stuff when clearing m_sending
+ */
+ }
+ else if (b->m_buffer.m_bytes_in_buffer > 0 ||
+ b->m_out_buffer.m_bytes_in_buffer > 0)
+ {
+ /**
+ * Copy all data from m_buffer to m_out_buffer
+ */
+ TFBuffer copy = b->m_buffer;
+ bzero(&b->m_buffer, sizeof(b->m_buffer));
+ b->m_sending = true;
+ NdbMutex_Unlock(&b->m_mutex);
+ if (copy.m_bytes_in_buffer > 0)
+ {
+ link_buffer(&b->m_out_buffer, ©);
+ }
+ theTransporterRegistry->performSend(i);
+ NdbMutex_Lock(&b->m_mutex);
+ b->m_sending = false;
+ if (b->m_buffer.m_bytes_in_buffer > 0 ||
+ b->m_out_buffer.m_bytes_in_buffer > 0)
+ {
+ all_empty = false;
+ }
+ }
+ NdbMutex_Unlock(&b->m_mutex);
+ }
+ } while (!theStopReceive && all_empty == false);
+
+ NdbMutex_Lock(m_send_thread_mutex);
}
+ NdbMutex_Unlock(m_send_thread_mutex);
theTransporterRegistry->stopSending();
m_socket_server.stopServer();
@@ -499,6 +587,7 @@ void TransporterFacade::threadMainReceiv
{
theClusterMgr->lock();
theTransporterRegistry->update_connections();
+ theClusterMgr->flush_send_buffers();
theClusterMgr->unlock();
NdbSleep_MilliSleep(100);
}//while
@@ -510,10 +599,9 @@ void TransporterFacade::threadMainReceiv
and returns to caller. It will quickly come back here if not all
data was received for the worker thread.
*/
-void TransporterFacade::external_poll(Uint32 wait_time)
+void
+TransporterFacade::external_poll(trp_client* clnt, Uint32 wait_time)
{
- NdbMutex_Unlock(theMutexPtr);
-
#ifdef NDB_SHM_TRANSPORTER
/*
If shared memory transporters are used we need to set our sigmask
@@ -529,10 +617,11 @@ void TransporterFacade::external_poll(Ui
NdbThread_set_shm_sigmask(TRUE);
#endif
- NdbMutex_Lock(theMutexPtr);
if (res > 0)
{
+ m_poll_cnt++;
theTransporterRegistry->performReceive();
+ m_poll_cnt++;
}
}
@@ -552,11 +641,22 @@ TransporterFacade::TransporterFacade(Glo
theSendThread(NULL),
theReceiveThread(NULL),
m_fragmented_signal_id(0),
- m_globalDictCache(cache)
+ m_globalDictCache(cache),
+ m_send_buffer("sendbufferpool")
{
DBUG_ENTER("TransporterFacade::TransporterFacade");
- theMutexPtr = NdbMutex_CreateWithName("TTFM");
+ thePollMutex = NdbMutex_CreateWithName("PollMutex");
sendPerformedLastInterval = 0;
+ m_open_close_mutex = NdbMutex_Create();
+ for (Uint32 i = 0; i<NDB_ARRAY_SIZE(m_send_buffers); i++)
+ {
+ BaseString n;
+ n.assfmt("sendbuffer:%u", i);
+ NdbMutex_InitWithName(&m_send_buffers[i].m_mutex, n.c_str());
+ }
+
+ m_send_thread_cond = NdbCondition_Create();
+ m_send_thread_mutex = NdbMutex_CreateWithName("SendThreadMutex");
for (int i = 0; i < NO_API_FIXED_BLOCKS; i++)
m_fixed2dynamic[i]= RNIL;
@@ -567,6 +667,7 @@ TransporterFacade::TransporterFacade(Glo
theClusterMgr = new ClusterMgr(*this);
+ m_poll_cnt = 0;
DBUG_VOID_RETURN;
}
@@ -641,12 +742,28 @@ TransporterFacade::configure(NodeId node
DBUG_RETURN(false);
// Configure send buffers
- Uint32 total_send_buffer = 0;
- iter.get(CFG_TOTAL_SEND_BUFFER_MEMORY, &total_send_buffer);
- Uint64 extra_send_buffer = 0;
- iter.get(CFG_EXTRA_SEND_BUFFER_MEMORY, &extra_send_buffer);
- theTransporterRegistry->allocate_send_buffers(total_send_buffer,
- extra_send_buffer);
+ if (!m_send_buffer.inited())
+ {
+ Uint32 total_send_buffer = 0;
+ iter.get(CFG_TOTAL_SEND_BUFFER_MEMORY, &total_send_buffer);
+
+ if (total_send_buffer == 0)
+ {
+ total_send_buffer = theTransporterRegistry->get_total_max_send_buffer();
+ }
+
+ Uint64 extra_send_buffer = 0;
+ iter.get(CFG_EXTRA_SEND_BUFFER_MEMORY, &extra_send_buffer);
+
+ total_send_buffer += extra_send_buffer;
+ if (!m_send_buffer.init(total_send_buffer))
+ {
+ ndbout << "Unable to allocate "
+ << total_send_buffer
+ << " bytes of memory for send buffers!!" << endl;
+ return false;
+ }
+ }
Uint32 auto_reconnect=1;
iter.get(CFG_AUTO_RECONNECT, &auto_reconnect);
@@ -686,15 +803,52 @@ TransporterFacade::for_each(trp_client*
const NdbApiSignal* aSignal,
const LinearSectionPtr ptr[3])
{
+ trp_client * woken[16];
+ Uint32 cnt_woken = 0;
Uint32 sz = m_threads.m_statusNext.size();
for (Uint32 i = 0; i < sz ; i ++)
{
trp_client * clnt = m_threads.m_objectExecute[i];
if (clnt != 0 && clnt != sender)
{
- clnt->trp_deliver_signal(aSignal, ptr);
+ bool res = m_poll_owner->m_poll.check_if_locked(clnt);
+ if (res)
+ {
+ clnt->trp_deliver_signal(aSignal, ptr);
+ }
+ else
+ {
+ NdbMutex_Lock(clnt->m_mutex);
+ int save = clnt->m_poll.m_waiting;
+ clnt->trp_deliver_signal(aSignal, ptr);
+ if (save != clnt->m_poll.m_waiting &&
+ clnt->m_poll.m_waiting == trp_client::PollQueue::PQ_WOKEN)
+ {
+ woken[cnt_woken++] = clnt;
+ if (cnt_woken == NDB_ARRAY_SIZE(woken))
+ {
+ lock_poll_mutex();
+ remove_from_poll_queue(woken, cnt_woken);
+ unlock_poll_mutex();
+ unlock_and_signal(woken, cnt_woken);
+ cnt_woken = 0;
+ }
+ }
+ else
+ {
+ NdbMutex_Unlock(clnt->m_mutex);
+ }
+ }
}
}
+
+ if (cnt_woken != 0)
+ {
+ lock_poll_mutex();
+ remove_from_poll_queue(woken, cnt_woken);
+ unlock_poll_mutex();
+ unlock_and_signal(woken, cnt_woken);
+ }
}
void
@@ -732,7 +886,8 @@ TransporterFacade::close_clnt(trp_client
int ret = -1;
if (clnt)
{
- NdbMutex_Lock(theMutexPtr);
+ Guard g(m_open_close_mutex);
+ if (DBG_POLL) ndbout_c("close(%p)", clnt);
if (m_threads.get(clnt->m_blockNo) == clnt)
{
m_threads.close(clnt->m_blockNo);
@@ -742,7 +897,20 @@ TransporterFacade::close_clnt(trp_client
{
assert(0);
}
- NdbMutex_Unlock(theMutexPtr);
+
+ /**
+ * Wait for any ongoing poll
+ */
+ clnt->lock();
+ Uint32 start_val = m_poll_cnt;
+ clnt->unlock();
+ Uint32 curr_val;
+ do
+ {
+ clnt->lock();
+ curr_val = m_poll_cnt;
+ clnt->unlock();
+ } while (start_val == curr_val && ((start_val & 1) == 1));
}
return ret;
}
@@ -751,7 +919,8 @@ Uint32
TransporterFacade::open_clnt(trp_client * clnt, int blockNo)
{
DBUG_ENTER("TransporterFacade::open");
- Guard g(theMutexPtr);
+ Guard g(m_open_close_mutex);
+ if (DBG_POLL) ndbout_c("open(%p)", clnt);
int r= m_threads.open(clnt);
if (r < 0)
{
@@ -785,10 +954,13 @@ TransporterFacade::~TransporterFacade()
DBUG_ENTER("TransporterFacade::~TransporterFacade");
delete theClusterMgr;
- NdbMutex_Lock(theMutexPtr);
+ NdbMutex_Lock(thePollMutex);
delete theTransporterRegistry;
- NdbMutex_Unlock(theMutexPtr);
- NdbMutex_Destroy(theMutexPtr);
+ NdbMutex_Unlock(thePollMutex);
+ NdbMutex_Destroy(thePollMutex);
+ NdbMutex_Destroy(m_open_close_mutex);
+ NdbMutex_Destroy(m_send_thread_mutex);
+ NdbCondition_Destroy(m_send_thread_cond);
#ifdef API_TRACE
signalLogger.setOutputStream(0);
#endif
@@ -821,6 +993,7 @@ TransporterFacade::calculateSendLimit()
// adaptive algorithm.
//-------------------------------------------------
void TransporterFacade::forceSend(Uint32 block_number) {
+#if 0
checkCounter--;
m_threads.m_statusNext[numberToIndex(block_number)] = ThreadData::ACTIVE;
sendPerformedLastInterval = 1;
@@ -828,6 +1001,7 @@ void TransporterFacade::forceSend(Uint32
calculateSendLimit();
}
theTransporterRegistry->forceSendCheck(0);
+#endif
}
//-------------------------------------------------
@@ -835,6 +1009,7 @@ void TransporterFacade::forceSend(Uint32
//-------------------------------------------------
int
TransporterFacade::checkForceSend(Uint32 block_number) {
+#if 0
m_threads.m_statusNext[numberToIndex(block_number)] = ThreadData::ACTIVE;
//-------------------------------------------------
// This code is an adaptive algorithm to discover when
@@ -856,6 +1031,9 @@ TransporterFacade::checkForceSend(Uint32
calculateSendLimit();
}
return did_send;
+#else
+ return 0;
+#endif
}
@@ -863,7 +1041,8 @@ TransporterFacade::checkForceSend(Uint32
* SEND SIGNAL METHODS
*****************************************************************************/
int
-TransporterFacade::sendSignal(const NdbApiSignal * aSignal, NodeId aNode)
+TransporterFacade::sendSignal(trp_client* clnt,
+ const NdbApiSignal * aSignal, NodeId aNode)
{
const Uint32* tDataPtr = aSignal->getConstDataPtrSend();
Uint32 Tlen = aSignal->theLength;
@@ -881,7 +1060,8 @@ TransporterFacade::sendSignal(const NdbA
}
#endif
if ((Tlen != 0) && (Tlen <= 25) && (TBno != 0)) {
- SendStatus ss = theTransporterRegistry->prepareSend(aSignal,
+ SendStatus ss = theTransporterRegistry->prepareSend(clnt,
+ aSignal,
1, // JBB
tDataPtr,
aNode,
@@ -1129,7 +1309,8 @@ public:
* boundaries to simplify reassembly in the kernel.
*/
int
-TransporterFacade::sendFragmentedSignal(const NdbApiSignal* inputSignal,
+TransporterFacade::sendFragmentedSignal(trp_client* clnt,
+ const NdbApiSignal* inputSignal,
NodeId aNode,
const GenericSectionPtr ptr[3],
Uint32 secs)
@@ -1144,7 +1325,7 @@ TransporterFacade::sendFragmentedSignal(
/* If there's no need to fragment, send normally */
if (totalSectionLength <= CHUNK_SZ)
- return sendSignal(aSignal, aNode, ptr, secs);
+ return sendSignal(clnt, aSignal, aNode, ptr, secs);
// TODO : Consider tracing fragment signals?
#ifdef API_TRACE
@@ -1263,7 +1444,8 @@ TransporterFacade::sendFragmentedSignal(
// do prepare send
{
SendStatus ss = theTransporterRegistry->prepareSend
- (&tmp_signal,
+ (clnt,
+ &tmp_signal,
1, /*JBB*/
tmp_signal_data,
aNode,
@@ -1318,7 +1500,8 @@ TransporterFacade::sendFragmentedSignal(
int ret;
{
SendStatus ss = theTransporterRegistry->prepareSend
- (aSignal,
+ (clnt,
+ aSignal,
1/*JBB*/,
aSignal->getConstDataPtrSend(),
aNode,
@@ -1338,7 +1521,8 @@ TransporterFacade::sendFragmentedSignal(
}
int
-TransporterFacade::sendFragmentedSignal(const NdbApiSignal* aSignal,
+TransporterFacade::sendFragmentedSignal(trp_client* clnt,
+ const NdbApiSignal* aSignal,
NodeId aNode,
const LinearSectionPtr ptr[3],
Uint32 secs)
@@ -1364,12 +1548,13 @@ TransporterFacade::sendFragmentedSignal(
tmpPtr[2].sz= linCopy[2].sz;
tmpPtr[2].sectionIter= &two;
- return sendFragmentedSignal(aSignal, aNode, tmpPtr, secs);
+ return sendFragmentedSignal(clnt, aSignal, aNode, tmpPtr, secs);
}
int
-TransporterFacade::sendSignal(const NdbApiSignal* aSignal, NodeId aNode,
+TransporterFacade::sendSignal(trp_client* clnt,
+ const NdbApiSignal* aSignal, NodeId aNode,
const LinearSectionPtr ptr[3], Uint32 secs)
{
Uint32 save = aSignal->m_noOfSections;
@@ -1386,7 +1571,8 @@ TransporterFacade::sendSignal(const NdbA
}
#endif
SendStatus ss = theTransporterRegistry->prepareSend
- (aSignal,
+ (clnt,
+ aSignal,
1, // JBB
aSignal->getConstDataPtrSend(),
aNode,
@@ -1402,7 +1588,8 @@ TransporterFacade::sendSignal(const NdbA
}
int
-TransporterFacade::sendSignal(const NdbApiSignal* aSignal, NodeId aNode,
+TransporterFacade::sendSignal(trp_client* clnt,
+ const NdbApiSignal* aSignal, NodeId aNode,
const GenericSectionPtr ptr[3], Uint32 secs)
{
Uint32 save = aSignal->m_noOfSections;
@@ -1421,7 +1608,8 @@ TransporterFacade::sendSignal(const NdbA
}
#endif
SendStatus ss = theTransporterRegistry->prepareSend
- (aSignal,
+ (clnt,
+ aSignal,
1, // JBB
aSignal->getConstDataPtrSend(),
aNode,
@@ -1566,21 +1754,25 @@ TransporterFacade::get_active_ndb_object
return m_threads.m_use_cnt;
}
-
void
TransporterFacade::start_poll(trp_client* clnt)
{
- lock_mutex();
- clnt->m_poll.m_locked = true;
+ assert(clnt->m_poll.m_locked == true);
+ assert(clnt->m_poll.m_poll_owner == false);
+ assert(clnt->m_poll.m_poll_queue == false);
+ assert(clnt->m_poll.m_waiting == trp_client::PollQueue::PQ_IDLE);
}
void
TransporterFacade::do_poll(trp_client* clnt, Uint32 wait_time)
{
- clnt->m_poll.m_waiting = true;
+ dbg("do_poll(%p)", clnt);
+ clnt->m_poll.m_waiting = trp_client::PollQueue::PQ_WAITING;
assert(clnt->m_poll.m_locked == true);
- trp_client* owner = m_poll_owner;
- if (owner != NULL && owner != clnt)
+ assert(clnt->m_poll.m_poll_owner == false);
+ assert(clnt->m_poll.m_poll_queue == false);
+ lock_poll_mutex();
+ if (m_poll_owner != NULL)
{
/*
We didn't get hold of the poll "right". We will sleep on a
@@ -1591,82 +1783,371 @@ TransporterFacade::do_poll(trp_client* c
queue if it hasn't happened already. It is usually already out of the
queue but at time-out it could be that the object is still there.
*/
- assert(clnt->m_poll.m_poll_owner == false);
add_to_poll_queue(clnt);
- NdbCondition_WaitTimeout(clnt->m_poll.m_condition, theMutexPtr,
- wait_time);
- if (clnt != m_poll_owner && clnt->m_poll.m_waiting)
+ unlock_poll_mutex();
+ dbg("cond_wait(%p)", clnt);
+ NdbCondition_WaitTimeout(clnt->m_poll.m_condition,
+ clnt->m_mutex,
+ wait_time);
+
+ switch(clnt->m_poll.m_waiting) {
+ case trp_client::PollQueue::PQ_WOKEN:
+ dbg("%p - PQ_WOKEN", clnt);
+ // we have already been taken out of poll queue
+ assert(clnt->m_poll.m_poll_queue == false);
+
+ /**
+ * clear m_poll_owner
+ * it can be that we were proposed as poll owner
+ * and later woken by another thread that became poll owner
+ */
+ clnt->m_poll.m_poll_owner = false;
+ clnt->m_poll.m_waiting = trp_client::PollQueue::PQ_IDLE;
+ return;
+ case trp_client::PollQueue::PQ_IDLE:
+ dbg("%p - PQ_IDLE", clnt);
+ assert(false); // should not happen!!
+ // ...treat as timeout...fall-through
+ case trp_client::PollQueue::PQ_WAITING:
+ dbg("%p - PQ_WAITING", clnt);
+ break;
+ }
+
+ lock_poll_mutex();
+ if (clnt->m_poll.m_poll_owner == false)
{
+ /**
+ * We got timeout...hopefully rare...
+ */
+ assert(clnt->m_poll.m_poll_queue == true);
remove_from_poll_queue(clnt);
+ unlock_poll_mutex();
+ clnt->m_poll.m_waiting = trp_client::PollQueue::PQ_IDLE;
+ dbg("%p - PQ_WAITING poll_owner == false => return", clnt);
+ return;
+ }
+ else if (m_poll_owner != 0)
+ {
+ /**
+ * We were proposed as new poll owner...but someone else beat us too it
+ * break out...and retry the whole thing...
+ */
+ clnt->m_poll.m_poll_owner = false;
+ assert(clnt->m_poll.m_poll_queue == false);
+ unlock_poll_mutex();
+ clnt->m_poll.m_waiting = trp_client::PollQueue::PQ_IDLE;
+ dbg("%p - PQ_WAITING m_poll_owner != 0 => return", clnt);
+ return;
}
+
+ /**
+ * We were proposed as new poll owner, and was first to wakeup
+ */
+ dbg("%p - PQ_WAITING => new poll_owner", clnt);
}
- else
- {
- /*
- We got the poll "right" and we poll until data is received. After
- receiving data we will check if all data is received, if not we
- poll again.
+ m_poll_owner = clnt;
+ unlock_poll_mutex();
+
+ /**
+ * We have the poll "right" and we poll until data is received. After
+ * receiving data we will check if all data is received,
+ * if not we poll again.
*/
- assert(owner == clnt || clnt->m_poll.m_poll_owner == false);
- m_poll_owner = clnt;
- clnt->m_poll.m_poll_owner = true;
- external_poll(wait_time);
+ clnt->m_poll.m_poll_owner = true;
+ clnt->m_poll.start_poll(clnt);
+ dbg("%p->external_poll", clnt);
+ external_poll(clnt, wait_time);
+
+#ifndef NDEBUG
+ {
+ Uint32 cnt = clnt->m_poll.m_locked_cnt;
+ assert(cnt >= 1);
+ assert(cnt <= NDB_ARRAY_SIZE(clnt->m_poll.m_locked_clients));
+ assert(clnt->m_poll.m_locked_clients[0] == clnt);
+ // no duplicates
+ if (DBG_POLL) printf("after external_poll: cnt: %u ", cnt);
+ for (Uint32 i = 0; i < cnt; i++)
+ {
+ trp_client * tmp = clnt->m_poll.m_locked_clients[i];
+ if (DBG_POLL) printf("%p(%u) ", tmp, tmp->m_poll.m_waiting);
+ for (Uint32 j = i + 1; j < cnt; j++)
+ {
+ assert(tmp != clnt->m_poll.m_locked_clients[j]);
+ }
+ }
+ if (DBG_POLL) printf("\n");
+
+ for (Uint32 i = 1; i < cnt; i++)
+ {
+ trp_client * tmp = clnt->m_poll.m_locked_clients[i];
+ if (tmp->m_poll.m_locked == true)
+ {
+ assert(tmp->m_poll.m_waiting != trp_client::PollQueue::PQ_IDLE);
+ }
+ else
+ {
+ assert(tmp->m_poll.m_poll_owner == false);
+ assert(tmp->m_poll.m_poll_queue == false);
+ assert(tmp->m_poll.m_waiting == trp_client::PollQueue::PQ_IDLE);
+ }
+ }
}
-}
+#endif
-void
-TransporterFacade::wakeup(trp_client* clnt)
-{
- if (clnt->m_poll.m_waiting)
+ /**
+ * we're finished polling
+ */
+ clnt->m_poll.m_poll_owner = false;
+ clnt->m_poll.m_waiting = trp_client::PollQueue::PQ_IDLE;
+
+ /**
+ * count woken clients
+ * and put them to the left in array
+ */
+ Uint32 cnt = clnt->m_poll.m_locked_cnt - 1; // skip self
+ Uint32 cnt_woken = 0;
+ trp_client ** arr = clnt->m_poll.m_locked_clients + 1; // skip self
+ for (Uint32 i = 0; i < cnt; i++)
{
- clnt->m_poll.m_waiting = false;
- if (m_poll_owner != clnt)
+ trp_client * tmp = arr[i];
+ if (tmp->m_poll.m_waiting == trp_client::PollQueue::PQ_WOKEN)
{
- remove_from_poll_queue(clnt);
- NdbCondition_Signal(clnt->m_poll.m_condition);
+ arr[i] = arr[cnt_woken];
+ arr[cnt_woken] = tmp;
+ cnt_woken++;
+ }
+ }
+
+ if (DBG_POLL)
+ {
+ Uint32 cnt = clnt->m_poll.m_locked_cnt;
+ printf("after sort: cnt: %u ", cnt);
+ for (Uint32 i = 0; i < cnt; i++)
+ {
+ trp_client * tmp = clnt->m_poll.m_locked_clients[i];
+ printf("%p(%u) ", tmp, tmp->m_poll.m_waiting);
}
+ printf("\n");
}
+
+ lock_poll_mutex();
+
+ /**
+ * now remove all woken from poll queue
+ * note: poll mutex held
+ */
+ remove_from_poll_queue(arr, cnt_woken);
+
+ /**
+ * take last client in poll queue and try lock it
+ */
+ bool new_owner_locked = true;
+ trp_client * new_owner = remove_last_from_poll_queue();
+ assert(new_owner != clnt);
+ if (new_owner != 0)
+ {
+ dbg("0 new_owner: %p", new_owner);
+ /**
+ * Note: we can only try lock here, to prevent potential deadlock
+ * given that we acquire mutex in different order when starting to poll
+ */
+ if (NdbMutex_Trylock(new_owner->m_mutex) != 0)
+ {
+ /**
+ * If we fail to try lock...we put him back into poll-queue
+ */
+ new_owner_locked = false;
+ add_to_poll_queue(new_owner);
+ dbg("try-lock failed %p", new_owner);
+ }
+ }
+
+ /**
+ * clear poll owner variable and unlock
+ */
+ m_poll_owner = 0;
+ unlock_poll_mutex();
+
+ if (new_owner && new_owner_locked)
+ {
+ /**
+ * Propose a poll owner
+ * Wakeup a client, that will race to become poll-owner
+ * I.e we don't assign m_poll_owner but let the wakeing up thread
+ * do this itself, if it is first
+ */
+ dbg("wake new_owner(%p)", new_owner);
+#ifndef NDEBUG
+ for (Uint32 i = 0; i <= cnt; i++)
+ {
+ assert(clnt->m_poll.m_locked_clients[i] != new_owner);
+ }
+#endif
+ assert(new_owner->m_poll.m_waiting == trp_client::PollQueue::PQ_WAITING);
+ new_owner->m_poll.m_poll_owner = true;
+ NdbCondition_Signal(new_owner->m_poll.m_condition);
+ NdbMutex_Unlock(new_owner->m_mutex);
+ }
+
+ /**
+ * Now wake all the woken clients
+ */
+ unlock_and_signal(arr, cnt_woken);
+
+ /**
+ * And unlock the rest that we delivered messages to
+ */
+ for (Uint32 i = cnt_woken; i < cnt; i++)
+ {
+ dbg("unlock (%p)", arr[i]);
+ NdbMutex_Unlock(arr[i]->m_mutex);
+ }
+
+ /**
+ * If we failed to propose new poll owner above, then we retry it here
+ */
+ if (new_owner_locked == false)
+ {
+ dbg("new_owner_locked == false", 0);
+ trp_client * new_owner;
+ while (true)
+ {
+ new_owner = 0;
+ lock_poll_mutex();
+ if (m_poll_owner != 0)
+ {
+ /**
+ * new poll owner already appointed...no need to do anything
+ */
+ break;
+ }
+
+ new_owner = remove_last_from_poll_queue();
+ if (new_owner == 0)
+ {
+ /**
+ * poll queue empty...no need to do anything
+ */
+ break;
+ }
+
+ if (NdbMutex_Trylock(new_owner->m_mutex) == 0)
+ {
+ /**
+ * We locked a client that we will propose as poll owner
+ */
+ break;
+ }
+
+ /**
+ * Failed to lock new owner, but him back on queue, and retry
+ */
+ add_to_poll_queue(new_owner);
+ unlock_poll_mutex();
+ }
+
+ unlock_poll_mutex();
+
+ if (new_owner)
+ {
+ /**
+ * Propose a poll owner
+ */
+ assert(new_owner->m_poll.m_waiting == trp_client::PollQueue::PQ_WAITING);
+ new_owner->m_poll.m_poll_owner = true;
+ NdbCondition_Signal(new_owner->m_poll.m_condition);
+ NdbMutex_Unlock(new_owner->m_mutex);
+ }
+ }
+
+ clnt->m_poll.m_locked_cnt = 0;
+ dbg("%p->do_poll return", clnt);
}
void
-TransporterFacade::complete_poll(trp_client* clnt)
+TransporterFacade::wakeup(trp_client* clnt)
{
- clnt->m_poll.m_waiting = false;
- if (!clnt->m_poll.m_locked)
- {
- assert(clnt->m_poll.m_poll_owner == false);
- return;
+ switch(clnt->m_poll.m_waiting) {
+ case trp_client::PollQueue::PQ_WAITING:
+ dbg("TransporterFacade::wakeup(%p) PQ_WAITING => PQ_WOKEN", clnt);
+ clnt->m_poll.m_waiting = trp_client::PollQueue::PQ_WOKEN;
+ break;
+ case trp_client::PollQueue::PQ_WOKEN:
+ dbg("TransporterFacade::wakeup(%p) PQ_WOKEN", clnt);
+ break;
+ case trp_client::PollQueue::PQ_IDLE:
+ dbg("TransporterFacade::wakeup(%p) PQ_IDLE", clnt);
+ break;
}
+}
- /*
- When completing the poll for this thread we must return the poll
- ownership if we own it. We will give it to the last thread that
- came here (the most recent) which is likely to be the one also
- last to complete. We will remove that thread from the conditional
- wait queue and set him as the new owner of the poll "right".
- We will wait however with the signal until we have unlocked the
- mutex for performance reasons.
- See Stevens book on Unix NetworkProgramming: The Sockets Networking
- API Volume 1 Third Edition on page 703-704 for a discussion on this
- subject.
- */
- trp_client* new_owner = 0;
- if (m_poll_owner == clnt)
+void
+TransporterFacade::unlock_and_signal(trp_client ** arr, Uint32 cnt)
+{
+ for (Uint32 i = 0; i < cnt; i++)
{
- assert(clnt->m_poll.m_poll_owner == true);
- m_poll_owner = new_owner = remove_last_from_poll_queue();
+ NdbCondition_Signal(arr[i]->m_poll.m_condition);
+ NdbMutex_Unlock(arr[i]->m_mutex);
}
- if (new_owner)
+}
+
+void
+TransporterFacade::complete_poll(trp_client* clnt)
+{
+ assert(clnt->m_poll.m_poll_owner == false);
+ assert(clnt->m_poll.m_poll_queue == false);
+ assert(clnt->m_poll.m_waiting == trp_client::PollQueue::PQ_IDLE);
+ clnt->flush_send_buffers();
+}
+
+inline
+void
+trp_client::PollQueue::start_poll(trp_client* self)
+{
+ assert(m_waiting == PQ_WAITING);
+ assert(m_locked);
+ assert(m_poll_owner);
+ assert(m_locked_cnt == 0);
+ assert(&self->m_poll == this);
+ m_locked_cnt = 1;
+ m_locked_clients[0] = self;
+}
+
+inline
+bool
+trp_client::PollQueue::check_if_locked(const trp_client* clnt) const
+{
+ for (Uint32 i = 0; i<m_locked_cnt; i++)
{
- assert(new_owner->m_poll.m_poll_owner == false);
- assert(new_owner->m_poll.m_locked == true);
- assert(new_owner->m_poll.m_waiting == true);
- NdbCondition_Signal(new_owner->m_poll.m_condition);
- new_owner->m_poll.m_poll_owner = true;
+ if (m_locked_clients[i] == clnt) // already locked
+ return true;
}
- clnt->m_poll.m_locked = false;
- clnt->m_poll.m_poll_owner = false;
- unlock_mutex();
+ return false;
+}
+
+inline
+bool
+trp_client::PollQueue::lock_client (trp_client* clnt)
+{
+ assert(m_locked_cnt <= NDB_ARRAY_SIZE(m_locked_clients));
+ if (check_if_locked(clnt))
+ return false;
+
+ dbg("lock_client(%p)", clnt);
+
+ /**
+ * API_PACKED contains a number of messages,
+ * so even if we signal "last_message"...
+ * there will(might) be a few more
+ *
+ * TODO: Check that 3 is correct number!!
+ */
+ const Uint32 MAX_MESSAGES_IN_API_PACKED = 3;
+
+ assert(m_locked_cnt < NDB_ARRAY_SIZE(m_locked_clients));
+ NdbMutex_Lock(clnt->m_mutex);
+ m_locked_clients[m_locked_cnt++] = clnt;
+ return m_locked_cnt + MAX_MESSAGES_IN_API_PACKED >= NDB_ARRAY_SIZE(m_locked_clients);
}
void
@@ -1677,7 +2158,9 @@ TransporterFacade::add_to_poll_queue(trp
assert(clnt->m_poll.m_next == 0);
assert(clnt->m_poll.m_locked == true);
assert(clnt->m_poll.m_poll_owner == false);
+ assert(clnt->m_poll.m_poll_queue == false);
+ clnt->m_poll.m_poll_queue = true;
if (m_poll_queue_head == 0)
{
assert(m_poll_queue_tail == 0);
@@ -1694,12 +2177,26 @@ TransporterFacade::add_to_poll_queue(trp
}
void
+TransporterFacade::remove_from_poll_queue(trp_client** arr, Uint32 cnt)
+{
+ for (Uint32 i = 0; i< cnt; i++)
+ {
+ if (arr[i]->m_poll.m_poll_queue)
+ {
+ remove_from_poll_queue(arr[i]);
+ }
+ }
+}
+
+void
TransporterFacade::remove_from_poll_queue(trp_client* clnt)
{
assert(clnt != 0);
assert(clnt->m_poll.m_locked == true);
assert(clnt->m_poll.m_poll_owner == false);
+ assert(clnt->m_poll.m_poll_queue == true);
+ clnt->m_poll.m_poll_queue = false;
if (clnt->m_poll.m_prev != 0)
{
clnt->m_poll.m_prev->m_poll.m_next = clnt->m_poll.m_next;
@@ -1758,6 +2255,174 @@ SignalSectionIterator::getNextWords(Uint
return NULL;
}
+void
+TransporterFacade::flush_send_buffer(Uint32 node, const TFBuffer * sb)
+{
+ Guard g(&m_send_buffers[node].m_mutex);
+ assert(node < NDB_ARRAY_SIZE(m_send_buffers));
+ link_buffer(&m_send_buffers[node].m_buffer, sb);
+}
+
+void
+TransporterFacade::flush_and_send_buffer(Uint32 node, const TFBuffer * sb)
+{
+ assert(node < NDB_ARRAY_SIZE(m_send_buffers));
+ struct TFSendBuffer * b = m_send_buffers + node;
+ bool wake = false;
+ NdbMutex_Lock(&b->m_mutex);
+ link_buffer(&b->m_buffer, sb);
+
+ if (b->m_sending == true)
+ {
+ /**
+ * Sender will check if here is data, and wake send-thread
+ * if needed
+ */
+ }
+ else
+ {
+ b->m_sending = true;
+
+ /**
+ * Copy all data from m_buffer to m_out_buffer
+ */
+ TFBuffer copy = b->m_buffer;
+ bzero(&b->m_buffer, sizeof(b->m_buffer));
+ NdbMutex_Unlock(&b->m_mutex);
+ link_buffer(&b->m_out_buffer, ©);
+ theTransporterRegistry->performSend(node);
+ NdbMutex_Lock(&b->m_mutex);
+ b->m_sending = false;
+ if (b->m_buffer.m_bytes_in_buffer > 0 ||
+ b->m_out_buffer.m_bytes_in_buffer > 0)
+ {
+ wake = true;
+ }
+ }
+ NdbMutex_Unlock(&b->m_mutex);
+
+ if (wake)
+ {
+ wakeup_send_thread(node);
+ }
+}
+
+Uint32
+TransporterFacade::get_bytes_to_send_iovec(NodeId node, struct iovec *dst,
+ Uint32 max)
+{
+ if (max == 0)
+ {
+ return 0;
+ }
+
+ Uint32 count = 0;
+ TFBuffer *b = &m_send_buffers[node].m_out_buffer;
+ TFBufferGuard g0(* b);
+ TFPage *page = b->m_head;
+ while (page != NULL && count < max)
+ {
+ dst[count].iov_base = page->m_data+page->m_start;
+ dst[count].iov_len = page->m_bytes;
+ assert(page->m_start + page->m_bytes <= page->max_data_bytes());
+ page = page->m_next;
+ count++;
+ }
+
+ return count;
+}
+
+Uint32
+TransporterFacade::bytes_sent(NodeId node, Uint32 bytes)
+{
+ TFBuffer *b = &m_send_buffers[node].m_out_buffer;
+ TFBufferGuard g0(* b);
+ Uint32 used_bytes = b->m_bytes_in_buffer;
+
+ if (bytes == 0)
+ {
+ return used_bytes;
+ }
+
+ assert(used_bytes >= bytes);
+ used_bytes -= bytes;
+ b->m_bytes_in_buffer = used_bytes;
+
+ TFPage *page = b->m_head;
+ TFPage *prev = 0;
+ while (bytes && bytes >= page->m_bytes)
+ {
+ prev = page;
+ bytes -= page->m_bytes;
+ page = page->m_next;
+ }
+
+ if (used_bytes == 0)
+ {
+ m_send_buffer.release(b->m_head, b->m_tail);
+ b->m_head = 0;
+ b->m_tail = 0;
+ }
+ else
+ {
+ if (prev)
+ {
+ m_send_buffer.release(b->m_head, prev);
+ }
+
+ page->m_start += bytes;
+ page->m_bytes -= bytes;
+ assert(page->m_start + page->m_bytes <= page->max_data_bytes());
+ b->m_head = page;
+ }
+
+ return used_bytes;
+}
+
+bool
+TransporterFacade::has_data_to_send(NodeId node)
+{
+ /**
+ * Not used...
+ */
+ abort();
+}
+
+void
+TransporterFacade::reset_send_buffer(NodeId node, bool should_be_empty)
+{
+ // Make sure that buffer is already empty if the "should_be_empty"
+ // flag is set. This is done to quickly catch any stray signals
+ // written to the send buffer while not being connected
+ TFBuffer *b0 = &m_send_buffers[node].m_buffer;
+ TFBuffer *b1 = &m_send_buffers[node].m_out_buffer;
+ bool has_data_to_send =
+ (b0->m_head != NULL && b0->m_head->m_bytes) ||
+ (b1->m_head != NULL && b1->m_head->m_bytes);
+
+ if (should_be_empty && !has_data_to_send)
+ return;
+ assert(!should_be_empty);
+
+ {
+ TFBuffer *b = &m_send_buffers[node].m_buffer;
+ if (b->m_head != 0)
+ {
+ m_send_buffer.release(b->m_head, b->m_tail);
+ }
+ bzero(b, sizeof(* b));
+ }
+
+ {
+ TFBuffer *b = &m_send_buffers[node].m_out_buffer;
+ if (b->m_head != 0)
+ {
+ m_send_buffer.release(b->m_head, b->m_tail);
+ }
+ bzero(b, sizeof(* b));
+ }
+}
+
#ifdef UNIT_TEST
// Unit test code starts
@@ -2035,6 +2700,7 @@ TransporterFacade::ext_update_connection
{
theClusterMgr->lock();
theTransporterRegistry->update_connections();
+ theClusterMgr->flush_send_buffers();
theClusterMgr->unlock();
}
@@ -2074,11 +2740,11 @@ TransporterFacade::setupWakeup()
{
/* Ask TransporterRegistry to setup wakeup sockets */
bool rc;
- lock_mutex();
+ lock_poll_mutex();
{
rc = theTransporterRegistry->setup_wakeup_socket();
}
- unlock_mutex();
+ unlock_poll_mutex();
return rc;
}
=== modified file 'storage/ndb/src/ndbapi/TransporterFacade.hpp'
--- a/storage/ndb/src/ndbapi/TransporterFacade.hpp 2012-02-23 15:41:31 +0000
+++ b/storage/ndb/src/ndbapi/TransporterFacade.hpp 2012-05-31 18:26:14 +0000
@@ -26,6 +26,7 @@
#include "DictCache.hpp"
#include <BlockNumbers.h>
#include <mgmapi.h>
+#include "trp_buffer.hpp"
class ClusterMgr;
class ArbitMgr;
@@ -81,14 +82,14 @@ public:
// Only sends to nodes which are alive
private:
- int sendSignal(const NdbApiSignal * signal, NodeId nodeId);
- int sendSignal(const NdbApiSignal*, NodeId,
+ int sendSignal(trp_client*, const NdbApiSignal *, NodeId nodeId);
+ int sendSignal(trp_client*, const NdbApiSignal*, NodeId,
const LinearSectionPtr ptr[3], Uint32 secs);
- int sendSignal(const NdbApiSignal*, NodeId,
+ int sendSignal(trp_client*, const NdbApiSignal*, NodeId,
const GenericSectionPtr ptr[3], Uint32 secs);
- int sendFragmentedSignal(const NdbApiSignal*, NodeId,
+ int sendFragmentedSignal(trp_client*, const NdbApiSignal*, NodeId,
const LinearSectionPtr ptr[3], Uint32 secs);
- int sendFragmentedSignal(const NdbApiSignal*, NodeId,
+ int sendFragmentedSignal(trp_client*, const NdbApiSignal*, NodeId,
const GenericSectionPtr ptr[3], Uint32 secs);
public:
@@ -129,8 +130,8 @@ public:
void for_each(trp_client* clnt,
const NdbApiSignal* aSignal, const LinearSectionPtr ptr[3]);
- void lock_mutex();
- void unlock_mutex();
+ void lock_poll_mutex();
+ void unlock_poll_mutex();
// Improving the API performance
void forceSend(Uint32 block_number);
@@ -159,7 +160,10 @@ public:
void complete_poll(trp_client*);
void wakeup(trp_client*);
- void external_poll(Uint32 wait_time);
+ void external_poll(trp_client* clnt, Uint32 wait_time);
+
+ void remove_from_poll_queue(trp_client**, Uint32 cnt);
+ void unlock_and_signal(trp_client**, Uint32 cnt);
trp_client* get_poll_owner(bool) const { return m_poll_owner;}
trp_client* remove_last_from_poll_queue();
@@ -177,7 +181,7 @@ public:
int get_auto_reconnect() const;
/* TransporterCallback interface. */
- void deliver_signal(SignalHeader * const header,
+ bool deliver_signal(SignalHeader * const header,
Uint8 prio,
Uint32 * const signalData,
LinearSectionPtr ptr[3]);
@@ -189,22 +193,7 @@ public:
void reportError(NodeId nodeId, TransporterError errorCode,
const char *info = 0);
void transporter_recv_from(NodeId node);
- Uint32 get_bytes_to_send_iovec(NodeId node, struct iovec *dst, Uint32 max)
- {
- return theTransporterRegistry->get_bytes_to_send_iovec(node, dst, max);
- }
- Uint32 bytes_sent(NodeId node, Uint32 bytes)
- {
- return theTransporterRegistry->bytes_sent(node, bytes);
- }
- bool has_data_to_send(NodeId node)
- {
- return theTransporterRegistry->has_data_to_send(node);
- }
- void reset_send_buffer(NodeId node, bool should_be_empty)
- {
- theTransporterRegistry->reset_send_buffer(node, should_be_empty);
- }
+
/**
* Wakeup
*
@@ -300,24 +289,67 @@ private:
Uint32 m_fragmented_signal_id;
public:
- NdbMutex* theMutexPtr;
+ volatile Uint32 m_poll_cnt;
+ NdbMutex* m_open_close_mutex;
+ NdbMutex* thePollMutex;
public:
GlobalDictCache *m_globalDictCache;
+
+public:
+ /**
+ * Add a send buffer to out-buffer
+ */
+ void flush_send_buffer(Uint32 node, const TFBuffer* buffer);
+ void flush_and_send_buffer(Uint32 node, const TFBuffer* buffer);
+
+ /**
+ * Allocate a send buffer
+ */
+ TFPage *alloc_sb_page() { return m_send_buffer.try_alloc(1);}
+
+ Uint32 get_bytes_to_send_iovec(NodeId node, struct iovec *dst, Uint32 max);
+ Uint32 bytes_sent(NodeId node, Uint32 bytes);
+ bool has_data_to_send(NodeId node);
+ void reset_send_buffer(NodeId node, bool should_be_empty);
+
+private:
+ TFMTPool m_send_buffer;
+ struct TFSendBuffer
+ {
+ TFSendBuffer() { m_sending = false; }
+ NdbMutex m_mutex;
+ bool m_sending;
+
+ /**
+ * This is data that have been "scheduled" to be sent
+ */
+ TFBuffer m_buffer;
+
+ /**
+ * This is data that is being sent
+ */
+ TFBuffer m_out_buffer;
+ } m_send_buffers[MAX_NODES];
+
+ void wakeup_send_thread(Uint32 node);
+ NdbMutex * m_send_thread_mutex;
+ NdbCondition * m_send_thread_cond;
+ NodeBitmask m_send_thread_nodes;
};
inline
void
-TransporterFacade::lock_mutex()
+TransporterFacade::lock_poll_mutex()
{
- NdbMutex_Lock(theMutexPtr);
+ NdbMutex_Lock(thePollMutex);
}
inline
void
-TransporterFacade::unlock_mutex()
+TransporterFacade::unlock_poll_mutex()
{
- NdbMutex_Unlock(theMutexPtr);
+ NdbMutex_Unlock(thePollMutex);
}
#include "ClusterMgr.hpp"
=== modified file 'storage/ndb/src/ndbapi/ndb_cluster_connection.cpp'
--- a/storage/ndb/src/ndbapi/ndb_cluster_connection.cpp 2012-03-05 13:04:02 +0000
+++ b/storage/ndb/src/ndbapi/ndb_cluster_connection.cpp 2012-05-31 18:26:14 +0000
@@ -204,13 +204,13 @@ Ndb_cluster_connection_impl::get_next_al
while ((id = get_next_node(iter)))
{
- tp->lock_mutex();
+ tp->lock_poll_mutex();
if (tp->get_node_alive(id) != 0)
{
- tp->unlock_mutex();
+ tp->unlock_poll_mutex();
return id;
}
- tp->unlock_mutex();
+ tp->unlock_poll_mutex();
}
return 0;
}
@@ -235,7 +235,7 @@ Ndb_cluster_connection::max_nodegroup()
return 0;
Bitmask<MAX_NDB_NODES> ng;
- tp->lock_mutex();
+ tp->lock_poll_mutex();
for(unsigned i= 0; i < no_db_nodes(); i++)
{
//************************************************
@@ -245,7 +245,7 @@ Ndb_cluster_connection::max_nodegroup()
if (n.is_confirmed() && n.m_state.nodeGroup <= MAX_NDB_NODES)
ng.set(n.m_state.nodeGroup);
}
- tp->unlock_mutex();
+ tp->unlock_poll_mutex();
if (ng.isclear())
return 0;
@@ -267,7 +267,7 @@ int Ndb_cluster_connection::get_no_ready
return -1;
unsigned int foundAliveNode = 0;
- tp->lock_mutex();
+ tp->lock_poll_mutex();
for(unsigned i= 0; i < no_db_nodes(); i++)
{
//************************************************
@@ -277,7 +277,7 @@ int Ndb_cluster_connection::get_no_ready
foundAliveNode++;
}
}
- tp->unlock_mutex();
+ tp->unlock_poll_mutex();
return foundAliveNode;
}
=== modified file 'storage/ndb/src/ndbapi/trp_buffer.cpp'
--- a/storage/ndb/src/ndbapi/trp_buffer.cpp 2011-02-02 00:40:07 +0000
+++ b/storage/ndb/src/ndbapi/trp_buffer.cpp 2012-05-31 18:26:14 +0000
@@ -44,6 +44,11 @@ TFPool::~TFPool()
free (m_alloc_ptr);
}
+TFMTPool::TFMTPool(const char * name)
+{
+ NdbMutex_InitWithName(&m_mutex, name);
+}
+
void
TFBuffer::validate() const
{
@@ -52,8 +57,8 @@ TFBuffer::validate() const
assert(m_head == m_tail);
if (m_head)
{
- assert(m_head->m_bytes < m_head->m_size); // Full pages should be release
- assert(m_head->m_bytes == m_head->m_start);
+ assert(m_head->m_start < m_head->m_size); // Full pages should be release
+ assert(m_head->m_bytes == 0);
}
return;
}
@@ -67,10 +72,10 @@ TFBuffer::validate() const
while (p)
{
assert(p->m_bytes <= p->m_size);
- assert(p->m_start <= p->m_bytes);
- assert((p->m_start & 3) == 0);
- assert(p->m_bytes - p->m_start > 0);
- assert(p->m_bytes - p->m_start <= (int)m_bytes_in_buffer);
+ assert(p->m_start <= p->m_size);
+ assert((p->m_bytes & 3) == 0);
+ assert(p->m_start + p->m_bytes <= p->m_size);
+ assert(p->m_bytes <= (int)m_bytes_in_buffer);
assert(p->m_next != p);
if (p == m_tail)
{
@@ -80,7 +85,7 @@ TFBuffer::validate() const
{
assert(p->m_next != 0);
}
- sum += p->m_bytes - p->m_start;
+ sum += p->m_bytes;
p = p->m_next;
}
assert(sum == m_bytes_in_buffer);
=== modified file 'storage/ndb/src/ndbapi/trp_buffer.hpp'
--- a/storage/ndb/src/ndbapi/trp_buffer.hpp 2011-02-02 00:40:07 +0000
+++ b/storage/ndb/src/ndbapi/trp_buffer.hpp 2012-05-31 18:26:14 +0000
@@ -20,6 +20,7 @@
#include <ndb_global.h>
#include <ndb_socket.h> // struct iovec
+#include <portlib/NdbMutex.h>
struct TFPage
{
@@ -98,9 +99,9 @@ struct TFSentinel
struct TFBuffer
{
TFBuffer() { m_bytes_in_buffer = 0; m_head = m_tail = 0;}
- Uint32 m_bytes_in_buffer;
struct TFPage * m_head;
struct TFPage * m_tail;
+ Uint32 m_bytes_in_buffer;
void validate() const;
};
@@ -139,6 +140,37 @@ public:
void release_list(TFPage*);
};
+class TFMTPool : private TFPool
+{
+ NdbMutex m_mutex;
+public:
+ TFMTPool(const char * name = 0);
+
+ bool init(size_t total_memory, size_t page_sz = 32768) {
+ return TFPool::init(total_memory, page_sz);
+ }
+ bool inited() const {
+ return TFPool::inited();
+ }
+
+ TFPage* try_alloc(Uint32 N) {
+ Guard g(&m_mutex);
+ return TFPool::try_alloc(N);
+ }
+
+ void release(TFPage* first, TFPage* last) {
+ Guard g(&m_mutex);
+ TFPool::release(first, last);
+ }
+
+ void release_list(TFPage* head) {
+ TFPage * tail = head;
+ while (tail->m_next != 0)
+ tail = tail->m_next;
+ release(head, tail);
+ }
+};
+
inline
TFPage *
TFPool::try_alloc(Uint32 n)
=== modified file 'storage/ndb/src/ndbapi/trp_client.cpp'
--- a/storage/ndb/src/ndbapi/trp_client.cpp 2011-09-08 06:22:07 +0000
+++ b/storage/ndb/src/ndbapi/trp_client.cpp 2012-05-31 18:26:14 +0000
@@ -21,12 +21,10 @@
trp_client::trp_client()
: m_blockNo(~Uint32(0)), m_facade(0)
{
- m_poll.m_waiting = false;
- m_poll.m_locked = false;
- m_poll.m_poll_owner = false;
- m_poll.m_next = 0;
- m_poll.m_prev = 0;
- m_poll.m_condition = NdbCondition_Create();
+ m_mutex = NdbMutex_Create();
+
+ m_send_nodes_cnt = 0;
+ m_send_buffers = new TFBuffer[MAX_NODES];
}
trp_client::~trp_client()
@@ -35,13 +33,14 @@ trp_client::~trp_client()
* require that trp_client user
* doesnt destroy object when holding any locks
*/
- assert(m_poll.m_locked == 0);
- assert(m_poll.m_poll_owner == false);
- assert(m_poll.m_next == 0);
- assert(m_poll.m_prev == 0);
+ m_poll.assert_destroy();
close();
NdbCondition_Destroy(m_poll.m_condition);
+ NdbMutex_Destroy(m_mutex);
+
+ assert(m_send_nodes_cnt == 0);
+ delete [] m_send_buffers;
}
Uint32
@@ -86,6 +85,9 @@ trp_client::close()
void
trp_client::start_poll()
{
+ NdbMutex_Lock(m_mutex);
+ assert(m_poll.m_locked == false);
+ m_poll.m_locked = true;
m_facade->start_poll(this);
}
@@ -98,30 +100,163 @@ trp_client::do_poll(Uint32 to)
void
trp_client::complete_poll()
{
- m_facade->complete_poll(this);
+ if (m_poll.m_locked == true)
+ {
+ m_facade->complete_poll(this);
+ m_poll.m_locked = false;
+ NdbMutex_Unlock(m_mutex);
+ }
}
int
trp_client::do_forceSend(int val)
{
- int did_send = 1;
+ /**
+ * since force send is disabled in this "version"
+ * set forceSend=1 always...
+ */
+ val = 1;
+
if (val == 0)
{
- did_send = m_facade->checkForceSend(m_blockNo);
+ flush_send_buffers();
+ return 0;
}
else if (val == 1)
{
- m_facade->forceSend(m_blockNo);
+ for (Uint32 i = 0; i < m_send_nodes_cnt; i++)
+ {
+ Uint32 n = m_send_nodes_list[i];
+ TFBuffer* b = m_send_buffers + n;
+ TFBufferGuard g0(* b);
+ m_facade->flush_and_send_buffer(n, b);
+ bzero(b, sizeof(* b));
+ }
+ m_send_nodes_cnt = 0;
+ m_send_nodes_mask.clear();
+ return 1;
}
- return did_send;
+ return 0;
}
int
-trp_client::safe_sendSignal(const NdbApiSignal* signal, Uint32 nodeId)
+trp_client::safe_noflush_sendSignal(const NdbApiSignal* signal, Uint32 nodeId)
{
return m_facade->m_poll_owner->raw_sendSignal(signal, nodeId);
}
+int
+trp_client::safe_sendSignal(const NdbApiSignal* signal, Uint32 nodeId)
+{
+ int res;
+ if ((res = safe_noflush_sendSignal(signal, nodeId)) != -1)
+ {
+ m_facade->m_poll_owner->flush_send_buffers();
+ }
+ return res;
+}
+
+Uint32 *
+trp_client::getWritePtr(NodeId node, Uint32 lenBytes, Uint32 prio,
+ Uint32 max_use)
+{
+ TFBuffer* b = m_send_buffers+node;
+ TFBufferGuard g0(* b);
+ bool found = m_send_nodes_mask.get(node);
+ if (likely(found))
+ {
+ TFPage * page = b->m_tail;
+ assert(page != 0);
+ if (page->m_bytes + page->m_start + lenBytes <= page->max_data_bytes())
+ {
+ return (Uint32 *)(page->m_data + page->m_start + page->m_bytes);
+ }
+ }
+ else
+ {
+ Uint32 cnt = m_send_nodes_cnt;
+ m_send_nodes_mask.set(node);
+ m_send_nodes_list[cnt] = node;
+ m_send_nodes_cnt = cnt + 1;
+ }
+
+ TFPage* page = m_facade->alloc_sb_page();
+ if (likely(page != 0))
+ {
+ page->init();
+
+ if (b->m_tail == NULL)
+ {
+ assert(!found);
+ b->m_head = page;
+ b->m_tail = page;
+ }
+ else
+ {
+ assert(found);
+ assert(b->m_head != NULL);
+ b->m_tail->m_next = page;
+ b->m_tail = page;
+ }
+ return (Uint32 *)(page->m_data);
+ }
+
+ if (b->m_tail == 0)
+ {
+ assert(!found);
+ m_send_nodes_mask.clear(node);
+ m_send_nodes_cnt--;
+ }
+ else
+ {
+ assert(found);
+ }
+
+ return NULL;
+}
+
+Uint32
+trp_client::updateWritePtr(NodeId node, Uint32 lenBytes, Uint32 prio)
+{
+ TFBuffer* b = m_send_buffers+node;
+ TFBufferGuard g0(* b);
+ assert(m_send_nodes_mask.get(node));
+ assert(b->m_head != 0);
+ assert(b->m_tail != 0);
+
+ TFPage *page = b->m_tail;
+ assert(page->m_bytes + lenBytes <= page->max_data_bytes());
+ page->m_bytes += lenBytes;
+ b->m_bytes_in_buffer += lenBytes;
+ return b->m_bytes_in_buffer;
+}
+
+void
+trp_client::flush_send_buffers()
+{
+ assert(m_poll.m_locked);
+ Uint32 cnt = m_send_nodes_cnt;
+ for (Uint32 i = 0; i<cnt; i++)
+ {
+ Uint32 node = m_send_nodes_list[i];
+ assert(m_send_nodes_mask.get(node));
+ TFBuffer* b = m_send_buffers + node;
+ TFBufferGuard g0(* b);
+ m_facade->flush_send_buffer(node, b);
+ bzero(b, sizeof(* b));
+ }
+
+ m_send_nodes_cnt = 0;
+ m_send_nodes_mask.clear();
+}
+
+bool
+trp_client::forceSend(NodeId node)
+{
+ do_forceSend();
+ return true;
+}
+
#include "NdbImpl.hpp"
PollGuard::PollGuard(NdbImpl& impl)
=== modified file 'storage/ndb/src/ndbapi/trp_client.hpp'
--- a/storage/ndb/src/ndbapi/trp_client.hpp 2011-09-09 12:29:43 +0000
+++ b/storage/ndb/src/ndbapi/trp_client.hpp 2012-05-31 18:26:14 +0000
@@ -20,13 +20,17 @@
#include <ndb_global.h>
#include <NdbCondition.h>
+#include <TransporterRegistry.hpp>
+#include <NodeBitmask.hpp>
struct trp_node;
class NdbApiSignal;
struct LinearSectionPtr;
struct GenericSectionPtr;
+struct TFBuffer;
+class trp_client;
-class trp_client
+class trp_client : TransporterSendBufferHandle
{
friend class TransporterFacade;
public:
@@ -46,6 +50,7 @@ public:
void complete_poll();
void wakeup();
+ void flush_send_buffers();
int do_forceSend(int val = 1);
int raw_sendSignal(const NdbApiSignal*, Uint32 nodeId);
@@ -71,10 +76,29 @@ public:
/**
* This sendSignal variant can be called on any trp_client
* but perform the send on the trp_client-object that
- * is currently receiving
+ * is currently receiving (m_poll_owner)
+ *
+ * This variant does flush thread-local send-buffer
*/
int safe_sendSignal(const NdbApiSignal*, Uint32 nodeId);
+ /**
+ * This sendSignal variant can be called on any trp_client
+ * but perform the send on the trp_client-object that
+ * is currently receiving (m_poll_owner)
+ *
+ * This variant does not flush thread-local send-buffer
+ */
+ int safe_noflush_sendSignal(const NdbApiSignal*, Uint32 nodeId);
+private:
+ /**
+ * TransporterSendBufferHandle interface
+ */
+ virtual Uint32 *getWritePtr(NodeId node, Uint32 lenBytes, Uint32 prio,
+ Uint32 max_use);
+ virtual Uint32 updateWritePtr(NodeId node, Uint32 lenBytes, Uint32 prio);
+ virtual bool forceSend(NodeId node);
+
private:
Uint32 m_blockNo;
TransporterFacade * m_facade;
@@ -82,15 +106,41 @@ private:
/**
* This is used for polling
*/
+public:
+ NdbMutex* m_mutex; // thread local mutex...
+private:
struct PollQueue
{
+ PollQueue();
+ void assert_destroy() const;
+
bool m_locked;
bool m_poll_owner;
- bool m_waiting;
+ bool m_poll_queue;
+ enum { PQ_WOKEN, PQ_IDLE, PQ_WAITING } m_waiting;
trp_client *m_prev;
trp_client *m_next;
NdbCondition * m_condition;
+
+ /**
+ * This is called by poll owner
+ * before doing external poll
+ */
+ void start_poll(trp_client* self);
+
+ bool lock_client(trp_client*);
+ bool check_if_locked(const trp_client*) const;
+ Uint32 m_locked_cnt;
+ trp_client * m_locked_clients[16];
} m_poll;
+
+ /**
+ * This is used for sending
+ */
+ Uint32 m_send_nodes_cnt;
+ Uint8 m_send_nodes_list[MAX_NODES];
+ NodeBitmask m_send_nodes_mask;
+ TFBuffer* m_send_buffers;
};
class PollGuard
@@ -115,7 +165,7 @@ inline
void
trp_client::lock()
{
- NdbMutex_Lock(m_facade->theMutexPtr);
+ NdbMutex_Lock(m_mutex);
assert(m_poll.m_locked == false);
m_poll.m_locked = true;
}
@@ -124,9 +174,10 @@ inline
void
trp_client::unlock()
{
+ assert(m_send_nodes_mask.isclear()); // Nothing unsent when unlocking...
assert(m_poll.m_locked == true);
m_poll.m_locked = false;
- NdbMutex_Unlock(m_facade->theMutexPtr);
+ NdbMutex_Unlock(m_mutex);
}
inline
@@ -141,7 +192,7 @@ int
trp_client::raw_sendSignal(const NdbApiSignal * signal, Uint32 nodeId)
{
assert(m_poll.m_locked);
- return m_facade->sendSignal(signal, nodeId);
+ return m_facade->sendSignal(this, signal, nodeId);
}
inline
@@ -150,7 +201,7 @@ trp_client::raw_sendSignal(const NdbApiS
const LinearSectionPtr ptr[3], Uint32 secs)
{
assert(m_poll.m_locked);
- return m_facade->sendSignal(signal, nodeId, ptr, secs);
+ return m_facade->sendSignal(this, signal, nodeId, ptr, secs);
}
inline
@@ -159,7 +210,7 @@ trp_client::raw_sendSignal(const NdbApiS
const GenericSectionPtr ptr[3], Uint32 secs)
{
assert(m_poll.m_locked);
- return m_facade->sendSignal(signal, nodeId, ptr, secs);
+ return m_facade->sendSignal(this, signal, nodeId, ptr, secs);
}
inline
@@ -168,7 +219,7 @@ trp_client::raw_sendFragmentedSignal(con
const LinearSectionPtr ptr[3], Uint32 secs)
{
assert(m_poll.m_locked);
- return m_facade->sendFragmentedSignal(signal, nodeId, ptr, secs);
+ return m_facade->sendFragmentedSignal(this, signal, nodeId, ptr, secs);
}
inline
@@ -178,7 +229,33 @@ trp_client::raw_sendFragmentedSignal(con
Uint32 secs)
{
assert(m_poll.m_locked);
- return m_facade->sendFragmentedSignal(signal, nodeId, ptr, secs);
+ return m_facade->sendFragmentedSignal(this, signal, nodeId, ptr, secs);
+}
+
+inline
+trp_client::PollQueue::PollQueue()
+{
+ m_waiting = PQ_IDLE;
+ m_locked = false;
+ m_poll_owner = false;
+ m_poll_queue = false;
+ m_next = 0;
+ m_prev = 0;
+ m_condition = NdbCondition_Create();
+ m_locked_cnt = 0;
+}
+
+inline
+void
+trp_client::PollQueue::assert_destroy() const
+{
+ assert(m_waiting == PQ_IDLE);
+ assert(m_locked == false);
+ assert(m_poll_owner == false);
+ assert(m_poll_queue == false);
+ assert(m_next == 0);
+ assert(m_prev == 0);
+ assert(m_locked_cnt == 0);
}
#endif
No bundle (reason: useless for push emails).
| Thread |
|---|
| • bzr push into mysql-5.5-cluster-7.2 branch (john.duncan:3929 to 3930) | John David Duncan | 31 May |