List:Commits« Previous MessageNext Message »
From:Mikael Ronstrom Date:January 25 2012 3:59pm
Subject:bzr push into mysql-5.5-cluster-7.2 branch (mikael.ronstrom:3722 to 3734)
View as plain text  
 3734 Mikael Ronstrom	2012-01-25
      Add pack send buffers before starting a new loop to minimize risk of send buffer overload

    modified:
      storage/ndb/src/kernel/vm/mt.cpp
 3733 Mikael Ronstrom	2012-01-25
      Fix overload handling of scans

    modified:
      storage/ndb/src/kernel/blocks/dblqh/Dblqh.hpp
      storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp
 3732 Mikael Ronstrom	2012-01-25
      Regulate scan already on slowdown limit, add some stuff for debugging

    modified:
      storage/ndb/include/transporter/TransporterRegistry.hpp
      storage/ndb/src/common/transporter/TCP_Transporter.cpp
      storage/ndb/src/common/transporter/Transporter.cpp
      storage/ndb/src/common/transporter/Transporter.hpp
      storage/ndb/src/common/transporter/TransporterRegistry.cpp
      storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp
 3731 Mikael Ronstrom	2012-01-25
      compilation fix

    modified:
      storage/ndb/src/kernel/vm/mt.cpp
 3730 Mikael Ronstrom	2012-01-25
      Don't send so immediately, only when absolutely necessary from flush_send_buffer

    modified:
      storage/ndb/src/kernel/vm/mt.cpp
 3729 Mikael Ronstrom	2012-01-25
      Need to send when grabbing send node lock

    modified:
      storage/ndb/src/kernel/vm/mt.cpp
 3728 Mikael Ronstrom	2012-01-24
      More to fix send buffer overload issue

    modified:
      storage/ndb/src/kernel/vm/mt.cpp
 3727 Mikael Ronstrom	2012-01-24
      Improved pack page patch

    modified:
      storage/ndb/src/kernel/vm/mt.cpp
 3726 Mikael Ronstrom	2012-01-24
      Release memory and add a debug printout to see how much it is used

    modified:
      storage/ndb/src/kernel/vm/mt.cpp
 3725 Mikael Ronstrom	2012-01-24
      pack sb_pages

    modified:
      storage/ndb/src/kernel/vm/mt.cpp
 3724 Mikael Ronstrom	2012-01-24 [merge]
      merge

    added:
      storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/DbImplForNdbRecord.java
    renamed:
      mysql-test/suite/ndb_rpl/r/ndb_rpl_conflict.result => mysql-test/suite/ndb_rpl/r/ndb_rpl_conflict_basic.result
      mysql-test/suite/ndb_rpl/t/ndb_rpl_conflict.test => mysql-test/suite/ndb_rpl/t/ndb_rpl_conflict_basic.test
    modified:
      storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/ClusterConnectionImpl.java
      storage/ndb/src/common/transporter/TransporterRegistry.cpp
      storage/ndb/src/kernel/blocks/trpman.cpp
      storage/ndb/src/kernel/vm/CMakeLists.txt
      storage/ndb/src/kernel/vm/TransporterCallback.cpp
      storage/ndb/src/kernel/vm/TransporterCallbackKernel.hpp
      storage/ndb/src/kernel/vm/mt.cpp
      storage/ndb/src/kernel/vm/mt.hpp
 3723 Mikael Ronstrom	2012-01-24
      Fix

    modified:
      storage/ndb/src/kernel/vm/mt.cpp
 3722 Mikael Ronstrom	2012-01-23 [merge]
      merge

    added:
      storage/ndb/clusterj/clusterj-test/src/main/java/testsuite/clusterj/StressTest.java
      storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/NdbRecordBlobImpl.java
      storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/NdbRecordImpl.java
      storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/NdbRecordOperationImpl.java
      storage/ndb/clusterj/clusterj-tie/src/test/java/testsuite/clusterj/tie/StressTest.java
      storage/ndb/src/kernel/vm/CountingPool.cpp
      storage/ndb/src/kernel/vm/CountingPool.hpp
    modified:
      mysql-test/suite/ndb/t/clusterj.test
      storage/ndb/clusterj/clusterj-api/pom.xml
      storage/ndb/clusterj/clusterj-bindings/pom.xml
      storage/ndb/clusterj/clusterj-core/pom.xml
      storage/ndb/clusterj/clusterj-core/src/main/java/com/mysql/clusterj/core/SessionImpl.java
      storage/ndb/clusterj/clusterj-core/src/main/java/com/mysql/clusterj/core/metadata/DomainFieldHandlerImpl.java
      storage/ndb/clusterj/clusterj-core/src/main/java/com/mysql/clusterj/core/store/Operation.java
      storage/ndb/clusterj/clusterj-jdbc/pom.xml
      storage/ndb/clusterj/clusterj-jpatest/pom.xml
      storage/ndb/clusterj/clusterj-openjpa/pom.xml
      storage/ndb/clusterj/clusterj-test/pom.xml
      storage/ndb/clusterj/clusterj-test/src/main/java/testsuite/clusterj/DynamicObjectTest.java
      storage/ndb/clusterj/clusterj-test/src/main/resources/schema.sql
      storage/ndb/clusterj/clusterj-tie/pom.xml
      storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/BlobImpl.java
      storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/ClusterConnectionImpl.java
      storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/ClusterConnectionServiceImpl.java
      storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/ClusterTransactionImpl.java
      storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/ColumnImpl.java
      storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/DbImpl.java
      storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/DictionaryImpl.java
      storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/OperationImpl.java
      storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/Utility.java
      storage/ndb/clusterj/clusterj-tie/src/main/resources/com/mysql/clusterj/tie/Bundle.properties
      storage/ndb/clusterj/pom.xml
      storage/ndb/include/ndbapi/Ndb.hpp
      storage/ndb/include/ndbapi/NdbDictionary.hpp
      storage/ndb/include/ndbapi/NdbOperation.hpp
      storage/ndb/include/ndbapi/NdbScanOperation.hpp
      storage/ndb/src/common/transporter/TransporterRegistry.cpp
      storage/ndb/src/kernel/blocks/dbdict/Dbdict.cpp
      storage/ndb/src/kernel/blocks/dbdict/Dbdict.hpp
      storage/ndb/src/kernel/blocks/dbtup/tuppage.hpp
      storage/ndb/src/kernel/vm/CMakeLists.txt
      storage/ndb/src/kernel/vm/Pool.cpp
      storage/ndb/src/kernel/vm/Pool.hpp
      storage/ndb/src/ndbjtie/NdbApiWrapper.hpp
      storage/ndb/src/ndbjtie/com/mysql/ndbjtie/ndbapi/Ndb.java
      storage/ndb/src/ndbjtie/com/mysql/ndbjtie/ndbapi/NdbDictionary.java
      storage/ndb/src/ndbjtie/com/mysql/ndbjtie/ndbapi/NdbOperation.java
      storage/ndb/src/ndbjtie/com/mysql/ndbjtie/ndbapi/NdbOperationConst.java
      storage/ndb/src/ndbjtie/com/mysql/ndbjtie/ndbapi/NdbScanOperation.java
      storage/ndb/src/ndbjtie/com/mysql/ndbjtie/ndbapi/issues.txt
      storage/ndb/src/ndbjtie/ndbapi_jtie.hpp
      storage/ndb/test/ndbapi/testSystemRestart.cpp
=== renamed file 'mysql-test/suite/ndb_rpl/r/ndb_rpl_conflict.result' => 'mysql-test/suite/ndb_rpl/r/ndb_rpl_conflict_basic.result'
=== renamed file 'mysql-test/suite/ndb_rpl/t/ndb_rpl_conflict.test' => 'mysql-test/suite/ndb_rpl/t/ndb_rpl_conflict_basic.test'
=== modified file 'storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/ClusterConnectionImpl.java'
--- a/storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/ClusterConnectionImpl.java	revid:mikael.ronstrom@stripped
+++ b/storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/ClusterConnectionImpl.java	revid:mikael.ronstrom@stripped
@@ -60,7 +60,7 @@ public class ClusterConnectionImpl
     final int nodeId;
 
     /** All dbs given out by this cluster connection */
-    private Map<DbImpl, Object> dbs = new IdentityHashMap<DbImpl, Object>();
+    private Map<Db, Object> dbs = new IdentityHashMap<Db, Object>();
 
     /** The map of table name to NdbRecordImpl */
     private ConcurrentMap<String, NdbRecordImpl> ndbRecordImplMap = new ConcurrentHashMap<String, NdbRecordImpl>();
@@ -98,7 +98,7 @@ public class ClusterConnectionImpl
                 // create a dictionary for NdbRecord
                 Ndb ndbForNdbRecord = Ndb.create(clusterConnection, database, "def");
                 handleError(ndbForNdbRecord, clusterConnection, connectString, nodeId);
-                DbImpl dbForNdbRecord = new DbImpl(this, ndbForNdbRecord, maxTransactions);
+                DbImplForNdbRecord dbForNdbRecord = new DbImplForNdbRecord(this, ndbForNdbRecord);
                 dbs.put(dbForNdbRecord, null);
                 dictionaryForNdbRecord = dbForNdbRecord.getNdbDictionary();
             }
@@ -167,7 +167,7 @@ public class ClusterConnectionImpl
             }
             ndbRecordImplMap.clear();
             if (dbs.size() != 0) {
-                Map<DbImpl, Object> dbsToClose = new IdentityHashMap<DbImpl, Object>(dbs);
+                Map<Db, Object> dbsToClose = new IdentityHashMap<Db, Object>(dbs);
                 for (Db db: dbsToClose.keySet()) {
                     db.close();
                 }

=== added file 'storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/DbImplForNdbRecord.java'
--- a/storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/DbImplForNdbRecord.java	1970-01-01 00:00:00 +0000
+++ b/storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/DbImplForNdbRecord.java	revid:mikael.ronstrom@stripped
@@ -0,0 +1,133 @@
+/*
+ *  Copyright (c) 2012, Oracle and/or its affiliates. All rights reserved.
+ *
+ *  This program is free software; you can redistribute it and/or modify
+ *  it under the terms of the GNU General Public License as published by
+ *  the Free Software Foundation; version 2 of the License.
+ *
+ *  This program is distributed in the hope that it will be useful,
+ *  but WITHOUT ANY WARRANTY; without even the implied warranty of
+ *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ *  GNU General Public License for more details.
+ *
+ *  You should have received a copy of the GNU General Public License
+ *  along with this program; if not, write to the Free Software
+ *  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA
+ */
+
+package com.mysql.clusterj.tie;
+
+import java.nio.ByteBuffer;
+
+import java.util.List;
+
+import com.mysql.clusterj.ClusterJDatastoreException;
+import com.mysql.clusterj.ClusterJFatalInternalException;
+
+import com.mysql.clusterj.core.store.ClusterTransaction;
+
+import com.mysql.clusterj.core.util.I18NHelper;
+import com.mysql.clusterj.core.util.Logger;
+import com.mysql.clusterj.core.util.LoggerFactoryService;
+
+import com.mysql.ndbjtie.ndbapi.Ndb;
+import com.mysql.ndbjtie.ndbapi.NdbErrorConst;
+import com.mysql.ndbjtie.ndbapi.NdbTransaction;
+import com.mysql.ndbjtie.ndbapi.NdbDictionary.Dictionary;
+
+/**
+ * This class is used to hold the ndb Dictionary used for NdbRecord. It has the minimum
+ * implementation needed to implement the life cycle of the Ndb. In particular, it omits the
+ * buffer manager and partition key scratch buffer used in the standard DbImpl.
+ */
+class DbImplForNdbRecord implements com.mysql.clusterj.core.store.Db {
+
+    /** My message translator */
+    static final I18NHelper local = I18NHelper.getInstance(DbImplForNdbRecord.class);
+
+    /** My logger */
+    static final Logger logger = LoggerFactoryService.getFactory()
+            .getInstance(DbImplForNdbRecord.class);
+
+    /** The Ndb instance that this instance is wrapping */
+    private Ndb ndb;
+
+    /** A "big enough" size for error information */
+    private int errorBufferSize = 300;
+
+    /** The ndb error detail buffer */
+    private ByteBuffer errorBuffer = ByteBuffer.allocateDirect(errorBufferSize);
+
+    /** The NdbDictionary for this Ndb */
+    private Dictionary ndbDictionary;
+
+    /** The ClusterConnection */
+    private ClusterConnectionImpl clusterConnection;
+
+    public DbImplForNdbRecord(ClusterConnectionImpl clusterConnection, Ndb ndb) {
+        this.clusterConnection = clusterConnection;
+        this.ndb = ndb;
+        int returnCode = ndb.init(1);
+        handleError(returnCode, ndb);
+        ndbDictionary = ndb.getDictionary();
+        handleError(ndbDictionary, ndb);
+    }
+
+    public void close() {
+        Ndb.delete(ndb);
+        clusterConnection.close(this);
+    }
+
+    public com.mysql.clusterj.core.store.Dictionary getDictionary() {
+        throw new ClusterJFatalInternalException(local.message("ERR_Implementation_Should_Not_Occur"));
+    }
+
+    public Dictionary getNdbDictionary() {
+        return ndbDictionary;
+    }
+
+    public ClusterTransaction startTransaction(String joinTransactionId) {
+        throw new ClusterJFatalInternalException(local.message("ERR_Implementation_Should_Not_Occur"));
+    }
+
+    protected void handleError(int returnCode, Ndb ndb) {
+        if (returnCode == 0) {
+            return;
+        } else {
+            NdbErrorConst ndbError = ndb.getNdbError();
+            String detail = getNdbErrorDetail(ndbError);
+            Utility.throwError(returnCode, ndbError, detail);
+        }
+    }
+
+    protected void handleError(Object object, Ndb ndb) {
+        if (object != null) {
+            return;
+        } else {
+            NdbErrorConst ndbError = ndb.getNdbError();
+            String detail = getNdbErrorDetail(ndbError);
+            Utility.throwError(null, ndbError, detail);
+        }
+    }
+
+    public boolean isRetriable(ClusterJDatastoreException ex) {
+        throw new ClusterJFatalInternalException(local.message("ERR_Implementation_Should_Not_Occur"));
+    }
+
+    public String getNdbErrorDetail(NdbErrorConst ndbError) {
+        return ndb.getNdbErrorDetail(ndbError, errorBuffer, errorBuffer.capacity());
+    }
+
+    public NdbTransaction enlist(String tableName, List<KeyPart> keyParts) {
+        throw new ClusterJFatalInternalException(local.message("ERR_Implementation_Should_Not_Occur"));
+    }
+
+    public NdbTransaction enlist(String tableName, int partitionId) {
+        throw new ClusterJFatalInternalException(local.message("ERR_Implementation_Should_Not_Occur"));
+    }
+
+    public NdbTransaction joinTransaction(String coordinatedTransactionId) {
+        throw new ClusterJFatalInternalException(local.message("ERR_Implementation_Should_Not_Occur"));
+    }
+
+}

=== modified file 'storage/ndb/include/transporter/TransporterRegistry.hpp'
--- a/storage/ndb/include/transporter/TransporterRegistry.hpp	revid:mikael.ronstrom@stripped
+++ b/storage/ndb/include/transporter/TransporterRegistry.hpp	revid:mikael.ronstrom@stripped
@@ -292,6 +292,13 @@ public:
   const NodeBitmask& get_status_overloaded() const;
   
   /**
+   * Set or clear slowdown bit.
+   * Query if any slowdown bit is set.
+   */
+  void set_status_slowdown(Uint32 nodeId, bool val);
+  const NodeBitmask& get_status_slowdown() const;
+  
+  /**
    * prepareSend
    *
    * When IOState is HaltOutput or HaltIO do not send or insert any 
@@ -431,8 +438,10 @@ private:
 
   /**
    * Overloaded bits, for fast check.
+   * Similarly slowdown bits for fast check.
    */
   NodeBitmask m_status_overloaded;
+  NodeBitmask m_status_slowdown;
 
   /**
    * Unpack signal data.
@@ -599,4 +608,18 @@ TransporterRegistry::get_status_overload
   return m_status_overloaded;
 }
 
+inline void
+TransporterRegistry::set_status_slowdown(Uint32 nodeId, bool val)
+{
+  assert(nodeId < MAX_NODES);
+  if (val != m_status_slowdown.get(nodeId))
+    m_status_slowdown.set(nodeId, val);
+}
+
+inline const NodeBitmask&
+TransporterRegistry::get_status_slowdown() const
+{
+  return m_status_slowdown;
+}
+
 #endif // Define of TransporterRegistry_H

=== modified file 'storage/ndb/src/common/transporter/TCP_Transporter.cpp'
--- a/storage/ndb/src/common/transporter/TCP_Transporter.cpp	revid:mikael.ronstrom@stripped
+++ b/storage/ndb/src/common/transporter/TCP_Transporter.cpp	revid:mikael.ronstrom@stripped
@@ -112,6 +112,7 @@ TCP_Transporter::TCP_Transporter(Transpo
   setIf(sockOptTcpMaxSeg, conf->tcp.tcpMaxsegSize, 0);
 
   m_overload_limit = overload_limit(conf);
+  m_slowdown_limit = conf->tcp.sendBufferSize/2;
 }
 
 

=== modified file 'storage/ndb/src/common/transporter/Transporter.cpp'
--- a/storage/ndb/src/common/transporter/Transporter.cpp	revid:mikael.ronstrom@stripped
+++ b/storage/ndb/src/common/transporter/Transporter.cpp	revid:mikael.ronstrom@stripped
@@ -43,7 +43,8 @@ Transporter::Transporter(TransporterRegi
   : m_s_port(s_port), remoteNodeId(rNodeId), localNodeId(lNodeId),
     isServer(lNodeId==serverNodeId),
     m_packer(_signalId, _checksum), m_max_send_buffer(max_send_buffer),
-    m_overload_limit(0xFFFFFFFF), isMgmConnection(_isMgmConnection),
+    m_overload_limit(0xFFFFFFFF), m_slowdown_limit(0xFFFFFFFF),
+    isMgmConnection(_isMgmConnection),
     m_connected(false),
     m_type(_type),
     m_transporter_registry(t_reg)

=== modified file 'storage/ndb/src/common/transporter/Transporter.hpp'
--- a/storage/ndb/src/common/transporter/Transporter.hpp	revid:mikael.ronstrom@stripped
+++ b/storage/ndb/src/common/transporter/Transporter.hpp	revid:mikael.ronstrom@stripped
@@ -90,6 +90,8 @@ public:
   {
     m_transporter_registry.set_status_overloaded(remoteNodeId,
                                                  used >= m_overload_limit);
+    m_transporter_registry.set_status_slowdown(remoteNodeId,
+                                               used >= m_slowdown_limit);
   }
 
   virtual int doSend() = 0;
@@ -159,6 +161,7 @@ protected:
   Uint32 m_max_send_buffer;
   /* Overload limit, as configured with the OverloadLimit config parameter. */
   Uint32 m_overload_limit;
+  Uint32 m_slowdown_limit;
 
 private:
 
@@ -226,6 +229,10 @@ Transporter::iovec_data_sent(int nBytesS
 {
   Uint32 used_bytes
     = get_callback_obj()->bytes_sent(remoteNodeId, nBytesSent);
+  if (used_bytes > 256 * 1024)
+  {
+    ndbout_c("%u bytes in send buffer", used_bytes);
+  }
   update_status_overloaded(used_bytes);
 }
 

=== modified file 'storage/ndb/src/common/transporter/TransporterRegistry.cpp'
--- a/storage/ndb/src/common/transporter/TransporterRegistry.cpp	revid:mikael.ronstrom@stripped
+++ b/storage/ndb/src/common/transporter/TransporterRegistry.cpp	revid:mikael.ronstrom@stripped
@@ -763,6 +763,7 @@ TransporterRegistry::prepareSend(Transpo
 	}
 
         set_status_overloaded(nodeId, true);
+        set_status_slowdown(nodeId, true);
         int sleepTime = 2;
 
 	/**
@@ -851,6 +852,7 @@ TransporterRegistry::prepareSend(Transpo
 	 *        so sleepTime = 2 generates a 10 ms sleep.
 	 */
         set_status_overloaded(nodeId, true);
+        set_status_slowdown(nodeId, true);
         int sleepTime = 2;
 	for(int i = 0; i<50; i++){
 	  if((nSHMTransporters+nSCITransporters) == 0)
@@ -934,6 +936,7 @@ TransporterRegistry::prepareSend(Transpo
 	 *        so sleepTime = 2 generates a 10 ms sleep.
 	 */
         set_status_overloaded(nodeId, true);
+        set_status_slowdown(nodeId, true);
         int sleepTime = 2;
 	for(int i = 0; i<50; i++){
 	  if((nSHMTransporters+nSCITransporters) == 0)
@@ -1242,7 +1245,7 @@ TransporterRegistry::poll_TCP(Uint32 tim
   if (extra_socket && recvdata.m_transporters.get(0))
   {
     const NDB_SOCKET_TYPE socket = m_extra_wakeup_sockets[0];
-    assert(receiveHandle);
+    assert(&recvdata == receiveHandle); // not used by ndbmtd...
 
     // Poll the wakup-socket for read
     recvdata.m_socket_poller.add(socket, true, false, false);
@@ -1318,7 +1321,7 @@ TransporterRegistry::performReceive(Tran
   if (recvdata.m_has_data_transporters.get(0))
   {
     assert(recvdata.m_transporters.get(0));
-    assert(receiveHandle);
+    assert(&recvdata == receiveHandle); // not used by ndbmtd
     recvdata.m_has_data_transporters.clear(Uint32(0));
     consume_extra_sockets();
   }
@@ -2293,6 +2296,8 @@ TransporterRegistry::updateWritePtr(Tran
 
   Uint32 used = handle->updateWritePtr(node, lenBytes, prio);
   t->update_status_overloaded(used);
+  if (used > 32768)
+    abort();
 
   if(t->send_limit_reached(used)) {
     //-------------------------------------------------

=== modified file 'storage/ndb/src/kernel/blocks/dblqh/Dblqh.hpp'
--- a/storage/ndb/src/kernel/blocks/dblqh/Dblqh.hpp	revid:mikael.ronstrom@stripped
+++ b/storage/ndb/src/kernel/blocks/dblqh/Dblqh.hpp	revid:mikael.ronstrom@stripped
@@ -574,7 +574,8 @@ public:
     Uint8 m_last_row;
     Uint8 m_reserved;
     Uint8 statScan;
-    Uint8 dummy[3]; // align?
+    Uint8 m_stop_batch;
+    Uint8 dummy[2]; // align?
   }; // Size 272 bytes
   typedef Ptr<ScanRecord> ScanRecordPtr;
 
@@ -3300,7 +3301,8 @@ Dblqh::ScanRecord::check_scan_batch_comp
   Uint32 max_rows = m_max_batch_size_rows;
   Uint32 max_bytes = m_max_batch_size_bytes;
 
-  return (max_rows > 0 && (m_curr_batch_size_rows >= max_rows))  ||
+  return m_stop_batch ||
+    (max_rows > 0 && (m_curr_batch_size_rows >= max_rows))  ||
     (max_bytes > 0 && (m_curr_batch_size_bytes >= max_bytes));
 }
 

=== modified file 'storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp'
--- a/storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp	revid:mikael.ronstrom@stripped
+++ b/storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp	revid:mikael.ronstrom@stripped
@@ -11284,17 +11284,17 @@ void Dblqh::scanTupkeyConfLab(Signal* si
   scanptr.p->m_curr_batch_size_rows = rows + 1;
   scanptr.p->m_last_row = tdata5;
 
-  const NodeBitmask& all = globalTransporterRegistry.get_status_overloaded();
+  const NodeBitmask& all = globalTransporterRegistry.get_status_slowdown();
   if (unlikely(!all.isclear()))
   {
     if (all.get(refToNode(scanptr.p->scanApiBlockref)))
     {
       /**
-       * End scan batch if transporter-buffer are overloaded
+       * End scan batch if transporter-buffer are in slowdown state
        *
        * TODO: We should have counters for this...
        */
-      tdata5 = 1;
+      scanptr.p->m_stop_batch = 1;
     }
   }
 
@@ -11604,6 +11604,7 @@ Uint32 Dblqh::initScanrec(const ScanFrag
   scanptr.p->scanTcrec = tcConnectptr.i;
   scanptr.p->scanSchemaVersion = scanFragReq->schemaVersion;
 
+  scanptr.p->m_stop_batch = 0;
   scanptr.p->m_curr_batch_size_rows = 0;
   scanptr.p->m_curr_batch_size_bytes= 0;
   scanptr.p->m_max_batch_size_rows = max_rows;
@@ -12091,6 +12092,8 @@ void Dblqh::sendScanFragConf(Signal* sig
     scanptr.p->m_curr_batch_size_rows = 0;
     scanptr.p->m_curr_batch_size_bytes= 0;
   }
+
+  scanptr.p->m_stop_batch = 0;
 }//Dblqh::sendScanFragConf()
 
 /* ######################################################################### */

=== modified file 'storage/ndb/src/kernel/blocks/trpman.cpp'
--- a/storage/ndb/src/kernel/blocks/trpman.cpp	revid:mikael.ronstrom@stripped
+++ b/storage/ndb/src/kernel/blocks/trpman.cpp	revid:mikael.ronstrom@stripped
@@ -23,6 +23,8 @@
 #include <signaldata/RouteOrd.hpp>
 #include <signaldata/DumpStateOrd.hpp>
 
+#include <mt.hpp>
+
 Trpman::Trpman(Block_context & ctx, Uint32 instanceno) :
   SimulatedBlock(TRPMAN, ctx, instanceno)
 {
@@ -50,11 +52,7 @@ BLOCK_FUNCTIONS(Trpman)
 #ifdef ERROR_INSERT
 static NodeBitmask c_error_9000_nodes_mask;
 extern Uint32 MAX_RECEIVED_SIGNALS;
-
-class TransporterReceiveHandle *
-mt_get_trp_receive_handle(unsigned instance);
 #endif
-Uint32 mt_get_recv_thread_idx(NodeId nodeId);
 
 bool
 Trpman::handles_this_node(Uint32 nodeId)
@@ -64,7 +62,7 @@ Trpman::handles_this_node(Uint32 nodeId)
 #else
   if (globalData.ndbMtReceiveThreads <= (Uint32)1)
     return true;
-  return (instance()== (mt_get_recv_thread_idx(nodeId) + 1));
+  return (instance()== (mt_get_recv_thread_idx(nodeId) + /* proxy */ 1));
 #endif
 }
 

=== modified file 'storage/ndb/src/kernel/vm/CMakeLists.txt'
--- a/storage/ndb/src/kernel/vm/CMakeLists.txt	revid:mikael.ronstrom@stripped
+++ b/storage/ndb/src/kernel/vm/CMakeLists.txt	revid:mikael.ronstrom@stripped
@@ -72,7 +72,7 @@ SET_TARGET_PROPERTIES(mt_thr_config-t
                       PROPERTIES COMPILE_FLAGS "-DTEST_MT_THR_CONFIG")
 TARGET_LINK_LIBRARIES(mt_thr_config-t ndbgeneral)
 
-FOREACH(testprog IN ITEMS CountingPool DynArr256)
+FOREACH(testprog CountingPool DynArr256)
   ADD_EXECUTABLE(${testprog}-t ${testprog}.cpp)
   SET_TARGET_PROPERTIES(${testprog}-t PROPERTIES COMPILE_FLAGS "-DTAP_TEST")
   TARGET_LINK_LIBRARIES(${testprog}-t ndbtest ndbkernel ndbsched ndberror

=== modified file 'storage/ndb/src/kernel/vm/TransporterCallback.cpp'
--- a/storage/ndb/src/kernel/vm/TransporterCallback.cpp	revid:mikael.ronstrom@stripped
+++ b/storage/ndb/src/kernel/vm/TransporterCallback.cpp	revid:mikael.ronstrom@stripped
@@ -488,10 +488,10 @@ TransporterReceiveHandleKernel::checkJob
 #endif
 }
 
+#ifdef NDBD_MULTITHREADED
 void
 TransporterReceiveHandleKernel::assign_nodes(NodeId *recv_thread_idx_array)
 {
-#ifdef NDBD_MULTITHREADED
   m_transporters.clear(); /* Clear all first */
   for (Uint32 nodeId = 1; nodeId < MAX_NODES; nodeId++)
   {
@@ -499,8 +499,8 @@ TransporterReceiveHandleKernel::assign_n
       m_transporters.set(nodeId); /* Belongs to our receive thread */
   }
   return;
-#endif
 }
+#endif
 
 void
 TransporterReceiveHandleKernel::transporter_recv_from(NodeId nodeId)

=== modified file 'storage/ndb/src/kernel/vm/TransporterCallbackKernel.hpp'
--- a/storage/ndb/src/kernel/vm/TransporterCallbackKernel.hpp	revid:mikael.ronstrom@stripped
+++ b/storage/ndb/src/kernel/vm/TransporterCallbackKernel.hpp	revid:mikael.ronstrom@stripped
@@ -36,6 +36,11 @@ public:
    *   instance() - 1(proxy)
    */
   Uint32 m_receiver_thread_idx;
+
+  /**
+   * Assign nodes to this TransporterReceiveHandle
+   */
+  void assign_nodes(NodeId *recv_thread_idx_array);
 #endif
 
   /* TransporterCallback interface. */
@@ -49,7 +54,6 @@ public:
   void reportError(NodeId nodeId, TransporterError errorCode,
                    const char *info = 0);
   void transporter_recv_from(NodeId node);
-  void assign_nodes(NodeId *recv_thread_idx_array);
   int checkJobBuffer();
   virtual ~TransporterReceiveHandleKernel() { }
 };

=== modified file 'storage/ndb/src/kernel/vm/mt.cpp'
--- a/storage/ndb/src/kernel/vm/mt.cpp	revid:mikael.ronstrom@stripped
+++ b/storage/ndb/src/kernel/vm/mt.cpp	revid:mikael.ronstrom@stripped
@@ -37,16 +37,6 @@
 
 #include "mt-asm.h"
 
-/**
- * Array for mapping nodes to receiver threads and function to access it.
- */
-static NodeId g_node_to_recv_thr_map[MAX_NODES];
-
-Uint32 mt_get_recv_thread_idx(NodeId nodeId)
-{
-  return g_node_to_recv_thr_map[nodeId];
-}
-
 inline
 SimulatedBlock*
 GlobalData::mt_getBlock(BlockNumber blockNo, Uint32 instanceNo)
@@ -555,6 +545,49 @@ public:
     return tmp;
   }
 
+  T* find_last(T* first)
+  {
+    T* next = first;
+    if (!first)
+      return first;
+    while (next->m_next)
+    {
+      next = next->m_next;
+    }
+    return next;
+  }
+
+
+  bool fill(Ndbd_mem_manager *mm, Uint32 rg, Uint32 fill_level)
+  {
+    Uint32 alloced = 0;
+    if (m_free >= fill_level)
+    {
+      return true;
+    }
+    do
+    {
+      T *save_last = find_last(m_freelist);
+      T *new_list = m_global_pool->seize_list(mm, rg,
+                                              fill_level - m_free,
+                                              &alloced);
+      m_free += alloced;
+      if (save_last)
+      {
+        save_last->m_next = new_list;
+      }
+      else
+      {
+        m_freelist = new_list;
+      }
+    } while ((m_free < fill_level) && alloced);
+    if (m_free >= fill_level)
+    {
+      return true;
+    }
+    return false;
+  }
+
   void release(Ndbd_mem_manager *mm, Uint32 rg, T *t) {
     unsigned free = m_free;
     if (free < m_max_free)
@@ -1049,21 +1082,36 @@ struct thr_repository
       m_sb_pool("sendbufferpool")
     {}
 
+  /**
+   * m_receive_lock, m_section_lock, m_mem_manager_lock, m_jb_pool
+   * and m_sb_pool are allvariables globally shared among the threads
+   * and also heavily updated.
+   */
   struct thr_spin_lock<64> m_receive_lock[MAX_NDBMT_RECEIVE_THREADS];
   struct thr_spin_lock<64> m_section_lock;
   struct thr_spin_lock<64> m_mem_manager_lock;
+  /* thr_safe_pool is aligned to be also 64 bytes in size */
   struct thr_safe_pool<thr_job_buffer> m_jb_pool;
   struct thr_safe_pool<thr_send_page> m_sb_pool;
+  /* m_mm and m_thread_count are globally shared and read only variables */
   Ndbd_mem_manager * m_mm;
   unsigned m_thread_count;
-  /* Protect m_mm and m_thread_count from CPU cache misses */
-  char protection_unused[NDB_CL];
+  /**
+   * Protect m_mm and m_thread_count from CPU cache misses, first
+   * part of m_thread (struct thr_data) is globally shared variables.
+   * So sharing cache line with these for these read only variables
+   * isn't a good idea
+   */
+  char protection_unused[NDB_CL - sizeof(void*) - sizeof(unsigned)];
   struct thr_data m_thread[MAX_BLOCK_THREADS];
 
   /**
    * send buffer handling
+   * Put protection cacheline to avoid sharing with last m_thread
+   * instance.
    */
 
+  char protection_unused2[NDB_CL];
   /* The buffers that are to be sent */
   struct send_buffer
   {
@@ -2404,6 +2452,45 @@ release_list(thread_local_pool<thr_send_
   pool->release_local(tail);
 }
 
+static
+void
+pack_sb_pages(thread_local_pool<thr_send_page>* pool,
+              thr_repository::send_buffer* sb)
+{
+  Uint32 release_count = 0;
+  assert(sb->m_buffer.m_first_page != 0);
+  assert(sb->m_buffer.m_last_page != 0);
+  assert(sb->m_buffer.m_last_page->m_next == 0);
+
+  thr_send_page* curr = sb->m_buffer.m_first_page;
+  Uint32 curr_free = curr->max_bytes() - (curr->m_bytes + curr->m_start);
+  while (curr->m_next != 0)
+  {
+    thr_send_page* next = curr->m_next;
+    assert(next->m_start == 0); // only first page should have half sent bytes
+    if (next->m_bytes <= curr_free)
+    {
+      thr_send_page * save = next;
+      memcpy(curr->m_data + (curr->m_bytes + curr->m_start),
+             next->m_data,
+             next->m_bytes);
+
+      curr_free -= next->m_bytes;
+
+      curr->m_bytes += next->m_bytes;
+      curr->m_next = next->m_next;
+      release_count++;
+
+      pool->release_local(save);
+    }
+    else
+    {
+      curr = next;
+      curr_free = curr->max_bytes() - (curr->m_bytes + curr->m_start);
+    }
+  }
+  sb->m_buffer.m_last_page = curr;
+}
 
 static
 Uint32
@@ -2467,6 +2554,14 @@ bytes_sent(thread_local_pool<thr_send_pa
   sb->m_buffer.m_first_page = curr;
   assert(sb->m_bytes > bytes);
   sb->m_bytes -= bytes;
+
+  /**
+   * Since not all bytes were sent...
+   *   spend the time to try to pack the pages
+   *   possibly releasing send-buffer
+   */
+  pack_sb_pages(pool, sb);
+
   return sb->m_bytes;
 }
 
@@ -2548,6 +2643,40 @@ register_pending_send(thr_data *selfptr,
   }
 }
 
+static void try_send(thr_data * selfptr, Uint32 node);
+
+void
+pack_send_buffer(thr_data *selfptr, Uint32 node, Uint32 thr_no)
+{
+  thr_repository* rep = &g_thr_repository;
+  thr_repository::send_buffer* sb = rep->m_send_buffers+node;
+  thread_local_pool<thr_send_page>* pool =
+    &rep->m_thread[thr_no].m_send_buffer_pool;
+
+  lock(&sb->m_send_lock);
+  int bytes = link_thread_send_buffers(sb, node);
+  if (bytes)
+  {
+    pack_sb_pages(pool, sb);
+  }
+  unlock(&sb->m_send_lock);
+  selfptr->m_send_buffer_pool.release_global(rep->m_mm,
+                                             RG_TRANSPORTER_BUFFERS);
+  if (sb->m_force_send)
+  {
+    try_send(selfptr, node);
+  }
+}
+
+void
+pack_send_buffers(thr_data *selfptr, Uint32 thr_no)
+{
+  for (Uint32 i = 1; i < NDB_ARRAY_SIZE(selfptr->m_send_buffers); i++)
+  {
+    pack_send_buffer(selfptr, i, thr_no);
+  }
+}
+
 /**
  * publish thread-locally prepared send-buffer
  */
@@ -2574,9 +2703,7 @@ flush_send_buffer(thr_data* selfptr, Uin
 
   if (unlikely(next == ri))
   {
-    lock(&sb->m_send_lock);
-    link_thread_send_buffers(sb, node);
-    unlock(&sb->m_send_lock);
+    pack_send_buffer(selfptr, node, thr_no);
   }
 
   dst->m_buffers[wi] = src->m_first_page;
@@ -2594,6 +2721,7 @@ flush_send_buffer(thr_data* selfptr, Uin
 bool
 mt_send_handle::forceSend(NodeId nodeId)
 {
+  int res;
   struct thr_repository *rep = &g_thr_repository;
   struct thr_data *selfptr = m_selfptr;
   struct thr_repository::send_buffer * sb = rep->m_send_buffers + nodeId;
@@ -2603,13 +2731,12 @@ mt_send_handle::forceSend(NodeId nodeId)
     sb->m_force_send = 0;
     lock(&sb->m_send_lock);
     sb->m_send_thread = selfptr->m_thr_no;
-    globalTransporterRegistry.performSend(nodeId);
+    res = globalTransporterRegistry.performSend(nodeId);
     sb->m_send_thread = NO_SEND_THREAD;
     unlock(&sb->m_send_lock);
+    selfptr->m_send_buffer_pool.release_global(rep->m_mm,
+                                               RG_TRANSPORTER_BUFFERS);
   } while (sb->m_force_send);
-
-  selfptr->m_send_buffer_pool.release_global(rep->m_mm, RG_TRANSPORTER_BUFFERS);
-
   return true;
 }
 
@@ -2620,6 +2747,7 @@ static
 void
 try_send(thr_data * selfptr, Uint32 node)
 {
+  int res;
   struct thr_repository *rep = &g_thr_repository;
   struct thr_repository::send_buffer * sb = rep->m_send_buffers + node;
 
@@ -2634,12 +2762,12 @@ try_send(thr_data * selfptr, Uint32 node
     mb();
 
     sb->m_send_thread = selfptr->m_thr_no;
-    globalTransporterRegistry.performSend(node);
+    res = globalTransporterRegistry.performSend(node);
     sb->m_send_thread = NO_SEND_THREAD;
     unlock(&sb->m_send_lock);
+    selfptr->m_send_buffer_pool.release_global(rep->m_mm,
+                                               RG_TRANSPORTER_BUFFERS);
   } while (sb->m_force_send);
-
-  selfptr->m_send_buffer_pool.release_global(rep->m_mm, RG_TRANSPORTER_BUFFERS);
 }
 
 /**
@@ -2682,6 +2810,7 @@ static
 Uint32
 do_send(struct thr_data* selfptr, bool must_send)
 {
+  int res = 1;
   Uint32 i;
   Uint32 count = selfptr->m_pending_send_count;
   Uint8 *nodes = selfptr->m_pending_send_nodes;
@@ -2764,17 +2893,21 @@ do_send(struct thr_data* selfptr, bool m
        * holds the send lock for this remote node.
        */
       sb->m_send_thread = selfptr->m_thr_no;
-      int res = globalTransporterRegistry.performSend(node);
+      res = globalTransporterRegistry.performSend(node);
       sb->m_send_thread = NO_SEND_THREAD;
       unlock(&sb->m_send_lock);
       if (res)
       {
         register_pending_send(selfptr, node);
       }
+      if (sb->m_force_send) /* Release memory if looping */
+        selfptr->m_send_buffer_pool.release_global(rep->m_mm,
+                                                   RG_TRANSPORTER_BUFFERS);
     } while (sb->m_force_send);
   }
 
-  selfptr->m_send_buffer_pool.release_global(rep->m_mm, RG_TRANSPORTER_BUFFERS);
+  selfptr->m_send_buffer_pool.release_global(rep->m_mm,
+                                             RG_TRANSPORTER_BUFFERS);
 
   return selfptr->m_pending_send_count;
 }
@@ -3406,6 +3539,11 @@ aligned_signal(unsigned char signal_buf[
 TransporterReceiveHandleKernel *
   g_trp_receive_handle_ptr[MAX_NDBMT_RECEIVE_THREADS];
 
+/**
+ * Array for mapping nodes to receiver threads and function to access it.
+ */
+static NodeId g_node_to_recv_thr_map[MAX_NODES];
+
 extern "C"
 void *
 mt_receiver_thread_main(void *thr_arg)
@@ -3417,7 +3555,7 @@ mt_receiver_thread_main(void *thr_arg)
   unsigned thr_no = selfptr->m_thr_no;
   Uint32& watchDogCounter = selfptr->m_watchdog_counter;
   Uint32 thrSignalId = 0;
-  Uint32 recv_thread_idx = thr_no - first_receiver_thread_no;
+  const Uint32 recv_thread_idx = thr_no - first_receiver_thread_no;
   bool has_received = false;
   int cnt = 0;
 
@@ -3622,6 +3760,14 @@ mt_job_thread_main(void *thr_arg)
   { 
     loops++;
 
+    watchDogCounter = 11;
+    if (!selfptr->m_send_buffer_pool.fill(g_thr_repository.m_mm,
+                                          RG_TRANSPORTER_BUFFERS,
+                                          THR_FREE_BUF_MAX))
+    {
+      pack_send_buffers(selfptr, thr_no);
+    }
+
     watchDogCounter = 2;
     scan_time_queues(selfptr, now);
 
@@ -4173,9 +4319,21 @@ setcpuaffinity(struct thr_repository* re
   }
 }
 
+/**
+ * return receiver thread handling a particular node
+ *   returned number is indexed from 0 and upwards to #receiver threads
+ *   (or MAX_NODES is none)
+ */
+Uint32
+mt_get_recv_thread_idx(NodeId nodeId)
+{
+  assert(nodeId < NDB_ARRAY_SIZE(g_node_to_recv_thr_map));
+  return g_node_to_recv_thr_map[nodeId];
+}
+
 static
 void
-mt_assign_receiver_threads(void)
+assign_receiver_threads(void)
 {
   Uint32 num_recv_threads = globalData.ndbMtReceiveThreads;
   Uint32 recv_thread_idx = 0;
@@ -4221,6 +4379,11 @@ ThreadConfig::ipControlLoop(NdbThread* p
 #endif
   setcpuaffinity(rep);
 
+  /**
+   * assign nodes to receiver threads
+   */
+  assign_receiver_threads();
+
   /* Start the send thread(s) */
   if (g_send_threads)
     g_send_threads->start_send_threads();
@@ -4228,10 +4391,7 @@ ThreadConfig::ipControlLoop(NdbThread* p
   /*
    * Start threads for all execution threads, except for the receiver
    * thread, which runs in the main thread.
-   * 
-   * Assign nodes to receiver threads before starting any threads.
    */
-  mt_assign_receiver_threads();
   for (thr_no = 0; thr_no < num_threads; thr_no++)
   {
     rep->m_thread[thr_no].m_time = NdbTick_CurrentMillisecond();

=== modified file 'storage/ndb/src/kernel/vm/mt.hpp'
--- a/storage/ndb/src/kernel/vm/mt.hpp	revid:mikael.ronstrom@stripped
+++ b/storage/ndb/src/kernel/vm/mt.hpp	revid:mikael.ronstrom@stripped
@@ -110,4 +110,12 @@ mt_get_thr_stat(class SimulatedBlock *, 
 class TransporterReceiveHandle *
 mt_get_trp_receive_handle(unsigned instance);
 
+/**
+ * return receiver thread handling a particular node
+ *   returned number is indexed from 0 and upwards to #receiver threads
+ *   (or MAX_NODES is none)
+ */
+Uint32
+mt_get_recv_thread_idx(NodeId nodeId);
+
 #endif

No bundle (reason: useless for push emails).
Thread
bzr push into mysql-5.5-cluster-7.2 branch (mikael.ronstrom:3722 to 3734) Mikael Ronstrom25 Jan