Below is the list of changes that have just been committed into a local
5.1 repository of jonas. When jonas does a push these changes will
be propagated to the main repository and, within 24 hours after the
push, to the public repository.
For information on how to access the public repository
see http://dev.mysql.com/doc/mysql/en/installing-source-tree.html
ChangeSet@stripped, 2007-08-08 13:14:22+02:00, jonas@stripped +3 -0
Merge eel.hemma.oreland.se:/home/jonas/src/51-telco
into eel.hemma.oreland.se:/home/jonas/src/51-telco-trp
MERGE: 1.2514.1.23
storage/ndb/src/common/transporter/TCP_Transporter.hpp@stripped, 2007-08-08 13:14:16+02:00, jonas@stripped +0 -0
Auto merged
MERGE: 1.13.1.1
storage/ndb/src/common/transporter/TransporterRegistry.cpp@stripped, 2007-08-08 13:14:17+02:00, jonas@stripped +0 -0
Auto merged
MERGE: 1.75.1.1
storage/ndb/src/ndbapi/TransporterFacade.cpp@stripped, 2007-08-08 13:14:17+02:00, jonas@stripped +0 -0
Auto merged
MERGE: 1.66.1.1
# This is a BitKeeper patch. What follows are the unified diffs for the
# set of deltas contained in the patch. The rest of the patch, the part
# that BitKeeper cares about, is below these diffs.
# User: jonas
# Host: eel.hemma.oreland.se
# Root: /home/jonas/src/51-telco-trp/RESYNC
--- 1.14/storage/ndb/src/common/transporter/TCP_Transporter.hpp 2007-06-14 18:46:33 +02:00
+++ 1.15/storage/ndb/src/common/transporter/TCP_Transporter.hpp 2007-08-08 13:14:16 +02:00
@@ -113,10 +113,9 @@
virtual void disconnectImpl();
private:
- /**
- * Send buffers
- */
- SendBuffer m_sendBuffer;
+ Uint32 m_send_bytes;
+ Uint32 m_send_buffer_pos;
+ struct iovec * m_send_buffers;
// Sending/Receiving socket used by both client and server
NDB_SOCKET_TYPE theSocket;
@@ -176,7 +175,7 @@
inline
bool
TCP_Transporter::hasDataToSend() const {
- return m_sendBuffer.dataSize > 0;
+ return m_send_bytes > 0;
}
inline
--- 1.76/storage/ndb/src/common/transporter/TransporterRegistry.cpp 2007-06-14 18:46:33 +02:00
+++ 1.77/storage/ndb/src/common/transporter/TransporterRegistry.cpp 2007-08-08 13:14:17 +02:00
@@ -99,7 +99,8 @@
nTCPTransporters = 0;
nSCITransporters = 0;
nSHMTransporters = 0;
-
+ m_send_buffer_size = 0;
+
// Initialize the transporter arrays
for (unsigned i=0; i<maxTransporters; i++) {
theTCPTransporters[i] = NULL;
@@ -270,7 +271,7 @@
bool
TransporterRegistry::createTCPTransporter(TransporterConfiguration *config) {
#ifdef NDB_TCP_TRANSPORTER
-
+
if(!nodeIdSpecified){
init(config->localNodeId);
}
@@ -280,23 +281,9 @@
if(theTransporters[config->remoteNodeId] != NULL)
return false;
-
+
TCP_Transporter * t = new TCP_Transporter(*this, config);
-
-#if 0
- config->tcp.sendBufferSize,
- config->tcp.maxReceiveSize,
- config->localHostName,
- config->remoteHostName,
- config->s_port,
- config->isMgmConnection,
- localNodeId,
- config->remoteNodeId,
- config->serverNodeId,
- config->checksum,
- config->signalId);
-#endif
-
+
if (t == NULL)
return false;
else if (!t->initTransporter()) {
@@ -311,7 +298,8 @@
performStates[t->getRemoteNodeId()] = DISCONNECTED;
nTransporters++;
nTCPTransporters++;
-
+
+ m_send_buffer_size += config->tcp.sendBufferSize;
return true;
#else
return false;
@@ -550,7 +538,7 @@
reportError(callbackObj, nodeId, TE_SEND_BUFFER_FULL);
return SEND_OK;
}
-
+
WARNING("Signal to " << nodeId << " lost(buffer)");
reportError(callbackObj, nodeId, TE_SIGNAL_LOST_SEND_BUFFER_FULL);
return SEND_BUFFER_FULL;
@@ -643,6 +631,85 @@
return SEND_BLOCKED;
}
+}
+
+
+/**
+ * Transporter(s) that does not support (nativly)
+ * the thr_client_r interface
+ * uses this method...
+ */
+SendStatus
+TransporterRegistry::prepareSend(Transporter* t,
+ struct iovec* buf,
+ Uint32 lenBytes)
+{
+ Uint32 i;
+ Uint32 cnt = (lenBytes + 32767) / 32768;
+ SendStatus ss = SEND_OK;
+
+ if(t->isConnected())
+ {
+ Uint32 * insertPtr = t->getWritePtr(lenBytes, 0);
+ if(insertPtr != 0)
+ {
+ for (i = 0; i<cnt; i++)
+ {
+ Uint32 sz = buf[i].iov_len;
+ assert((sz & 3) == 0);
+ sz /= 4;
+ memcpy(insertPtr, buf[i].iov_base, sz);
+ insertPtr += sz;
+ }
+ t->updateWritePtr(lenBytes, 0);
+ goto done;
+ }
+
+ /**
+ * @note: on linux/i386 the granularity is 10ms
+ * so sleepTime = 2 generates a 10 ms sleep.
+ */
+ int sleepTime = 2;
+ for(Uint32 loop = 0; loop<50; loop++){
+ if((nSHMTransporters+nSCITransporters) == 0)
+ NdbSleep_MilliSleep(sleepTime);
+ insertPtr = t->getWritePtr(lenBytes, 0);
+ if(insertPtr != 0)
+ {
+ for (i = 0; i<cnt; i++)
+ {
+ Uint32 sz = buf[i].iov_len;
+ assert((sz & 3) == 0);
+ sz /= 4;
+ memcpy(insertPtr, buf[i].iov_base, sz);
+ insertPtr += sz;
+ }
+ t->updateWritePtr(lenBytes, 0);
+ break;
+ }
+ }
+
+ Uint32 nodeId = t->remoteNodeId;
+ if(insertPtr != 0)
+ {
+ /**
+ * Send buffer full, but resend works
+ */
+ reportError(callbackObj, nodeId, TE_SEND_BUFFER_FULL);
+ goto done;
+ }
+
+ WARNING("Signal to " << nodeId << " lost(buffer)");
+ reportError(callbackObj, nodeId, TE_SIGNAL_LOST_SEND_BUFFER_FULL);
+ ss = SEND_BUFFER_FULL;
+ goto done;
+ }
+
+ ss = SEND_DISCONNECTED;
+
+done:
+ release_buffers(buf, cnt);
+ return ss;
}
void
--- 1.68/storage/ndb/src/ndbapi/TransporterFacade.cpp 2007-06-20 06:34:48 +02:00
+++ 1.69/storage/ndb/src/ndbapi/TransporterFacade.cpp 2007-08-08 13:14:17 +02:00
@@ -691,6 +691,11 @@
TRP_DEBUG( "configureTransporters returned 0 or less" );
DBUG_RETURN(false);
}
+
+ if (theTransporterRegistry->alloc_buffers() != true)
+ {
+ DBUG_RETURN(false);
+ }
ndb_mgm_configuration_iterator iter(* props, CFG_SECTION_NODE);
iter.first();
| Thread |
|---|
| • bk commit into 5.1 tree (jonas:1.2563) | jonas | 8 Aug |