3734 Mikael Ronstrom 2012-01-25
Add pack send buffers before starting a new loop to minimize risk of send buffer overload
modified:
storage/ndb/src/kernel/vm/mt.cpp
3733 Mikael Ronstrom 2012-01-25
Fix overload handling of scans
modified:
storage/ndb/src/kernel/blocks/dblqh/Dblqh.hpp
storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp
3732 Mikael Ronstrom 2012-01-25
Regulate scan already on slowdown limit, add some stuff for debugging
modified:
storage/ndb/include/transporter/TransporterRegistry.hpp
storage/ndb/src/common/transporter/TCP_Transporter.cpp
storage/ndb/src/common/transporter/Transporter.cpp
storage/ndb/src/common/transporter/Transporter.hpp
storage/ndb/src/common/transporter/TransporterRegistry.cpp
storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp
3731 Mikael Ronstrom 2012-01-25
compilation fix
modified:
storage/ndb/src/kernel/vm/mt.cpp
3730 Mikael Ronstrom 2012-01-25
Don't send so immediately, only when absolutely necessary from flush_send_buffer
modified:
storage/ndb/src/kernel/vm/mt.cpp
3729 Mikael Ronstrom 2012-01-25
Need to send when grabbing send node lock
modified:
storage/ndb/src/kernel/vm/mt.cpp
3728 Mikael Ronstrom 2012-01-24
More to fix send buffer overload issue
modified:
storage/ndb/src/kernel/vm/mt.cpp
3727 Mikael Ronstrom 2012-01-24
Improved pack page patch
modified:
storage/ndb/src/kernel/vm/mt.cpp
3726 Mikael Ronstrom 2012-01-24
Release memory and add a debug printout to see how much it is used
modified:
storage/ndb/src/kernel/vm/mt.cpp
3725 Mikael Ronstrom 2012-01-24
pack sb_pages
modified:
storage/ndb/src/kernel/vm/mt.cpp
3724 Mikael Ronstrom 2012-01-24 [merge]
merge
added:
storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/DbImplForNdbRecord.java
renamed:
mysql-test/suite/ndb_rpl/r/ndb_rpl_conflict.result => mysql-test/suite/ndb_rpl/r/ndb_rpl_conflict_basic.result
mysql-test/suite/ndb_rpl/t/ndb_rpl_conflict.test => mysql-test/suite/ndb_rpl/t/ndb_rpl_conflict_basic.test
modified:
storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/ClusterConnectionImpl.java
storage/ndb/src/common/transporter/TransporterRegistry.cpp
storage/ndb/src/kernel/blocks/trpman.cpp
storage/ndb/src/kernel/vm/CMakeLists.txt
storage/ndb/src/kernel/vm/TransporterCallback.cpp
storage/ndb/src/kernel/vm/TransporterCallbackKernel.hpp
storage/ndb/src/kernel/vm/mt.cpp
storage/ndb/src/kernel/vm/mt.hpp
3723 Mikael Ronstrom 2012-01-24
Fix
modified:
storage/ndb/src/kernel/vm/mt.cpp
3722 Mikael Ronstrom 2012-01-23 [merge]
merge
added:
storage/ndb/clusterj/clusterj-test/src/main/java/testsuite/clusterj/StressTest.java
storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/NdbRecordBlobImpl.java
storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/NdbRecordImpl.java
storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/NdbRecordOperationImpl.java
storage/ndb/clusterj/clusterj-tie/src/test/java/testsuite/clusterj/tie/StressTest.java
storage/ndb/src/kernel/vm/CountingPool.cpp
storage/ndb/src/kernel/vm/CountingPool.hpp
modified:
mysql-test/suite/ndb/t/clusterj.test
storage/ndb/clusterj/clusterj-api/pom.xml
storage/ndb/clusterj/clusterj-bindings/pom.xml
storage/ndb/clusterj/clusterj-core/pom.xml
storage/ndb/clusterj/clusterj-core/src/main/java/com/mysql/clusterj/core/SessionImpl.java
storage/ndb/clusterj/clusterj-core/src/main/java/com/mysql/clusterj/core/metadata/DomainFieldHandlerImpl.java
storage/ndb/clusterj/clusterj-core/src/main/java/com/mysql/clusterj/core/store/Operation.java
storage/ndb/clusterj/clusterj-jdbc/pom.xml
storage/ndb/clusterj/clusterj-jpatest/pom.xml
storage/ndb/clusterj/clusterj-openjpa/pom.xml
storage/ndb/clusterj/clusterj-test/pom.xml
storage/ndb/clusterj/clusterj-test/src/main/java/testsuite/clusterj/DynamicObjectTest.java
storage/ndb/clusterj/clusterj-test/src/main/resources/schema.sql
storage/ndb/clusterj/clusterj-tie/pom.xml
storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/BlobImpl.java
storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/ClusterConnectionImpl.java
storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/ClusterConnectionServiceImpl.java
storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/ClusterTransactionImpl.java
storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/ColumnImpl.java
storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/DbImpl.java
storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/DictionaryImpl.java
storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/OperationImpl.java
storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/Utility.java
storage/ndb/clusterj/clusterj-tie/src/main/resources/com/mysql/clusterj/tie/Bundle.properties
storage/ndb/clusterj/pom.xml
storage/ndb/include/ndbapi/Ndb.hpp
storage/ndb/include/ndbapi/NdbDictionary.hpp
storage/ndb/include/ndbapi/NdbOperation.hpp
storage/ndb/include/ndbapi/NdbScanOperation.hpp
storage/ndb/src/common/transporter/TransporterRegistry.cpp
storage/ndb/src/kernel/blocks/dbdict/Dbdict.cpp
storage/ndb/src/kernel/blocks/dbdict/Dbdict.hpp
storage/ndb/src/kernel/blocks/dbtup/tuppage.hpp
storage/ndb/src/kernel/vm/CMakeLists.txt
storage/ndb/src/kernel/vm/Pool.cpp
storage/ndb/src/kernel/vm/Pool.hpp
storage/ndb/src/ndbjtie/NdbApiWrapper.hpp
storage/ndb/src/ndbjtie/com/mysql/ndbjtie/ndbapi/Ndb.java
storage/ndb/src/ndbjtie/com/mysql/ndbjtie/ndbapi/NdbDictionary.java
storage/ndb/src/ndbjtie/com/mysql/ndbjtie/ndbapi/NdbOperation.java
storage/ndb/src/ndbjtie/com/mysql/ndbjtie/ndbapi/NdbOperationConst.java
storage/ndb/src/ndbjtie/com/mysql/ndbjtie/ndbapi/NdbScanOperation.java
storage/ndb/src/ndbjtie/com/mysql/ndbjtie/ndbapi/issues.txt
storage/ndb/src/ndbjtie/ndbapi_jtie.hpp
storage/ndb/test/ndbapi/testSystemRestart.cpp
=== renamed file 'mysql-test/suite/ndb_rpl/r/ndb_rpl_conflict.result' => 'mysql-test/suite/ndb_rpl/r/ndb_rpl_conflict_basic.result'
=== renamed file 'mysql-test/suite/ndb_rpl/t/ndb_rpl_conflict.test' => 'mysql-test/suite/ndb_rpl/t/ndb_rpl_conflict_basic.test'
=== modified file 'storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/ClusterConnectionImpl.java'
--- a/storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/ClusterConnectionImpl.java revid:mikael.ronstrom@stripped
+++ b/storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/ClusterConnectionImpl.java revid:mikael.ronstrom@stripped
@@ -60,7 +60,7 @@ public class ClusterConnectionImpl
final int nodeId;
/** All dbs given out by this cluster connection */
- private Map<DbImpl, Object> dbs = new IdentityHashMap<DbImpl, Object>();
+ private Map<Db, Object> dbs = new IdentityHashMap<Db, Object>();
/** The map of table name to NdbRecordImpl */
private ConcurrentMap<String, NdbRecordImpl> ndbRecordImplMap = new ConcurrentHashMap<String, NdbRecordImpl>();
@@ -98,7 +98,7 @@ public class ClusterConnectionImpl
// create a dictionary for NdbRecord
Ndb ndbForNdbRecord = Ndb.create(clusterConnection, database, "def");
handleError(ndbForNdbRecord, clusterConnection, connectString, nodeId);
- DbImpl dbForNdbRecord = new DbImpl(this, ndbForNdbRecord, maxTransactions);
+ DbImplForNdbRecord dbForNdbRecord = new DbImplForNdbRecord(this, ndbForNdbRecord);
dbs.put(dbForNdbRecord, null);
dictionaryForNdbRecord = dbForNdbRecord.getNdbDictionary();
}
@@ -167,7 +167,7 @@ public class ClusterConnectionImpl
}
ndbRecordImplMap.clear();
if (dbs.size() != 0) {
- Map<DbImpl, Object> dbsToClose = new IdentityHashMap<DbImpl, Object>(dbs);
+ Map<Db, Object> dbsToClose = new IdentityHashMap<Db, Object>(dbs);
for (Db db: dbsToClose.keySet()) {
db.close();
}
=== added file 'storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/DbImplForNdbRecord.java'
--- a/storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/DbImplForNdbRecord.java 1970-01-01 00:00:00 +0000
+++ b/storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/DbImplForNdbRecord.java revid:mikael.ronstrom@stripped
@@ -0,0 +1,133 @@
+/*
+ * Copyright (c) 2012, Oracle and/or its affiliates. All rights reserved.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; version 2 of the License.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
+ */
+
+package com.mysql.clusterj.tie;
+
+import java.nio.ByteBuffer;
+
+import java.util.List;
+
+import com.mysql.clusterj.ClusterJDatastoreException;
+import com.mysql.clusterj.ClusterJFatalInternalException;
+
+import com.mysql.clusterj.core.store.ClusterTransaction;
+
+import com.mysql.clusterj.core.util.I18NHelper;
+import com.mysql.clusterj.core.util.Logger;
+import com.mysql.clusterj.core.util.LoggerFactoryService;
+
+import com.mysql.ndbjtie.ndbapi.Ndb;
+import com.mysql.ndbjtie.ndbapi.NdbErrorConst;
+import com.mysql.ndbjtie.ndbapi.NdbTransaction;
+import com.mysql.ndbjtie.ndbapi.NdbDictionary.Dictionary;
+
+/**
+ * This class is used to hold the ndb Dictionary used for NdbRecord. It has the minimum
+ * implementation needed to implement the life cycle of the Ndb. In particular, it omits the
+ * buffer manager and partition key scratch buffer used in the standard DbImpl.
+ */
+class DbImplForNdbRecord implements com.mysql.clusterj.core.store.Db {
+
+ /** My message translator */
+ static final I18NHelper local = I18NHelper.getInstance(DbImplForNdbRecord.class);
+
+ /** My logger */
+ static final Logger logger = LoggerFactoryService.getFactory()
+ .getInstance(DbImplForNdbRecord.class);
+
+ /** The Ndb instance that this instance is wrapping */
+ private Ndb ndb;
+
+ /** A "big enough" size for error information */
+ private int errorBufferSize = 300;
+
+ /** The ndb error detail buffer */
+ private ByteBuffer errorBuffer = ByteBuffer.allocateDirect(errorBufferSize);
+
+ /** The NdbDictionary for this Ndb */
+ private Dictionary ndbDictionary;
+
+ /** The ClusterConnection */
+ private ClusterConnectionImpl clusterConnection;
+
+ public DbImplForNdbRecord(ClusterConnectionImpl clusterConnection, Ndb ndb) {
+ this.clusterConnection = clusterConnection;
+ this.ndb = ndb;
+ int returnCode = ndb.init(1);
+ handleError(returnCode, ndb);
+ ndbDictionary = ndb.getDictionary();
+ handleError(ndbDictionary, ndb);
+ }
+
+ public void close() {
+ Ndb.delete(ndb);
+ clusterConnection.close(this);
+ }
+
+ public com.mysql.clusterj.core.store.Dictionary getDictionary() {
+ throw new ClusterJFatalInternalException(local.message("ERR_Implementation_Should_Not_Occur"));
+ }
+
+ public Dictionary getNdbDictionary() {
+ return ndbDictionary;
+ }
+
+ public ClusterTransaction startTransaction(String joinTransactionId) {
+ throw new ClusterJFatalInternalException(local.message("ERR_Implementation_Should_Not_Occur"));
+ }
+
+ protected void handleError(int returnCode, Ndb ndb) {
+ if (returnCode == 0) {
+ return;
+ } else {
+ NdbErrorConst ndbError = ndb.getNdbError();
+ String detail = getNdbErrorDetail(ndbError);
+ Utility.throwError(returnCode, ndbError, detail);
+ }
+ }
+
+ protected void handleError(Object object, Ndb ndb) {
+ if (object != null) {
+ return;
+ } else {
+ NdbErrorConst ndbError = ndb.getNdbError();
+ String detail = getNdbErrorDetail(ndbError);
+ Utility.throwError(null, ndbError, detail);
+ }
+ }
+
+ public boolean isRetriable(ClusterJDatastoreException ex) {
+ throw new ClusterJFatalInternalException(local.message("ERR_Implementation_Should_Not_Occur"));
+ }
+
+ public String getNdbErrorDetail(NdbErrorConst ndbError) {
+ return ndb.getNdbErrorDetail(ndbError, errorBuffer, errorBuffer.capacity());
+ }
+
+ public NdbTransaction enlist(String tableName, List<KeyPart> keyParts) {
+ throw new ClusterJFatalInternalException(local.message("ERR_Implementation_Should_Not_Occur"));
+ }
+
+ public NdbTransaction enlist(String tableName, int partitionId) {
+ throw new ClusterJFatalInternalException(local.message("ERR_Implementation_Should_Not_Occur"));
+ }
+
+ public NdbTransaction joinTransaction(String coordinatedTransactionId) {
+ throw new ClusterJFatalInternalException(local.message("ERR_Implementation_Should_Not_Occur"));
+ }
+
+}
=== modified file 'storage/ndb/include/transporter/TransporterRegistry.hpp'
--- a/storage/ndb/include/transporter/TransporterRegistry.hpp revid:mikael.ronstrom@stripped
+++ b/storage/ndb/include/transporter/TransporterRegistry.hpp revid:mikael.ronstrom@stripped
@@ -292,6 +292,13 @@ public:
const NodeBitmask& get_status_overloaded() const;
/**
+ * Set or clear slowdown bit.
+ * Query if any slowdown bit is set.
+ */
+ void set_status_slowdown(Uint32 nodeId, bool val);
+ const NodeBitmask& get_status_slowdown() const;
+
+ /**
* prepareSend
*
* When IOState is HaltOutput or HaltIO do not send or insert any
@@ -431,8 +438,10 @@ private:
/**
* Overloaded bits, for fast check.
+ * Similarly slowdown bits for fast check.
*/
NodeBitmask m_status_overloaded;
+ NodeBitmask m_status_slowdown;
/**
* Unpack signal data.
@@ -599,4 +608,18 @@ TransporterRegistry::get_status_overload
return m_status_overloaded;
}
+inline void
+TransporterRegistry::set_status_slowdown(Uint32 nodeId, bool val)
+{
+ assert(nodeId < MAX_NODES);
+ if (val != m_status_slowdown.get(nodeId))
+ m_status_slowdown.set(nodeId, val);
+}
+
+inline const NodeBitmask&
+TransporterRegistry::get_status_slowdown() const
+{
+ return m_status_slowdown;
+}
+
#endif // Define of TransporterRegistry_H
=== modified file 'storage/ndb/src/common/transporter/TCP_Transporter.cpp'
--- a/storage/ndb/src/common/transporter/TCP_Transporter.cpp revid:mikael.ronstrom@stripped
+++ b/storage/ndb/src/common/transporter/TCP_Transporter.cpp revid:mikael.ronstrom@stripped
@@ -112,6 +112,7 @@ TCP_Transporter::TCP_Transporter(Transpo
setIf(sockOptTcpMaxSeg, conf->tcp.tcpMaxsegSize, 0);
m_overload_limit = overload_limit(conf);
+ m_slowdown_limit = conf->tcp.sendBufferSize/2;
}
=== modified file 'storage/ndb/src/common/transporter/Transporter.cpp'
--- a/storage/ndb/src/common/transporter/Transporter.cpp revid:mikael.ronstrom@stripped
+++ b/storage/ndb/src/common/transporter/Transporter.cpp revid:mikael.ronstrom@stripped
@@ -43,7 +43,8 @@ Transporter::Transporter(TransporterRegi
: m_s_port(s_port), remoteNodeId(rNodeId), localNodeId(lNodeId),
isServer(lNodeId==serverNodeId),
m_packer(_signalId, _checksum), m_max_send_buffer(max_send_buffer),
- m_overload_limit(0xFFFFFFFF), isMgmConnection(_isMgmConnection),
+ m_overload_limit(0xFFFFFFFF), m_slowdown_limit(0xFFFFFFFF),
+ isMgmConnection(_isMgmConnection),
m_connected(false),
m_type(_type),
m_transporter_registry(t_reg)
=== modified file 'storage/ndb/src/common/transporter/Transporter.hpp'
--- a/storage/ndb/src/common/transporter/Transporter.hpp revid:mikael.ronstrom@stripped
+++ b/storage/ndb/src/common/transporter/Transporter.hpp revid:mikael.ronstrom@stripped
@@ -90,6 +90,8 @@ public:
{
m_transporter_registry.set_status_overloaded(remoteNodeId,
used >= m_overload_limit);
+ m_transporter_registry.set_status_slowdown(remoteNodeId,
+ used >= m_slowdown_limit);
}
virtual int doSend() = 0;
@@ -159,6 +161,7 @@ protected:
Uint32 m_max_send_buffer;
/* Overload limit, as configured with the OverloadLimit config parameter. */
Uint32 m_overload_limit;
+ Uint32 m_slowdown_limit;
private:
@@ -226,6 +229,10 @@ Transporter::iovec_data_sent(int nBytesS
{
Uint32 used_bytes
= get_callback_obj()->bytes_sent(remoteNodeId, nBytesSent);
+ if (used_bytes > 256 * 1024)
+ {
+ ndbout_c("%u bytes in send buffer", used_bytes);
+ }
update_status_overloaded(used_bytes);
}
=== modified file 'storage/ndb/src/common/transporter/TransporterRegistry.cpp'
--- a/storage/ndb/src/common/transporter/TransporterRegistry.cpp revid:mikael.ronstrom@stripped
+++ b/storage/ndb/src/common/transporter/TransporterRegistry.cpp revid:mikael.ronstrom@stripped
@@ -763,6 +763,7 @@ TransporterRegistry::prepareSend(Transpo
}
set_status_overloaded(nodeId, true);
+ set_status_slowdown(nodeId, true);
int sleepTime = 2;
/**
@@ -851,6 +852,7 @@ TransporterRegistry::prepareSend(Transpo
* so sleepTime = 2 generates a 10 ms sleep.
*/
set_status_overloaded(nodeId, true);
+ set_status_slowdown(nodeId, true);
int sleepTime = 2;
for(int i = 0; i<50; i++){
if((nSHMTransporters+nSCITransporters) == 0)
@@ -934,6 +936,7 @@ TransporterRegistry::prepareSend(Transpo
* so sleepTime = 2 generates a 10 ms sleep.
*/
set_status_overloaded(nodeId, true);
+ set_status_slowdown(nodeId, true);
int sleepTime = 2;
for(int i = 0; i<50; i++){
if((nSHMTransporters+nSCITransporters) == 0)
@@ -1242,7 +1245,7 @@ TransporterRegistry::poll_TCP(Uint32 tim
if (extra_socket && recvdata.m_transporters.get(0))
{
const NDB_SOCKET_TYPE socket = m_extra_wakeup_sockets[0];
- assert(receiveHandle);
+ assert(&recvdata == receiveHandle); // not used by ndbmtd...
// Poll the wakup-socket for read
recvdata.m_socket_poller.add(socket, true, false, false);
@@ -1318,7 +1321,7 @@ TransporterRegistry::performReceive(Tran
if (recvdata.m_has_data_transporters.get(0))
{
assert(recvdata.m_transporters.get(0));
- assert(receiveHandle);
+ assert(&recvdata == receiveHandle); // not used by ndbmtd
recvdata.m_has_data_transporters.clear(Uint32(0));
consume_extra_sockets();
}
@@ -2293,6 +2296,8 @@ TransporterRegistry::updateWritePtr(Tran
Uint32 used = handle->updateWritePtr(node, lenBytes, prio);
t->update_status_overloaded(used);
+ if (used > 32768)
+ abort();
if(t->send_limit_reached(used)) {
//-------------------------------------------------
=== modified file 'storage/ndb/src/kernel/blocks/dblqh/Dblqh.hpp'
--- a/storage/ndb/src/kernel/blocks/dblqh/Dblqh.hpp revid:mikael.ronstrom@stripped
+++ b/storage/ndb/src/kernel/blocks/dblqh/Dblqh.hpp revid:mikael.ronstrom@stripped
@@ -574,7 +574,8 @@ public:
Uint8 m_last_row;
Uint8 m_reserved;
Uint8 statScan;
- Uint8 dummy[3]; // align?
+ Uint8 m_stop_batch;
+ Uint8 dummy[2]; // align?
}; // Size 272 bytes
typedef Ptr<ScanRecord> ScanRecordPtr;
@@ -3300,7 +3301,8 @@ Dblqh::ScanRecord::check_scan_batch_comp
Uint32 max_rows = m_max_batch_size_rows;
Uint32 max_bytes = m_max_batch_size_bytes;
- return (max_rows > 0 && (m_curr_batch_size_rows >= max_rows)) ||
+ return m_stop_batch ||
+ (max_rows > 0 && (m_curr_batch_size_rows >= max_rows)) ||
(max_bytes > 0 && (m_curr_batch_size_bytes >= max_bytes));
}
=== modified file 'storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp'
--- a/storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp revid:mikael.ronstrom@stripped
+++ b/storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp revid:mikael.ronstrom@stripped
@@ -11284,17 +11284,17 @@ void Dblqh::scanTupkeyConfLab(Signal* si
scanptr.p->m_curr_batch_size_rows = rows + 1;
scanptr.p->m_last_row = tdata5;
- const NodeBitmask& all = globalTransporterRegistry.get_status_overloaded();
+ const NodeBitmask& all = globalTransporterRegistry.get_status_slowdown();
if (unlikely(!all.isclear()))
{
if (all.get(refToNode(scanptr.p->scanApiBlockref)))
{
/**
- * End scan batch if transporter-buffer are overloaded
+ * End scan batch if transporter-buffer are in slowdown state
*
* TODO: We should have counters for this...
*/
- tdata5 = 1;
+ scanptr.p->m_stop_batch = 1;
}
}
@@ -11604,6 +11604,7 @@ Uint32 Dblqh::initScanrec(const ScanFrag
scanptr.p->scanTcrec = tcConnectptr.i;
scanptr.p->scanSchemaVersion = scanFragReq->schemaVersion;
+ scanptr.p->m_stop_batch = 0;
scanptr.p->m_curr_batch_size_rows = 0;
scanptr.p->m_curr_batch_size_bytes= 0;
scanptr.p->m_max_batch_size_rows = max_rows;
@@ -12091,6 +12092,8 @@ void Dblqh::sendScanFragConf(Signal* sig
scanptr.p->m_curr_batch_size_rows = 0;
scanptr.p->m_curr_batch_size_bytes= 0;
}
+
+ scanptr.p->m_stop_batch = 0;
}//Dblqh::sendScanFragConf()
/* ######################################################################### */
=== modified file 'storage/ndb/src/kernel/blocks/trpman.cpp'
--- a/storage/ndb/src/kernel/blocks/trpman.cpp revid:mikael.ronstrom@stripped
+++ b/storage/ndb/src/kernel/blocks/trpman.cpp revid:mikael.ronstrom@stripped
@@ -23,6 +23,8 @@
#include <signaldata/RouteOrd.hpp>
#include <signaldata/DumpStateOrd.hpp>
+#include <mt.hpp>
+
Trpman::Trpman(Block_context & ctx, Uint32 instanceno) :
SimulatedBlock(TRPMAN, ctx, instanceno)
{
@@ -50,11 +52,7 @@ BLOCK_FUNCTIONS(Trpman)
#ifdef ERROR_INSERT
static NodeBitmask c_error_9000_nodes_mask;
extern Uint32 MAX_RECEIVED_SIGNALS;
-
-class TransporterReceiveHandle *
-mt_get_trp_receive_handle(unsigned instance);
#endif
-Uint32 mt_get_recv_thread_idx(NodeId nodeId);
bool
Trpman::handles_this_node(Uint32 nodeId)
@@ -64,7 +62,7 @@ Trpman::handles_this_node(Uint32 nodeId)
#else
if (globalData.ndbMtReceiveThreads <= (Uint32)1)
return true;
- return (instance()== (mt_get_recv_thread_idx(nodeId) + 1));
+ return (instance()== (mt_get_recv_thread_idx(nodeId) + /* proxy */ 1));
#endif
}
=== modified file 'storage/ndb/src/kernel/vm/CMakeLists.txt'
--- a/storage/ndb/src/kernel/vm/CMakeLists.txt revid:mikael.ronstrom@stripped
+++ b/storage/ndb/src/kernel/vm/CMakeLists.txt revid:mikael.ronstrom@stripped
@@ -72,7 +72,7 @@ SET_TARGET_PROPERTIES(mt_thr_config-t
PROPERTIES COMPILE_FLAGS "-DTEST_MT_THR_CONFIG")
TARGET_LINK_LIBRARIES(mt_thr_config-t ndbgeneral)
-FOREACH(testprog IN ITEMS CountingPool DynArr256)
+FOREACH(testprog CountingPool DynArr256)
ADD_EXECUTABLE(${testprog}-t ${testprog}.cpp)
SET_TARGET_PROPERTIES(${testprog}-t PROPERTIES COMPILE_FLAGS "-DTAP_TEST")
TARGET_LINK_LIBRARIES(${testprog}-t ndbtest ndbkernel ndbsched ndberror
=== modified file 'storage/ndb/src/kernel/vm/TransporterCallback.cpp'
--- a/storage/ndb/src/kernel/vm/TransporterCallback.cpp revid:mikael.ronstrom@stripped
+++ b/storage/ndb/src/kernel/vm/TransporterCallback.cpp revid:mikael.ronstrom@stripped
@@ -488,10 +488,10 @@ TransporterReceiveHandleKernel::checkJob
#endif
}
+#ifdef NDBD_MULTITHREADED
void
TransporterReceiveHandleKernel::assign_nodes(NodeId *recv_thread_idx_array)
{
-#ifdef NDBD_MULTITHREADED
m_transporters.clear(); /* Clear all first */
for (Uint32 nodeId = 1; nodeId < MAX_NODES; nodeId++)
{
@@ -499,8 +499,8 @@ TransporterReceiveHandleKernel::assign_n
m_transporters.set(nodeId); /* Belongs to our receive thread */
}
return;
-#endif
}
+#endif
void
TransporterReceiveHandleKernel::transporter_recv_from(NodeId nodeId)
=== modified file 'storage/ndb/src/kernel/vm/TransporterCallbackKernel.hpp'
--- a/storage/ndb/src/kernel/vm/TransporterCallbackKernel.hpp revid:mikael.ronstrom@stripped
+++ b/storage/ndb/src/kernel/vm/TransporterCallbackKernel.hpp revid:mikael.ronstrom@stripped
@@ -36,6 +36,11 @@ public:
* instance() - 1(proxy)
*/
Uint32 m_receiver_thread_idx;
+
+ /**
+ * Assign nodes to this TransporterReceiveHandle
+ */
+ void assign_nodes(NodeId *recv_thread_idx_array);
#endif
/* TransporterCallback interface. */
@@ -49,7 +54,6 @@ public:
void reportError(NodeId nodeId, TransporterError errorCode,
const char *info = 0);
void transporter_recv_from(NodeId node);
- void assign_nodes(NodeId *recv_thread_idx_array);
int checkJobBuffer();
virtual ~TransporterReceiveHandleKernel() { }
};
=== modified file 'storage/ndb/src/kernel/vm/mt.cpp'
--- a/storage/ndb/src/kernel/vm/mt.cpp revid:mikael.ronstrom@stripped
+++ b/storage/ndb/src/kernel/vm/mt.cpp revid:mikael.ronstrom@stripped
@@ -37,16 +37,6 @@
#include "mt-asm.h"
-/**
- * Array for mapping nodes to receiver threads and function to access it.
- */
-static NodeId g_node_to_recv_thr_map[MAX_NODES];
-
-Uint32 mt_get_recv_thread_idx(NodeId nodeId)
-{
- return g_node_to_recv_thr_map[nodeId];
-}
-
inline
SimulatedBlock*
GlobalData::mt_getBlock(BlockNumber blockNo, Uint32 instanceNo)
@@ -555,6 +545,49 @@ public:
return tmp;
}
+ T* find_last(T* first)
+ {
+ T* next = first;
+ if (!first)
+ return first;
+ while (next->m_next)
+ {
+ next = next->m_next;
+ }
+ return next;
+ }
+
+
+ bool fill(Ndbd_mem_manager *mm, Uint32 rg, Uint32 fill_level)
+ {
+ Uint32 alloced = 0;
+ if (m_free >= fill_level)
+ {
+ return true;
+ }
+ do
+ {
+ T *save_last = find_last(m_freelist);
+ T *new_list = m_global_pool->seize_list(mm, rg,
+ fill_level - m_free,
+ &alloced);
+ m_free += alloced;
+ if (save_last)
+ {
+ save_last->m_next = new_list;
+ }
+ else
+ {
+ m_freelist = new_list;
+ }
+ } while ((m_free < fill_level) && alloced);
+ if (m_free >= fill_level)
+ {
+ return true;
+ }
+ return false;
+ }
+
void release(Ndbd_mem_manager *mm, Uint32 rg, T *t) {
unsigned free = m_free;
if (free < m_max_free)
@@ -1049,21 +1082,36 @@ struct thr_repository
m_sb_pool("sendbufferpool")
{}
+ /**
+ * m_receive_lock, m_section_lock, m_mem_manager_lock, m_jb_pool
+ * and m_sb_pool are allvariables globally shared among the threads
+ * and also heavily updated.
+ */
struct thr_spin_lock<64> m_receive_lock[MAX_NDBMT_RECEIVE_THREADS];
struct thr_spin_lock<64> m_section_lock;
struct thr_spin_lock<64> m_mem_manager_lock;
+ /* thr_safe_pool is aligned to be also 64 bytes in size */
struct thr_safe_pool<thr_job_buffer> m_jb_pool;
struct thr_safe_pool<thr_send_page> m_sb_pool;
+ /* m_mm and m_thread_count are globally shared and read only variables */
Ndbd_mem_manager * m_mm;
unsigned m_thread_count;
- /* Protect m_mm and m_thread_count from CPU cache misses */
- char protection_unused[NDB_CL];
+ /**
+ * Protect m_mm and m_thread_count from CPU cache misses, first
+ * part of m_thread (struct thr_data) is globally shared variables.
+ * So sharing cache line with these for these read only variables
+ * isn't a good idea
+ */
+ char protection_unused[NDB_CL - sizeof(void*) - sizeof(unsigned)];
struct thr_data m_thread[MAX_BLOCK_THREADS];
/**
* send buffer handling
+ * Put protection cacheline to avoid sharing with last m_thread
+ * instance.
*/
+ char protection_unused2[NDB_CL];
/* The buffers that are to be sent */
struct send_buffer
{
@@ -2404,6 +2452,45 @@ release_list(thread_local_pool<thr_send_
pool->release_local(tail);
}
+static
+void
+pack_sb_pages(thread_local_pool<thr_send_page>* pool,
+ thr_repository::send_buffer* sb)
+{
+ Uint32 release_count = 0;
+ assert(sb->m_buffer.m_first_page != 0);
+ assert(sb->m_buffer.m_last_page != 0);
+ assert(sb->m_buffer.m_last_page->m_next == 0);
+
+ thr_send_page* curr = sb->m_buffer.m_first_page;
+ Uint32 curr_free = curr->max_bytes() - (curr->m_bytes + curr->m_start);
+ while (curr->m_next != 0)
+ {
+ thr_send_page* next = curr->m_next;
+ assert(next->m_start == 0); // only first page should have half sent bytes
+ if (next->m_bytes <= curr_free)
+ {
+ thr_send_page * save = next;
+ memcpy(curr->m_data + (curr->m_bytes + curr->m_start),
+ next->m_data,
+ next->m_bytes);
+
+ curr_free -= next->m_bytes;
+
+ curr->m_bytes += next->m_bytes;
+ curr->m_next = next->m_next;
+ release_count++;
+
+ pool->release_local(save);
+ }
+ else
+ {
+ curr = next;
+ curr_free = curr->max_bytes() - (curr->m_bytes + curr->m_start);
+ }
+ }
+ sb->m_buffer.m_last_page = curr;
+}
static
Uint32
@@ -2467,6 +2554,14 @@ bytes_sent(thread_local_pool<thr_send_pa
sb->m_buffer.m_first_page = curr;
assert(sb->m_bytes > bytes);
sb->m_bytes -= bytes;
+
+ /**
+ * Since not all bytes were sent...
+ * spend the time to try to pack the pages
+ * possibly releasing send-buffer
+ */
+ pack_sb_pages(pool, sb);
+
return sb->m_bytes;
}
@@ -2548,6 +2643,40 @@ register_pending_send(thr_data *selfptr,
}
}
+static void try_send(thr_data * selfptr, Uint32 node);
+
+void
+pack_send_buffer(thr_data *selfptr, Uint32 node, Uint32 thr_no)
+{
+ thr_repository* rep = &g_thr_repository;
+ thr_repository::send_buffer* sb = rep->m_send_buffers+node;
+ thread_local_pool<thr_send_page>* pool =
+ &rep->m_thread[thr_no].m_send_buffer_pool;
+
+ lock(&sb->m_send_lock);
+ int bytes = link_thread_send_buffers(sb, node);
+ if (bytes)
+ {
+ pack_sb_pages(pool, sb);
+ }
+ unlock(&sb->m_send_lock);
+ selfptr->m_send_buffer_pool.release_global(rep->m_mm,
+ RG_TRANSPORTER_BUFFERS);
+ if (sb->m_force_send)
+ {
+ try_send(selfptr, node);
+ }
+}
+
+void
+pack_send_buffers(thr_data *selfptr, Uint32 thr_no)
+{
+ for (Uint32 i = 1; i < NDB_ARRAY_SIZE(selfptr->m_send_buffers); i++)
+ {
+ pack_send_buffer(selfptr, i, thr_no);
+ }
+}
+
/**
* publish thread-locally prepared send-buffer
*/
@@ -2574,9 +2703,7 @@ flush_send_buffer(thr_data* selfptr, Uin
if (unlikely(next == ri))
{
- lock(&sb->m_send_lock);
- link_thread_send_buffers(sb, node);
- unlock(&sb->m_send_lock);
+ pack_send_buffer(selfptr, node, thr_no);
}
dst->m_buffers[wi] = src->m_first_page;
@@ -2594,6 +2721,7 @@ flush_send_buffer(thr_data* selfptr, Uin
bool
mt_send_handle::forceSend(NodeId nodeId)
{
+ int res;
struct thr_repository *rep = &g_thr_repository;
struct thr_data *selfptr = m_selfptr;
struct thr_repository::send_buffer * sb = rep->m_send_buffers + nodeId;
@@ -2603,13 +2731,12 @@ mt_send_handle::forceSend(NodeId nodeId)
sb->m_force_send = 0;
lock(&sb->m_send_lock);
sb->m_send_thread = selfptr->m_thr_no;
- globalTransporterRegistry.performSend(nodeId);
+ res = globalTransporterRegistry.performSend(nodeId);
sb->m_send_thread = NO_SEND_THREAD;
unlock(&sb->m_send_lock);
+ selfptr->m_send_buffer_pool.release_global(rep->m_mm,
+ RG_TRANSPORTER_BUFFERS);
} while (sb->m_force_send);
-
- selfptr->m_send_buffer_pool.release_global(rep->m_mm, RG_TRANSPORTER_BUFFERS);
-
return true;
}
@@ -2620,6 +2747,7 @@ static
void
try_send(thr_data * selfptr, Uint32 node)
{
+ int res;
struct thr_repository *rep = &g_thr_repository;
struct thr_repository::send_buffer * sb = rep->m_send_buffers + node;
@@ -2634,12 +2762,12 @@ try_send(thr_data * selfptr, Uint32 node
mb();
sb->m_send_thread = selfptr->m_thr_no;
- globalTransporterRegistry.performSend(node);
+ res = globalTransporterRegistry.performSend(node);
sb->m_send_thread = NO_SEND_THREAD;
unlock(&sb->m_send_lock);
+ selfptr->m_send_buffer_pool.release_global(rep->m_mm,
+ RG_TRANSPORTER_BUFFERS);
} while (sb->m_force_send);
-
- selfptr->m_send_buffer_pool.release_global(rep->m_mm, RG_TRANSPORTER_BUFFERS);
}
/**
@@ -2682,6 +2810,7 @@ static
Uint32
do_send(struct thr_data* selfptr, bool must_send)
{
+ int res = 1;
Uint32 i;
Uint32 count = selfptr->m_pending_send_count;
Uint8 *nodes = selfptr->m_pending_send_nodes;
@@ -2764,17 +2893,21 @@ do_send(struct thr_data* selfptr, bool m
* holds the send lock for this remote node.
*/
sb->m_send_thread = selfptr->m_thr_no;
- int res = globalTransporterRegistry.performSend(node);
+ res = globalTransporterRegistry.performSend(node);
sb->m_send_thread = NO_SEND_THREAD;
unlock(&sb->m_send_lock);
if (res)
{
register_pending_send(selfptr, node);
}
+ if (sb->m_force_send) /* Release memory if looping */
+ selfptr->m_send_buffer_pool.release_global(rep->m_mm,
+ RG_TRANSPORTER_BUFFERS);
} while (sb->m_force_send);
}
- selfptr->m_send_buffer_pool.release_global(rep->m_mm, RG_TRANSPORTER_BUFFERS);
+ selfptr->m_send_buffer_pool.release_global(rep->m_mm,
+ RG_TRANSPORTER_BUFFERS);
return selfptr->m_pending_send_count;
}
@@ -3406,6 +3539,11 @@ aligned_signal(unsigned char signal_buf[
TransporterReceiveHandleKernel *
g_trp_receive_handle_ptr[MAX_NDBMT_RECEIVE_THREADS];
+/**
+ * Array for mapping nodes to receiver threads and function to access it.
+ */
+static NodeId g_node_to_recv_thr_map[MAX_NODES];
+
extern "C"
void *
mt_receiver_thread_main(void *thr_arg)
@@ -3417,7 +3555,7 @@ mt_receiver_thread_main(void *thr_arg)
unsigned thr_no = selfptr->m_thr_no;
Uint32& watchDogCounter = selfptr->m_watchdog_counter;
Uint32 thrSignalId = 0;
- Uint32 recv_thread_idx = thr_no - first_receiver_thread_no;
+ const Uint32 recv_thread_idx = thr_no - first_receiver_thread_no;
bool has_received = false;
int cnt = 0;
@@ -3622,6 +3760,14 @@ mt_job_thread_main(void *thr_arg)
{
loops++;
+ watchDogCounter = 11;
+ if (!selfptr->m_send_buffer_pool.fill(g_thr_repository.m_mm,
+ RG_TRANSPORTER_BUFFERS,
+ THR_FREE_BUF_MAX))
+ {
+ pack_send_buffers(selfptr, thr_no);
+ }
+
watchDogCounter = 2;
scan_time_queues(selfptr, now);
@@ -4173,9 +4319,21 @@ setcpuaffinity(struct thr_repository* re
}
}
+/**
+ * return receiver thread handling a particular node
+ * returned number is indexed from 0 and upwards to #receiver threads
+ * (or MAX_NODES is none)
+ */
+Uint32
+mt_get_recv_thread_idx(NodeId nodeId)
+{
+ assert(nodeId < NDB_ARRAY_SIZE(g_node_to_recv_thr_map));
+ return g_node_to_recv_thr_map[nodeId];
+}
+
static
void
-mt_assign_receiver_threads(void)
+assign_receiver_threads(void)
{
Uint32 num_recv_threads = globalData.ndbMtReceiveThreads;
Uint32 recv_thread_idx = 0;
@@ -4221,6 +4379,11 @@ ThreadConfig::ipControlLoop(NdbThread* p
#endif
setcpuaffinity(rep);
+ /**
+ * assign nodes to receiver threads
+ */
+ assign_receiver_threads();
+
/* Start the send thread(s) */
if (g_send_threads)
g_send_threads->start_send_threads();
@@ -4228,10 +4391,7 @@ ThreadConfig::ipControlLoop(NdbThread* p
/*
* Start threads for all execution threads, except for the receiver
* thread, which runs in the main thread.
- *
- * Assign nodes to receiver threads before starting any threads.
*/
- mt_assign_receiver_threads();
for (thr_no = 0; thr_no < num_threads; thr_no++)
{
rep->m_thread[thr_no].m_time = NdbTick_CurrentMillisecond();
=== modified file 'storage/ndb/src/kernel/vm/mt.hpp'
--- a/storage/ndb/src/kernel/vm/mt.hpp revid:mikael.ronstrom@stripped
+++ b/storage/ndb/src/kernel/vm/mt.hpp revid:mikael.ronstrom@stripped
@@ -110,4 +110,12 @@ mt_get_thr_stat(class SimulatedBlock *,
class TransporterReceiveHandle *
mt_get_trp_receive_handle(unsigned instance);
+/**
+ * return receiver thread handling a particular node
+ * returned number is indexed from 0 and upwards to #receiver threads
+ * (or MAX_NODES is none)
+ */
+Uint32
+mt_get_recv_thread_idx(NodeId nodeId);
+
#endif
No bundle (reason: useless for push emails).
| Thread |
|---|
| • bzr push into mysql-5.5-cluster-7.2 branch (mikael.ronstrom:3722 to 3734) | Mikael Ronstrom | 25 Jan |