List:Commits« Previous MessageNext Message »
From:Jonas Oreland Date:October 27 2008 6:57pm
Subject:bzr push into mysql-5.1 branch (jonas:3019 to 3020)
View as plain text  
 3020 Jonas Oreland	2008-10-27
      ndb(mt)d - rework unified sendbuffer handling - to remove spurious corrupt messages
in high load + fix high-load-read performance breakdown
modified:
  storage/ndb/include/transporter/TransporterCallback.hpp
  storage/ndb/include/transporter/TransporterRegistry.hpp
  storage/ndb/src/common/transporter/SHM_Transporter.cpp
  storage/ndb/src/common/transporter/SHM_Transporter.hpp
  storage/ndb/src/common/transporter/TCP_Transporter.cpp
  storage/ndb/src/common/transporter/TCP_Transporter.hpp
  storage/ndb/src/common/transporter/Transporter.cpp
  storage/ndb/src/common/transporter/Transporter.hpp
  storage/ndb/src/common/transporter/TransporterRegistry.cpp
  storage/ndb/src/kernel/vm/TransporterCallback.cpp
  storage/ndb/src/kernel/vm/mt.cpp
  storage/ndb/src/ndbapi/TransporterFacade.hpp

 3019 Magnus Svensson	2008-10-27
      WL#4350 Ignore API_REGCONF in all Ndb blocks
modified:
  storage/ndb/src/ndbapi/Ndbif.cpp

=== modified file 'storage/ndb/include/transporter/TransporterCallback.hpp'
--- a/storage/ndb/include/transporter/TransporterCallback.hpp	2008-05-29 15:06:11 +0000
+++ b/storage/ndb/include/transporter/TransporterCallback.hpp	2008-10-27 17:56:57 +0000
@@ -411,8 +411,8 @@ public:
    * Will be called from the thread that does performSend(), so multi-threaded
    * use cases must be prepared for that and do any necessary locking.
    */
-  virtual int get_bytes_to_send_iovec(NodeId node, struct iovec *dst,
-                                      Uint32 max) = 0;
+  virtual Uint32 get_bytes_to_send_iovec(NodeId, struct iovec *dst, Uint32) = 0;
+
   /**
    * Called when data has been sent, allowing to free / reuse the space. Passes
    * number of bytes sent.
@@ -426,8 +426,7 @@ public:
    *
    * Like get_bytes_to_send_iovec(), this is called during performSend().
    */
-  virtual Uint32 bytes_sent(NodeId node, const struct iovec *src,
-                            Uint32 bytes) = 0;
+  virtual Uint32 bytes_sent(NodeId node, Uint32 bytes) = 0;
 
   /**
    * Called to check if any data is available for sending with doSend().

=== modified file 'storage/ndb/include/transporter/TransporterRegistry.hpp'
--- a/storage/ndb/include/transporter/TransporterRegistry.hpp	2008-06-17 20:28:45 +0000
+++ b/storage/ndb/include/transporter/TransporterRegistry.hpp	2008-10-27 17:56:57 +0000
@@ -300,7 +300,7 @@ public:
   
   Uint32 pollReceive(Uint32 timeOutMillis);
   void performReceive();
-  void performSend(NodeId nodeId);
+  int performSend(NodeId nodeId);
   void performSend();
   
   /**
@@ -455,31 +455,23 @@ private:
 
     /* Send buffer for one transporter is kept in a single-linked list. */
     struct SendBufferPage *m_next;
+
     /* Bytes of send data available in this page. */
-    Uint32 m_bytes;
+    Uint16 m_bytes;
+    /* Start of unsent data */
+    Uint16 m_start;
+
     /* Data; real size is to the end of one page. */
-    unsigned char m_data[1];
+    unsigned char m_data[2];
   };
 
   /* Send buffer for one transporter. */
   struct SendBuffer {
+    /* Total size of data in buffer, from m_offset_start_data to end. */
+    Uint32 m_used_bytes;
     /* Linked list of active buffer pages with first and last pointer. */
     SendBufferPage *m_first_page;
     SendBufferPage *m_last_page;
-    /**
-     * Current page == the first one with data not yet returned from
-     * get_bytes_to_send_iovec().
-     */
-    SendBufferPage *m_current_page;
-    /**
-     * Offset (in m_current_page) of next data to return from
-     * get_bytes_to_send_iovec().
-     */
-    Uint32 m_offset_unsent_data;
-    /* Offset (in m_first_page) of data not yet passed to bytes_sent(). */
-    Uint32 m_offset_start_data;
-    /* Total size of data in buffer, from m_offset_start_data to end. */
-    Uint32 m_used_bytes;
   };
 
   SendBufferPage *alloc_page();
@@ -501,8 +493,8 @@ private:
   Uint32 m_total_max_send_buffer;
 
 public:
-  int get_bytes_to_send_iovec(NodeId node, struct iovec *dst, Uint32 max);
-  Uint32 bytes_sent(NodeId node, const struct iovec *src, Uint32 bytes);
+  Uint32 get_bytes_to_send_iovec(NodeId node, struct iovec *dst, Uint32 max);
+  Uint32 bytes_sent(NodeId node, Uint32 bytes);
   bool has_data_to_send(NodeId node);
 
   void reset_send_buffer(NodeId node);

=== modified file 'storage/ndb/src/common/transporter/SHM_Transporter.cpp'
--- a/storage/ndb/src/common/transporter/SHM_Transporter.cpp	2008-08-27 14:34:22 +0000
+++ b/storage/ndb/src/common/transporter/SHM_Transporter.cpp	2008-10-27 17:56:57 +0000
@@ -358,23 +358,37 @@ SHM_Transporter::connect_common(NDB_SOCK
   return false;
 }
 
-bool
+int
 SHM_Transporter::doSend()
 {
-  if (!fetch_send_iovec_data())
-    return false;
+  struct iovec iov[64];
+  Uint32 cnt = fetch_send_iovec_data(iov, NDB_ARRAY_SIZE(iov));
 
-  Uint32 used = m_send_iovec_used;
-  if (used == 0)
-    return true;                                // Nothing to send
+  if (cnt == 0)
+  {
+    return 0;
+  }
+
+  Uint32 sum = 0;
+  for(Uint32 i = 0; i<cnt; i++)
+  {
+    assert(iov[i].iov_len);
+    sum += iov[i].iov_len;
+  }
 
-  int nBytesSent = writer->writev(m_send_iovec, used);
+  int nBytesSent = writer->writev(iov, cnt);
 
   if (nBytesSent > 0)
   {
     kill(m_remote_pid, g_ndb_shm_signum);
     iovec_data_sent(nBytesSent);
+
+    if (Uint32(nBytesSent) == sum && (cnt != NDB_ARRAY_SIZE(iov)))
+    {
+      return 0;
+    }
+    return 1;
   }
 
-  return true;
+  return 1;
 }

=== modified file 'storage/ndb/src/common/transporter/SHM_Transporter.hpp'
--- a/storage/ndb/src/common/transporter/SHM_Transporter.hpp	2008-05-29 15:06:11 +0000
+++ b/storage/ndb/src/common/transporter/SHM_Transporter.hpp	2008-10-27 17:56:57 +0000
@@ -119,7 +119,7 @@ protected:
   /**
    * doSend (i.e signal receiver)
    */
-  bool doSend();
+  int doSend();
   int m_remote_pid;
   Uint32 m_signal_threshold;
 

=== modified file 'storage/ndb/src/common/transporter/TCP_Transporter.cpp'
--- a/storage/ndb/src/common/transporter/TCP_Transporter.cpp	2008-10-11 09:22:33 +0000
+++ b/storage/ndb/src/common/transporter/TCP_Transporter.cpp	2008-10-27 17:56:57 +0000
@@ -246,49 +246,112 @@ TCP_Transporter::sendIsPossible(struct t
                (!((sz == -1) && (e == SOCKET_EAGAIN) || (e == SOCKET_EWOULDBLOCK)
|| (e == SOCKET_EINTR))))
 
 
-bool
+int
 TCP_Transporter::doSend() {
-  if (!fetch_send_iovec_data())
-    return false;
+  struct iovec iov[64];
+  Uint32 cnt = fetch_send_iovec_data(iov, NDB_ARRAY_SIZE(iov));
 
-  Uint32 used = m_send_iovec_used;
-  if (used == 0)
-    return true;                                // Nothing to send
+  if (cnt == 0)
+  {
+    return 0;
+  }
+
+  Uint32 sum = 0;
+  for(Uint32 i = 0; i<cnt; i++)
+  {
+    assert(iov[i].iov_len);
+    sum += iov[i].iov_len;
+  }
 
-  Uint32 iovcnt = used > m_os_max_iovec ? m_os_max_iovec : used;
-  int nBytesSent = my_socket_writev(theSocket, m_send_iovec, iovcnt);
+  Uint32 pos = 0;
+  Uint32 sum_sent = 0;
+  Uint32 send_cnt = 0;
+  Uint32 remain = sum;
 
-  if (nBytesSent > 0)
+  if (cnt == NDB_ARRAY_SIZE(iov))
   {
-    iovec_data_sent(nBytesSent);
+    // If pulling all iov's make sure that we never return everyting
+    // flushed
+    sum++;
+  }
+
+  while (send_cnt < 5)
+  {
+    send_cnt++;
+    Uint32 iovcnt = cnt > m_os_max_iovec ? m_os_max_iovec : cnt;
+    int nBytesSent = my_socket_writev(theSocket, iov+pos, iovcnt);
+    assert(nBytesSent <= (int)remain);
 
-    sendCount ++;
-    sendSize  += nBytesSent;
-    if(sendCount == reportFreq)
+    if (Uint32(nBytesSent) == remain)
     {
-      get_callback_obj()->reportSendLen(remoteNodeId, sendCount, sendSize);
-      sendCount = 0;
-      sendSize  = 0;
+      sum_sent += nBytesSent;
+      goto ok;
     }
-    return true;
-  }
-  else
-  {
-    /* Send failed. */
+    else if (nBytesSent > 0)
+    {
+      sum_sent += nBytesSent;
+      remain -= nBytesSent;
+
+      /**
+       * Forward in iovec
+       */
+      while (Uint32(nBytesSent) >= iov[pos].iov_len)
+      {
+        assert(iov[pos].iov_len > 0);
+        nBytesSent -= iov[pos].iov_len;
+        pos++;
+        cnt--;
+      }
+
+      if (nBytesSent)
+      {
+        assert(iov[pos].iov_len > Uint32(nBytesSent));
+        iov[pos].iov_len -= nBytesSent;
+        iov[pos].iov_base = ((char*)(iov[pos].iov_base))+nBytesSent;
+      }
+      continue;
+    }
+    else
+    {
+      int err = my_socket_errno();
+      if (!(DISCONNECT_ERRNO(err, nBytesSent)))
+      {
+        if (sum_sent)
+        {
+          goto ok;
+        }
+        else
+        {
+          return remain;
+        }
+      }
+
 #if defined DEBUG_TRANSPORTER
-    g_eventLogger->error("Send Failure(disconnect==%d) to node = %d "
-                         "nBytesSent = %d "
-                         "errno = %d strerror = %s",
-                         DISCONNECT_ERRNO(my_socket_errno(), nBytesSent),
-                         remoteNodeId, nBytesSent, my_socket_errno(),
-                         (char*)ndbstrerror(my_socket_errno()));
+      g_eventLogger->error("Send Failure(disconnect==%d) to node = %d "
+                           "nBytesSent = %d "
+                           "errno = %d strerror = %s",
+                           DISCONNECT_ERRNO(err, nBytesSent),
+                           remoteNodeId, nBytesSent, my_socket_errno(),
+                           (char*)ndbstrerror(err));
 #endif
-    if(DISCONNECT_ERRNO(my_socket_errno(), nBytesSent)){
-      do_disconnect(my_socket_errno());
+      do_disconnect(err);
+      return 0;
     }
+  }
 
-    return false;
+ok:
+  assert(sum >= sum_sent);
+  iovec_data_sent(sum_sent);
+  sendCount += send_cnt;
+  sendSize  += sum_sent;
+  if(sendCount >= reportFreq)
+  {
+    get_callback_obj()->reportSendLen(remoteNodeId, sendCount, sendSize);
+    sendCount = 0;
+    sendSize  = 0;
   }
+
+  return sum - sum_sent; // 0 if every thing flushed else >0
 }
 
 int

=== modified file 'storage/ndb/src/common/transporter/TCP_Transporter.hpp'
--- a/storage/ndb/src/common/transporter/TCP_Transporter.hpp	2008-05-29 15:06:11 +0000
+++ b/storage/ndb/src/common/transporter/TCP_Transporter.hpp	2008-10-27 17:56:57 +0000
@@ -58,7 +58,7 @@ private:
    * Retrieves the contents of the send buffers and writes it on
    * the external TCP/IP interface.
    */
-  bool doSend();
+  int doSend();
   
   /**
    * It reads the external TCP/IP interface once 

=== modified file 'storage/ndb/src/common/transporter/Transporter.cpp'
--- a/storage/ndb/src/common/transporter/Transporter.cpp	2008-10-11 13:45:50 +0000
+++ b/storage/ndb/src/common/transporter/Transporter.cpp	2008-10-27 17:56:57 +0000
@@ -42,7 +42,6 @@ Transporter::Transporter(TransporterRegi
     isServer(lNodeId==serverNodeId),
     m_packer(_signalId, _checksum), m_max_send_buffer(max_send_buffer),
     m_overload_limit(0xFFFFFFFF), isMgmConnection(_isMgmConnection),
-    m_send_iovec_used(0),
     m_type(_type),
     m_transporter_registry(t_reg)
 {
@@ -244,7 +243,6 @@ Transporter::doDisconnect() {
 
   m_connected= false;
 
-  m_send_iovec_used= 0;
   get_callback_obj()->reset_send_buffer(remoteNodeId);
   disconnectImpl();
 }

=== modified file 'storage/ndb/src/common/transporter/Transporter.hpp'
--- a/storage/ndb/src/common/transporter/Transporter.hpp	2008-10-14 21:01:04 +0000
+++ b/storage/ndb/src/common/transporter/Transporter.hpp	2008-10-27 17:56:57 +0000
@@ -90,12 +90,11 @@ public:
                                                  used >= m_overload_limit);
   }
   
-  virtual bool doSend() = 0;
+  virtual int doSend() = 0;
 
   bool has_data_to_send()
   {
-    return (m_send_iovec_used > 0 ||
-            get_callback_obj()->has_data_to_send(remoteNodeId));
+    return get_callback_obj()->has_data_to_send(remoteNodeId);
   }
 
   /* Get the configured maximum send buffer usage. */
@@ -171,10 +170,7 @@ private:
   virtual bool send_limit_reached(int bufsize) = 0;
 
 protected:
-  static const Uint32 SEND_IOVEC_SIZE = 64;
   Uint32 m_os_max_iovec;
-  Uint32 m_send_iovec_used;
-  struct iovec m_send_iovec[SEND_IOVEC_SIZE];
 
   Uint32 getErrorCount();
   Uint32 m_errorCount;
@@ -190,7 +186,7 @@ protected:
   void report_error(enum TransporterError err, const char *info = 0)
     { m_transporter_registry.report_error(remoteNodeId, err, info); };
 
-  bool fetch_send_iovec_data();
+  Uint32 fetch_send_iovec_data(struct iovec dst[], Uint32 cnt);
   void iovec_data_sent(int nBytesSent);
 };
 
@@ -224,56 +220,20 @@ Transporter::getErrorCount()
  * partial send).
  */
 inline
-bool
-Transporter::fetch_send_iovec_data()
+Uint32
+Transporter::fetch_send_iovec_data(struct iovec dst[], Uint32 cnt)
 {
-  Uint32 used = m_send_iovec_used;
-  if (SEND_IOVEC_SIZE > used + 1)
-  {
-    Uint32 avail = (SEND_IOVEC_SIZE - 1) - used;
-    int count = get_callback_obj()->get_bytes_to_send_iovec(remoteNodeId,
-                                                            m_send_iovec + used,
-                                                            avail);
-    if (count < 0)
-      return false;                             // Error
-    m_send_iovec_used = used + count;
-  }
-
-  assert(m_send_iovec_used < SEND_IOVEC_SIZE);
-
-  return true;
+  return get_callback_obj()->get_bytes_to_send_iovec(remoteNodeId,
+                                                     dst, cnt);
 }
 
-/* Drop all iovec's that have been sent (last one maybe partially). */
 inline
 void
 Transporter::iovec_data_sent(int nBytesSent)
 {
   Uint32 used_bytes
-    = get_callback_obj()->bytes_sent(remoteNodeId, m_send_iovec, nBytesSent);
+    = get_callback_obj()->bytes_sent(remoteNodeId, nBytesSent);
   update_status_overloaded(used_bytes);
-
-  Uint32 used = m_send_iovec_used;
-  int sofar = 0;
-  Uint32 i;
-  for (i = 0; i < used; i++)
-  {
-    int len = m_send_iovec[i].iov_len;
-    assert(len >= 0);
-    int new_sofar = sofar + len;
-    if (new_sofar > nBytesSent)
-    {
-      int partial = nBytesSent - sofar;
-      assert(partial >= 0);
-      m_send_iovec[i].iov_base = (char *)m_send_iovec[i].iov_base + partial;
-      m_send_iovec[i].iov_len = len - partial;
-      if (i > 0)
-        memmove(m_send_iovec, m_send_iovec + i, (used - i)*sizeof(iovec));
-      break;
-    }
-    sofar = new_sofar;
-  }
-  m_send_iovec_used = used - i;
 }
 
 #endif // Define of Transporter_H

=== modified file 'storage/ndb/src/common/transporter/TransporterRegistry.cpp'
--- a/storage/ndb/src/common/transporter/TransporterRegistry.cpp	2008-09-21 09:47:14 +0000
+++ b/storage/ndb/src/common/transporter/TransporterRegistry.cpp	2008-10-27 17:56:57 +0000
@@ -161,9 +161,6 @@ TransporterRegistry::allocate_send_buffe
     SendBuffer &b = m_send_buffers[i];
     b.m_first_page = NULL;
     b.m_last_page = NULL;
-    b.m_current_page = NULL;
-    b.m_offset_unsent_data = 0;
-    b.m_offset_start_data = 0;
     b.m_used_bytes = 0;
   }
 
@@ -1166,13 +1163,16 @@ TransporterRegistry::performReceive()
  * In multi-threaded cases, this must be protected by send lock (can use
  * different locks for each node).
  */
-void
+int
 TransporterRegistry::performSend(NodeId nodeId)
 {
   Transporter *t = get_transporter(nodeId);
-  if (t && t->has_data_to_send() && t->isConnected() &&
-      is_connected(nodeId))
-    t->doSend();
+  if (t && t->isConnected() && is_connected(nodeId))
+  {
+    return t->doSend();
+  }
+
+  return 0;
 }
 
 void
@@ -1912,7 +1912,7 @@ TransporterRegistry::updateWritePtr(Tran
   }
 }
 
-int
+Uint32
 TransporterRegistry::get_bytes_to_send_iovec(NodeId node, struct iovec *dst,
                                              Uint32 max)
 {
@@ -1920,48 +1920,24 @@ TransporterRegistry::get_bytes_to_send_i
 
   if (max == 0)
     return 0;
-  SendBuffer *b = m_send_buffers + node;
-
-  SendBufferPage *page = b->m_current_page;
-  if (page == NULL)
-    return 0;
-
-  Uint32 offset = b->m_offset_unsent_data;
-  assert(offset <= page->m_bytes);
-  if (offset == page->m_bytes)
-    return 0;
-
-  dst[0].iov_base = (char*)(page->m_data + offset);
-  dst[0].iov_len = page->m_bytes - offset;
-  Uint32 count = 1;
-  page = page->m_next;
 
+  Uint32 count = 0;
+  SendBuffer *b = m_send_buffers + node;
+  SendBufferPage *page = b->m_first_page;
   while (page != NULL && count < max)
   {
-    dst[count].iov_base = (char*)page->m_data;
+    dst[count].iov_base = page->m_data+page->m_start;
     dst[count].iov_len = page->m_bytes;
+    assert(page->m_start + page->m_bytes <= page->max_data_bytes());
     page = page->m_next;
     count++;
   }
 
-  if (page != NULL)
-  {
-    b->m_current_page = page;
-    b->m_offset_unsent_data = 0;
-  }
-  else
-  {
-    assert(b->m_last_page != NULL);
-    b->m_current_page = b->m_last_page;
-    b->m_offset_unsent_data = b->m_last_page->m_bytes;
-  }
-
   return count;
 }
 
 Uint32
-TransporterRegistry::bytes_sent(NodeId node, const struct iovec *src,
-                                Uint32 bytes)
+TransporterRegistry::bytes_sent(NodeId node, Uint32 bytes)
 {
   assert(m_use_default_send_buffer);
 
@@ -1975,68 +1951,27 @@ TransporterRegistry::bytes_sent(NodeId n
   b->m_used_bytes = used_bytes;
 
   SendBufferPage *page = b->m_first_page;
-  assert(page != NULL);
-
-  /**
-   * On the first page, part of the page may have been sent previously, as
-   * indicated by b->m_offset_start_data.
-   *
-   * Additionally, there may be more data on the page than what was sent, or
-   * else we will need to release this (and possibly more) pages.
-   */
-  assert(b->m_offset_start_data < page->m_bytes);
-  Uint32 rest = page->m_bytes - b->m_offset_start_data;
-  if (rest > bytes)
+  while (bytes && bytes >= page->m_bytes)
   {
-    b->m_offset_start_data += bytes;
-    return used_bytes;
-  }
-  bytes -= rest;
-  /**
-   * Now loop, releasing pages until we find one where not all data has been sent.
-   */
-  for(;;) {
-    if (page == b->m_last_page)
-    {
-      /**
-       * Don't free the last page if emptied completely.
-       * Instead keep it for storing more data later.
-       */
-      break;
-    }
-    SendBufferPage *next = page->m_next;
-    assert(next != NULL);
-    if (page == b->m_current_page)
-    {
-      assert(page->m_bytes == b->m_offset_unsent_data);
-      b->m_current_page = next;
-      b->m_offset_unsent_data = 0;
-    }
-    release_page(page);
-    page = next;
-    if (bytes == 0)
-      break;
-    assert(page != NULL);
-    if (bytes < page->m_bytes)
-      break;
+    SendBufferPage * tmp = page;
     bytes -= page->m_bytes;
+    page = page->m_next;
+    release_page(tmp);
   }
-  if (page == NULL)
+
+  if (bytes)
   {
-    /* We have sent everything we had. */
-    assert(bytes == 0);
-    assert(b->m_current_page == NULL);
-    assert(b->m_offset_unsent_data == 0);
-    b->m_first_page = NULL;
-    b->m_last_page = NULL;
-    b->m_offset_start_data = 0;
+    page->m_start += bytes;
+    page->m_bytes -= bytes;
+    assert(page->m_start + page->m_bytes <= page->max_data_bytes());
+    b->m_first_page = page;
   }
   else
   {
-    /* We have sent only part of a page. */
-    b->m_first_page = page;
-    b->m_offset_start_data = bytes;
+    b->m_first_page = 0;
+    b->m_last_page = 0;
   }
+
   return used_bytes;
 }
 
@@ -2046,8 +1981,7 @@ TransporterRegistry::has_data_to_send(No
   assert(m_use_default_send_buffer);
 
   SendBuffer *b = m_send_buffers + node;
-  return (b->m_current_page != NULL &&
-          b->m_current_page->m_bytes > b->m_offset_unsent_data);
+  return (b->m_first_page != NULL && b->m_first_page->m_bytes);
 }
 
 void
@@ -2065,9 +1999,6 @@ TransporterRegistry::reset_send_buffer(N
   }
   b->m_first_page = NULL;
   b->m_last_page = NULL;
-  b->m_current_page = NULL;
-  b->m_offset_unsent_data = 0;
-  b->m_offset_start_data = 0;
   b->m_used_bytes = 0;
 }
 
@@ -2100,42 +2031,33 @@ TransporterRegistry::getWritePtr(NodeId 
   assert(m_use_default_send_buffer);
 
   SendBuffer *b = m_send_buffers + node;
-  Uint32 *p;
-
-  if (b->m_used_bytes + lenBytes > max_use)
-    return NULL;
 
   /* First check if we have room in already allocated page. */
   SendBufferPage *page = b->m_last_page;
-  if (page != NULL && page->m_bytes + lenBytes <=
page->max_data_bytes())
+  if (page != NULL && page->m_bytes + page->m_start + lenBytes <=
page->max_data_bytes())
   {
-    p = (Uint32 *)(page->m_data + page->m_bytes);
-    return p;
+    return (Uint32 *)(page->m_data + page->m_start + page->m_bytes);
   }
 
+  if (b->m_used_bytes + lenBytes > max_use)
+    return NULL;
+
   /* Allocate a new page. */
   page = alloc_page();
   if (page == NULL)
     return NULL;
   page->m_next = NULL;
   page->m_bytes = 0;
+  page->m_start = 0;
 
   if (b->m_last_page == NULL)
   {
     b->m_first_page = page;
     b->m_last_page = page;
-    b->m_current_page = page;
-    b->m_offset_unsent_data = 0;
-    b->m_offset_start_data = 0;
   }
   else
   {
     assert(b->m_first_page != NULL);
-    if (b->m_current_page == NULL)
-    {
-      b->m_current_page = page;
-      b->m_offset_unsent_data = 0;
-    }
     b->m_last_page->m_next = page;
     b->m_last_page = page;
   }
@@ -2153,40 +2075,6 @@ TransporterRegistry::updateWritePtr(Node
   assert(page->m_bytes + lenBytes <= page->max_data_bytes());
   page->m_bytes += lenBytes;
   b->m_used_bytes += lenBytes;
-
-  /**
-   * If we have no data not returned from get_bytes_to_send_iovec(), and the
-   * first signal spills over into a new page, we move the current pointer to
-   * not have to deal with a page with zero data in get_bytes_to_send_iovec().
-   */
-  if (b->m_current_page != NULL &&
-      b->m_current_page->m_bytes == b->m_offset_unsent_data)
-  {
-    b->m_current_page = b->m_current_page->m_next;
-    assert(b->m_current_page == page);
-    b->m_offset_unsent_data = 0;
-  }
-  /**
-   * If all data has been sent, and the first new signal spills over into a
-   * new page, we get a first page with no data which we need to free.
-   */
-  SendBufferPage *tmp = b->m_first_page;
-  if (tmp != NULL && tmp->m_bytes == b->m_offset_start_data)
-  {
-    b->m_first_page = tmp->m_next;
-    assert(b->m_first_page == page);
-    assert(b->m_current_page == page);
-    release_page(tmp);
-    b->m_offset_start_data = 0;
-  }
-
-  /**
-   * ToDo: To get better buffer utilization, we might at this point attempt
-   * to copy back part of the new data into a previous page.
-   *
-   * This will be especially worthwhile in case of big long signals.
-   */
-
   return b->m_used_bytes;
 }
 

=== modified file 'storage/ndb/src/kernel/vm/TransporterCallback.cpp'
--- a/storage/ndb/src/kernel/vm/TransporterCallback.cpp	2008-10-08 19:09:05 +0000
+++ b/storage/ndb/src/kernel/vm/TransporterCallback.cpp	2008-10-27 17:56:57 +0000
@@ -74,13 +74,13 @@ class TransporterCallbackKernelNonMT : p
    */
   int checkJobBuffer() { return globalScheduler.checkDoJob(); }
   void reportSendLen(NodeId nodeId, Uint32 count, Uint64 bytes);
-  int get_bytes_to_send_iovec(NodeId node, struct iovec *dst, Uint32 max)
+  Uint32 get_bytes_to_send_iovec(NodeId node, struct iovec *dst, Uint32 max)
   {
     return globalTransporterRegistry.get_bytes_to_send_iovec(node, dst, max);
   }
-  Uint32 bytes_sent(NodeId node, const struct iovec *src, Uint32 bytes)
+  Uint32 bytes_sent(NodeId node, Uint32 bytes)
   {
-    return globalTransporterRegistry.bytes_sent(node, src, bytes);
+    return globalTransporterRegistry.bytes_sent(node, bytes);
   }
   bool has_data_to_send(NodeId node)
   {

=== modified file 'storage/ndb/src/kernel/vm/mt.cpp'
--- a/storage/ndb/src/kernel/vm/mt.cpp	2008-10-20 10:01:57 +0000
+++ b/storage/ndb/src/kernel/vm/mt.cpp	2008-10-27 17:56:57 +0000
@@ -71,6 +71,18 @@ static Uint32 ndbmt_threads = 0;
 static Uint32 num_threads = 0;
 static Uint32 receiver_thread_no = 0;
 
+#define NO_SEND_THREAD (MAX_THREADS + 1)
+
+struct mt_lock_stat
+{
+  const void * m_ptr;
+  char * m_name;
+  Uint32 m_contended_count;
+  Uint32 m_spin_count;
+};
+static void register_lock(const void * ptr, const char * name);
+static mt_lock_stat * lookup_lock(const void * ptr);
+
 #if defined(HAVE_LINUX_FUTEX) && defined(NDB_HAVE_XCNG)
 #define USE_FUTEX
 #endif
@@ -230,39 +242,51 @@ struct thr_spin_lock
   thr_spin_lock(const char * name = 0)
   {
     m_lock = 0;
-    m_name = name;
-    m_contended_count = 0;
+    register_lock(this, name);
   }
 
-  const char * m_name;
-  Uint32 m_contended_count;
   volatile Uint32 m_lock;
 };
 
 static
-inline
 void
-lock(struct thr_spin_lock* sl)
+lock_slow(struct thr_spin_lock* sl)
 {
   volatile unsigned* val = &sl->m_lock;
-test:
-  if (likely(xcng(val, 1) == 0))
-    return;
-
-  /*
-   * There is a race conditions here on m_contended_count. But it doesn't
-   * really matter if the counts are not 100% accurate
-   */
-  Uint32 count = sl->m_contended_count++;
-  Uint32 freq = (count > 10000 ? 5000 : (count > 20 ? 200 : 1));
-  if ((count % freq) == 0)
-    printf("%s waiting for lock, contentions~=%u\n", sl->m_name, count);
 
+loop:
+  Uint32 spins = 0;
   do {
+    spins++;
     cpu_pause();
   } while (* val == 1);
 
-  goto test;
+  if (unlikely(xcng(val, 1) != 0))
+    goto loop;
+
+  mt_lock_stat* s = lookup_lock(sl);
+  if (s)
+  {
+    s->m_spin_count += spins;
+    Uint32 count = ++s->m_contended_count;
+    Uint32 freq = (count > 10000 ? 5000 : (count > 20 ? 200 : 1));
+
+    if ((count % freq) == 0)
+      printf("%s waiting for lock, contentions: %u spins: %u\n",
+             s->m_name, count, s->m_spin_count);
+  }
+}
+
+static
+inline
+void
+lock(struct thr_spin_lock* sl)
+{
+  volatile unsigned* val = &sl->m_lock;
+  if (likely(xcng(val, 1) == 0))
+    return;
+
+  lock_slow(sl);
 }
 
 static
@@ -294,10 +318,9 @@ struct thr_mutex
 {
   thr_mutex(const char * name = 0) {
     m_mutex = NdbMutex_Create();
-    m_name = name;
+    register_lock(this, name);
   }
 
-  const char * m_name;
   NdbMutex * m_mutex;
 };
 
@@ -326,6 +349,113 @@ trylock(struct thr_mutex * sl)
 }
 
 /**
+ * thr_safe_pool
+ */
+template<typename T>
+struct thr_safe_pool
+{
+  thr_safe_pool() : m_free_list(0), m_lock("mempool") {}
+
+  T* m_free_list;
+  Ndbd_mem_manager *m_mm;
+  thr_spin_lock m_lock;
+
+  T* seize() {
+    T* ret = 0;
+    lock(&m_lock);
+    if (m_free_list)
+    {
+      ret = m_free_list;
+      m_free_list = *reinterpret_cast<T**>(m_free_list);
+      unlock(&m_lock);
+    }
+    else
+    {
+      Uint32 dummy;
+      unlock(&m_lock);
+      ret = reinterpret_cast<T*>
+        (m_mm->alloc_page(RT_JOB_BUFFER, &dummy,
+                          Ndbd_mem_manager::NDB_ZONE_ANY));
+      // ToDo: How to deal with failed allocation?!?
+      // I think in this case we need to start grabbing buffers kept for signal
+      // trace.
+    }
+    return ret;
+  }
+
+  void release(T* t){
+    lock(&m_lock);
+    T** nextptr = reinterpret_cast<T**>(t);
+    * nextptr = m_free_list;
+    m_free_list = t;
+    unlock(&m_lock);
+  }
+};
+
+/**
+ * thread_local_pool
+ */
+template<typename T, typename U>
+class thread_local_pool
+{
+public:
+  thread_local_pool(thr_safe_pool<U> *global_pool, unsigned max_free) :
+    m_max_free(max_free),
+    m_free(0),
+    m_freelist(0),
+    m_global_pool(global_pool)
+  {
+  }
+
+  T *seize()
+  {
+    T *tmp = m_freelist;
+    if (tmp)
+    {
+      m_freelist = tmp->m_next;
+      assert (m_free > 0);
+      m_free--;
+    }
+    else
+      tmp = (T *)m_global_pool->seize();
+    return tmp;
+  }
+
+  void release(T *t)
+  {
+    unsigned free = m_free;
+    if (free < m_max_free)
+    {
+      m_free = free + 1;
+      t->m_next = m_freelist;
+      m_freelist = t;
+    }
+    else
+      m_global_pool->release((U *)t);
+  }
+
+  void releaseAll()
+  {
+    Uint32 save = m_max_free;
+    m_max_free = 0;
+    while(m_free)
+    {
+      T* t = seize();
+      release(t);
+    }
+    m_max_free = save;
+  }
+
+  void set_pool(thr_safe_pool<U> * pool) { m_global_pool = pool; }
+
+private:
+  unsigned m_max_free;
+  unsigned m_free;
+  T *m_freelist;
+  thr_safe_pool<U> *m_global_pool;
+};
+
+/**
  * Signal buffers.
  *
  * Each thread job queue contains a list of these buffers with signals.
@@ -374,7 +504,6 @@ struct thr_job_queue
  * For example, on Intel core 2 quad processors, there is a ~33%
  * penalty for two cores accessing the same 64-byte cacheline.
  */
-
 struct thr_jb_write_state
 {
   /*
@@ -424,17 +553,20 @@ struct thr_jb_read_state
   Uint32 m_write_pos;
 };
 
+/**
+ * time-queue
+ */
 struct thr_tq
 {
   static const unsigned SQ_SIZE = 512;
   static const unsigned LQ_SIZE = 512;
   static const unsigned PAGES = 32 * (SQ_SIZE + LQ_SIZE) / 8192;
   
+  Uint32 * m_delayed_signals[PAGES];
+  Uint32 m_next_free;
   Uint32 m_next_timer;
   Uint32 m_current_time;
-  Uint32 m_next_free;
   Uint32 m_cnt[2];
-  Uint32 * m_delayed_signals[PAGES];
   Uint32 m_short_queue[SQ_SIZE];
   Uint32 m_long_queue[LQ_SIZE];
 };
@@ -452,9 +584,58 @@ struct thr_tq
  */
 #define THR_FREE_BUF_BATCH 6
 
+/**
+ * a page with send data
+ */
+struct thr_send_page
+{
+  static const Uint32 PGSIZE = 32768;
+  static Uint32 max_bytes() {
+    return PGSIZE - offsetof(thr_send_page, m_data);
+  }
+
+  /* Next page */
+  thr_send_page* m_next;
+
+  /* Bytes of send data available in this page. */
+  Uint16 m_bytes;
+
+  /* Start of unsent data */
+  Uint16 m_start;
+
+  /* Data; real size is to the end of one page. */
+  unsigned char m_data[2];
+};
+
+/**
+ * a linked list with thr_send_page
+ */
+struct thr_send_buffer
+{
+  thr_send_page* m_first_page;
+  thr_send_page* m_last_page;
+};
+
+/**
+ * a ring buffer with linked list of thr_send_page
+ */
+struct thr_send_queue
+{
+  unsigned m_write_index;
+#if SIZEOF_CHARP == 8
+  unsigned m_unused;
+  thr_send_page* m_buffers[7];
+  static const unsigned SIZE = 7;
+#else
+  thr_send_page* m_buffers[15];
+  static const unsigned SIZE = 15;
+#endif
+};
+
 struct thr_data
 {
-  thr_data() : m_jba_write_lock("jbalock") {}
+  thr_data() : m_jba_write_lock("jbalock"),
+               m_send_buffer_pool(0, THR_FREE_BUF_MAX) {}
 
   thr_wait m_waiter;
   unsigned m_thr_no;
@@ -508,16 +689,24 @@ struct thr_data
   Uint32 m_prioa_size;
   Uint32 m_priob_count;
   Uint32 m_priob_size;
+
   /* Array of node ids with pending remote send data. */
   Uint8 m_pending_send_nodes[MAX_NTRANSPORTERS];
   /* Number of node ids in m_pending_send_nodes. */
   Uint32 m_pending_send_count;
+
   /**
    * Bitmap of pending node ids with send data.
    * Used to quickly check if a node id is already in m_pending_send_nodes.
    */
   Bitmask<(MAX_NTRANSPORTERS+31)/32> m_pending_send_mask;
 
+  /* pool for send buffers */
+  struct thread_local_pool<thr_send_page, thr_job_buffer> m_send_buffer_pool;
+
+  /* Send buffer for this thread, these are not touched by any other thread */
+  struct thr_send_buffer m_send_buffers[MAX_NTRANSPORTERS];
+
   /* Block instances (main and worker) handled by this thread. */
   /* Used for sendpacked (send-at-job-buffer-end). */
   Uint32 m_instance_count;
@@ -526,245 +715,92 @@ struct thr_data
   SectionSegmentPool::Cache m_sectionPoolCache;
 };
 
-template<typename T>
-struct thr_safe_pool
+struct mt_send_handle  : public TransporterSendBufferHandle
 {
-  thr_safe_pool() : m_lock("mempool"), m_free_list(0) {}
-
-  thr_spin_lock m_lock;
-  T* m_free_list;
-  Ndbd_mem_manager *m_mm;
+  struct thr_data * m_selfptr;
+  mt_send_handle(thr_data* ptr) : m_selfptr(ptr) {}
+  virtual ~mt_send_handle() {}
+
+  virtual Uint32 *getWritePtr(NodeId node, Uint32 len, Uint32 prio, Uint32 max);
+  virtual Uint32 updateWritePtr(NodeId node, Uint32 lenBytes, Uint32 prio);
+  virtual bool forceSend(NodeId node);
+};
 
-  T* seize() {
-    T* ret = 0;
-    lock(&m_lock);
-    if (m_free_list)
-    {
-      ret = m_free_list;
-      m_free_list = *reinterpret_cast<T**>(m_free_list);
-      unlock(&m_lock);
-    }
-    else
-    {
-      Uint32 dummy;
-      unlock(&m_lock);
-      ret = reinterpret_cast<T*>
-        (m_mm->alloc_page(RT_JOB_BUFFER, &dummy, 
-                          Ndbd_mem_manager::NDB_ZONE_ANY));
-      // ToDo: How to deal with failed allocation?!?
-      // I think in this case we need to start grabbing buffers kept for signal
-      // trace.
-    }
-    return ret;
-  }
+struct trp_callback : public TransporterCallbackKernel
+{
+  trp_callback() {}
 
-  void release(T* t){
-    lock(&m_lock);
-    T** nextptr = reinterpret_cast<T**>(t);
-    * nextptr = m_free_list;
-    m_free_list = t;
-    unlock(&m_lock);
-  }
+  /* Callback interface. */
+  int checkJobBuffer();
+  void reportSendLen(NodeId nodeId, Uint32 count, Uint64 bytes);
+  void lock_transporter(NodeId node);
+  void unlock_transporter(NodeId node);
+  Uint32 get_bytes_to_send_iovec(NodeId node, struct iovec *dst, Uint32 max);
+  Uint32 bytes_sent(NodeId node, Uint32 bytes);
+  bool has_data_to_send(NodeId node);
+  void reset_send_buffer(NodeId node);
 };
 
-template<typename T, typename U>
-class thread_local_pool
+extern trp_callback g_trp_callback;             // Forward declaration
+
+struct thr_repository
 {
-public:
-  thread_local_pool(thr_safe_pool<U> *global_pool, int max_free) :
-    m_global_pool(global_pool), m_max_free(max_free),
-    m_freelist(0), m_free(0)
-  {
-  }
+  thr_repository()
+    : m_receive_lock("recvlock"),
+      m_section_lock("sectionlock"),
+      m_mem_manager_lock("memmanagerlock") {}
 
-  T *seize()
-  {
-    T *tmp = m_freelist;
-    if (tmp)
-    {
-      m_freelist = tmp->m_next;
-      assert (m_free > 0);
-      m_free--;
-    }
-    else
-      tmp = (T *)m_global_pool->seize();
-    return tmp;
-  }
+  unsigned m_thread_count;
+  struct thr_spin_lock m_receive_lock;
+  struct thr_spin_lock m_section_lock;
+  struct thr_spin_lock m_mem_manager_lock;
+  struct thr_data m_thread[MAX_THREADS];
+  struct thr_safe_pool<thr_job_buffer> m_free_list;
 
-  void release(T *t)
-  {
-    int free = m_free;
-    if (free < m_max_free)
-    {
-      m_free = free + 1;
-      t->m_next = m_freelist;
-      m_freelist = t;
-    }
-    else
-      m_global_pool->release((U *)t);
-  }
+  /**
+   * send buffer handling
+   */
 
-  void releaseAll()
+  /* The buffers that are to be sent */
+  struct send_buffer
   {
-    Uint32 save = m_max_free;
-    m_max_free = 0;
-    while(m_free)
-    {
-      T* t = seize();
-      release(t);
-    }
-    m_max_free = save;
-  }
+    /**
+     * pending data
+     */
+    struct thr_send_buffer m_buffer;
 
-private:
-  thr_safe_pool<U> *m_global_pool;
-  int m_max_free;
-  T *m_freelist;
-  int m_free;
-};
-
-/**
- * Send buffer implementation, we have one of these for each thread.
- *
- * Enables lock-free prepareSend(), and per-transporter lock for doSend().
- */
-struct thr_send_buf : public TransporterSendBufferHandle
-{
-  /* One page of send buffer data. */
-  struct page
-  {
-    /* This is the number of words that will fit in one page of send buffer. */
-    static const Uint32 PGSIZE = 32768;
-    static Uint32 max_data_bytes()
-    {
-      return PGSIZE - offsetof(page, m_data);
-    }
-
-    /* Send buffer for one transporter is kept in a single-linked list. */
-    page *m_next;
-#if defined VM_TRACE || defined ERROR_INSERT
-    /* Prev page (only used when splitting signals) */
-    page * m_prev;
-#endif
-    /* Bytes of send data available in this page. */
-    Uint32 m_bytes;
-    /* Start of unsent data (next bytes_sent() will count from here). */
-    Uint32 m_start;
-    /* Offset from where to return data in next get_bytes_to_send_iovec(). */
-    Uint32 m_current;
-    /* Data; real size is to the end of one page. */
-    unsigned char m_data[1];
-  };
+    /**
+     * lock
+     */
+    struct thr_spin_lock m_send_lock;
 
-  /* Linked list of pages for one thread/transporter pair. */
-  struct send_buffer
-  {
-    /* First page, ie. page where next bytes_sent() will count from. */
-    page *m_first_page;
-    /* Last page, ie. page next getWritePtr() will use. */
-    page *m_last_page;
-    /* Page from which next get_bytes_to_send_iovec() will return data. */
-    page *m_current_page;
-    /* Temporary pointer stored in getWritePtr and read in updateWritePtr. */
-    page *m_prev_page;
     /**
-     * Members to keep track of total buffer usage.
-     *
-     * Since we are non-locking, these will only be approximate, as there is no
-     * defined temporal synchronization between readers and writers.
-     *
-     * We keep two separate counters, one updated only by the writer, and one
-     * updated only by the reader, to avoid the need for locking or atomic
-     * operations. An approximate count of bytes available is obtained from
-     * (m_written_bytes - m_read_bytes), C unsigned arithmetics takes care
-     * to handle overflow/wrapover correctly (for <2Gb send buffers at least).
+     * Flag used to coordinate sending to same remote node from different
+     * threads.
      *
-     * It is teoretically possible to get a <0 value (if m_read_bytes is
-     * updated before m_written_bytes), so this needs to be handled.
+     * If two threads need to send to the same node at the same time, the
+     * second thread, rather than wait for the first to finish, will just
+     * set this flag, and the first thread will do an extra send when done
+     * with the first.
      */
-    Uint32 m_written_bytes;
-    Uint32 m_read_bytes;
-
-    Uint32 used_bytes() const
-    {
-      Uint32 used = m_written_bytes - m_read_bytes;
-      return (used >= (Uint32)0x80000000) ? 0 : used;
-    }
+    Uint32 m_force_send;
 
-    void init()
-    {
-      m_first_page = NULL;
-      m_last_page = NULL;
-      m_current_page = NULL;
-      m_prev_page = NULL;
-      m_written_bytes = 0;
-      m_read_bytes = 0;
-    }
-  };
-
-  thr_send_buf(struct trp_callback *trp_cb, Uint32 thread,
-               thr_safe_pool<thr_job_buffer> *global_pool);
-  bool initial_alloc(NodeId node);
-
-  /* Callback interface. */
-  Uint32 *getWritePtr(NodeId node, Uint32 lenBytes, Uint32 prio,
-                      Uint32 max_use);
-  Uint32 updateWritePtr(NodeId node, Uint32 lenBytes, Uint32 prio);
-  bool forceSend(NodeId node);
-
-  const Uint32 m_self;
-  send_buffer m_buffers[MAX_NTRANSPORTERS];
-  thread_local_pool<page, thr_job_buffer> m_pool;
-  const struct trp_callback *m_trp_callback;
-};
-
-struct trp_callback : public TransporterCallbackKernel
-{
-  trp_callback();
-
-  /* Callback interface. */
-  int checkJobBuffer();
-  void reportSendLen(NodeId nodeId, Uint32 count, Uint64 bytes);
-  void lock_transporter(NodeId node);
-  void unlock_transporter(NodeId node);
-  int get_bytes_to_send_iovec(NodeId node, struct iovec *dst, Uint32 max);
-  Uint32 bytes_sent(NodeId node, const struct iovec *src, Uint32 bytes);
-  bool has_data_to_send(NodeId node);
-  void reset_send_buffer(NodeId node);
-
-  Uint32 total_bytes(NodeId node) const
-  {
-    Uint32 total = 0;
-    for (Uint32 i = 0; i < num_threads; i++)
-      total += m_thr_buffers[i]->m_buffers[node].used_bytes();
-    return total;
-  }
-
-  // +1 to handle reset buffers from "other" thread
-  thr_send_buf *m_thr_buffers[MAX_THREADS + 1];
-
-  /**
-   * During send, for each node this holds the id of the thread currently
-   * doing send to that node.
-   */
-  Uint32 m_send_thr[MAX_NTRANSPORTERS];
-};
+    /**
+     * Which thread is currently holding the m_send_lock
+     */
+    Uint32 m_send_thread;
 
-extern trp_callback g_trp_callback;             // Forward declaration
+    /**
+     * bytes pending for this node
+     */
+    Uint32 m_bytes;
 
-struct thr_repository
-{
-  thr_repository()
-    : m_receive_lock("recvlock"),
-      m_section_lock("sectionlock"),
-      m_mem_manager_lock("memmanagerlock") {}
+    /* read index(es) in thr_send_queue */
+    Uint32 m_read_index[MAX_THREADS];
+  } m_send_buffers[MAX_NTRANSPORTERS];
 
-  unsigned m_thread_count;
-  struct thr_spin_lock m_receive_lock;
-  struct thr_spin_lock m_section_lock;
-  struct thr_spin_lock m_mem_manager_lock;
-  struct thr_data m_thread[MAX_THREADS];
-  struct thr_safe_pool<thr_job_buffer> m_free_list;
+  /* The buffers published by threads */
+  thr_send_queue m_thread_send_buffers[MAX_NTRANSPORTERS][MAX_THREADS];
 
   /*
    * These are used to synchronize during crash / trace dumps.
@@ -774,21 +810,6 @@ struct thr_repository
   pthread_mutex_t stop_for_crash_mutex;
   pthread_cond_t stop_for_crash_cond;
   Uint32 stopped_threads;
-
-  /**
-   * Send locks for the transporters, one per possible remote node.
-   */
-  thr_spin_lock m_send_locks[MAX_NTRANSPORTERS];
-  /**
-   * Flag used to coordinate sending to same remote node from different
-   * threads.
-   *
-   * If two threads need to send to the same node at the same time, the
-   * second thread, rather than wait for the first to finish, will just
-   * set this flag, and the first thread will do an extra send when done
-   * with the first.
-   */
-  Uint32 m_force_send[MAX_NTRANSPORTERS];
 };
 
 static
@@ -1037,6 +1058,128 @@ scan_time_queues(struct thr_data* selfpt
   abort();
 }
 
+static
+inline
+Uint32*
+get_free_slot(struct thr_repository* rep,
+	      struct thr_data* selfptr,
+	      Uint32* idxptr)
+{
+  struct thr_tq * tq = &selfptr->m_tq;
+  Uint32 idx = tq->m_next_free;
+retry:
+  Uint32 buf = idx >> 8;
+  Uint32 pos = idx & 0xFF;
+
+  if (idx != RNIL)
+  {
+    Uint32* page = * (tq->m_delayed_signals + buf);
+    Uint32* ptr = page + (32 * pos);
+    tq->m_next_free = * ptr;
+    * idxptr = idx;
+    return ptr;
+  }
+
+  Uint32 thr_no = selfptr->m_thr_no;
+  for (Uint32 i = 0; i<thr_tq::PAGES; i++)
+  {
+    if (tq->m_delayed_signals[i] == 0)
+    {
+      struct thr_job_buffer *jb = seize_buffer(rep, thr_no, false);
+      Uint32 * page = reinterpret_cast<Uint32*>(jb);
+      tq->m_delayed_signals[i] = page;
+
+      ndbout_c("saving %p at %p (%d)", page, tq->m_delayed_signals+i, i);
+
+      /**
+       * Init page
+       */
+      for (Uint32 j = 0; j<255; j ++)
+      {
+	page[j * 32] = (i << 8) + (j + 1);
+      }
+      page[255*32] = RNIL;
+      idx = (i << 8);
+      goto retry;
+    }
+  }
+  abort();
+}
+
+void
+senddelay(Uint32 thr_no, const SignalHeader* s, Uint32 delay)
+{
+  struct thr_repository* rep = &g_thr_repository;
+  struct thr_data * selfptr = rep->m_thread + thr_no;
+  unsigned siglen = (sizeof(*s) >> 2) + s->theLength + s->m_noOfSections;
+
+  Uint32 max;
+  Uint32 * cntptr;
+  Uint32 * queueptr;
+
+  Uint32 alarm = selfptr->m_tq.m_current_time + delay;
+  Uint32 nexttimer = selfptr->m_tq.m_next_timer;
+  if (delay < 100)
+  {
+    cntptr = selfptr->m_tq.m_cnt + 0;
+    queueptr = selfptr->m_tq.m_short_queue;
+    max = thr_tq::SQ_SIZE;
+  }
+  else
+  {
+    cntptr = selfptr->m_tq.m_cnt + 1;
+    queueptr = selfptr->m_tq.m_long_queue;
+    max = thr_tq::LQ_SIZE;
+  }
+
+  Uint32 idx;
+  Uint32* ptr = get_free_slot(rep, selfptr, &idx);
+  memcpy(ptr, s, 4*siglen);
+
+  if (0)
+    ndbout_c("now: %d alarm: %d send %s from %s to %s delay: %d idx: %x %p",
+	     selfptr->m_tq.m_current_time,
+	     alarm,
+	     getSignalName(s->theVerId_signalNumber),
+	     getBlockName(refToBlock(s->theSendersBlockRef)),
+	     getBlockName(s->theReceiversBlockNumber),
+	     delay,
+	     idx, ptr);
+
+  Uint32 i;
+  Uint32 cnt = *cntptr;
+  Uint32 newentry = (idx << 16) | (alarm & 0xFFFF);
+
+  * cntptr = cnt + 1;
+  selfptr->m_tq.m_next_timer = alarm < nexttimer ? alarm : nexttimer;
+
+  if (cnt == 0)
+  {
+    queueptr[0] = newentry;
+    return;
+  }
+  else if (cnt < max)
+  {
+    for (i = 0; i<cnt; i++)
+    {
+      Uint32 save = queueptr[i];
+      if ((save & 0xFFFF) > alarm)
+      {
+	memmove(queueptr+i+1, queueptr+i, 4*(cnt - i));
+	queueptr[i] = newentry;
+	return;
+      }
+    }
+    assert(i == cnt);
+    queueptr[i] = newentry;
+    return;
+  }
+  else
+  {
+    abort();
+  }
+}
+
 /*
  * Flush the write state to the job queue, making any new signals available to
  * receiving threads.
@@ -1132,7 +1275,7 @@ rand_yield(Uint32 limit, void* ptr0, voi
   Uint32 sum = g_rand_yield;
   for (Uint32 i = 0; i<sizeof(tmp); i++)
     sum = 33 * sum + tmpptr[i];
-  
+
   if ((sum % 100) < limit)
   {
     g_rand_yield++;
@@ -1143,187 +1286,7 @@ rand_yield(Uint32 limit, void* ptr0, voi
 static inline void rand_yield(Uint32 limit, void* ptr0, void * ptr1) {}
 #endif
 
-thr_send_buf::thr_send_buf(struct trp_callback *trp_cb, Uint32 thread,
-                           thr_safe_pool<thr_job_buffer> *global_pool) :
-  m_self(thread),
-  m_pool(global_pool, THR_FREE_BUF_MAX),
-  m_trp_callback(trp_cb)
-{
-  for (int i = 0; i < MAX_NTRANSPORTERS; i++)
-    m_buffers[i].init();
-}
-
-bool
-thr_send_buf::initial_alloc(NodeId node)
-{
-  send_buffer *b = m_buffers + node;
-
-  page *pg = m_pool.seize();
-  if (pg == NULL)
-    return false;
-
-  pg->m_next = NULL;
-#if defined VM_TRACE || defined ERROR_INSERT
-  pg->m_prev = NULL;
-#endif
-  pg->m_bytes = 0;
-  pg->m_start = 0;
-  pg->m_current = 0;
-  wmb();   // Commit page init before making visible
-
-  /**
-   * Due to no locking, we need to be very careful about initialisation here.
-   *
-   * Initialisation is done by the writer, so what we need to ensure is that
-   * reader will not see an inconsistent state.
-   *
-   * Since reader is using m_current_page != NULL to mean the page is valid,
-   * we set that last, with a store-store barrier.
-   */
-  b->m_first_page = pg;
-  b->m_last_page = pg;
-  b->m_prev_page = 0;
-  b->m_written_bytes = 0;
-  b->m_read_bytes = 0;
-  wmb();
-  b->m_current_page = pg;
-
-  return true;
-}
-
-Uint32 *
-thr_send_buf::getWritePtr(NodeId node, Uint32 lenBytes, Uint32 prio,
-                          Uint32 max_use)
-{
-  send_buffer *b = m_buffers + node;
-  assert(lenBytes > 0);
-
-  /**
-   * Only allocate send buffer memory on first actual use.
-   * Once allocated, at least one page stays, even if empty.
-   */
-  if (unlikely(b->m_first_page == NULL))
-    if (!initial_alloc(node))
-      return NULL;
-
-  /* Check for common case, free space in existing buffer. */
-  page *last_pg = b->m_last_page;
-  assert(last_pg != NULL);
-  assert(lenBytes < last_pg->max_data_bytes());
-
-  if (last_pg->m_bytes + lenBytes <= last_pg->max_data_bytes())
-  {
-    return (Uint32 *)(last_pg->m_data + last_pg->m_bytes);
-  }
-
-  /**
-   * Check for buffer limit exceeded.
-   *
-   * We do this check only when about to allocate a new page.
-   * This may make us over-use slightly, but that is fine since the remaining
-   * space can in any case not be used for anything else.
-   */
-  if (m_trp_callback->total_bytes(node) + lenBytes > max_use)
-    return NULL;
-
-  /* Need to allocate a new page. */
-  page *new_pg = m_pool.seize();
-  if (new_pg == NULL)
-    return NULL;
-
-#if defined VM_TRACE || defined ERROR_INSERT
-  new_pg->m_prev = 0;
-#endif
-  new_pg->m_next = NULL;
-  new_pg->m_bytes = 0;
-  new_pg->m_start = 0;
-  new_pg->m_current = 0;
-
-  /* Remeber old last page temporarily until updateWritePtr(). */
-  assert(b->m_prev_page == 0);
-  if (last_pg->m_bytes < last_pg->max_data_bytes())
-  {
-    b->m_prev_page = last_pg;
-#if defined VM_TRACE || defined ERROR_INSERT
-    new_pg->m_prev = last_pg;
-#endif
-  }
-  b->m_last_page = new_pg;
-
-  /** Assigning m_next makes the new page available to readers, so need a
-   * memory barrier here.
-   */
-  wmb();
-  last_pg->m_next = new_pg;
-
-  return (Uint32 *)(new_pg->m_data);
-}
-
-Uint32
-thr_send_buf::updateWritePtr(NodeId node, Uint32 lenBytes, Uint32 prio)
-{
-  send_buffer *b = m_buffers + node;
-  page *last_pg = b->m_last_page;
-  assert(lenBytes > 0);
-  assert(last_pg != NULL);
-  assert(last_pg->m_bytes + lenBytes <= last_pg->max_data_bytes());
-
-  b->m_written_bytes += lenBytes;
-  Uint32 used = m_trp_callback->total_bytes(node);
-
-  /* For the first signal in a buffer, split it and move it back so that
-   * previous buffer is 100% utilised.
-   *
-   * This avoids buffer waste for big signals, and also simplifies reader.
-   */
-  if (last_pg->m_bytes == 0)
-  {
-    page *prev_pg = b->m_prev_page;
-#if defined VM_TRACE || defined ERROR_INSERT
-    assert(last_pg->m_prev == 0 || last_pg->m_prev == prev_pg);
-#endif
-    b->m_prev_page = 0;
-    if (prev_pg != 0)
-    {
-      assert(prev_pg->m_bytes < prev_pg->max_data_bytes());
-      Uint32 part = prev_pg->max_data_bytes() - prev_pg->m_bytes;
-      assert(part < lenBytes);
-      memcpy(prev_pg->m_data + prev_pg->m_bytes, last_pg->m_data, part);
-      memmove(last_pg->m_data, last_pg->m_data + part, lenBytes - part);
 
-      last_pg->m_bytes = lenBytes - part;
-
-      rand_yield(1, prev_pg, last_pg); // BUG_39880
-
-      /**
-       * Memory barrier since this makes data available to reader. 
-       * and to serialize the two assignments 
-       */
-      wmb();
-      prev_pg->m_bytes = prev_pg->max_data_bytes();
-      return used;
-
-      /**
-       * If at some later point we need to support messages bigger than the
-       * page size, we could do so here similarly by copying from a separate
-       * temporary big thread-local buffer returned from getWritePtr().
-       */
-    }
-  }
-
-  wmb();
-  last_pg->m_bytes += lenBytes;
-  return used;
-}
-
-trp_callback::trp_callback()
-{
-  // number of threads not yet set so use MAX_THREADS
-  for (Uint32 i = 0; i < NDB_ARRAY_SIZE(m_thr_buffers); i++)
-    m_thr_buffers[i] = new thr_send_buf(this, i, &g_thr_repository.m_free_list);
-  for (Uint32 i = 0; i < NDB_ARRAY_SIZE(m_send_thr); i++)
-    m_send_thr[i] = ~(Uint32)0;
-}
 
 void
 trp_callback::reportSendLen(NodeId nodeId, Uint32 count, Uint64 bytes)
@@ -1340,7 +1303,7 @@ trp_callback::reportSendLen(NodeId nodeI
   signal.theData[2] = (bytes/count);
   signal.header.theVerId_signalNumber = GSN_EVENT_REP;
   signal.header.theReceiversBlockNumber = CMVMI;
-  sendlocal(m_send_thr[nodeId],
+  sendlocal(g_thr_repository.m_send_buffers[nodeId].m_send_thread,
             &signalT.header, signalT.theData, NULL);
 }
 
@@ -1368,7 +1331,7 @@ trp_callback::lock_transporter(NodeId no
    * in any case disconnecting/connecting at this point in time, and sends are
    * non-waiting (so we will not block sending on other transporters).
    */
-  lock(&rep->m_send_locks[node]);
+  lock(&rep->m_send_buffers[node].m_send_lock);
   lock(&rep->m_receive_lock);
 }
 
@@ -1377,7 +1340,7 @@ trp_callback::unlock_transporter(NodeId 
 {
   struct thr_repository* rep = &g_thr_repository;
   unlock(&rep->m_receive_lock);
-  unlock(&rep->m_send_locks[node]);
+  unlock(&rep->m_send_buffers[node].m_send_lock);
 }
 
 int
@@ -1403,228 +1366,341 @@ trp_callback::checkJobBuffer()
   return 0;
 }
 
-int
-trp_callback::get_bytes_to_send_iovec(NodeId node, struct iovec *dst,
-                                      Uint32 max_iovecs)
+/**
+ * Link all send-buffer-pages into *one*
+ *   single linked list of buffers
+ *
+ * TODO: This is not completly fair,
+ *       it would be better to get one entry from each thr_send_queue
+ *       per thread instead (until empty)
+ */
+static
+Uint32
+link_thread_send_buffers(thr_repository::send_buffer * sb, Uint32 node)
 {
-  Uint32 iovecs = 0;
-
-  if (max_iovecs == 0)
-    return 0;
-
-  for (Uint32 thr = 0; thr < num_threads && iovecs < max_iovecs; thr++)
+  Uint32 ri[MAX_THREADS];
+  Uint32 wi[MAX_THREADS];
+  thr_send_queue * src = g_thr_repository.m_thread_send_buffers[node];
+  for (unsigned thr = 0; thr < num_threads; thr++)
   {
-    thr_send_buf::send_buffer *b = m_thr_buffers[thr]->m_buffers + node;
-    thr_send_buf::page *pg = b->m_current_page;
+    ri[thr] = sb->m_read_index[thr];
+    wi[thr] = src[thr].m_write_index;
+  }
 
-    /* Handle not yet allocated buffer. */
-    if (unlikely(pg == NULL))
-      continue;
-    rmb();
+  Uint32 sentinel[thr_send_page::PGSIZE - thr_send_page::max_bytes()];
+  thr_send_page* sentinel_page = (thr_send_page*)sentinel;
+  sentinel_page->m_next = 0;
 
-    bool current_must_be_zero = false;
-    while (iovecs < max_iovecs)
-    {
-      Uint32 bytes = pg->m_bytes;
-      if (current_must_be_zero)
-        assert(pg->m_current == 0);
+  struct thr_send_buffer tmp;
+  tmp.m_first_page = sentinel_page;
+  tmp.m_last_page = sentinel_page;
 
-      /* Make sure we see all updates before seen m_bytes value. */
+  Uint32 bytes = 0;
+  for (unsigned thr = 0; thr < num_threads; thr++, src++)
+  {
+    Uint32 r = ri[thr];
+    Uint32 w = wi[thr];
+    if (r != w)
+    {
       rmb();
-      if (bytes > pg->m_current)
+      while (r != w)
       {
-        dst[iovecs].iov_base = pg->m_data + pg->m_current;
-        dst[iovecs].iov_len = bytes - pg->m_current;
-        iovecs++;
-        pg->m_current = bytes;
+        thr_send_page * p = src->m_buffers[r];
+        assert(p->m_start == 0);
+        bytes += p->m_bytes;
+        tmp.m_last_page->m_next = p;
+        while (p->m_next != 0)
+        {
+          p = p->m_next;
+          assert(p->m_start == 0);
+          bytes += p->m_bytes;
+        }
+        tmp.m_last_page = p;
+        assert(tmp.m_last_page != 0);
+        r = (r + 1) % thr_send_queue::SIZE;
       }
-      if (bytes < pg->max_data_bytes())
-        break;                                  // More data will arrive later
+      sb->m_read_index[thr] = r;
+    }
+  }
 
-      pg = pg->m_next;
-      if (pg == NULL)
-        break;
-      current_must_be_zero = true;
-      b->m_current_page = pg;
+  if (bytes)
+  {
+    if (sb->m_bytes)
+    {
+      assert(sb->m_buffer.m_first_page != 0);
+      assert(sb->m_buffer.m_last_page != 0);
+      sb->m_buffer.m_last_page->m_next = tmp.m_first_page->m_next;
+      sb->m_buffer.m_last_page = tmp.m_last_page;
+    }
+    else
+    {
+      assert(sb->m_buffer.m_first_page == 0);
+      assert(sb->m_buffer.m_last_page == 0);
+      sb->m_buffer.m_first_page = tmp.m_first_page->m_next;
+      sb->m_buffer.m_last_page = tmp.m_last_page;
     }
+    sb->m_bytes += bytes;
   }
 
-  return iovecs;
+  return sb->m_bytes;
 }
 
 Uint32
-trp_callback::bytes_sent(NodeId node, const struct iovec *src, Uint32 bytes)
+trp_callback::get_bytes_to_send_iovec(NodeId node,
+                                      struct iovec *dst, Uint32 max)
 {
-  Uint32 curr_thr = 0;
+  thr_repository::send_buffer * sb = g_thr_repository.m_send_buffers + node;
 
-  while (bytes > 0)
-  {
-    const struct iovec *iov = src++;
+  Uint32 bytes = link_thread_send_buffers(sb, node);
+  if (max == 0 || bytes == 0)
+    return 0;
 
-    /**
-     * This piece of data could be from any thread, so need to search for
-     * which it is.
-     *
-     * Since data is sent in the same order we returned it from
-     * get_bytes_to_send_iovec(), we are likely to find the right one very
-     * quickly by searching in the same order as used in the loop there.
-     */
-#ifdef VM_TRACE
-    Uint32 start_thr = curr_thr;
-#endif
+  /**
+   * Process linked-list and put into iovecs
+   * TODO: Here we would also pack stuff to get better utilization
+   */
+  Uint32 tot = 0;
+  Uint32 pos = 0;
+  thr_send_page * p = sb->m_buffer.m_first_page;
+  do {
+    dst[pos].iov_len = p->m_bytes;
+    dst[pos].iov_base = p->m_data + p->m_start;
+    assert(p->m_start + p->m_bytes <= p->max_bytes());
+    tot += p->m_bytes;
+    pos++;
+    max--;
+    p = p->m_next;
+  } while (max && p != 0);
 
-    thr_send_buf::send_buffer *b;
-    thr_send_buf::page *pg;
-    for (;;)
-    {
-      b = m_thr_buffers[curr_thr]->m_buffers + node;
-      if (b->m_current_page != NULL)
-      {
-        rmb();
+  return pos;
+}
 
-        pg = b->m_first_page;
+static
+void
+release_list(thread_local_pool<thr_send_page, thr_job_buffer>* pool,
+             thr_send_page* head, thr_send_page * tail)
+{
+  while (head != tail)
+  {
+    thr_send_page * tmp = head;
+    head = head->m_next;
+    pool->release(tmp);
+  }
+  pool->release(tail);
+}
 
-        /* Drop the first page if it is completely empty and not last. */
-        if (pg->m_start == pg->max_data_bytes())
-        {
-          thr_send_buf::page *next_pg = pg->m_next;
-          if (next_pg != NULL)
-          {
-            b->m_first_page = next_pg;
-            if (pg == b->m_current_page)
-              b->m_current_page = next_pg;
-            m_thr_buffers[m_send_thr[node]]->m_pool.release(pg);
-            pg = next_pg;
-          }
-        }
 
-        if (pg->m_data + pg->m_start == iov->iov_base)
-          break;                                  // Found it
-      }
+static
+Uint32
+bytes_sent(thread_local_pool<thr_send_page, thr_job_buffer>* pool,
+           thr_repository::send_buffer* sb, NodeId node, Uint32 bytes)
+{
+  assert(bytes);
 
-      curr_thr = (curr_thr + 1) % num_threads;
-#ifdef VM_TRACE
-      assert(curr_thr != start_thr);            // Sent data was not in buffer
-#endif
-    }
+  Uint32 remain = bytes;
+  thr_send_page * prev = 0;
+  thr_send_page * curr = sb->m_buffer.m_first_page;
 
-    assert(pg->m_start + iov->iov_len <= pg->m_current);
-    Uint32 chunk = iov->iov_len;
-    if (chunk > bytes)
-      chunk = bytes;                            // last chunk sent partially
-    bytes -= chunk;
-    pg->m_start += chunk;
-    b->m_read_bytes += chunk;
-    if (pg->m_start == pg->max_data_bytes() && pg->m_next != NULL)
-    {
-      /* All done with this page, de-allocate. */
-      b->m_first_page = pg->m_next;
-      if (b->m_current_page == pg)
-      {
-        assert(pg->m_current == pg->max_data_bytes());
-        b->m_current_page = pg->m_next;
-      }
-      /* Release to send thread thread pool, to avoid need for locks. */
-      m_thr_buffers[m_send_thr[node]]->m_pool.release(pg);
+  assert(sb->m_bytes >= bytes);
+  while (remain && remain >= curr->m_bytes)
+  {
+    remain -= curr->m_bytes;
+    prev = curr;
+    curr = curr->m_next;
+  }
+
+  Uint32 total_bytes = sb->m_bytes;
+  if (total_bytes == bytes)
+  {
+    /**
+     * Every thing was released
+     */
+    release_list(pool, sb->m_buffer.m_first_page, sb->m_buffer.m_last_page);
+    sb->m_buffer.m_first_page = 0;
+    sb->m_buffer.m_last_page = 0;
+    sb->m_bytes = 0;
+    return 0;
+  }
+  else if (remain)
+  {
+    /**
+     * Half a page was released
+     */
+    curr->m_start += remain;
+    assert(curr->m_bytes > remain);
+    curr->m_bytes -= remain;
+    if (prev)
+    {
+      release_list(pool, sb->m_buffer.m_first_page, prev);
+    }
+  }
+  else
+  {
+    /**
+     * X full page(s) was released
+     */
+    if (prev)
+    {
+      release_list(pool, sb->m_buffer.m_first_page, prev);
+    }
+    else
+    {
+      pool->release(sb->m_buffer.m_first_page);
     }
   }
 
-  return total_bytes(node);
+  sb->m_buffer.m_first_page = curr;
+  assert(sb->m_bytes > bytes);
+  sb->m_bytes -= bytes;
+  return sb->m_bytes;
+}
+
+Uint32
+trp_callback::bytes_sent(NodeId node, Uint32 bytes)
+{
+  thr_repository::send_buffer * sb = g_thr_repository.m_send_buffers+node;
+  Uint32 thr_no = sb->m_send_thread;
+  assert(thr_no != NO_SEND_THREAD);
+  return ::bytes_sent(&g_thr_repository.m_thread[thr_no].m_send_buffer_pool,
+                      sb, node, bytes);
 }
 
 bool
 trp_callback::has_data_to_send(NodeId node)
 {
-  for (Uint32 thr = 0; thr < num_threads; thr++)
-  {
-    thr_send_buf::send_buffer *b = m_thr_buffers[thr]->m_buffers + node;
-    thr_send_buf::page *pg = b->m_current_page;
-
-    /* Handle not yet allocated buffer. */
-    if (unlikely(pg == NULL))
-      continue;
-    rmb();
-
-    Uint32 bytes = pg->m_bytes;
-    if (bytes > pg->m_current)
-      return true;
-    if (bytes == pg->max_data_bytes())
-    {
-      thr_send_buf::page *next_pg = pg->m_next;
-      rmb();
-      if (next_pg != NULL && next_pg->m_bytes > next_pg->m_current)
-        return true;
-    }
-  }
-  return false;
+  return true;
 }
 
-/**
- * Clear the send buffers.
- *
- * Works by consuming all data with get_bytes_to_send_iovec() + bytes_sent()
- * (but without sending anything of course). This makes us thread safe, as
- * long as we take the send lock.
- */
 void
 trp_callback::reset_send_buffer(NodeId node)
 {
-  struct thr_repository* rep = &g_thr_repository;
+  thr_repository::send_buffer * sb = g_thr_repository.m_send_buffers+node;
   struct iovec v[32];
 
-  lock(&rep->m_send_locks[node]);
-  void *value= NdbThread_GetTlsKey(NDB_THREAD_TLS_THREAD);
-  const thr_data *selfptr = reinterpret_cast<const thr_data *>(value);
-  if (selfptr)
-  {
-    m_send_thr[node] = selfptr->m_thr_no;  
-  }
-  else
-  {
-    /**
-     * This was done from a "non-mt" thread, use
-     *   faked entry
-     */
-    m_send_thr[node] = NDB_ARRAY_SIZE(m_thr_buffers) - 1;
-  }
+  thread_local_pool<thr_send_page, thr_job_buffer>
+    pool(&g_thr_repository.m_free_list, Uint32(~0));
+
+  lock(&sb->m_send_lock);
 
   for (;;)
   {
-    int count = get_bytes_to_send_iovec(node, v, sizeof(v)/sizeof(v[0]));
+    Uint32 count = get_bytes_to_send_iovec(node, v, sizeof(v)/sizeof(v[0]));
     if (count == 0)
       break;
     int bytes = 0;
-    for (int i = 0; i < count; i++)
+    for (Uint32 i = 0; i < count; i++)
       bytes += v[i].iov_len;
 
-    bytes_sent(node, v, bytes);
+    ::bytes_sent(&pool, sb, node, bytes);
+  }
+
+  unlock(&sb->m_send_lock);
+
+  pool.releaseAll();
+}
+
+static inline
+void
+register_pending_send(thr_data *selfptr, Uint32 nodeId)
+{
+  /* Mark that this node has pending send data. */
+  if (!selfptr->m_pending_send_mask.get(nodeId))
+  {
+    selfptr->m_pending_send_mask.set(nodeId, 1);
+    Uint32 i = selfptr->m_pending_send_count;
+    selfptr->m_pending_send_nodes[i] = nodeId;
+    selfptr->m_pending_send_count = i + 1;
+  }
+}
+
+/**
+ * publish thread-locally prepared send-buffer
+ */
+static
+void
+flush_send_buffer(thr_data* selfptr, Uint32 node)
+{
+  Uint32 thr_no = selfptr->m_thr_no;
+  thr_send_buffer * src = selfptr->m_send_buffers + node;
+  thr_repository* rep = &g_thr_repository;
+
+  if (src->m_first_page == 0)
+  {
+    return;
   }
+  assert(src->m_last_page != 0);
 
-  if (selfptr == 0)
+  thr_send_queue * dst = rep->m_thread_send_buffers[node]+thr_no;
+  thr_repository::send_buffer* sb = rep->m_send_buffers+node;
+
+  Uint32 wi = dst->m_write_index;
+  Uint32 next = (wi + 1) % thr_send_queue::SIZE;
+  Uint32 ri = sb->m_read_index[thr_no];
+
+  if (unlikely(next == ri))
   {
-    /**
-     * This was done from a "non-mt" thread, use
-     *   release buffer directly
-     */
-    m_thr_buffers[m_send_thr[node]]->m_pool.releaseAll();
+    lock(&sb->m_send_lock);
+    link_thread_send_buffers(sb, node);
+    unlock(&sb->m_send_lock);
   }
-  
-  g_trp_callback.m_send_thr[node] = ~(Uint32)0;
-  unlock(&rep->m_send_locks[node]);
+
+  dst->m_buffers[wi] = src->m_first_page;
+  wmb();
+  dst->m_write_index = next;
+
+  src->m_first_page = 0;
+  src->m_last_page = 0;
 }
 
-static inline
+/**
+ * This is used in case send buffer gets full, to force an emergency send,
+ * hopefully freeing up some buffer space for the next signal.
+ */
+bool
+mt_send_handle::forceSend(NodeId nodeId)
+{
+  struct thr_repository *rep = &g_thr_repository;
+  struct thr_data *selfptr = m_selfptr;
+  struct thr_repository::send_buffer * sb = rep->m_send_buffers + nodeId;
+
+  do
+  {
+    sb->m_force_send = 0;
+    lock(&sb->m_send_lock);
+    sb->m_send_thread = selfptr->m_thr_no;
+    globalTransporterRegistry.performSend(nodeId);
+    sb->m_send_thread = NO_SEND_THREAD;
+    unlock(&sb->m_send_lock);
+  } while (sb->m_force_send);
+
+  return true;
+}
+
+/**
+ * try sending data
+ */
+static
 void
-register_pending_send(thr_data *selfptr, Uint32 nodeId)
+try_send(thr_data * selfptr, Uint32 node)
 {
-  /* Mark that this node has pending send data. */
-  if (!selfptr->m_pending_send_mask.get(nodeId))
+  struct thr_repository *rep = &g_thr_repository;
+  struct thr_repository::send_buffer * sb = rep->m_send_buffers + node;
+
+  do
   {
-    selfptr->m_pending_send_mask.set(nodeId, 1);
-    Uint32 i = selfptr->m_pending_send_count;
-    selfptr->m_pending_send_nodes[i] = nodeId;
-    selfptr->m_pending_send_count = i + 1;
-  }
+    if (trylock(&sb->m_send_lock) != 0)
+    {
+      return;
+    }
+
+    sb->m_force_send = 0;
+    mb();
+
+    sb->m_send_thread = selfptr->m_thr_no;
+    globalTransporterRegistry.performSend(node);
+    sb->m_send_thread = NO_SEND_THREAD;
+    unlock(&sb->m_send_lock);
+  } while (sb->m_force_send);
 }
 
 /**
@@ -1642,25 +1718,33 @@ register_pending_send(thr_data *selfptr,
  * other thread (but we will never loose signals due to this).
  */
 static
-void
-do_send(struct thr_repository* rep, struct thr_data* selfptr,
+Uint32
+do_send(struct thr_data* selfptr,
         Uint32 *watchDogCounter, bool must_send)
 {
   Uint32 i;
   Uint32 count = selfptr->m_pending_send_count;
   Uint8 *nodes = selfptr->m_pending_send_nodes;
+  struct thr_repository* rep = &g_thr_repository;
 
   if (count == 0)
-    return;
+  {
+    return 0; // send-buffers empty
+  }
+
   /* Clear the pending list. */
   selfptr->m_pending_send_mask.clear();
   selfptr->m_pending_send_count = 0;
 
   for (i = 0; i < count; i++)
   {
-    NodeId nodeId = nodes[i];
+    Uint32 node = nodes[i];
     *watchDogCounter = 6;
 
+    flush_send_buffer(selfptr, node);
+
+    thr_repository::send_buffer * sb = rep->m_send_buffers + node;
+
     /**
      * If we must send now, set the force_send flag.
      *
@@ -1672,10 +1756,13 @@ do_send(struct thr_repository* rep, stru
      * flag update is flushed to the other thread.
      */
     if (must_send)
-      rep->m_force_send[nodeId] = 1;
+    {
+      sb->m_force_send = 1;
+    }
+
     do
     {
-      if (trylock(&rep->m_send_locks[nodeId]) != 0)
+      if (trylock(&sb->m_send_lock) != 0)
       {
         if (!must_send)
         {
@@ -1685,7 +1772,7 @@ do_send(struct thr_repository* rep, stru
            * As we only add from the start of an empty list, we are safe from
            * overwriting the list while we are iterating over it.
            */
-          register_pending_send(selfptr, nodeId);
+          register_pending_send(selfptr, node);
         }
         else
         {
@@ -1704,58 +1791,61 @@ do_send(struct thr_repository* rep, stru
        * will (thanks to mb()) be able to see and send all of the data already
        * in the first send iteration.
        */
-      rep->m_force_send[nodeId] = 0;
+      sb->m_force_send = 0;
       mb();
 
       /**
        * Set m_send_thr so that our transporter callback can know which thread
        * holds the send lock for this remote node.
        */
-      g_trp_callback.m_send_thr[nodeId] = selfptr->m_thr_no;
-      globalTransporterRegistry.performSend(nodeId);
-      g_trp_callback.m_send_thr[nodeId] = ~(Uint32)0;
-      unlock(&rep->m_send_locks[nodeId]);
-    } while (rep->m_force_send[nodeId]);
+      sb->m_send_thread = selfptr->m_thr_no;
+      int res = globalTransporterRegistry.performSend(node);
+      sb->m_send_thread = NO_SEND_THREAD;
+      unlock(&sb->m_send_lock);
+      if (res)
+      {
+        register_pending_send(selfptr, node);
+      }
+    } while (sb->m_force_send);
   }
+
+  return selfptr->m_pending_send_count;
 }
 
-/**
- * This is used in case send buffer gets full, to force an emergency send,
- * hopefully freeing up some buffer space for the next signal.
- */
-bool
-thr_send_buf::forceSend(NodeId nodeId)
+Uint32 *
+mt_send_handle::getWritePtr(NodeId node, Uint32 len, Uint32 prio, Uint32 max)
 {
-  struct thr_repository *rep = &g_thr_repository;
-  struct thr_data *selfptr = rep->m_thread + m_self;
-
-  do
+  struct thr_send_buffer * b = m_selfptr->m_send_buffers+node;
+  thr_send_page * p = b->m_last_page;
+  if ((p != 0) && (p->m_bytes + p->m_start + len <=
thr_send_page::max_bytes()))
   {
-    rep->m_force_send[nodeId] = 0;
-    lock(&rep->m_send_locks[nodeId]);
-    g_trp_callback.m_send_thr[nodeId] = selfptr->m_thr_no;
-    globalTransporterRegistry.performSend(nodeId);
-    g_trp_callback.m_send_thr[nodeId] = ~(Uint32)0;
-    unlock(&rep->m_send_locks[nodeId]);
-  } while (rep->m_force_send[nodeId]);
+    return (Uint32*)(p->m_data + p->m_start + p->m_bytes);
+  }
+  else if (p != 0)
+  {
+    // TODO: maybe dont always flush on page-boundary ???
+    flush_send_buffer(m_selfptr, node);
+    try_send(m_selfptr, node);
+  }
 
-  return true;
+  if ((p = m_selfptr->m_send_buffer_pool.seize()) != 0)
+  {
+    p->m_bytes = 0;
+    p->m_start = 0;
+    p->m_next = 0;
+    b->m_first_page = b->m_last_page = p;
+    return (Uint32*)p->m_data;
+  }
+  return 0;
 }
 
-static void
-sendpacked(struct thr_data* thr_ptr, Signal* signal)
+Uint32
+mt_send_handle::updateWritePtr(NodeId node, Uint32 lenBytes, Uint32 prio)
 {
-  Uint32 i;
-  for (i = 0; i < thr_ptr->m_instance_count; i++) {
-    BlockReference block = thr_ptr->m_instance_list[i];
-    Uint32 main = blockToMain(block);
-    Uint32 instance = blockToInstance(block);
-    SimulatedBlock* b = globalData.getBlock(main, instance);
-    // wl4391_todo remove useless assert
-    assert(b != 0 && b->getThreadId() == thr_ptr->m_thr_no);
-    /* b->send_at_job_buffer_end(); */
-    b->executeFunction(GSN_SEND_PACKED, signal);
-  }
+  struct thr_send_buffer * b = m_selfptr->m_send_buffers+node;
+  thr_send_page * p = b->m_last_page;
+  p->m_bytes += lenBytes;
+  return p->m_bytes;
 }
 
 /*
@@ -2314,7 +2404,7 @@ mt_receiver_thread_main(void *thr_arg)
       flush_jbb_write_state(selfptr);
     }
 
-    do_send(rep, selfptr, &watchDogCounter, TRUE);
+    do_send(selfptr, &watchDogCounter, TRUE);
 
     watchDogCounter = 7;
 
@@ -2336,6 +2426,23 @@ mt_receiver_thread_main(void *thr_arg)
   return NULL;                  // Return value not currently used
 }
 
+static
+void
+sendpacked(struct thr_data* thr_ptr, Signal* signal)
+{
+  Uint32 i;
+  for (i = 0; i < thr_ptr->m_instance_count; i++) {
+    BlockReference block = thr_ptr->m_instance_list[i];
+    Uint32 main = blockToMain(block);
+    Uint32 instance = blockToInstance(block);
+    SimulatedBlock* b = globalData.getBlock(main, instance);
+    // wl4391_todo remove useless assert
+    assert(b != 0 && b->getThreadId() == thr_ptr->m_thr_no);
+    /* b->send_at_job_buffer_end(); */
+    b->executeFunction(GSN_SEND_PACKED, signal);
+  }
+}
+
 extern "C"
 void *
 mt_job_thread_main(void *thr_arg)
@@ -2347,7 +2454,6 @@ mt_job_thread_main(void *thr_arg)
   nowait.tv_nsec = 10 * 1000000;
   Uint32 thrSignalId = 0;
 
-  struct thr_repository* rep = &g_thr_repository;
   struct thr_data* selfptr = (struct thr_data *)thr_arg;
   init_thread(selfptr);
   Uint32& watchDogCounter = selfptr->m_watchdog_counter;
@@ -2386,14 +2492,12 @@ mt_job_thread_main(void *thr_arg)
       if (sum == 0)
       {
         /* About to sleep, _must_ send now. */
-        do_send(rep, selfptr, &watchDogCounter, TRUE);
-        send_sum = 0;
+        sum = send_sum = do_send(selfptr, &watchDogCounter, TRUE);
       }
       else if (send_sum > MAX_SIGNALS_BEFORE_SEND)
       {
         /* Try to send, but skip for now in case of lock contention. */
-        do_send(rep, selfptr, &watchDogCounter, FALSE);
-        send_sum = 0;
+        send_sum = do_send(selfptr, &watchDogCounter, FALSE);
       }
     }
     
@@ -2502,9 +2606,10 @@ mt_send_remote(Uint32 self, const Signal
   thr_data *selfptr = rep->m_thread + self;
   SendStatus ss;
 
+  mt_send_handle handle(selfptr);
   register_pending_send(selfptr, nodeId);
   /* prepareSend() is lock-free, as we have per-thread send buffers. */
-  ss = globalTransporterRegistry.prepareSend(g_trp_callback.m_thr_buffers[self],
+  ss = globalTransporterRegistry.prepareSend(&handle,
                                              sh, prio, data, nodeId, ptr);
   return ss;
 }
@@ -2519,8 +2624,9 @@ mt_send_remote(Uint32 self, const Signal
   thr_data *selfptr = rep->m_thread + self;
   SendStatus ss;
 
+  mt_send_handle handle(selfptr);
   register_pending_send(selfptr, nodeId);
-  ss = globalTransporterRegistry.prepareSend(g_trp_callback.m_thr_buffers[self],
+  ss = globalTransporterRegistry.prepareSend(&handle,
                                              sh, prio, data, nodeId,
                                              *thePool, ptr);
   return ss;
@@ -2597,128 +2703,9 @@ sendprioa_STOP_FOR_CRASH(Uint32 dst)
   unlock(&dstptr->m_jba_write_lock);
 }
 
-static
-inline
-Uint32*
-get_free_slot(struct thr_repository* rep, 
-	      struct thr_data* selfptr, 
-	      Uint32* idxptr)
-{
-  struct thr_tq * tq = &selfptr->m_tq;
-  Uint32 idx = tq->m_next_free;
-retry:
-  Uint32 buf = idx >> 8;
-  Uint32 pos = idx & 0xFF;
-
-  if (idx != RNIL)
-  {
-    Uint32* page = * (tq->m_delayed_signals + buf);
-    Uint32* ptr = page + (32 * pos);
-    tq->m_next_free = * ptr;
-    * idxptr = idx;
-    return ptr;
-  }
-
-  Uint32 thr_no = selfptr->m_thr_no;
-  for (Uint32 i = 0; i<thr_tq::PAGES; i++)
-  {
-    if (tq->m_delayed_signals[i] == 0)
-    {
-      struct thr_job_buffer *jb = seize_buffer(rep, thr_no, false);
-      Uint32 * page = reinterpret_cast<Uint32*>(jb);
-      tq->m_delayed_signals[i] = page;
-   
-      ndbout_c("saving %p at %p (%d)", page, tq->m_delayed_signals+i, i);
-   
-      /**
-       * Init page
-       */
-      for (Uint32 j = 0; j<255; j ++)
-      {
-	page[j * 32] = (i << 8) + (j + 1);
-      }
-      page[255*32] = RNIL;
-      idx = (i << 8);
-      goto retry;
-    }
-  }
-  abort();
-}
-
-void
-senddelay(Uint32 thr_no, const SignalHeader* s, Uint32 delay)
-{
-  struct thr_repository* rep = &g_thr_repository;
-  struct thr_data * selfptr = rep->m_thread + thr_no;
-  unsigned siglen = (sizeof(*s) >> 2) + s->theLength + s->m_noOfSections;
-
-  Uint32 max;
-  Uint32 * cntptr;
-  Uint32 * queueptr;
-
-  Uint32 alarm = selfptr->m_tq.m_current_time + delay;
-  Uint32 nexttimer = selfptr->m_tq.m_next_timer;
-  if (delay < 100)
-  {
-    cntptr = selfptr->m_tq.m_cnt + 0;
-    queueptr = selfptr->m_tq.m_short_queue;
-    max = thr_tq::SQ_SIZE;
-  }
-  else
-  {
-    cntptr = selfptr->m_tq.m_cnt + 1;
-    queueptr = selfptr->m_tq.m_long_queue;
-    max = thr_tq::LQ_SIZE;
-  }
-
-  Uint32 idx;
-  Uint32* ptr = get_free_slot(rep, selfptr, &idx);
-  memcpy(ptr, s, 4*siglen);
-
-  if (0)
-    ndbout_c("now: %d alarm: %d send %s from %s to %s delay: %d idx: %x %p", 
-	     selfptr->m_tq.m_current_time,
-	     alarm,
-	     getSignalName(s->theVerId_signalNumber),
-	     getBlockName(refToBlock(s->theSendersBlockRef)),
-	     getBlockName(s->theReceiversBlockNumber),
-	     delay,
-	     idx, ptr);
-  
-  Uint32 i;
-  Uint32 cnt = *cntptr;
-  Uint32 newentry = (idx << 16) | (alarm & 0xFFFF);
-
-  * cntptr = cnt + 1;
-  selfptr->m_tq.m_next_timer = alarm < nexttimer ? alarm : nexttimer;
-  
-  if (cnt == 0)
-  {
-    queueptr[0] = newentry;
-    return;
-  }
-  else if (cnt < max)
-  {
-    for (i = 0; i<cnt; i++)
-    {
-      Uint32 save = queueptr[i];
-      if ((save & 0xFFFF) > alarm)
-      {
-	memmove(queueptr+i+1, queueptr+i, 4*(cnt - i));
-	queueptr[i] = newentry;
-	return;
-      }
-    }
-    assert(i == cnt);
-    queueptr[i] = newentry;
-    return;
-  }
-  else
-  {
-    abort();
-  }
-}
-
+/**
+ * init functions
+ */
 static
 void
 queue_init(struct thr_tq* tq)
@@ -2751,6 +2738,7 @@ thr_init(struct thr_repository* rep, str
   selfptr->m_jba_read_state.m_write_index = 0;
   selfptr->m_jba_read_state.m_write_pos = 0;
   selfptr->m_next_buffer = seize_buffer(rep, thr_no, false);
+  selfptr->m_send_buffer_pool.set_pool(&rep->m_free_list);
   
   for (i = 0; i<cnt; i++)
   {
@@ -2778,6 +2766,8 @@ thr_init(struct thr_repository* rep, str
   selfptr->m_instance_count = 0;
   for (i = 0; i < MAX_INSTANCES_PER_THREAD; i++)
     selfptr->m_instance_list[i] = 0;
+
+  bzero(&selfptr->m_send_buffers, sizeof(selfptr->m_send_buffers));
 }
 
 /* Have to do this after init of all m_in_queues is done. */
@@ -2798,6 +2788,20 @@ thr_init2(struct thr_repository* rep, st
 
 static
 void
+send_buffer_init(Uint32 node, thr_repository::send_buffer * sb)
+{
+  char buf[100];
+  snprintf(buf, sizeof(buf), "send lock node %d", node);
+  register_lock(&sb->m_send_lock, buf);
+  sb->m_force_send = 0;
+  sb->m_send_thread = NO_SEND_THREAD;
+  bzero(&sb->m_buffer, sizeof(sb->m_buffer));
+  sb->m_bytes = 0;
+  bzero(sb->m_read_index, sizeof(sb->m_read_index));
+}
+
+static
+void
 rep_init(struct thr_repository* rep, unsigned int cnt, Ndbd_mem_manager *mm)
 {
   rep->m_free_list.m_mm = mm;
@@ -2818,11 +2822,10 @@ rep_init(struct thr_repository* rep, uns
 
   for (int i = 0 ; i < MAX_NTRANSPORTERS; i++)
   {
-    char buf[100];
-    snprintf(buf, sizeof(buf), "send lock node %d", i);
-    rep->m_send_locks[i].m_name = strdup(buf);
-    rep->m_force_send[i] = 0;
+    send_buffer_init(i, rep->m_send_buffers+i);
   }
+
+  bzero(rep->m_thread_send_buffers, sizeof(rep->m_thread_send_buffers));
 }
 
 
@@ -3291,6 +3294,53 @@ mt_mem_manager_unlock()
   unlock(&(g_thr_repository.m_mem_manager_lock));
 }
 
+Vector<mt_lock_stat> g_locks;
+template class Vector<mt_lock_stat>;
+
+static
+void
+register_lock(const void * ptr, const char * name)
+{
+  if (name == 0)
+    return;
+
+  mt_lock_stat* arr = g_locks.getBase();
+  for (size_t i = 0; i<g_locks.size(); i++)
+  {
+    if (arr[i].m_ptr == ptr)
+    {
+      if (arr[i].m_name)
+      {
+        free(arr[i].m_name);
+      }
+      arr[i].m_name = strdup(name);
+      return;
+    }
+  }
+
+  mt_lock_stat ln;
+  ln.m_ptr = ptr;
+  ln.m_name = strdup(name);
+  ln.m_contended_count = 0;
+  ln.m_spin_count = 0;
+  g_locks.push_back(ln);
+}
+
+static
+mt_lock_stat *
+lookup_lock(const void * ptr)
+{
+  mt_lock_stat* arr = g_locks.getBase();
+  for (size_t i = 0; i<g_locks.size(); i++)
+  {
+    if (arr[i].m_ptr == ptr)
+      return arr + i;
+  }
+
+  return 0;
+}
+
+
 /**
  * Global data
  */

=== modified file 'storage/ndb/src/ndbapi/TransporterFacade.hpp'
--- a/storage/ndb/src/ndbapi/TransporterFacade.hpp	2008-10-21 12:41:59 +0000
+++ b/storage/ndb/src/ndbapi/TransporterFacade.hpp	2008-10-27 17:56:57 +0000
@@ -168,13 +168,13 @@ public:
   void reportError(NodeId nodeId, TransporterError errorCode,
                    const char *info = 0);
   void transporter_recv_from(NodeId node);
-  int get_bytes_to_send_iovec(NodeId node, struct iovec *dst, Uint32 max)
+  Uint32 get_bytes_to_send_iovec(NodeId node, struct iovec *dst, Uint32 max)
   {
     return theTransporterRegistry->get_bytes_to_send_iovec(node, dst, max);
   }
-  Uint32 bytes_sent(NodeId node, const struct iovec *src, Uint32 bytes)
+  Uint32 bytes_sent(NodeId node, Uint32 bytes)
   {
-    return theTransporterRegistry->bytes_sent(node, src, bytes);
+    return theTransporterRegistry->bytes_sent(node, bytes);
   }
   bool has_data_to_send(NodeId node)
   {

Thread
bzr push into mysql-5.1 branch (jonas:3019 to 3020) Jonas Oreland27 Oct