3020 Jonas Oreland 2008-10-27
ndb(mt)d - rework unified sendbuffer handling - to remove spurious corrupt messages
in high load + fix high-load-read performance breakdown
modified:
storage/ndb/include/transporter/TransporterCallback.hpp
storage/ndb/include/transporter/TransporterRegistry.hpp
storage/ndb/src/common/transporter/SHM_Transporter.cpp
storage/ndb/src/common/transporter/SHM_Transporter.hpp
storage/ndb/src/common/transporter/TCP_Transporter.cpp
storage/ndb/src/common/transporter/TCP_Transporter.hpp
storage/ndb/src/common/transporter/Transporter.cpp
storage/ndb/src/common/transporter/Transporter.hpp
storage/ndb/src/common/transporter/TransporterRegistry.cpp
storage/ndb/src/kernel/vm/TransporterCallback.cpp
storage/ndb/src/kernel/vm/mt.cpp
storage/ndb/src/ndbapi/TransporterFacade.hpp
3019 Magnus Svensson 2008-10-27
WL#4350 Ignore API_REGCONF in all Ndb blocks
modified:
storage/ndb/src/ndbapi/Ndbif.cpp
=== modified file 'storage/ndb/include/transporter/TransporterCallback.hpp'
--- a/storage/ndb/include/transporter/TransporterCallback.hpp 2008-05-29 15:06:11 +0000
+++ b/storage/ndb/include/transporter/TransporterCallback.hpp 2008-10-27 17:56:57 +0000
@@ -411,8 +411,8 @@ public:
* Will be called from the thread that does performSend(), so multi-threaded
* use cases must be prepared for that and do any necessary locking.
*/
- virtual int get_bytes_to_send_iovec(NodeId node, struct iovec *dst,
- Uint32 max) = 0;
+ virtual Uint32 get_bytes_to_send_iovec(NodeId, struct iovec *dst, Uint32) = 0;
+
/**
* Called when data has been sent, allowing to free / reuse the space. Passes
* number of bytes sent.
@@ -426,8 +426,7 @@ public:
*
* Like get_bytes_to_send_iovec(), this is called during performSend().
*/
- virtual Uint32 bytes_sent(NodeId node, const struct iovec *src,
- Uint32 bytes) = 0;
+ virtual Uint32 bytes_sent(NodeId node, Uint32 bytes) = 0;
/**
* Called to check if any data is available for sending with doSend().
=== modified file 'storage/ndb/include/transporter/TransporterRegistry.hpp'
--- a/storage/ndb/include/transporter/TransporterRegistry.hpp 2008-06-17 20:28:45 +0000
+++ b/storage/ndb/include/transporter/TransporterRegistry.hpp 2008-10-27 17:56:57 +0000
@@ -300,7 +300,7 @@ public:
Uint32 pollReceive(Uint32 timeOutMillis);
void performReceive();
- void performSend(NodeId nodeId);
+ int performSend(NodeId nodeId);
void performSend();
/**
@@ -455,31 +455,23 @@ private:
/* Send buffer for one transporter is kept in a single-linked list. */
struct SendBufferPage *m_next;
+
/* Bytes of send data available in this page. */
- Uint32 m_bytes;
+ Uint16 m_bytes;
+ /* Start of unsent data */
+ Uint16 m_start;
+
/* Data; real size is to the end of one page. */
- unsigned char m_data[1];
+ unsigned char m_data[2];
};
/* Send buffer for one transporter. */
struct SendBuffer {
+ /* Total size of data in buffer, from m_offset_start_data to end. */
+ Uint32 m_used_bytes;
/* Linked list of active buffer pages with first and last pointer. */
SendBufferPage *m_first_page;
SendBufferPage *m_last_page;
- /**
- * Current page == the first one with data not yet returned from
- * get_bytes_to_send_iovec().
- */
- SendBufferPage *m_current_page;
- /**
- * Offset (in m_current_page) of next data to return from
- * get_bytes_to_send_iovec().
- */
- Uint32 m_offset_unsent_data;
- /* Offset (in m_first_page) of data not yet passed to bytes_sent(). */
- Uint32 m_offset_start_data;
- /* Total size of data in buffer, from m_offset_start_data to end. */
- Uint32 m_used_bytes;
};
SendBufferPage *alloc_page();
@@ -501,8 +493,8 @@ private:
Uint32 m_total_max_send_buffer;
public:
- int get_bytes_to_send_iovec(NodeId node, struct iovec *dst, Uint32 max);
- Uint32 bytes_sent(NodeId node, const struct iovec *src, Uint32 bytes);
+ Uint32 get_bytes_to_send_iovec(NodeId node, struct iovec *dst, Uint32 max);
+ Uint32 bytes_sent(NodeId node, Uint32 bytes);
bool has_data_to_send(NodeId node);
void reset_send_buffer(NodeId node);
=== modified file 'storage/ndb/src/common/transporter/SHM_Transporter.cpp'
--- a/storage/ndb/src/common/transporter/SHM_Transporter.cpp 2008-08-27 14:34:22 +0000
+++ b/storage/ndb/src/common/transporter/SHM_Transporter.cpp 2008-10-27 17:56:57 +0000
@@ -358,23 +358,37 @@ SHM_Transporter::connect_common(NDB_SOCK
return false;
}
-bool
+int
SHM_Transporter::doSend()
{
- if (!fetch_send_iovec_data())
- return false;
+ struct iovec iov[64];
+ Uint32 cnt = fetch_send_iovec_data(iov, NDB_ARRAY_SIZE(iov));
- Uint32 used = m_send_iovec_used;
- if (used == 0)
- return true; // Nothing to send
+ if (cnt == 0)
+ {
+ return 0;
+ }
+
+ Uint32 sum = 0;
+ for(Uint32 i = 0; i<cnt; i++)
+ {
+ assert(iov[i].iov_len);
+ sum += iov[i].iov_len;
+ }
- int nBytesSent = writer->writev(m_send_iovec, used);
+ int nBytesSent = writer->writev(iov, cnt);
if (nBytesSent > 0)
{
kill(m_remote_pid, g_ndb_shm_signum);
iovec_data_sent(nBytesSent);
+
+ if (Uint32(nBytesSent) == sum && (cnt != NDB_ARRAY_SIZE(iov)))
+ {
+ return 0;
+ }
+ return 1;
}
- return true;
+ return 1;
}
=== modified file 'storage/ndb/src/common/transporter/SHM_Transporter.hpp'
--- a/storage/ndb/src/common/transporter/SHM_Transporter.hpp 2008-05-29 15:06:11 +0000
+++ b/storage/ndb/src/common/transporter/SHM_Transporter.hpp 2008-10-27 17:56:57 +0000
@@ -119,7 +119,7 @@ protected:
/**
* doSend (i.e signal receiver)
*/
- bool doSend();
+ int doSend();
int m_remote_pid;
Uint32 m_signal_threshold;
=== modified file 'storage/ndb/src/common/transporter/TCP_Transporter.cpp'
--- a/storage/ndb/src/common/transporter/TCP_Transporter.cpp 2008-10-11 09:22:33 +0000
+++ b/storage/ndb/src/common/transporter/TCP_Transporter.cpp 2008-10-27 17:56:57 +0000
@@ -246,49 +246,112 @@ TCP_Transporter::sendIsPossible(struct t
(!((sz == -1) && (e == SOCKET_EAGAIN) || (e == SOCKET_EWOULDBLOCK)
|| (e == SOCKET_EINTR))))
-bool
+int
TCP_Transporter::doSend() {
- if (!fetch_send_iovec_data())
- return false;
+ struct iovec iov[64];
+ Uint32 cnt = fetch_send_iovec_data(iov, NDB_ARRAY_SIZE(iov));
- Uint32 used = m_send_iovec_used;
- if (used == 0)
- return true; // Nothing to send
+ if (cnt == 0)
+ {
+ return 0;
+ }
+
+ Uint32 sum = 0;
+ for(Uint32 i = 0; i<cnt; i++)
+ {
+ assert(iov[i].iov_len);
+ sum += iov[i].iov_len;
+ }
- Uint32 iovcnt = used > m_os_max_iovec ? m_os_max_iovec : used;
- int nBytesSent = my_socket_writev(theSocket, m_send_iovec, iovcnt);
+ Uint32 pos = 0;
+ Uint32 sum_sent = 0;
+ Uint32 send_cnt = 0;
+ Uint32 remain = sum;
- if (nBytesSent > 0)
+ if (cnt == NDB_ARRAY_SIZE(iov))
{
- iovec_data_sent(nBytesSent);
+ // If pulling all iov's make sure that we never return everyting
+ // flushed
+ sum++;
+ }
+
+ while (send_cnt < 5)
+ {
+ send_cnt++;
+ Uint32 iovcnt = cnt > m_os_max_iovec ? m_os_max_iovec : cnt;
+ int nBytesSent = my_socket_writev(theSocket, iov+pos, iovcnt);
+ assert(nBytesSent <= (int)remain);
- sendCount ++;
- sendSize += nBytesSent;
- if(sendCount == reportFreq)
+ if (Uint32(nBytesSent) == remain)
{
- get_callback_obj()->reportSendLen(remoteNodeId, sendCount, sendSize);
- sendCount = 0;
- sendSize = 0;
+ sum_sent += nBytesSent;
+ goto ok;
}
- return true;
- }
- else
- {
- /* Send failed. */
+ else if (nBytesSent > 0)
+ {
+ sum_sent += nBytesSent;
+ remain -= nBytesSent;
+
+ /**
+ * Forward in iovec
+ */
+ while (Uint32(nBytesSent) >= iov[pos].iov_len)
+ {
+ assert(iov[pos].iov_len > 0);
+ nBytesSent -= iov[pos].iov_len;
+ pos++;
+ cnt--;
+ }
+
+ if (nBytesSent)
+ {
+ assert(iov[pos].iov_len > Uint32(nBytesSent));
+ iov[pos].iov_len -= nBytesSent;
+ iov[pos].iov_base = ((char*)(iov[pos].iov_base))+nBytesSent;
+ }
+ continue;
+ }
+ else
+ {
+ int err = my_socket_errno();
+ if (!(DISCONNECT_ERRNO(err, nBytesSent)))
+ {
+ if (sum_sent)
+ {
+ goto ok;
+ }
+ else
+ {
+ return remain;
+ }
+ }
+
#if defined DEBUG_TRANSPORTER
- g_eventLogger->error("Send Failure(disconnect==%d) to node = %d "
- "nBytesSent = %d "
- "errno = %d strerror = %s",
- DISCONNECT_ERRNO(my_socket_errno(), nBytesSent),
- remoteNodeId, nBytesSent, my_socket_errno(),
- (char*)ndbstrerror(my_socket_errno()));
+ g_eventLogger->error("Send Failure(disconnect==%d) to node = %d "
+ "nBytesSent = %d "
+ "errno = %d strerror = %s",
+ DISCONNECT_ERRNO(err, nBytesSent),
+ remoteNodeId, nBytesSent, my_socket_errno(),
+ (char*)ndbstrerror(err));
#endif
- if(DISCONNECT_ERRNO(my_socket_errno(), nBytesSent)){
- do_disconnect(my_socket_errno());
+ do_disconnect(err);
+ return 0;
}
+ }
- return false;
+ok:
+ assert(sum >= sum_sent);
+ iovec_data_sent(sum_sent);
+ sendCount += send_cnt;
+ sendSize += sum_sent;
+ if(sendCount >= reportFreq)
+ {
+ get_callback_obj()->reportSendLen(remoteNodeId, sendCount, sendSize);
+ sendCount = 0;
+ sendSize = 0;
}
+
+ return sum - sum_sent; // 0 if every thing flushed else >0
}
int
=== modified file 'storage/ndb/src/common/transporter/TCP_Transporter.hpp'
--- a/storage/ndb/src/common/transporter/TCP_Transporter.hpp 2008-05-29 15:06:11 +0000
+++ b/storage/ndb/src/common/transporter/TCP_Transporter.hpp 2008-10-27 17:56:57 +0000
@@ -58,7 +58,7 @@ private:
* Retrieves the contents of the send buffers and writes it on
* the external TCP/IP interface.
*/
- bool doSend();
+ int doSend();
/**
* It reads the external TCP/IP interface once
=== modified file 'storage/ndb/src/common/transporter/Transporter.cpp'
--- a/storage/ndb/src/common/transporter/Transporter.cpp 2008-10-11 13:45:50 +0000
+++ b/storage/ndb/src/common/transporter/Transporter.cpp 2008-10-27 17:56:57 +0000
@@ -42,7 +42,6 @@ Transporter::Transporter(TransporterRegi
isServer(lNodeId==serverNodeId),
m_packer(_signalId, _checksum), m_max_send_buffer(max_send_buffer),
m_overload_limit(0xFFFFFFFF), isMgmConnection(_isMgmConnection),
- m_send_iovec_used(0),
m_type(_type),
m_transporter_registry(t_reg)
{
@@ -244,7 +243,6 @@ Transporter::doDisconnect() {
m_connected= false;
- m_send_iovec_used= 0;
get_callback_obj()->reset_send_buffer(remoteNodeId);
disconnectImpl();
}
=== modified file 'storage/ndb/src/common/transporter/Transporter.hpp'
--- a/storage/ndb/src/common/transporter/Transporter.hpp 2008-10-14 21:01:04 +0000
+++ b/storage/ndb/src/common/transporter/Transporter.hpp 2008-10-27 17:56:57 +0000
@@ -90,12 +90,11 @@ public:
used >= m_overload_limit);
}
- virtual bool doSend() = 0;
+ virtual int doSend() = 0;
bool has_data_to_send()
{
- return (m_send_iovec_used > 0 ||
- get_callback_obj()->has_data_to_send(remoteNodeId));
+ return get_callback_obj()->has_data_to_send(remoteNodeId);
}
/* Get the configured maximum send buffer usage. */
@@ -171,10 +170,7 @@ private:
virtual bool send_limit_reached(int bufsize) = 0;
protected:
- static const Uint32 SEND_IOVEC_SIZE = 64;
Uint32 m_os_max_iovec;
- Uint32 m_send_iovec_used;
- struct iovec m_send_iovec[SEND_IOVEC_SIZE];
Uint32 getErrorCount();
Uint32 m_errorCount;
@@ -190,7 +186,7 @@ protected:
void report_error(enum TransporterError err, const char *info = 0)
{ m_transporter_registry.report_error(remoteNodeId, err, info); };
- bool fetch_send_iovec_data();
+ Uint32 fetch_send_iovec_data(struct iovec dst[], Uint32 cnt);
void iovec_data_sent(int nBytesSent);
};
@@ -224,56 +220,20 @@ Transporter::getErrorCount()
* partial send).
*/
inline
-bool
-Transporter::fetch_send_iovec_data()
+Uint32
+Transporter::fetch_send_iovec_data(struct iovec dst[], Uint32 cnt)
{
- Uint32 used = m_send_iovec_used;
- if (SEND_IOVEC_SIZE > used + 1)
- {
- Uint32 avail = (SEND_IOVEC_SIZE - 1) - used;
- int count = get_callback_obj()->get_bytes_to_send_iovec(remoteNodeId,
- m_send_iovec + used,
- avail);
- if (count < 0)
- return false; // Error
- m_send_iovec_used = used + count;
- }
-
- assert(m_send_iovec_used < SEND_IOVEC_SIZE);
-
- return true;
+ return get_callback_obj()->get_bytes_to_send_iovec(remoteNodeId,
+ dst, cnt);
}
-/* Drop all iovec's that have been sent (last one maybe partially). */
inline
void
Transporter::iovec_data_sent(int nBytesSent)
{
Uint32 used_bytes
- = get_callback_obj()->bytes_sent(remoteNodeId, m_send_iovec, nBytesSent);
+ = get_callback_obj()->bytes_sent(remoteNodeId, nBytesSent);
update_status_overloaded(used_bytes);
-
- Uint32 used = m_send_iovec_used;
- int sofar = 0;
- Uint32 i;
- for (i = 0; i < used; i++)
- {
- int len = m_send_iovec[i].iov_len;
- assert(len >= 0);
- int new_sofar = sofar + len;
- if (new_sofar > nBytesSent)
- {
- int partial = nBytesSent - sofar;
- assert(partial >= 0);
- m_send_iovec[i].iov_base = (char *)m_send_iovec[i].iov_base + partial;
- m_send_iovec[i].iov_len = len - partial;
- if (i > 0)
- memmove(m_send_iovec, m_send_iovec + i, (used - i)*sizeof(iovec));
- break;
- }
- sofar = new_sofar;
- }
- m_send_iovec_used = used - i;
}
#endif // Define of Transporter_H
=== modified file 'storage/ndb/src/common/transporter/TransporterRegistry.cpp'
--- a/storage/ndb/src/common/transporter/TransporterRegistry.cpp 2008-09-21 09:47:14 +0000
+++ b/storage/ndb/src/common/transporter/TransporterRegistry.cpp 2008-10-27 17:56:57 +0000
@@ -161,9 +161,6 @@ TransporterRegistry::allocate_send_buffe
SendBuffer &b = m_send_buffers[i];
b.m_first_page = NULL;
b.m_last_page = NULL;
- b.m_current_page = NULL;
- b.m_offset_unsent_data = 0;
- b.m_offset_start_data = 0;
b.m_used_bytes = 0;
}
@@ -1166,13 +1163,16 @@ TransporterRegistry::performReceive()
* In multi-threaded cases, this must be protected by send lock (can use
* different locks for each node).
*/
-void
+int
TransporterRegistry::performSend(NodeId nodeId)
{
Transporter *t = get_transporter(nodeId);
- if (t && t->has_data_to_send() && t->isConnected() &&
- is_connected(nodeId))
- t->doSend();
+ if (t && t->isConnected() && is_connected(nodeId))
+ {
+ return t->doSend();
+ }
+
+ return 0;
}
void
@@ -1912,7 +1912,7 @@ TransporterRegistry::updateWritePtr(Tran
}
}
-int
+Uint32
TransporterRegistry::get_bytes_to_send_iovec(NodeId node, struct iovec *dst,
Uint32 max)
{
@@ -1920,48 +1920,24 @@ TransporterRegistry::get_bytes_to_send_i
if (max == 0)
return 0;
- SendBuffer *b = m_send_buffers + node;
-
- SendBufferPage *page = b->m_current_page;
- if (page == NULL)
- return 0;
-
- Uint32 offset = b->m_offset_unsent_data;
- assert(offset <= page->m_bytes);
- if (offset == page->m_bytes)
- return 0;
-
- dst[0].iov_base = (char*)(page->m_data + offset);
- dst[0].iov_len = page->m_bytes - offset;
- Uint32 count = 1;
- page = page->m_next;
+ Uint32 count = 0;
+ SendBuffer *b = m_send_buffers + node;
+ SendBufferPage *page = b->m_first_page;
while (page != NULL && count < max)
{
- dst[count].iov_base = (char*)page->m_data;
+ dst[count].iov_base = page->m_data+page->m_start;
dst[count].iov_len = page->m_bytes;
+ assert(page->m_start + page->m_bytes <= page->max_data_bytes());
page = page->m_next;
count++;
}
- if (page != NULL)
- {
- b->m_current_page = page;
- b->m_offset_unsent_data = 0;
- }
- else
- {
- assert(b->m_last_page != NULL);
- b->m_current_page = b->m_last_page;
- b->m_offset_unsent_data = b->m_last_page->m_bytes;
- }
-
return count;
}
Uint32
-TransporterRegistry::bytes_sent(NodeId node, const struct iovec *src,
- Uint32 bytes)
+TransporterRegistry::bytes_sent(NodeId node, Uint32 bytes)
{
assert(m_use_default_send_buffer);
@@ -1975,68 +1951,27 @@ TransporterRegistry::bytes_sent(NodeId n
b->m_used_bytes = used_bytes;
SendBufferPage *page = b->m_first_page;
- assert(page != NULL);
-
- /**
- * On the first page, part of the page may have been sent previously, as
- * indicated by b->m_offset_start_data.
- *
- * Additionally, there may be more data on the page than what was sent, or
- * else we will need to release this (and possibly more) pages.
- */
- assert(b->m_offset_start_data < page->m_bytes);
- Uint32 rest = page->m_bytes - b->m_offset_start_data;
- if (rest > bytes)
+ while (bytes && bytes >= page->m_bytes)
{
- b->m_offset_start_data += bytes;
- return used_bytes;
- }
- bytes -= rest;
- /**
- * Now loop, releasing pages until we find one where not all data has been sent.
- */
- for(;;) {
- if (page == b->m_last_page)
- {
- /**
- * Don't free the last page if emptied completely.
- * Instead keep it for storing more data later.
- */
- break;
- }
- SendBufferPage *next = page->m_next;
- assert(next != NULL);
- if (page == b->m_current_page)
- {
- assert(page->m_bytes == b->m_offset_unsent_data);
- b->m_current_page = next;
- b->m_offset_unsent_data = 0;
- }
- release_page(page);
- page = next;
- if (bytes == 0)
- break;
- assert(page != NULL);
- if (bytes < page->m_bytes)
- break;
+ SendBufferPage * tmp = page;
bytes -= page->m_bytes;
+ page = page->m_next;
+ release_page(tmp);
}
- if (page == NULL)
+
+ if (bytes)
{
- /* We have sent everything we had. */
- assert(bytes == 0);
- assert(b->m_current_page == NULL);
- assert(b->m_offset_unsent_data == 0);
- b->m_first_page = NULL;
- b->m_last_page = NULL;
- b->m_offset_start_data = 0;
+ page->m_start += bytes;
+ page->m_bytes -= bytes;
+ assert(page->m_start + page->m_bytes <= page->max_data_bytes());
+ b->m_first_page = page;
}
else
{
- /* We have sent only part of a page. */
- b->m_first_page = page;
- b->m_offset_start_data = bytes;
+ b->m_first_page = 0;
+ b->m_last_page = 0;
}
+
return used_bytes;
}
@@ -2046,8 +1981,7 @@ TransporterRegistry::has_data_to_send(No
assert(m_use_default_send_buffer);
SendBuffer *b = m_send_buffers + node;
- return (b->m_current_page != NULL &&
- b->m_current_page->m_bytes > b->m_offset_unsent_data);
+ return (b->m_first_page != NULL && b->m_first_page->m_bytes);
}
void
@@ -2065,9 +1999,6 @@ TransporterRegistry::reset_send_buffer(N
}
b->m_first_page = NULL;
b->m_last_page = NULL;
- b->m_current_page = NULL;
- b->m_offset_unsent_data = 0;
- b->m_offset_start_data = 0;
b->m_used_bytes = 0;
}
@@ -2100,42 +2031,33 @@ TransporterRegistry::getWritePtr(NodeId
assert(m_use_default_send_buffer);
SendBuffer *b = m_send_buffers + node;
- Uint32 *p;
-
- if (b->m_used_bytes + lenBytes > max_use)
- return NULL;
/* First check if we have room in already allocated page. */
SendBufferPage *page = b->m_last_page;
- if (page != NULL && page->m_bytes + lenBytes <=
page->max_data_bytes())
+ if (page != NULL && page->m_bytes + page->m_start + lenBytes <=
page->max_data_bytes())
{
- p = (Uint32 *)(page->m_data + page->m_bytes);
- return p;
+ return (Uint32 *)(page->m_data + page->m_start + page->m_bytes);
}
+ if (b->m_used_bytes + lenBytes > max_use)
+ return NULL;
+
/* Allocate a new page. */
page = alloc_page();
if (page == NULL)
return NULL;
page->m_next = NULL;
page->m_bytes = 0;
+ page->m_start = 0;
if (b->m_last_page == NULL)
{
b->m_first_page = page;
b->m_last_page = page;
- b->m_current_page = page;
- b->m_offset_unsent_data = 0;
- b->m_offset_start_data = 0;
}
else
{
assert(b->m_first_page != NULL);
- if (b->m_current_page == NULL)
- {
- b->m_current_page = page;
- b->m_offset_unsent_data = 0;
- }
b->m_last_page->m_next = page;
b->m_last_page = page;
}
@@ -2153,40 +2075,6 @@ TransporterRegistry::updateWritePtr(Node
assert(page->m_bytes + lenBytes <= page->max_data_bytes());
page->m_bytes += lenBytes;
b->m_used_bytes += lenBytes;
-
- /**
- * If we have no data not returned from get_bytes_to_send_iovec(), and the
- * first signal spills over into a new page, we move the current pointer to
- * not have to deal with a page with zero data in get_bytes_to_send_iovec().
- */
- if (b->m_current_page != NULL &&
- b->m_current_page->m_bytes == b->m_offset_unsent_data)
- {
- b->m_current_page = b->m_current_page->m_next;
- assert(b->m_current_page == page);
- b->m_offset_unsent_data = 0;
- }
- /**
- * If all data has been sent, and the first new signal spills over into a
- * new page, we get a first page with no data which we need to free.
- */
- SendBufferPage *tmp = b->m_first_page;
- if (tmp != NULL && tmp->m_bytes == b->m_offset_start_data)
- {
- b->m_first_page = tmp->m_next;
- assert(b->m_first_page == page);
- assert(b->m_current_page == page);
- release_page(tmp);
- b->m_offset_start_data = 0;
- }
-
- /**
- * ToDo: To get better buffer utilization, we might at this point attempt
- * to copy back part of the new data into a previous page.
- *
- * This will be especially worthwhile in case of big long signals.
- */
-
return b->m_used_bytes;
}
=== modified file 'storage/ndb/src/kernel/vm/TransporterCallback.cpp'
--- a/storage/ndb/src/kernel/vm/TransporterCallback.cpp 2008-10-08 19:09:05 +0000
+++ b/storage/ndb/src/kernel/vm/TransporterCallback.cpp 2008-10-27 17:56:57 +0000
@@ -74,13 +74,13 @@ class TransporterCallbackKernelNonMT : p
*/
int checkJobBuffer() { return globalScheduler.checkDoJob(); }
void reportSendLen(NodeId nodeId, Uint32 count, Uint64 bytes);
- int get_bytes_to_send_iovec(NodeId node, struct iovec *dst, Uint32 max)
+ Uint32 get_bytes_to_send_iovec(NodeId node, struct iovec *dst, Uint32 max)
{
return globalTransporterRegistry.get_bytes_to_send_iovec(node, dst, max);
}
- Uint32 bytes_sent(NodeId node, const struct iovec *src, Uint32 bytes)
+ Uint32 bytes_sent(NodeId node, Uint32 bytes)
{
- return globalTransporterRegistry.bytes_sent(node, src, bytes);
+ return globalTransporterRegistry.bytes_sent(node, bytes);
}
bool has_data_to_send(NodeId node)
{
=== modified file 'storage/ndb/src/kernel/vm/mt.cpp'
--- a/storage/ndb/src/kernel/vm/mt.cpp 2008-10-20 10:01:57 +0000
+++ b/storage/ndb/src/kernel/vm/mt.cpp 2008-10-27 17:56:57 +0000
@@ -71,6 +71,18 @@ static Uint32 ndbmt_threads = 0;
static Uint32 num_threads = 0;
static Uint32 receiver_thread_no = 0;
+#define NO_SEND_THREAD (MAX_THREADS + 1)
+
+struct mt_lock_stat
+{
+ const void * m_ptr;
+ char * m_name;
+ Uint32 m_contended_count;
+ Uint32 m_spin_count;
+};
+static void register_lock(const void * ptr, const char * name);
+static mt_lock_stat * lookup_lock(const void * ptr);
+
#if defined(HAVE_LINUX_FUTEX) && defined(NDB_HAVE_XCNG)
#define USE_FUTEX
#endif
@@ -230,39 +242,51 @@ struct thr_spin_lock
thr_spin_lock(const char * name = 0)
{
m_lock = 0;
- m_name = name;
- m_contended_count = 0;
+ register_lock(this, name);
}
- const char * m_name;
- Uint32 m_contended_count;
volatile Uint32 m_lock;
};
static
-inline
void
-lock(struct thr_spin_lock* sl)
+lock_slow(struct thr_spin_lock* sl)
{
volatile unsigned* val = &sl->m_lock;
-test:
- if (likely(xcng(val, 1) == 0))
- return;
-
- /*
- * There is a race conditions here on m_contended_count. But it doesn't
- * really matter if the counts are not 100% accurate
- */
- Uint32 count = sl->m_contended_count++;
- Uint32 freq = (count > 10000 ? 5000 : (count > 20 ? 200 : 1));
- if ((count % freq) == 0)
- printf("%s waiting for lock, contentions~=%u\n", sl->m_name, count);
+loop:
+ Uint32 spins = 0;
do {
+ spins++;
cpu_pause();
} while (* val == 1);
- goto test;
+ if (unlikely(xcng(val, 1) != 0))
+ goto loop;
+
+ mt_lock_stat* s = lookup_lock(sl);
+ if (s)
+ {
+ s->m_spin_count += spins;
+ Uint32 count = ++s->m_contended_count;
+ Uint32 freq = (count > 10000 ? 5000 : (count > 20 ? 200 : 1));
+
+ if ((count % freq) == 0)
+ printf("%s waiting for lock, contentions: %u spins: %u\n",
+ s->m_name, count, s->m_spin_count);
+ }
+}
+
+static
+inline
+void
+lock(struct thr_spin_lock* sl)
+{
+ volatile unsigned* val = &sl->m_lock;
+ if (likely(xcng(val, 1) == 0))
+ return;
+
+ lock_slow(sl);
}
static
@@ -294,10 +318,9 @@ struct thr_mutex
{
thr_mutex(const char * name = 0) {
m_mutex = NdbMutex_Create();
- m_name = name;
+ register_lock(this, name);
}
- const char * m_name;
NdbMutex * m_mutex;
};
@@ -326,6 +349,113 @@ trylock(struct thr_mutex * sl)
}
/**
+ * thr_safe_pool
+ */
+template<typename T>
+struct thr_safe_pool
+{
+ thr_safe_pool() : m_free_list(0), m_lock("mempool") {}
+
+ T* m_free_list;
+ Ndbd_mem_manager *m_mm;
+ thr_spin_lock m_lock;
+
+ T* seize() {
+ T* ret = 0;
+ lock(&m_lock);
+ if (m_free_list)
+ {
+ ret = m_free_list;
+ m_free_list = *reinterpret_cast<T**>(m_free_list);
+ unlock(&m_lock);
+ }
+ else
+ {
+ Uint32 dummy;
+ unlock(&m_lock);
+ ret = reinterpret_cast<T*>
+ (m_mm->alloc_page(RT_JOB_BUFFER, &dummy,
+ Ndbd_mem_manager::NDB_ZONE_ANY));
+ // ToDo: How to deal with failed allocation?!?
+ // I think in this case we need to start grabbing buffers kept for signal
+ // trace.
+ }
+ return ret;
+ }
+
+ void release(T* t){
+ lock(&m_lock);
+ T** nextptr = reinterpret_cast<T**>(t);
+ * nextptr = m_free_list;
+ m_free_list = t;
+ unlock(&m_lock);
+ }
+};
+
+/**
+ * thread_local_pool
+ */
+template<typename T, typename U>
+class thread_local_pool
+{
+public:
+ thread_local_pool(thr_safe_pool<U> *global_pool, unsigned max_free) :
+ m_max_free(max_free),
+ m_free(0),
+ m_freelist(0),
+ m_global_pool(global_pool)
+ {
+ }
+
+ T *seize()
+ {
+ T *tmp = m_freelist;
+ if (tmp)
+ {
+ m_freelist = tmp->m_next;
+ assert (m_free > 0);
+ m_free--;
+ }
+ else
+ tmp = (T *)m_global_pool->seize();
+ return tmp;
+ }
+
+ void release(T *t)
+ {
+ unsigned free = m_free;
+ if (free < m_max_free)
+ {
+ m_free = free + 1;
+ t->m_next = m_freelist;
+ m_freelist = t;
+ }
+ else
+ m_global_pool->release((U *)t);
+ }
+
+ void releaseAll()
+ {
+ Uint32 save = m_max_free;
+ m_max_free = 0;
+ while(m_free)
+ {
+ T* t = seize();
+ release(t);
+ }
+ m_max_free = save;
+ }
+
+ void set_pool(thr_safe_pool<U> * pool) { m_global_pool = pool; }
+
+private:
+ unsigned m_max_free;
+ unsigned m_free;
+ T *m_freelist;
+ thr_safe_pool<U> *m_global_pool;
+};
+
+/**
* Signal buffers.
*
* Each thread job queue contains a list of these buffers with signals.
@@ -374,7 +504,6 @@ struct thr_job_queue
* For example, on Intel core 2 quad processors, there is a ~33%
* penalty for two cores accessing the same 64-byte cacheline.
*/
-
struct thr_jb_write_state
{
/*
@@ -424,17 +553,20 @@ struct thr_jb_read_state
Uint32 m_write_pos;
};
+/**
+ * time-queue
+ */
struct thr_tq
{
static const unsigned SQ_SIZE = 512;
static const unsigned LQ_SIZE = 512;
static const unsigned PAGES = 32 * (SQ_SIZE + LQ_SIZE) / 8192;
+ Uint32 * m_delayed_signals[PAGES];
+ Uint32 m_next_free;
Uint32 m_next_timer;
Uint32 m_current_time;
- Uint32 m_next_free;
Uint32 m_cnt[2];
- Uint32 * m_delayed_signals[PAGES];
Uint32 m_short_queue[SQ_SIZE];
Uint32 m_long_queue[LQ_SIZE];
};
@@ -452,9 +584,58 @@ struct thr_tq
*/
#define THR_FREE_BUF_BATCH 6
+/**
+ * a page with send data
+ */
+struct thr_send_page
+{
+ static const Uint32 PGSIZE = 32768;
+ static Uint32 max_bytes() {
+ return PGSIZE - offsetof(thr_send_page, m_data);
+ }
+
+ /* Next page */
+ thr_send_page* m_next;
+
+ /* Bytes of send data available in this page. */
+ Uint16 m_bytes;
+
+ /* Start of unsent data */
+ Uint16 m_start;
+
+ /* Data; real size is to the end of one page. */
+ unsigned char m_data[2];
+};
+
+/**
+ * a linked list with thr_send_page
+ */
+struct thr_send_buffer
+{
+ thr_send_page* m_first_page;
+ thr_send_page* m_last_page;
+};
+
+/**
+ * a ring buffer with linked list of thr_send_page
+ */
+struct thr_send_queue
+{
+ unsigned m_write_index;
+#if SIZEOF_CHARP == 8
+ unsigned m_unused;
+ thr_send_page* m_buffers[7];
+ static const unsigned SIZE = 7;
+#else
+ thr_send_page* m_buffers[15];
+ static const unsigned SIZE = 15;
+#endif
+};
+
struct thr_data
{
- thr_data() : m_jba_write_lock("jbalock") {}
+ thr_data() : m_jba_write_lock("jbalock"),
+ m_send_buffer_pool(0, THR_FREE_BUF_MAX) {}
thr_wait m_waiter;
unsigned m_thr_no;
@@ -508,16 +689,24 @@ struct thr_data
Uint32 m_prioa_size;
Uint32 m_priob_count;
Uint32 m_priob_size;
+
/* Array of node ids with pending remote send data. */
Uint8 m_pending_send_nodes[MAX_NTRANSPORTERS];
/* Number of node ids in m_pending_send_nodes. */
Uint32 m_pending_send_count;
+
/**
* Bitmap of pending node ids with send data.
* Used to quickly check if a node id is already in m_pending_send_nodes.
*/
Bitmask<(MAX_NTRANSPORTERS+31)/32> m_pending_send_mask;
+ /* pool for send buffers */
+ struct thread_local_pool<thr_send_page, thr_job_buffer> m_send_buffer_pool;
+
+ /* Send buffer for this thread, these are not touched by any other thread */
+ struct thr_send_buffer m_send_buffers[MAX_NTRANSPORTERS];
+
/* Block instances (main and worker) handled by this thread. */
/* Used for sendpacked (send-at-job-buffer-end). */
Uint32 m_instance_count;
@@ -526,245 +715,92 @@ struct thr_data
SectionSegmentPool::Cache m_sectionPoolCache;
};
-template<typename T>
-struct thr_safe_pool
+struct mt_send_handle : public TransporterSendBufferHandle
{
- thr_safe_pool() : m_lock("mempool"), m_free_list(0) {}
-
- thr_spin_lock m_lock;
- T* m_free_list;
- Ndbd_mem_manager *m_mm;
+ struct thr_data * m_selfptr;
+ mt_send_handle(thr_data* ptr) : m_selfptr(ptr) {}
+ virtual ~mt_send_handle() {}
+
+ virtual Uint32 *getWritePtr(NodeId node, Uint32 len, Uint32 prio, Uint32 max);
+ virtual Uint32 updateWritePtr(NodeId node, Uint32 lenBytes, Uint32 prio);
+ virtual bool forceSend(NodeId node);
+};
- T* seize() {
- T* ret = 0;
- lock(&m_lock);
- if (m_free_list)
- {
- ret = m_free_list;
- m_free_list = *reinterpret_cast<T**>(m_free_list);
- unlock(&m_lock);
- }
- else
- {
- Uint32 dummy;
- unlock(&m_lock);
- ret = reinterpret_cast<T*>
- (m_mm->alloc_page(RT_JOB_BUFFER, &dummy,
- Ndbd_mem_manager::NDB_ZONE_ANY));
- // ToDo: How to deal with failed allocation?!?
- // I think in this case we need to start grabbing buffers kept for signal
- // trace.
- }
- return ret;
- }
+struct trp_callback : public TransporterCallbackKernel
+{
+ trp_callback() {}
- void release(T* t){
- lock(&m_lock);
- T** nextptr = reinterpret_cast<T**>(t);
- * nextptr = m_free_list;
- m_free_list = t;
- unlock(&m_lock);
- }
+ /* Callback interface. */
+ int checkJobBuffer();
+ void reportSendLen(NodeId nodeId, Uint32 count, Uint64 bytes);
+ void lock_transporter(NodeId node);
+ void unlock_transporter(NodeId node);
+ Uint32 get_bytes_to_send_iovec(NodeId node, struct iovec *dst, Uint32 max);
+ Uint32 bytes_sent(NodeId node, Uint32 bytes);
+ bool has_data_to_send(NodeId node);
+ void reset_send_buffer(NodeId node);
};
-template<typename T, typename U>
-class thread_local_pool
+extern trp_callback g_trp_callback; // Forward declaration
+
+struct thr_repository
{
-public:
- thread_local_pool(thr_safe_pool<U> *global_pool, int max_free) :
- m_global_pool(global_pool), m_max_free(max_free),
- m_freelist(0), m_free(0)
- {
- }
+ thr_repository()
+ : m_receive_lock("recvlock"),
+ m_section_lock("sectionlock"),
+ m_mem_manager_lock("memmanagerlock") {}
- T *seize()
- {
- T *tmp = m_freelist;
- if (tmp)
- {
- m_freelist = tmp->m_next;
- assert (m_free > 0);
- m_free--;
- }
- else
- tmp = (T *)m_global_pool->seize();
- return tmp;
- }
+ unsigned m_thread_count;
+ struct thr_spin_lock m_receive_lock;
+ struct thr_spin_lock m_section_lock;
+ struct thr_spin_lock m_mem_manager_lock;
+ struct thr_data m_thread[MAX_THREADS];
+ struct thr_safe_pool<thr_job_buffer> m_free_list;
- void release(T *t)
- {
- int free = m_free;
- if (free < m_max_free)
- {
- m_free = free + 1;
- t->m_next = m_freelist;
- m_freelist = t;
- }
- else
- m_global_pool->release((U *)t);
- }
+ /**
+ * send buffer handling
+ */
- void releaseAll()
+ /* The buffers that are to be sent */
+ struct send_buffer
{
- Uint32 save = m_max_free;
- m_max_free = 0;
- while(m_free)
- {
- T* t = seize();
- release(t);
- }
- m_max_free = save;
- }
+ /**
+ * pending data
+ */
+ struct thr_send_buffer m_buffer;
-private:
- thr_safe_pool<U> *m_global_pool;
- int m_max_free;
- T *m_freelist;
- int m_free;
-};
-
-/**
- * Send buffer implementation, we have one of these for each thread.
- *
- * Enables lock-free prepareSend(), and per-transporter lock for doSend().
- */
-struct thr_send_buf : public TransporterSendBufferHandle
-{
- /* One page of send buffer data. */
- struct page
- {
- /* This is the number of words that will fit in one page of send buffer. */
- static const Uint32 PGSIZE = 32768;
- static Uint32 max_data_bytes()
- {
- return PGSIZE - offsetof(page, m_data);
- }
-
- /* Send buffer for one transporter is kept in a single-linked list. */
- page *m_next;
-#if defined VM_TRACE || defined ERROR_INSERT
- /* Prev page (only used when splitting signals) */
- page * m_prev;
-#endif
- /* Bytes of send data available in this page. */
- Uint32 m_bytes;
- /* Start of unsent data (next bytes_sent() will count from here). */
- Uint32 m_start;
- /* Offset from where to return data in next get_bytes_to_send_iovec(). */
- Uint32 m_current;
- /* Data; real size is to the end of one page. */
- unsigned char m_data[1];
- };
+ /**
+ * lock
+ */
+ struct thr_spin_lock m_send_lock;
- /* Linked list of pages for one thread/transporter pair. */
- struct send_buffer
- {
- /* First page, ie. page where next bytes_sent() will count from. */
- page *m_first_page;
- /* Last page, ie. page next getWritePtr() will use. */
- page *m_last_page;
- /* Page from which next get_bytes_to_send_iovec() will return data. */
- page *m_current_page;
- /* Temporary pointer stored in getWritePtr and read in updateWritePtr. */
- page *m_prev_page;
/**
- * Members to keep track of total buffer usage.
- *
- * Since we are non-locking, these will only be approximate, as there is no
- * defined temporal synchronization between readers and writers.
- *
- * We keep two separate counters, one updated only by the writer, and one
- * updated only by the reader, to avoid the need for locking or atomic
- * operations. An approximate count of bytes available is obtained from
- * (m_written_bytes - m_read_bytes), C unsigned arithmetics takes care
- * to handle overflow/wrapover correctly (for <2Gb send buffers at least).
+ * Flag used to coordinate sending to same remote node from different
+ * threads.
*
- * It is teoretically possible to get a <0 value (if m_read_bytes is
- * updated before m_written_bytes), so this needs to be handled.
+ * If two threads need to send to the same node at the same time, the
+ * second thread, rather than wait for the first to finish, will just
+ * set this flag, and the first thread will do an extra send when done
+ * with the first.
*/
- Uint32 m_written_bytes;
- Uint32 m_read_bytes;
-
- Uint32 used_bytes() const
- {
- Uint32 used = m_written_bytes - m_read_bytes;
- return (used >= (Uint32)0x80000000) ? 0 : used;
- }
+ Uint32 m_force_send;
- void init()
- {
- m_first_page = NULL;
- m_last_page = NULL;
- m_current_page = NULL;
- m_prev_page = NULL;
- m_written_bytes = 0;
- m_read_bytes = 0;
- }
- };
-
- thr_send_buf(struct trp_callback *trp_cb, Uint32 thread,
- thr_safe_pool<thr_job_buffer> *global_pool);
- bool initial_alloc(NodeId node);
-
- /* Callback interface. */
- Uint32 *getWritePtr(NodeId node, Uint32 lenBytes, Uint32 prio,
- Uint32 max_use);
- Uint32 updateWritePtr(NodeId node, Uint32 lenBytes, Uint32 prio);
- bool forceSend(NodeId node);
-
- const Uint32 m_self;
- send_buffer m_buffers[MAX_NTRANSPORTERS];
- thread_local_pool<page, thr_job_buffer> m_pool;
- const struct trp_callback *m_trp_callback;
-};
-
-struct trp_callback : public TransporterCallbackKernel
-{
- trp_callback();
-
- /* Callback interface. */
- int checkJobBuffer();
- void reportSendLen(NodeId nodeId, Uint32 count, Uint64 bytes);
- void lock_transporter(NodeId node);
- void unlock_transporter(NodeId node);
- int get_bytes_to_send_iovec(NodeId node, struct iovec *dst, Uint32 max);
- Uint32 bytes_sent(NodeId node, const struct iovec *src, Uint32 bytes);
- bool has_data_to_send(NodeId node);
- void reset_send_buffer(NodeId node);
-
- Uint32 total_bytes(NodeId node) const
- {
- Uint32 total = 0;
- for (Uint32 i = 0; i < num_threads; i++)
- total += m_thr_buffers[i]->m_buffers[node].used_bytes();
- return total;
- }
-
- // +1 to handle reset buffers from "other" thread
- thr_send_buf *m_thr_buffers[MAX_THREADS + 1];
-
- /**
- * During send, for each node this holds the id of the thread currently
- * doing send to that node.
- */
- Uint32 m_send_thr[MAX_NTRANSPORTERS];
-};
+ /**
+ * Which thread is currently holding the m_send_lock
+ */
+ Uint32 m_send_thread;
-extern trp_callback g_trp_callback; // Forward declaration
+ /**
+ * bytes pending for this node
+ */
+ Uint32 m_bytes;
-struct thr_repository
-{
- thr_repository()
- : m_receive_lock("recvlock"),
- m_section_lock("sectionlock"),
- m_mem_manager_lock("memmanagerlock") {}
+ /* read index(es) in thr_send_queue */
+ Uint32 m_read_index[MAX_THREADS];
+ } m_send_buffers[MAX_NTRANSPORTERS];
- unsigned m_thread_count;
- struct thr_spin_lock m_receive_lock;
- struct thr_spin_lock m_section_lock;
- struct thr_spin_lock m_mem_manager_lock;
- struct thr_data m_thread[MAX_THREADS];
- struct thr_safe_pool<thr_job_buffer> m_free_list;
+ /* The buffers published by threads */
+ thr_send_queue m_thread_send_buffers[MAX_NTRANSPORTERS][MAX_THREADS];
/*
* These are used to synchronize during crash / trace dumps.
@@ -774,21 +810,6 @@ struct thr_repository
pthread_mutex_t stop_for_crash_mutex;
pthread_cond_t stop_for_crash_cond;
Uint32 stopped_threads;
-
- /**
- * Send locks for the transporters, one per possible remote node.
- */
- thr_spin_lock m_send_locks[MAX_NTRANSPORTERS];
- /**
- * Flag used to coordinate sending to same remote node from different
- * threads.
- *
- * If two threads need to send to the same node at the same time, the
- * second thread, rather than wait for the first to finish, will just
- * set this flag, and the first thread will do an extra send when done
- * with the first.
- */
- Uint32 m_force_send[MAX_NTRANSPORTERS];
};
static
@@ -1037,6 +1058,128 @@ scan_time_queues(struct thr_data* selfpt
abort();
}
+static
+inline
+Uint32*
+get_free_slot(struct thr_repository* rep,
+ struct thr_data* selfptr,
+ Uint32* idxptr)
+{
+ struct thr_tq * tq = &selfptr->m_tq;
+ Uint32 idx = tq->m_next_free;
+retry:
+ Uint32 buf = idx >> 8;
+ Uint32 pos = idx & 0xFF;
+
+ if (idx != RNIL)
+ {
+ Uint32* page = * (tq->m_delayed_signals + buf);
+ Uint32* ptr = page + (32 * pos);
+ tq->m_next_free = * ptr;
+ * idxptr = idx;
+ return ptr;
+ }
+
+ Uint32 thr_no = selfptr->m_thr_no;
+ for (Uint32 i = 0; i<thr_tq::PAGES; i++)
+ {
+ if (tq->m_delayed_signals[i] == 0)
+ {
+ struct thr_job_buffer *jb = seize_buffer(rep, thr_no, false);
+ Uint32 * page = reinterpret_cast<Uint32*>(jb);
+ tq->m_delayed_signals[i] = page;
+
+ ndbout_c("saving %p at %p (%d)", page, tq->m_delayed_signals+i, i);
+
+ /**
+ * Init page
+ */
+ for (Uint32 j = 0; j<255; j ++)
+ {
+ page[j * 32] = (i << 8) + (j + 1);
+ }
+ page[255*32] = RNIL;
+ idx = (i << 8);
+ goto retry;
+ }
+ }
+ abort();
+}
+
+void
+senddelay(Uint32 thr_no, const SignalHeader* s, Uint32 delay)
+{
+ struct thr_repository* rep = &g_thr_repository;
+ struct thr_data * selfptr = rep->m_thread + thr_no;
+ unsigned siglen = (sizeof(*s) >> 2) + s->theLength + s->m_noOfSections;
+
+ Uint32 max;
+ Uint32 * cntptr;
+ Uint32 * queueptr;
+
+ Uint32 alarm = selfptr->m_tq.m_current_time + delay;
+ Uint32 nexttimer = selfptr->m_tq.m_next_timer;
+ if (delay < 100)
+ {
+ cntptr = selfptr->m_tq.m_cnt + 0;
+ queueptr = selfptr->m_tq.m_short_queue;
+ max = thr_tq::SQ_SIZE;
+ }
+ else
+ {
+ cntptr = selfptr->m_tq.m_cnt + 1;
+ queueptr = selfptr->m_tq.m_long_queue;
+ max = thr_tq::LQ_SIZE;
+ }
+
+ Uint32 idx;
+ Uint32* ptr = get_free_slot(rep, selfptr, &idx);
+ memcpy(ptr, s, 4*siglen);
+
+ if (0)
+ ndbout_c("now: %d alarm: %d send %s from %s to %s delay: %d idx: %x %p",
+ selfptr->m_tq.m_current_time,
+ alarm,
+ getSignalName(s->theVerId_signalNumber),
+ getBlockName(refToBlock(s->theSendersBlockRef)),
+ getBlockName(s->theReceiversBlockNumber),
+ delay,
+ idx, ptr);
+
+ Uint32 i;
+ Uint32 cnt = *cntptr;
+ Uint32 newentry = (idx << 16) | (alarm & 0xFFFF);
+
+ * cntptr = cnt + 1;
+ selfptr->m_tq.m_next_timer = alarm < nexttimer ? alarm : nexttimer;
+
+ if (cnt == 0)
+ {
+ queueptr[0] = newentry;
+ return;
+ }
+ else if (cnt < max)
+ {
+ for (i = 0; i<cnt; i++)
+ {
+ Uint32 save = queueptr[i];
+ if ((save & 0xFFFF) > alarm)
+ {
+ memmove(queueptr+i+1, queueptr+i, 4*(cnt - i));
+ queueptr[i] = newentry;
+ return;
+ }
+ }
+ assert(i == cnt);
+ queueptr[i] = newentry;
+ return;
+ }
+ else
+ {
+ abort();
+ }
+}
+
/*
* Flush the write state to the job queue, making any new signals available to
* receiving threads.
@@ -1132,7 +1275,7 @@ rand_yield(Uint32 limit, void* ptr0, voi
Uint32 sum = g_rand_yield;
for (Uint32 i = 0; i<sizeof(tmp); i++)
sum = 33 * sum + tmpptr[i];
-
+
if ((sum % 100) < limit)
{
g_rand_yield++;
@@ -1143,187 +1286,7 @@ rand_yield(Uint32 limit, void* ptr0, voi
static inline void rand_yield(Uint32 limit, void* ptr0, void * ptr1) {}
#endif
-thr_send_buf::thr_send_buf(struct trp_callback *trp_cb, Uint32 thread,
- thr_safe_pool<thr_job_buffer> *global_pool) :
- m_self(thread),
- m_pool(global_pool, THR_FREE_BUF_MAX),
- m_trp_callback(trp_cb)
-{
- for (int i = 0; i < MAX_NTRANSPORTERS; i++)
- m_buffers[i].init();
-}
-
-bool
-thr_send_buf::initial_alloc(NodeId node)
-{
- send_buffer *b = m_buffers + node;
-
- page *pg = m_pool.seize();
- if (pg == NULL)
- return false;
-
- pg->m_next = NULL;
-#if defined VM_TRACE || defined ERROR_INSERT
- pg->m_prev = NULL;
-#endif
- pg->m_bytes = 0;
- pg->m_start = 0;
- pg->m_current = 0;
- wmb(); // Commit page init before making visible
-
- /**
- * Due to no locking, we need to be very careful about initialisation here.
- *
- * Initialisation is done by the writer, so what we need to ensure is that
- * reader will not see an inconsistent state.
- *
- * Since reader is using m_current_page != NULL to mean the page is valid,
- * we set that last, with a store-store barrier.
- */
- b->m_first_page = pg;
- b->m_last_page = pg;
- b->m_prev_page = 0;
- b->m_written_bytes = 0;
- b->m_read_bytes = 0;
- wmb();
- b->m_current_page = pg;
-
- return true;
-}
-
-Uint32 *
-thr_send_buf::getWritePtr(NodeId node, Uint32 lenBytes, Uint32 prio,
- Uint32 max_use)
-{
- send_buffer *b = m_buffers + node;
- assert(lenBytes > 0);
-
- /**
- * Only allocate send buffer memory on first actual use.
- * Once allocated, at least one page stays, even if empty.
- */
- if (unlikely(b->m_first_page == NULL))
- if (!initial_alloc(node))
- return NULL;
-
- /* Check for common case, free space in existing buffer. */
- page *last_pg = b->m_last_page;
- assert(last_pg != NULL);
- assert(lenBytes < last_pg->max_data_bytes());
-
- if (last_pg->m_bytes + lenBytes <= last_pg->max_data_bytes())
- {
- return (Uint32 *)(last_pg->m_data + last_pg->m_bytes);
- }
-
- /**
- * Check for buffer limit exceeded.
- *
- * We do this check only when about to allocate a new page.
- * This may make us over-use slightly, but that is fine since the remaining
- * space can in any case not be used for anything else.
- */
- if (m_trp_callback->total_bytes(node) + lenBytes > max_use)
- return NULL;
-
- /* Need to allocate a new page. */
- page *new_pg = m_pool.seize();
- if (new_pg == NULL)
- return NULL;
-
-#if defined VM_TRACE || defined ERROR_INSERT
- new_pg->m_prev = 0;
-#endif
- new_pg->m_next = NULL;
- new_pg->m_bytes = 0;
- new_pg->m_start = 0;
- new_pg->m_current = 0;
-
- /* Remeber old last page temporarily until updateWritePtr(). */
- assert(b->m_prev_page == 0);
- if (last_pg->m_bytes < last_pg->max_data_bytes())
- {
- b->m_prev_page = last_pg;
-#if defined VM_TRACE || defined ERROR_INSERT
- new_pg->m_prev = last_pg;
-#endif
- }
- b->m_last_page = new_pg;
-
- /** Assigning m_next makes the new page available to readers, so need a
- * memory barrier here.
- */
- wmb();
- last_pg->m_next = new_pg;
-
- return (Uint32 *)(new_pg->m_data);
-}
-
-Uint32
-thr_send_buf::updateWritePtr(NodeId node, Uint32 lenBytes, Uint32 prio)
-{
- send_buffer *b = m_buffers + node;
- page *last_pg = b->m_last_page;
- assert(lenBytes > 0);
- assert(last_pg != NULL);
- assert(last_pg->m_bytes + lenBytes <= last_pg->max_data_bytes());
-
- b->m_written_bytes += lenBytes;
- Uint32 used = m_trp_callback->total_bytes(node);
-
- /* For the first signal in a buffer, split it and move it back so that
- * previous buffer is 100% utilised.
- *
- * This avoids buffer waste for big signals, and also simplifies reader.
- */
- if (last_pg->m_bytes == 0)
- {
- page *prev_pg = b->m_prev_page;
-#if defined VM_TRACE || defined ERROR_INSERT
- assert(last_pg->m_prev == 0 || last_pg->m_prev == prev_pg);
-#endif
- b->m_prev_page = 0;
- if (prev_pg != 0)
- {
- assert(prev_pg->m_bytes < prev_pg->max_data_bytes());
- Uint32 part = prev_pg->max_data_bytes() - prev_pg->m_bytes;
- assert(part < lenBytes);
- memcpy(prev_pg->m_data + prev_pg->m_bytes, last_pg->m_data, part);
- memmove(last_pg->m_data, last_pg->m_data + part, lenBytes - part);
- last_pg->m_bytes = lenBytes - part;
-
- rand_yield(1, prev_pg, last_pg); // BUG_39880
-
- /**
- * Memory barrier since this makes data available to reader.
- * and to serialize the two assignments
- */
- wmb();
- prev_pg->m_bytes = prev_pg->max_data_bytes();
- return used;
-
- /**
- * If at some later point we need to support messages bigger than the
- * page size, we could do so here similarly by copying from a separate
- * temporary big thread-local buffer returned from getWritePtr().
- */
- }
- }
-
- wmb();
- last_pg->m_bytes += lenBytes;
- return used;
-}
-
-trp_callback::trp_callback()
-{
- // number of threads not yet set so use MAX_THREADS
- for (Uint32 i = 0; i < NDB_ARRAY_SIZE(m_thr_buffers); i++)
- m_thr_buffers[i] = new thr_send_buf(this, i, &g_thr_repository.m_free_list);
- for (Uint32 i = 0; i < NDB_ARRAY_SIZE(m_send_thr); i++)
- m_send_thr[i] = ~(Uint32)0;
-}
void
trp_callback::reportSendLen(NodeId nodeId, Uint32 count, Uint64 bytes)
@@ -1340,7 +1303,7 @@ trp_callback::reportSendLen(NodeId nodeI
signal.theData[2] = (bytes/count);
signal.header.theVerId_signalNumber = GSN_EVENT_REP;
signal.header.theReceiversBlockNumber = CMVMI;
- sendlocal(m_send_thr[nodeId],
+ sendlocal(g_thr_repository.m_send_buffers[nodeId].m_send_thread,
&signalT.header, signalT.theData, NULL);
}
@@ -1368,7 +1331,7 @@ trp_callback::lock_transporter(NodeId no
* in any case disconnecting/connecting at this point in time, and sends are
* non-waiting (so we will not block sending on other transporters).
*/
- lock(&rep->m_send_locks[node]);
+ lock(&rep->m_send_buffers[node].m_send_lock);
lock(&rep->m_receive_lock);
}
@@ -1377,7 +1340,7 @@ trp_callback::unlock_transporter(NodeId
{
struct thr_repository* rep = &g_thr_repository;
unlock(&rep->m_receive_lock);
- unlock(&rep->m_send_locks[node]);
+ unlock(&rep->m_send_buffers[node].m_send_lock);
}
int
@@ -1403,228 +1366,341 @@ trp_callback::checkJobBuffer()
return 0;
}
-int
-trp_callback::get_bytes_to_send_iovec(NodeId node, struct iovec *dst,
- Uint32 max_iovecs)
+/**
+ * Link all send-buffer-pages into *one*
+ * single linked list of buffers
+ *
+ * TODO: This is not completly fair,
+ * it would be better to get one entry from each thr_send_queue
+ * per thread instead (until empty)
+ */
+static
+Uint32
+link_thread_send_buffers(thr_repository::send_buffer * sb, Uint32 node)
{
- Uint32 iovecs = 0;
-
- if (max_iovecs == 0)
- return 0;
-
- for (Uint32 thr = 0; thr < num_threads && iovecs < max_iovecs; thr++)
+ Uint32 ri[MAX_THREADS];
+ Uint32 wi[MAX_THREADS];
+ thr_send_queue * src = g_thr_repository.m_thread_send_buffers[node];
+ for (unsigned thr = 0; thr < num_threads; thr++)
{
- thr_send_buf::send_buffer *b = m_thr_buffers[thr]->m_buffers + node;
- thr_send_buf::page *pg = b->m_current_page;
+ ri[thr] = sb->m_read_index[thr];
+ wi[thr] = src[thr].m_write_index;
+ }
- /* Handle not yet allocated buffer. */
- if (unlikely(pg == NULL))
- continue;
- rmb();
+ Uint32 sentinel[thr_send_page::PGSIZE - thr_send_page::max_bytes()];
+ thr_send_page* sentinel_page = (thr_send_page*)sentinel;
+ sentinel_page->m_next = 0;
- bool current_must_be_zero = false;
- while (iovecs < max_iovecs)
- {
- Uint32 bytes = pg->m_bytes;
- if (current_must_be_zero)
- assert(pg->m_current == 0);
+ struct thr_send_buffer tmp;
+ tmp.m_first_page = sentinel_page;
+ tmp.m_last_page = sentinel_page;
- /* Make sure we see all updates before seen m_bytes value. */
+ Uint32 bytes = 0;
+ for (unsigned thr = 0; thr < num_threads; thr++, src++)
+ {
+ Uint32 r = ri[thr];
+ Uint32 w = wi[thr];
+ if (r != w)
+ {
rmb();
- if (bytes > pg->m_current)
+ while (r != w)
{
- dst[iovecs].iov_base = pg->m_data + pg->m_current;
- dst[iovecs].iov_len = bytes - pg->m_current;
- iovecs++;
- pg->m_current = bytes;
+ thr_send_page * p = src->m_buffers[r];
+ assert(p->m_start == 0);
+ bytes += p->m_bytes;
+ tmp.m_last_page->m_next = p;
+ while (p->m_next != 0)
+ {
+ p = p->m_next;
+ assert(p->m_start == 0);
+ bytes += p->m_bytes;
+ }
+ tmp.m_last_page = p;
+ assert(tmp.m_last_page != 0);
+ r = (r + 1) % thr_send_queue::SIZE;
}
- if (bytes < pg->max_data_bytes())
- break; // More data will arrive later
+ sb->m_read_index[thr] = r;
+ }
+ }
- pg = pg->m_next;
- if (pg == NULL)
- break;
- current_must_be_zero = true;
- b->m_current_page = pg;
+ if (bytes)
+ {
+ if (sb->m_bytes)
+ {
+ assert(sb->m_buffer.m_first_page != 0);
+ assert(sb->m_buffer.m_last_page != 0);
+ sb->m_buffer.m_last_page->m_next = tmp.m_first_page->m_next;
+ sb->m_buffer.m_last_page = tmp.m_last_page;
+ }
+ else
+ {
+ assert(sb->m_buffer.m_first_page == 0);
+ assert(sb->m_buffer.m_last_page == 0);
+ sb->m_buffer.m_first_page = tmp.m_first_page->m_next;
+ sb->m_buffer.m_last_page = tmp.m_last_page;
}
+ sb->m_bytes += bytes;
}
- return iovecs;
+ return sb->m_bytes;
}
Uint32
-trp_callback::bytes_sent(NodeId node, const struct iovec *src, Uint32 bytes)
+trp_callback::get_bytes_to_send_iovec(NodeId node,
+ struct iovec *dst, Uint32 max)
{
- Uint32 curr_thr = 0;
+ thr_repository::send_buffer * sb = g_thr_repository.m_send_buffers + node;
- while (bytes > 0)
- {
- const struct iovec *iov = src++;
+ Uint32 bytes = link_thread_send_buffers(sb, node);
+ if (max == 0 || bytes == 0)
+ return 0;
- /**
- * This piece of data could be from any thread, so need to search for
- * which it is.
- *
- * Since data is sent in the same order we returned it from
- * get_bytes_to_send_iovec(), we are likely to find the right one very
- * quickly by searching in the same order as used in the loop there.
- */
-#ifdef VM_TRACE
- Uint32 start_thr = curr_thr;
-#endif
+ /**
+ * Process linked-list and put into iovecs
+ * TODO: Here we would also pack stuff to get better utilization
+ */
+ Uint32 tot = 0;
+ Uint32 pos = 0;
+ thr_send_page * p = sb->m_buffer.m_first_page;
+ do {
+ dst[pos].iov_len = p->m_bytes;
+ dst[pos].iov_base = p->m_data + p->m_start;
+ assert(p->m_start + p->m_bytes <= p->max_bytes());
+ tot += p->m_bytes;
+ pos++;
+ max--;
+ p = p->m_next;
+ } while (max && p != 0);
- thr_send_buf::send_buffer *b;
- thr_send_buf::page *pg;
- for (;;)
- {
- b = m_thr_buffers[curr_thr]->m_buffers + node;
- if (b->m_current_page != NULL)
- {
- rmb();
+ return pos;
+}
- pg = b->m_first_page;
+static
+void
+release_list(thread_local_pool<thr_send_page, thr_job_buffer>* pool,
+ thr_send_page* head, thr_send_page * tail)
+{
+ while (head != tail)
+ {
+ thr_send_page * tmp = head;
+ head = head->m_next;
+ pool->release(tmp);
+ }
+ pool->release(tail);
+}
- /* Drop the first page if it is completely empty and not last. */
- if (pg->m_start == pg->max_data_bytes())
- {
- thr_send_buf::page *next_pg = pg->m_next;
- if (next_pg != NULL)
- {
- b->m_first_page = next_pg;
- if (pg == b->m_current_page)
- b->m_current_page = next_pg;
- m_thr_buffers[m_send_thr[node]]->m_pool.release(pg);
- pg = next_pg;
- }
- }
- if (pg->m_data + pg->m_start == iov->iov_base)
- break; // Found it
- }
+static
+Uint32
+bytes_sent(thread_local_pool<thr_send_page, thr_job_buffer>* pool,
+ thr_repository::send_buffer* sb, NodeId node, Uint32 bytes)
+{
+ assert(bytes);
- curr_thr = (curr_thr + 1) % num_threads;
-#ifdef VM_TRACE
- assert(curr_thr != start_thr); // Sent data was not in buffer
-#endif
- }
+ Uint32 remain = bytes;
+ thr_send_page * prev = 0;
+ thr_send_page * curr = sb->m_buffer.m_first_page;
- assert(pg->m_start + iov->iov_len <= pg->m_current);
- Uint32 chunk = iov->iov_len;
- if (chunk > bytes)
- chunk = bytes; // last chunk sent partially
- bytes -= chunk;
- pg->m_start += chunk;
- b->m_read_bytes += chunk;
- if (pg->m_start == pg->max_data_bytes() && pg->m_next != NULL)
- {
- /* All done with this page, de-allocate. */
- b->m_first_page = pg->m_next;
- if (b->m_current_page == pg)
- {
- assert(pg->m_current == pg->max_data_bytes());
- b->m_current_page = pg->m_next;
- }
- /* Release to send thread thread pool, to avoid need for locks. */
- m_thr_buffers[m_send_thr[node]]->m_pool.release(pg);
+ assert(sb->m_bytes >= bytes);
+ while (remain && remain >= curr->m_bytes)
+ {
+ remain -= curr->m_bytes;
+ prev = curr;
+ curr = curr->m_next;
+ }
+
+ Uint32 total_bytes = sb->m_bytes;
+ if (total_bytes == bytes)
+ {
+ /**
+ * Every thing was released
+ */
+ release_list(pool, sb->m_buffer.m_first_page, sb->m_buffer.m_last_page);
+ sb->m_buffer.m_first_page = 0;
+ sb->m_buffer.m_last_page = 0;
+ sb->m_bytes = 0;
+ return 0;
+ }
+ else if (remain)
+ {
+ /**
+ * Half a page was released
+ */
+ curr->m_start += remain;
+ assert(curr->m_bytes > remain);
+ curr->m_bytes -= remain;
+ if (prev)
+ {
+ release_list(pool, sb->m_buffer.m_first_page, prev);
+ }
+ }
+ else
+ {
+ /**
+ * X full page(s) was released
+ */
+ if (prev)
+ {
+ release_list(pool, sb->m_buffer.m_first_page, prev);
+ }
+ else
+ {
+ pool->release(sb->m_buffer.m_first_page);
}
}
- return total_bytes(node);
+ sb->m_buffer.m_first_page = curr;
+ assert(sb->m_bytes > bytes);
+ sb->m_bytes -= bytes;
+ return sb->m_bytes;
+}
+
+Uint32
+trp_callback::bytes_sent(NodeId node, Uint32 bytes)
+{
+ thr_repository::send_buffer * sb = g_thr_repository.m_send_buffers+node;
+ Uint32 thr_no = sb->m_send_thread;
+ assert(thr_no != NO_SEND_THREAD);
+ return ::bytes_sent(&g_thr_repository.m_thread[thr_no].m_send_buffer_pool,
+ sb, node, bytes);
}
bool
trp_callback::has_data_to_send(NodeId node)
{
- for (Uint32 thr = 0; thr < num_threads; thr++)
- {
- thr_send_buf::send_buffer *b = m_thr_buffers[thr]->m_buffers + node;
- thr_send_buf::page *pg = b->m_current_page;
-
- /* Handle not yet allocated buffer. */
- if (unlikely(pg == NULL))
- continue;
- rmb();
-
- Uint32 bytes = pg->m_bytes;
- if (bytes > pg->m_current)
- return true;
- if (bytes == pg->max_data_bytes())
- {
- thr_send_buf::page *next_pg = pg->m_next;
- rmb();
- if (next_pg != NULL && next_pg->m_bytes > next_pg->m_current)
- return true;
- }
- }
- return false;
+ return true;
}
-/**
- * Clear the send buffers.
- *
- * Works by consuming all data with get_bytes_to_send_iovec() + bytes_sent()
- * (but without sending anything of course). This makes us thread safe, as
- * long as we take the send lock.
- */
void
trp_callback::reset_send_buffer(NodeId node)
{
- struct thr_repository* rep = &g_thr_repository;
+ thr_repository::send_buffer * sb = g_thr_repository.m_send_buffers+node;
struct iovec v[32];
- lock(&rep->m_send_locks[node]);
- void *value= NdbThread_GetTlsKey(NDB_THREAD_TLS_THREAD);
- const thr_data *selfptr = reinterpret_cast<const thr_data *>(value);
- if (selfptr)
- {
- m_send_thr[node] = selfptr->m_thr_no;
- }
- else
- {
- /**
- * This was done from a "non-mt" thread, use
- * faked entry
- */
- m_send_thr[node] = NDB_ARRAY_SIZE(m_thr_buffers) - 1;
- }
+ thread_local_pool<thr_send_page, thr_job_buffer>
+ pool(&g_thr_repository.m_free_list, Uint32(~0));
+
+ lock(&sb->m_send_lock);
for (;;)
{
- int count = get_bytes_to_send_iovec(node, v, sizeof(v)/sizeof(v[0]));
+ Uint32 count = get_bytes_to_send_iovec(node, v, sizeof(v)/sizeof(v[0]));
if (count == 0)
break;
int bytes = 0;
- for (int i = 0; i < count; i++)
+ for (Uint32 i = 0; i < count; i++)
bytes += v[i].iov_len;
- bytes_sent(node, v, bytes);
+ ::bytes_sent(&pool, sb, node, bytes);
+ }
+
+ unlock(&sb->m_send_lock);
+
+ pool.releaseAll();
+}
+
+static inline
+void
+register_pending_send(thr_data *selfptr, Uint32 nodeId)
+{
+ /* Mark that this node has pending send data. */
+ if (!selfptr->m_pending_send_mask.get(nodeId))
+ {
+ selfptr->m_pending_send_mask.set(nodeId, 1);
+ Uint32 i = selfptr->m_pending_send_count;
+ selfptr->m_pending_send_nodes[i] = nodeId;
+ selfptr->m_pending_send_count = i + 1;
+ }
+}
+
+/**
+ * publish thread-locally prepared send-buffer
+ */
+static
+void
+flush_send_buffer(thr_data* selfptr, Uint32 node)
+{
+ Uint32 thr_no = selfptr->m_thr_no;
+ thr_send_buffer * src = selfptr->m_send_buffers + node;
+ thr_repository* rep = &g_thr_repository;
+
+ if (src->m_first_page == 0)
+ {
+ return;
}
+ assert(src->m_last_page != 0);
- if (selfptr == 0)
+ thr_send_queue * dst = rep->m_thread_send_buffers[node]+thr_no;
+ thr_repository::send_buffer* sb = rep->m_send_buffers+node;
+
+ Uint32 wi = dst->m_write_index;
+ Uint32 next = (wi + 1) % thr_send_queue::SIZE;
+ Uint32 ri = sb->m_read_index[thr_no];
+
+ if (unlikely(next == ri))
{
- /**
- * This was done from a "non-mt" thread, use
- * release buffer directly
- */
- m_thr_buffers[m_send_thr[node]]->m_pool.releaseAll();
+ lock(&sb->m_send_lock);
+ link_thread_send_buffers(sb, node);
+ unlock(&sb->m_send_lock);
}
-
- g_trp_callback.m_send_thr[node] = ~(Uint32)0;
- unlock(&rep->m_send_locks[node]);
+
+ dst->m_buffers[wi] = src->m_first_page;
+ wmb();
+ dst->m_write_index = next;
+
+ src->m_first_page = 0;
+ src->m_last_page = 0;
}
-static inline
+/**
+ * This is used in case send buffer gets full, to force an emergency send,
+ * hopefully freeing up some buffer space for the next signal.
+ */
+bool
+mt_send_handle::forceSend(NodeId nodeId)
+{
+ struct thr_repository *rep = &g_thr_repository;
+ struct thr_data *selfptr = m_selfptr;
+ struct thr_repository::send_buffer * sb = rep->m_send_buffers + nodeId;
+
+ do
+ {
+ sb->m_force_send = 0;
+ lock(&sb->m_send_lock);
+ sb->m_send_thread = selfptr->m_thr_no;
+ globalTransporterRegistry.performSend(nodeId);
+ sb->m_send_thread = NO_SEND_THREAD;
+ unlock(&sb->m_send_lock);
+ } while (sb->m_force_send);
+
+ return true;
+}
+
+/**
+ * try sending data
+ */
+static
void
-register_pending_send(thr_data *selfptr, Uint32 nodeId)
+try_send(thr_data * selfptr, Uint32 node)
{
- /* Mark that this node has pending send data. */
- if (!selfptr->m_pending_send_mask.get(nodeId))
+ struct thr_repository *rep = &g_thr_repository;
+ struct thr_repository::send_buffer * sb = rep->m_send_buffers + node;
+
+ do
{
- selfptr->m_pending_send_mask.set(nodeId, 1);
- Uint32 i = selfptr->m_pending_send_count;
- selfptr->m_pending_send_nodes[i] = nodeId;
- selfptr->m_pending_send_count = i + 1;
- }
+ if (trylock(&sb->m_send_lock) != 0)
+ {
+ return;
+ }
+
+ sb->m_force_send = 0;
+ mb();
+
+ sb->m_send_thread = selfptr->m_thr_no;
+ globalTransporterRegistry.performSend(node);
+ sb->m_send_thread = NO_SEND_THREAD;
+ unlock(&sb->m_send_lock);
+ } while (sb->m_force_send);
}
/**
@@ -1642,25 +1718,33 @@ register_pending_send(thr_data *selfptr,
* other thread (but we will never loose signals due to this).
*/
static
-void
-do_send(struct thr_repository* rep, struct thr_data* selfptr,
+Uint32
+do_send(struct thr_data* selfptr,
Uint32 *watchDogCounter, bool must_send)
{
Uint32 i;
Uint32 count = selfptr->m_pending_send_count;
Uint8 *nodes = selfptr->m_pending_send_nodes;
+ struct thr_repository* rep = &g_thr_repository;
if (count == 0)
- return;
+ {
+ return 0; // send-buffers empty
+ }
+
/* Clear the pending list. */
selfptr->m_pending_send_mask.clear();
selfptr->m_pending_send_count = 0;
for (i = 0; i < count; i++)
{
- NodeId nodeId = nodes[i];
+ Uint32 node = nodes[i];
*watchDogCounter = 6;
+ flush_send_buffer(selfptr, node);
+
+ thr_repository::send_buffer * sb = rep->m_send_buffers + node;
+
/**
* If we must send now, set the force_send flag.
*
@@ -1672,10 +1756,13 @@ do_send(struct thr_repository* rep, stru
* flag update is flushed to the other thread.
*/
if (must_send)
- rep->m_force_send[nodeId] = 1;
+ {
+ sb->m_force_send = 1;
+ }
+
do
{
- if (trylock(&rep->m_send_locks[nodeId]) != 0)
+ if (trylock(&sb->m_send_lock) != 0)
{
if (!must_send)
{
@@ -1685,7 +1772,7 @@ do_send(struct thr_repository* rep, stru
* As we only add from the start of an empty list, we are safe from
* overwriting the list while we are iterating over it.
*/
- register_pending_send(selfptr, nodeId);
+ register_pending_send(selfptr, node);
}
else
{
@@ -1704,58 +1791,61 @@ do_send(struct thr_repository* rep, stru
* will (thanks to mb()) be able to see and send all of the data already
* in the first send iteration.
*/
- rep->m_force_send[nodeId] = 0;
+ sb->m_force_send = 0;
mb();
/**
* Set m_send_thr so that our transporter callback can know which thread
* holds the send lock for this remote node.
*/
- g_trp_callback.m_send_thr[nodeId] = selfptr->m_thr_no;
- globalTransporterRegistry.performSend(nodeId);
- g_trp_callback.m_send_thr[nodeId] = ~(Uint32)0;
- unlock(&rep->m_send_locks[nodeId]);
- } while (rep->m_force_send[nodeId]);
+ sb->m_send_thread = selfptr->m_thr_no;
+ int res = globalTransporterRegistry.performSend(node);
+ sb->m_send_thread = NO_SEND_THREAD;
+ unlock(&sb->m_send_lock);
+ if (res)
+ {
+ register_pending_send(selfptr, node);
+ }
+ } while (sb->m_force_send);
}
+
+ return selfptr->m_pending_send_count;
}
-/**
- * This is used in case send buffer gets full, to force an emergency send,
- * hopefully freeing up some buffer space for the next signal.
- */
-bool
-thr_send_buf::forceSend(NodeId nodeId)
+Uint32 *
+mt_send_handle::getWritePtr(NodeId node, Uint32 len, Uint32 prio, Uint32 max)
{
- struct thr_repository *rep = &g_thr_repository;
- struct thr_data *selfptr = rep->m_thread + m_self;
-
- do
+ struct thr_send_buffer * b = m_selfptr->m_send_buffers+node;
+ thr_send_page * p = b->m_last_page;
+ if ((p != 0) && (p->m_bytes + p->m_start + len <=
thr_send_page::max_bytes()))
{
- rep->m_force_send[nodeId] = 0;
- lock(&rep->m_send_locks[nodeId]);
- g_trp_callback.m_send_thr[nodeId] = selfptr->m_thr_no;
- globalTransporterRegistry.performSend(nodeId);
- g_trp_callback.m_send_thr[nodeId] = ~(Uint32)0;
- unlock(&rep->m_send_locks[nodeId]);
- } while (rep->m_force_send[nodeId]);
+ return (Uint32*)(p->m_data + p->m_start + p->m_bytes);
+ }
+ else if (p != 0)
+ {
+ // TODO: maybe dont always flush on page-boundary ???
+ flush_send_buffer(m_selfptr, node);
+ try_send(m_selfptr, node);
+ }
- return true;
+ if ((p = m_selfptr->m_send_buffer_pool.seize()) != 0)
+ {
+ p->m_bytes = 0;
+ p->m_start = 0;
+ p->m_next = 0;
+ b->m_first_page = b->m_last_page = p;
+ return (Uint32*)p->m_data;
+ }
+ return 0;
}
-static void
-sendpacked(struct thr_data* thr_ptr, Signal* signal)
+Uint32
+mt_send_handle::updateWritePtr(NodeId node, Uint32 lenBytes, Uint32 prio)
{
- Uint32 i;
- for (i = 0; i < thr_ptr->m_instance_count; i++) {
- BlockReference block = thr_ptr->m_instance_list[i];
- Uint32 main = blockToMain(block);
- Uint32 instance = blockToInstance(block);
- SimulatedBlock* b = globalData.getBlock(main, instance);
- // wl4391_todo remove useless assert
- assert(b != 0 && b->getThreadId() == thr_ptr->m_thr_no);
- /* b->send_at_job_buffer_end(); */
- b->executeFunction(GSN_SEND_PACKED, signal);
- }
+ struct thr_send_buffer * b = m_selfptr->m_send_buffers+node;
+ thr_send_page * p = b->m_last_page;
+ p->m_bytes += lenBytes;
+ return p->m_bytes;
}
/*
@@ -2314,7 +2404,7 @@ mt_receiver_thread_main(void *thr_arg)
flush_jbb_write_state(selfptr);
}
- do_send(rep, selfptr, &watchDogCounter, TRUE);
+ do_send(selfptr, &watchDogCounter, TRUE);
watchDogCounter = 7;
@@ -2336,6 +2426,23 @@ mt_receiver_thread_main(void *thr_arg)
return NULL; // Return value not currently used
}
+static
+void
+sendpacked(struct thr_data* thr_ptr, Signal* signal)
+{
+ Uint32 i;
+ for (i = 0; i < thr_ptr->m_instance_count; i++) {
+ BlockReference block = thr_ptr->m_instance_list[i];
+ Uint32 main = blockToMain(block);
+ Uint32 instance = blockToInstance(block);
+ SimulatedBlock* b = globalData.getBlock(main, instance);
+ // wl4391_todo remove useless assert
+ assert(b != 0 && b->getThreadId() == thr_ptr->m_thr_no);
+ /* b->send_at_job_buffer_end(); */
+ b->executeFunction(GSN_SEND_PACKED, signal);
+ }
+}
+
extern "C"
void *
mt_job_thread_main(void *thr_arg)
@@ -2347,7 +2454,6 @@ mt_job_thread_main(void *thr_arg)
nowait.tv_nsec = 10 * 1000000;
Uint32 thrSignalId = 0;
- struct thr_repository* rep = &g_thr_repository;
struct thr_data* selfptr = (struct thr_data *)thr_arg;
init_thread(selfptr);
Uint32& watchDogCounter = selfptr->m_watchdog_counter;
@@ -2386,14 +2492,12 @@ mt_job_thread_main(void *thr_arg)
if (sum == 0)
{
/* About to sleep, _must_ send now. */
- do_send(rep, selfptr, &watchDogCounter, TRUE);
- send_sum = 0;
+ sum = send_sum = do_send(selfptr, &watchDogCounter, TRUE);
}
else if (send_sum > MAX_SIGNALS_BEFORE_SEND)
{
/* Try to send, but skip for now in case of lock contention. */
- do_send(rep, selfptr, &watchDogCounter, FALSE);
- send_sum = 0;
+ send_sum = do_send(selfptr, &watchDogCounter, FALSE);
}
}
@@ -2502,9 +2606,10 @@ mt_send_remote(Uint32 self, const Signal
thr_data *selfptr = rep->m_thread + self;
SendStatus ss;
+ mt_send_handle handle(selfptr);
register_pending_send(selfptr, nodeId);
/* prepareSend() is lock-free, as we have per-thread send buffers. */
- ss = globalTransporterRegistry.prepareSend(g_trp_callback.m_thr_buffers[self],
+ ss = globalTransporterRegistry.prepareSend(&handle,
sh, prio, data, nodeId, ptr);
return ss;
}
@@ -2519,8 +2624,9 @@ mt_send_remote(Uint32 self, const Signal
thr_data *selfptr = rep->m_thread + self;
SendStatus ss;
+ mt_send_handle handle(selfptr);
register_pending_send(selfptr, nodeId);
- ss = globalTransporterRegistry.prepareSend(g_trp_callback.m_thr_buffers[self],
+ ss = globalTransporterRegistry.prepareSend(&handle,
sh, prio, data, nodeId,
*thePool, ptr);
return ss;
@@ -2597,128 +2703,9 @@ sendprioa_STOP_FOR_CRASH(Uint32 dst)
unlock(&dstptr->m_jba_write_lock);
}
-static
-inline
-Uint32*
-get_free_slot(struct thr_repository* rep,
- struct thr_data* selfptr,
- Uint32* idxptr)
-{
- struct thr_tq * tq = &selfptr->m_tq;
- Uint32 idx = tq->m_next_free;
-retry:
- Uint32 buf = idx >> 8;
- Uint32 pos = idx & 0xFF;
-
- if (idx != RNIL)
- {
- Uint32* page = * (tq->m_delayed_signals + buf);
- Uint32* ptr = page + (32 * pos);
- tq->m_next_free = * ptr;
- * idxptr = idx;
- return ptr;
- }
-
- Uint32 thr_no = selfptr->m_thr_no;
- for (Uint32 i = 0; i<thr_tq::PAGES; i++)
- {
- if (tq->m_delayed_signals[i] == 0)
- {
- struct thr_job_buffer *jb = seize_buffer(rep, thr_no, false);
- Uint32 * page = reinterpret_cast<Uint32*>(jb);
- tq->m_delayed_signals[i] = page;
-
- ndbout_c("saving %p at %p (%d)", page, tq->m_delayed_signals+i, i);
-
- /**
- * Init page
- */
- for (Uint32 j = 0; j<255; j ++)
- {
- page[j * 32] = (i << 8) + (j + 1);
- }
- page[255*32] = RNIL;
- idx = (i << 8);
- goto retry;
- }
- }
- abort();
-}
-
-void
-senddelay(Uint32 thr_no, const SignalHeader* s, Uint32 delay)
-{
- struct thr_repository* rep = &g_thr_repository;
- struct thr_data * selfptr = rep->m_thread + thr_no;
- unsigned siglen = (sizeof(*s) >> 2) + s->theLength + s->m_noOfSections;
-
- Uint32 max;
- Uint32 * cntptr;
- Uint32 * queueptr;
-
- Uint32 alarm = selfptr->m_tq.m_current_time + delay;
- Uint32 nexttimer = selfptr->m_tq.m_next_timer;
- if (delay < 100)
- {
- cntptr = selfptr->m_tq.m_cnt + 0;
- queueptr = selfptr->m_tq.m_short_queue;
- max = thr_tq::SQ_SIZE;
- }
- else
- {
- cntptr = selfptr->m_tq.m_cnt + 1;
- queueptr = selfptr->m_tq.m_long_queue;
- max = thr_tq::LQ_SIZE;
- }
-
- Uint32 idx;
- Uint32* ptr = get_free_slot(rep, selfptr, &idx);
- memcpy(ptr, s, 4*siglen);
-
- if (0)
- ndbout_c("now: %d alarm: %d send %s from %s to %s delay: %d idx: %x %p",
- selfptr->m_tq.m_current_time,
- alarm,
- getSignalName(s->theVerId_signalNumber),
- getBlockName(refToBlock(s->theSendersBlockRef)),
- getBlockName(s->theReceiversBlockNumber),
- delay,
- idx, ptr);
-
- Uint32 i;
- Uint32 cnt = *cntptr;
- Uint32 newentry = (idx << 16) | (alarm & 0xFFFF);
-
- * cntptr = cnt + 1;
- selfptr->m_tq.m_next_timer = alarm < nexttimer ? alarm : nexttimer;
-
- if (cnt == 0)
- {
- queueptr[0] = newentry;
- return;
- }
- else if (cnt < max)
- {
- for (i = 0; i<cnt; i++)
- {
- Uint32 save = queueptr[i];
- if ((save & 0xFFFF) > alarm)
- {
- memmove(queueptr+i+1, queueptr+i, 4*(cnt - i));
- queueptr[i] = newentry;
- return;
- }
- }
- assert(i == cnt);
- queueptr[i] = newentry;
- return;
- }
- else
- {
- abort();
- }
-}
-
+/**
+ * init functions
+ */
static
void
queue_init(struct thr_tq* tq)
@@ -2751,6 +2738,7 @@ thr_init(struct thr_repository* rep, str
selfptr->m_jba_read_state.m_write_index = 0;
selfptr->m_jba_read_state.m_write_pos = 0;
selfptr->m_next_buffer = seize_buffer(rep, thr_no, false);
+ selfptr->m_send_buffer_pool.set_pool(&rep->m_free_list);
for (i = 0; i<cnt; i++)
{
@@ -2778,6 +2766,8 @@ thr_init(struct thr_repository* rep, str
selfptr->m_instance_count = 0;
for (i = 0; i < MAX_INSTANCES_PER_THREAD; i++)
selfptr->m_instance_list[i] = 0;
+
+ bzero(&selfptr->m_send_buffers, sizeof(selfptr->m_send_buffers));
}
/* Have to do this after init of all m_in_queues is done. */
@@ -2798,6 +2788,20 @@ thr_init2(struct thr_repository* rep, st
static
void
+send_buffer_init(Uint32 node, thr_repository::send_buffer * sb)
+{
+ char buf[100];
+ snprintf(buf, sizeof(buf), "send lock node %d", node);
+ register_lock(&sb->m_send_lock, buf);
+ sb->m_force_send = 0;
+ sb->m_send_thread = NO_SEND_THREAD;
+ bzero(&sb->m_buffer, sizeof(sb->m_buffer));
+ sb->m_bytes = 0;
+ bzero(sb->m_read_index, sizeof(sb->m_read_index));
+}
+
+static
+void
rep_init(struct thr_repository* rep, unsigned int cnt, Ndbd_mem_manager *mm)
{
rep->m_free_list.m_mm = mm;
@@ -2818,11 +2822,10 @@ rep_init(struct thr_repository* rep, uns
for (int i = 0 ; i < MAX_NTRANSPORTERS; i++)
{
- char buf[100];
- snprintf(buf, sizeof(buf), "send lock node %d", i);
- rep->m_send_locks[i].m_name = strdup(buf);
- rep->m_force_send[i] = 0;
+ send_buffer_init(i, rep->m_send_buffers+i);
}
+
+ bzero(rep->m_thread_send_buffers, sizeof(rep->m_thread_send_buffers));
}
@@ -3291,6 +3294,53 @@ mt_mem_manager_unlock()
unlock(&(g_thr_repository.m_mem_manager_lock));
}
+Vector<mt_lock_stat> g_locks;
+template class Vector<mt_lock_stat>;
+
+static
+void
+register_lock(const void * ptr, const char * name)
+{
+ if (name == 0)
+ return;
+
+ mt_lock_stat* arr = g_locks.getBase();
+ for (size_t i = 0; i<g_locks.size(); i++)
+ {
+ if (arr[i].m_ptr == ptr)
+ {
+ if (arr[i].m_name)
+ {
+ free(arr[i].m_name);
+ }
+ arr[i].m_name = strdup(name);
+ return;
+ }
+ }
+
+ mt_lock_stat ln;
+ ln.m_ptr = ptr;
+ ln.m_name = strdup(name);
+ ln.m_contended_count = 0;
+ ln.m_spin_count = 0;
+ g_locks.push_back(ln);
+}
+
+static
+mt_lock_stat *
+lookup_lock(const void * ptr)
+{
+ mt_lock_stat* arr = g_locks.getBase();
+ for (size_t i = 0; i<g_locks.size(); i++)
+ {
+ if (arr[i].m_ptr == ptr)
+ return arr + i;
+ }
+
+ return 0;
+}
+
+
/**
* Global data
*/
=== modified file 'storage/ndb/src/ndbapi/TransporterFacade.hpp'
--- a/storage/ndb/src/ndbapi/TransporterFacade.hpp 2008-10-21 12:41:59 +0000
+++ b/storage/ndb/src/ndbapi/TransporterFacade.hpp 2008-10-27 17:56:57 +0000
@@ -168,13 +168,13 @@ public:
void reportError(NodeId nodeId, TransporterError errorCode,
const char *info = 0);
void transporter_recv_from(NodeId node);
- int get_bytes_to_send_iovec(NodeId node, struct iovec *dst, Uint32 max)
+ Uint32 get_bytes_to_send_iovec(NodeId node, struct iovec *dst, Uint32 max)
{
return theTransporterRegistry->get_bytes_to_send_iovec(node, dst, max);
}
- Uint32 bytes_sent(NodeId node, const struct iovec *src, Uint32 bytes)
+ Uint32 bytes_sent(NodeId node, Uint32 bytes)
{
- return theTransporterRegistry->bytes_sent(node, src, bytes);
+ return theTransporterRegistry->bytes_sent(node, bytes);
}
bool has_data_to_send(NodeId node)
{
| Thread |
|---|
| • bzr push into mysql-5.1 branch (jonas:3019 to 3020) | Jonas Oreland | 27 Oct |