From: Mikael Ronstrom Date: January 25 2012 3:59pm Subject: bzr push into mysql-5.5-cluster-7.2 branch (mikael.ronstrom:3722 to 3734) List-Archive: http://lists.mysql.com/commits/142544 Message-Id: <201201251600.q0PG07O2013255@dator6> MIME-Version: 1.0 Content-Type: text/plain; charset="us-ascii" Content-Transfer-Encoding: 7bit 3734 Mikael Ronstrom 2012-01-25 Add pack send buffers before starting a new loop to minimize risk of send buffer overload modified: storage/ndb/src/kernel/vm/mt.cpp 3733 Mikael Ronstrom 2012-01-25 Fix overload handling of scans modified: storage/ndb/src/kernel/blocks/dblqh/Dblqh.hpp storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp 3732 Mikael Ronstrom 2012-01-25 Regulate scan already on slowdown limit, add some stuff for debugging modified: storage/ndb/include/transporter/TransporterRegistry.hpp storage/ndb/src/common/transporter/TCP_Transporter.cpp storage/ndb/src/common/transporter/Transporter.cpp storage/ndb/src/common/transporter/Transporter.hpp storage/ndb/src/common/transporter/TransporterRegistry.cpp storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp 3731 Mikael Ronstrom 2012-01-25 compilation fix modified: storage/ndb/src/kernel/vm/mt.cpp 3730 Mikael Ronstrom 2012-01-25 Don't send so immediately, only when absolutely necessary from flush_send_buffer modified: storage/ndb/src/kernel/vm/mt.cpp 3729 Mikael Ronstrom 2012-01-25 Need to send when grabbing send node lock modified: storage/ndb/src/kernel/vm/mt.cpp 3728 Mikael Ronstrom 2012-01-24 More to fix send buffer overload issue modified: storage/ndb/src/kernel/vm/mt.cpp 3727 Mikael Ronstrom 2012-01-24 Improved pack page patch modified: storage/ndb/src/kernel/vm/mt.cpp 3726 Mikael Ronstrom 2012-01-24 Release memory and add a debug printout to see how much it is used modified: storage/ndb/src/kernel/vm/mt.cpp 3725 Mikael Ronstrom 2012-01-24 pack sb_pages modified: storage/ndb/src/kernel/vm/mt.cpp 3724 Mikael Ronstrom 2012-01-24 [merge] merge added: storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/DbImplForNdbRecord.java renamed: mysql-test/suite/ndb_rpl/r/ndb_rpl_conflict.result => mysql-test/suite/ndb_rpl/r/ndb_rpl_conflict_basic.result mysql-test/suite/ndb_rpl/t/ndb_rpl_conflict.test => mysql-test/suite/ndb_rpl/t/ndb_rpl_conflict_basic.test modified: storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/ClusterConnectionImpl.java storage/ndb/src/common/transporter/TransporterRegistry.cpp storage/ndb/src/kernel/blocks/trpman.cpp storage/ndb/src/kernel/vm/CMakeLists.txt storage/ndb/src/kernel/vm/TransporterCallback.cpp storage/ndb/src/kernel/vm/TransporterCallbackKernel.hpp storage/ndb/src/kernel/vm/mt.cpp storage/ndb/src/kernel/vm/mt.hpp 3723 Mikael Ronstrom 2012-01-24 Fix modified: storage/ndb/src/kernel/vm/mt.cpp 3722 Mikael Ronstrom 2012-01-23 [merge] merge added: storage/ndb/clusterj/clusterj-test/src/main/java/testsuite/clusterj/StressTest.java storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/NdbRecordBlobImpl.java storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/NdbRecordImpl.java storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/NdbRecordOperationImpl.java storage/ndb/clusterj/clusterj-tie/src/test/java/testsuite/clusterj/tie/StressTest.java storage/ndb/src/kernel/vm/CountingPool.cpp storage/ndb/src/kernel/vm/CountingPool.hpp modified: mysql-test/suite/ndb/t/clusterj.test storage/ndb/clusterj/clusterj-api/pom.xml storage/ndb/clusterj/clusterj-bindings/pom.xml storage/ndb/clusterj/clusterj-core/pom.xml storage/ndb/clusterj/clusterj-core/src/main/java/com/mysql/clusterj/core/SessionImpl.java storage/ndb/clusterj/clusterj-core/src/main/java/com/mysql/clusterj/core/metadata/DomainFieldHandlerImpl.java storage/ndb/clusterj/clusterj-core/src/main/java/com/mysql/clusterj/core/store/Operation.java storage/ndb/clusterj/clusterj-jdbc/pom.xml storage/ndb/clusterj/clusterj-jpatest/pom.xml storage/ndb/clusterj/clusterj-openjpa/pom.xml storage/ndb/clusterj/clusterj-test/pom.xml storage/ndb/clusterj/clusterj-test/src/main/java/testsuite/clusterj/DynamicObjectTest.java storage/ndb/clusterj/clusterj-test/src/main/resources/schema.sql storage/ndb/clusterj/clusterj-tie/pom.xml storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/BlobImpl.java storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/ClusterConnectionImpl.java storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/ClusterConnectionServiceImpl.java storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/ClusterTransactionImpl.java storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/ColumnImpl.java storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/DbImpl.java storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/DictionaryImpl.java storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/OperationImpl.java storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/Utility.java storage/ndb/clusterj/clusterj-tie/src/main/resources/com/mysql/clusterj/tie/Bundle.properties storage/ndb/clusterj/pom.xml storage/ndb/include/ndbapi/Ndb.hpp storage/ndb/include/ndbapi/NdbDictionary.hpp storage/ndb/include/ndbapi/NdbOperation.hpp storage/ndb/include/ndbapi/NdbScanOperation.hpp storage/ndb/src/common/transporter/TransporterRegistry.cpp storage/ndb/src/kernel/blocks/dbdict/Dbdict.cpp storage/ndb/src/kernel/blocks/dbdict/Dbdict.hpp storage/ndb/src/kernel/blocks/dbtup/tuppage.hpp storage/ndb/src/kernel/vm/CMakeLists.txt storage/ndb/src/kernel/vm/Pool.cpp storage/ndb/src/kernel/vm/Pool.hpp storage/ndb/src/ndbjtie/NdbApiWrapper.hpp storage/ndb/src/ndbjtie/com/mysql/ndbjtie/ndbapi/Ndb.java storage/ndb/src/ndbjtie/com/mysql/ndbjtie/ndbapi/NdbDictionary.java storage/ndb/src/ndbjtie/com/mysql/ndbjtie/ndbapi/NdbOperation.java storage/ndb/src/ndbjtie/com/mysql/ndbjtie/ndbapi/NdbOperationConst.java storage/ndb/src/ndbjtie/com/mysql/ndbjtie/ndbapi/NdbScanOperation.java storage/ndb/src/ndbjtie/com/mysql/ndbjtie/ndbapi/issues.txt storage/ndb/src/ndbjtie/ndbapi_jtie.hpp storage/ndb/test/ndbapi/testSystemRestart.cpp === renamed file 'mysql-test/suite/ndb_rpl/r/ndb_rpl_conflict.result' => 'mysql-test/suite/ndb_rpl/r/ndb_rpl_conflict_basic.result' === renamed file 'mysql-test/suite/ndb_rpl/t/ndb_rpl_conflict.test' => 'mysql-test/suite/ndb_rpl/t/ndb_rpl_conflict_basic.test' === modified file 'storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/ClusterConnectionImpl.java' --- a/storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/ClusterConnectionImpl.java revid:mikael.ronstrom@stripped +++ b/storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/ClusterConnectionImpl.java revid:mikael.ronstrom@stripped @@ -60,7 +60,7 @@ public class ClusterConnectionImpl final int nodeId; /** All dbs given out by this cluster connection */ - private Map dbs = new IdentityHashMap(); + private Map dbs = new IdentityHashMap(); /** The map of table name to NdbRecordImpl */ private ConcurrentMap ndbRecordImplMap = new ConcurrentHashMap(); @@ -98,7 +98,7 @@ public class ClusterConnectionImpl // create a dictionary for NdbRecord Ndb ndbForNdbRecord = Ndb.create(clusterConnection, database, "def"); handleError(ndbForNdbRecord, clusterConnection, connectString, nodeId); - DbImpl dbForNdbRecord = new DbImpl(this, ndbForNdbRecord, maxTransactions); + DbImplForNdbRecord dbForNdbRecord = new DbImplForNdbRecord(this, ndbForNdbRecord); dbs.put(dbForNdbRecord, null); dictionaryForNdbRecord = dbForNdbRecord.getNdbDictionary(); } @@ -167,7 +167,7 @@ public class ClusterConnectionImpl } ndbRecordImplMap.clear(); if (dbs.size() != 0) { - Map dbsToClose = new IdentityHashMap(dbs); + Map dbsToClose = new IdentityHashMap(dbs); for (Db db: dbsToClose.keySet()) { db.close(); } === added file 'storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/DbImplForNdbRecord.java' --- a/storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/DbImplForNdbRecord.java 1970-01-01 00:00:00 +0000 +++ b/storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/DbImplForNdbRecord.java revid:mikael.ronstrom@stripped @@ -0,0 +1,133 @@ +/* + * Copyright (c) 2012, Oracle and/or its affiliates. All rights reserved. + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; version 2 of the License. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA + */ + +package com.mysql.clusterj.tie; + +import java.nio.ByteBuffer; + +import java.util.List; + +import com.mysql.clusterj.ClusterJDatastoreException; +import com.mysql.clusterj.ClusterJFatalInternalException; + +import com.mysql.clusterj.core.store.ClusterTransaction; + +import com.mysql.clusterj.core.util.I18NHelper; +import com.mysql.clusterj.core.util.Logger; +import com.mysql.clusterj.core.util.LoggerFactoryService; + +import com.mysql.ndbjtie.ndbapi.Ndb; +import com.mysql.ndbjtie.ndbapi.NdbErrorConst; +import com.mysql.ndbjtie.ndbapi.NdbTransaction; +import com.mysql.ndbjtie.ndbapi.NdbDictionary.Dictionary; + +/** + * This class is used to hold the ndb Dictionary used for NdbRecord. It has the minimum + * implementation needed to implement the life cycle of the Ndb. In particular, it omits the + * buffer manager and partition key scratch buffer used in the standard DbImpl. + */ +class DbImplForNdbRecord implements com.mysql.clusterj.core.store.Db { + + /** My message translator */ + static final I18NHelper local = I18NHelper.getInstance(DbImplForNdbRecord.class); + + /** My logger */ + static final Logger logger = LoggerFactoryService.getFactory() + .getInstance(DbImplForNdbRecord.class); + + /** The Ndb instance that this instance is wrapping */ + private Ndb ndb; + + /** A "big enough" size for error information */ + private int errorBufferSize = 300; + + /** The ndb error detail buffer */ + private ByteBuffer errorBuffer = ByteBuffer.allocateDirect(errorBufferSize); + + /** The NdbDictionary for this Ndb */ + private Dictionary ndbDictionary; + + /** The ClusterConnection */ + private ClusterConnectionImpl clusterConnection; + + public DbImplForNdbRecord(ClusterConnectionImpl clusterConnection, Ndb ndb) { + this.clusterConnection = clusterConnection; + this.ndb = ndb; + int returnCode = ndb.init(1); + handleError(returnCode, ndb); + ndbDictionary = ndb.getDictionary(); + handleError(ndbDictionary, ndb); + } + + public void close() { + Ndb.delete(ndb); + clusterConnection.close(this); + } + + public com.mysql.clusterj.core.store.Dictionary getDictionary() { + throw new ClusterJFatalInternalException(local.message("ERR_Implementation_Should_Not_Occur")); + } + + public Dictionary getNdbDictionary() { + return ndbDictionary; + } + + public ClusterTransaction startTransaction(String joinTransactionId) { + throw new ClusterJFatalInternalException(local.message("ERR_Implementation_Should_Not_Occur")); + } + + protected void handleError(int returnCode, Ndb ndb) { + if (returnCode == 0) { + return; + } else { + NdbErrorConst ndbError = ndb.getNdbError(); + String detail = getNdbErrorDetail(ndbError); + Utility.throwError(returnCode, ndbError, detail); + } + } + + protected void handleError(Object object, Ndb ndb) { + if (object != null) { + return; + } else { + NdbErrorConst ndbError = ndb.getNdbError(); + String detail = getNdbErrorDetail(ndbError); + Utility.throwError(null, ndbError, detail); + } + } + + public boolean isRetriable(ClusterJDatastoreException ex) { + throw new ClusterJFatalInternalException(local.message("ERR_Implementation_Should_Not_Occur")); + } + + public String getNdbErrorDetail(NdbErrorConst ndbError) { + return ndb.getNdbErrorDetail(ndbError, errorBuffer, errorBuffer.capacity()); + } + + public NdbTransaction enlist(String tableName, List keyParts) { + throw new ClusterJFatalInternalException(local.message("ERR_Implementation_Should_Not_Occur")); + } + + public NdbTransaction enlist(String tableName, int partitionId) { + throw new ClusterJFatalInternalException(local.message("ERR_Implementation_Should_Not_Occur")); + } + + public NdbTransaction joinTransaction(String coordinatedTransactionId) { + throw new ClusterJFatalInternalException(local.message("ERR_Implementation_Should_Not_Occur")); + } + +} === modified file 'storage/ndb/include/transporter/TransporterRegistry.hpp' --- a/storage/ndb/include/transporter/TransporterRegistry.hpp revid:mikael.ronstrom@stripped +++ b/storage/ndb/include/transporter/TransporterRegistry.hpp revid:mikael.ronstrom@stripped @@ -292,6 +292,13 @@ public: const NodeBitmask& get_status_overloaded() const; /** + * Set or clear slowdown bit. + * Query if any slowdown bit is set. + */ + void set_status_slowdown(Uint32 nodeId, bool val); + const NodeBitmask& get_status_slowdown() const; + + /** * prepareSend * * When IOState is HaltOutput or HaltIO do not send or insert any @@ -431,8 +438,10 @@ private: /** * Overloaded bits, for fast check. + * Similarly slowdown bits for fast check. */ NodeBitmask m_status_overloaded; + NodeBitmask m_status_slowdown; /** * Unpack signal data. @@ -599,4 +608,18 @@ TransporterRegistry::get_status_overload return m_status_overloaded; } +inline void +TransporterRegistry::set_status_slowdown(Uint32 nodeId, bool val) +{ + assert(nodeId < MAX_NODES); + if (val != m_status_slowdown.get(nodeId)) + m_status_slowdown.set(nodeId, val); +} + +inline const NodeBitmask& +TransporterRegistry::get_status_slowdown() const +{ + return m_status_slowdown; +} + #endif // Define of TransporterRegistry_H === modified file 'storage/ndb/src/common/transporter/TCP_Transporter.cpp' --- a/storage/ndb/src/common/transporter/TCP_Transporter.cpp revid:mikael.ronstrom@stripped +++ b/storage/ndb/src/common/transporter/TCP_Transporter.cpp revid:mikael.ronstrom@stripped @@ -112,6 +112,7 @@ TCP_Transporter::TCP_Transporter(Transpo setIf(sockOptTcpMaxSeg, conf->tcp.tcpMaxsegSize, 0); m_overload_limit = overload_limit(conf); + m_slowdown_limit = conf->tcp.sendBufferSize/2; } === modified file 'storage/ndb/src/common/transporter/Transporter.cpp' --- a/storage/ndb/src/common/transporter/Transporter.cpp revid:mikael.ronstrom@stripped +++ b/storage/ndb/src/common/transporter/Transporter.cpp revid:mikael.ronstrom@stripped @@ -43,7 +43,8 @@ Transporter::Transporter(TransporterRegi : m_s_port(s_port), remoteNodeId(rNodeId), localNodeId(lNodeId), isServer(lNodeId==serverNodeId), m_packer(_signalId, _checksum), m_max_send_buffer(max_send_buffer), - m_overload_limit(0xFFFFFFFF), isMgmConnection(_isMgmConnection), + m_overload_limit(0xFFFFFFFF), m_slowdown_limit(0xFFFFFFFF), + isMgmConnection(_isMgmConnection), m_connected(false), m_type(_type), m_transporter_registry(t_reg) === modified file 'storage/ndb/src/common/transporter/Transporter.hpp' --- a/storage/ndb/src/common/transporter/Transporter.hpp revid:mikael.ronstrom@stripped +++ b/storage/ndb/src/common/transporter/Transporter.hpp revid:mikael.ronstrom@stripped @@ -90,6 +90,8 @@ public: { m_transporter_registry.set_status_overloaded(remoteNodeId, used >= m_overload_limit); + m_transporter_registry.set_status_slowdown(remoteNodeId, + used >= m_slowdown_limit); } virtual int doSend() = 0; @@ -159,6 +161,7 @@ protected: Uint32 m_max_send_buffer; /* Overload limit, as configured with the OverloadLimit config parameter. */ Uint32 m_overload_limit; + Uint32 m_slowdown_limit; private: @@ -226,6 +229,10 @@ Transporter::iovec_data_sent(int nBytesS { Uint32 used_bytes = get_callback_obj()->bytes_sent(remoteNodeId, nBytesSent); + if (used_bytes > 256 * 1024) + { + ndbout_c("%u bytes in send buffer", used_bytes); + } update_status_overloaded(used_bytes); } === modified file 'storage/ndb/src/common/transporter/TransporterRegistry.cpp' --- a/storage/ndb/src/common/transporter/TransporterRegistry.cpp revid:mikael.ronstrom@stripped +++ b/storage/ndb/src/common/transporter/TransporterRegistry.cpp revid:mikael.ronstrom@stripped @@ -763,6 +763,7 @@ TransporterRegistry::prepareSend(Transpo } set_status_overloaded(nodeId, true); + set_status_slowdown(nodeId, true); int sleepTime = 2; /** @@ -851,6 +852,7 @@ TransporterRegistry::prepareSend(Transpo * so sleepTime = 2 generates a 10 ms sleep. */ set_status_overloaded(nodeId, true); + set_status_slowdown(nodeId, true); int sleepTime = 2; for(int i = 0; i<50; i++){ if((nSHMTransporters+nSCITransporters) == 0) @@ -934,6 +936,7 @@ TransporterRegistry::prepareSend(Transpo * so sleepTime = 2 generates a 10 ms sleep. */ set_status_overloaded(nodeId, true); + set_status_slowdown(nodeId, true); int sleepTime = 2; for(int i = 0; i<50; i++){ if((nSHMTransporters+nSCITransporters) == 0) @@ -1242,7 +1245,7 @@ TransporterRegistry::poll_TCP(Uint32 tim if (extra_socket && recvdata.m_transporters.get(0)) { const NDB_SOCKET_TYPE socket = m_extra_wakeup_sockets[0]; - assert(receiveHandle); + assert(&recvdata == receiveHandle); // not used by ndbmtd... // Poll the wakup-socket for read recvdata.m_socket_poller.add(socket, true, false, false); @@ -1318,7 +1321,7 @@ TransporterRegistry::performReceive(Tran if (recvdata.m_has_data_transporters.get(0)) { assert(recvdata.m_transporters.get(0)); - assert(receiveHandle); + assert(&recvdata == receiveHandle); // not used by ndbmtd recvdata.m_has_data_transporters.clear(Uint32(0)); consume_extra_sockets(); } @@ -2293,6 +2296,8 @@ TransporterRegistry::updateWritePtr(Tran Uint32 used = handle->updateWritePtr(node, lenBytes, prio); t->update_status_overloaded(used); + if (used > 32768) + abort(); if(t->send_limit_reached(used)) { //------------------------------------------------- === modified file 'storage/ndb/src/kernel/blocks/dblqh/Dblqh.hpp' --- a/storage/ndb/src/kernel/blocks/dblqh/Dblqh.hpp revid:mikael.ronstrom@stripped +++ b/storage/ndb/src/kernel/blocks/dblqh/Dblqh.hpp revid:mikael.ronstrom@stripped @@ -574,7 +574,8 @@ public: Uint8 m_last_row; Uint8 m_reserved; Uint8 statScan; - Uint8 dummy[3]; // align? + Uint8 m_stop_batch; + Uint8 dummy[2]; // align? }; // Size 272 bytes typedef Ptr ScanRecordPtr; @@ -3300,7 +3301,8 @@ Dblqh::ScanRecord::check_scan_batch_comp Uint32 max_rows = m_max_batch_size_rows; Uint32 max_bytes = m_max_batch_size_bytes; - return (max_rows > 0 && (m_curr_batch_size_rows >= max_rows)) || + return m_stop_batch || + (max_rows > 0 && (m_curr_batch_size_rows >= max_rows)) || (max_bytes > 0 && (m_curr_batch_size_bytes >= max_bytes)); } === modified file 'storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp' --- a/storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp revid:mikael.ronstrom@stripped +++ b/storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp revid:mikael.ronstrom@stripped @@ -11284,17 +11284,17 @@ void Dblqh::scanTupkeyConfLab(Signal* si scanptr.p->m_curr_batch_size_rows = rows + 1; scanptr.p->m_last_row = tdata5; - const NodeBitmask& all = globalTransporterRegistry.get_status_overloaded(); + const NodeBitmask& all = globalTransporterRegistry.get_status_slowdown(); if (unlikely(!all.isclear())) { if (all.get(refToNode(scanptr.p->scanApiBlockref))) { /** - * End scan batch if transporter-buffer are overloaded + * End scan batch if transporter-buffer are in slowdown state * * TODO: We should have counters for this... */ - tdata5 = 1; + scanptr.p->m_stop_batch = 1; } } @@ -11604,6 +11604,7 @@ Uint32 Dblqh::initScanrec(const ScanFrag scanptr.p->scanTcrec = tcConnectptr.i; scanptr.p->scanSchemaVersion = scanFragReq->schemaVersion; + scanptr.p->m_stop_batch = 0; scanptr.p->m_curr_batch_size_rows = 0; scanptr.p->m_curr_batch_size_bytes= 0; scanptr.p->m_max_batch_size_rows = max_rows; @@ -12091,6 +12092,8 @@ void Dblqh::sendScanFragConf(Signal* sig scanptr.p->m_curr_batch_size_rows = 0; scanptr.p->m_curr_batch_size_bytes= 0; } + + scanptr.p->m_stop_batch = 0; }//Dblqh::sendScanFragConf() /* ######################################################################### */ === modified file 'storage/ndb/src/kernel/blocks/trpman.cpp' --- a/storage/ndb/src/kernel/blocks/trpman.cpp revid:mikael.ronstrom@stripped +++ b/storage/ndb/src/kernel/blocks/trpman.cpp revid:mikael.ronstrom@stripped @@ -23,6 +23,8 @@ #include #include +#include + Trpman::Trpman(Block_context & ctx, Uint32 instanceno) : SimulatedBlock(TRPMAN, ctx, instanceno) { @@ -50,11 +52,7 @@ BLOCK_FUNCTIONS(Trpman) #ifdef ERROR_INSERT static NodeBitmask c_error_9000_nodes_mask; extern Uint32 MAX_RECEIVED_SIGNALS; - -class TransporterReceiveHandle * -mt_get_trp_receive_handle(unsigned instance); #endif -Uint32 mt_get_recv_thread_idx(NodeId nodeId); bool Trpman::handles_this_node(Uint32 nodeId) @@ -64,7 +62,7 @@ Trpman::handles_this_node(Uint32 nodeId) #else if (globalData.ndbMtReceiveThreads <= (Uint32)1) return true; - return (instance()== (mt_get_recv_thread_idx(nodeId) + 1)); + return (instance()== (mt_get_recv_thread_idx(nodeId) + /* proxy */ 1)); #endif } === modified file 'storage/ndb/src/kernel/vm/CMakeLists.txt' --- a/storage/ndb/src/kernel/vm/CMakeLists.txt revid:mikael.ronstrom@stripped +++ b/storage/ndb/src/kernel/vm/CMakeLists.txt revid:mikael.ronstrom@stripped @@ -72,7 +72,7 @@ SET_TARGET_PROPERTIES(mt_thr_config-t PROPERTIES COMPILE_FLAGS "-DTEST_MT_THR_CONFIG") TARGET_LINK_LIBRARIES(mt_thr_config-t ndbgeneral) -FOREACH(testprog IN ITEMS CountingPool DynArr256) +FOREACH(testprog CountingPool DynArr256) ADD_EXECUTABLE(${testprog}-t ${testprog}.cpp) SET_TARGET_PROPERTIES(${testprog}-t PROPERTIES COMPILE_FLAGS "-DTAP_TEST") TARGET_LINK_LIBRARIES(${testprog}-t ndbtest ndbkernel ndbsched ndberror === modified file 'storage/ndb/src/kernel/vm/TransporterCallback.cpp' --- a/storage/ndb/src/kernel/vm/TransporterCallback.cpp revid:mikael.ronstrom@stripped +++ b/storage/ndb/src/kernel/vm/TransporterCallback.cpp revid:mikael.ronstrom@stripped @@ -488,10 +488,10 @@ TransporterReceiveHandleKernel::checkJob #endif } +#ifdef NDBD_MULTITHREADED void TransporterReceiveHandleKernel::assign_nodes(NodeId *recv_thread_idx_array) { -#ifdef NDBD_MULTITHREADED m_transporters.clear(); /* Clear all first */ for (Uint32 nodeId = 1; nodeId < MAX_NODES; nodeId++) { @@ -499,8 +499,8 @@ TransporterReceiveHandleKernel::assign_n m_transporters.set(nodeId); /* Belongs to our receive thread */ } return; -#endif } +#endif void TransporterReceiveHandleKernel::transporter_recv_from(NodeId nodeId) === modified file 'storage/ndb/src/kernel/vm/TransporterCallbackKernel.hpp' --- a/storage/ndb/src/kernel/vm/TransporterCallbackKernel.hpp revid:mikael.ronstrom@stripped +++ b/storage/ndb/src/kernel/vm/TransporterCallbackKernel.hpp revid:mikael.ronstrom@stripped @@ -36,6 +36,11 @@ public: * instance() - 1(proxy) */ Uint32 m_receiver_thread_idx; + + /** + * Assign nodes to this TransporterReceiveHandle + */ + void assign_nodes(NodeId *recv_thread_idx_array); #endif /* TransporterCallback interface. */ @@ -49,7 +54,6 @@ public: void reportError(NodeId nodeId, TransporterError errorCode, const char *info = 0); void transporter_recv_from(NodeId node); - void assign_nodes(NodeId *recv_thread_idx_array); int checkJobBuffer(); virtual ~TransporterReceiveHandleKernel() { } }; === modified file 'storage/ndb/src/kernel/vm/mt.cpp' --- a/storage/ndb/src/kernel/vm/mt.cpp revid:mikael.ronstrom@stripped +++ b/storage/ndb/src/kernel/vm/mt.cpp revid:mikael.ronstrom@stripped @@ -37,16 +37,6 @@ #include "mt-asm.h" -/** - * Array for mapping nodes to receiver threads and function to access it. - */ -static NodeId g_node_to_recv_thr_map[MAX_NODES]; - -Uint32 mt_get_recv_thread_idx(NodeId nodeId) -{ - return g_node_to_recv_thr_map[nodeId]; -} - inline SimulatedBlock* GlobalData::mt_getBlock(BlockNumber blockNo, Uint32 instanceNo) @@ -555,6 +545,49 @@ public: return tmp; } + T* find_last(T* first) + { + T* next = first; + if (!first) + return first; + while (next->m_next) + { + next = next->m_next; + } + return next; + } + + + bool fill(Ndbd_mem_manager *mm, Uint32 rg, Uint32 fill_level) + { + Uint32 alloced = 0; + if (m_free >= fill_level) + { + return true; + } + do + { + T *save_last = find_last(m_freelist); + T *new_list = m_global_pool->seize_list(mm, rg, + fill_level - m_free, + &alloced); + m_free += alloced; + if (save_last) + { + save_last->m_next = new_list; + } + else + { + m_freelist = new_list; + } + } while ((m_free < fill_level) && alloced); + if (m_free >= fill_level) + { + return true; + } + return false; + } + void release(Ndbd_mem_manager *mm, Uint32 rg, T *t) { unsigned free = m_free; if (free < m_max_free) @@ -1049,21 +1082,36 @@ struct thr_repository m_sb_pool("sendbufferpool") {} + /** + * m_receive_lock, m_section_lock, m_mem_manager_lock, m_jb_pool + * and m_sb_pool are allvariables globally shared among the threads + * and also heavily updated. + */ struct thr_spin_lock<64> m_receive_lock[MAX_NDBMT_RECEIVE_THREADS]; struct thr_spin_lock<64> m_section_lock; struct thr_spin_lock<64> m_mem_manager_lock; + /* thr_safe_pool is aligned to be also 64 bytes in size */ struct thr_safe_pool m_jb_pool; struct thr_safe_pool m_sb_pool; + /* m_mm and m_thread_count are globally shared and read only variables */ Ndbd_mem_manager * m_mm; unsigned m_thread_count; - /* Protect m_mm and m_thread_count from CPU cache misses */ - char protection_unused[NDB_CL]; + /** + * Protect m_mm and m_thread_count from CPU cache misses, first + * part of m_thread (struct thr_data) is globally shared variables. + * So sharing cache line with these for these read only variables + * isn't a good idea + */ + char protection_unused[NDB_CL - sizeof(void*) - sizeof(unsigned)]; struct thr_data m_thread[MAX_BLOCK_THREADS]; /** * send buffer handling + * Put protection cacheline to avoid sharing with last m_thread + * instance. */ + char protection_unused2[NDB_CL]; /* The buffers that are to be sent */ struct send_buffer { @@ -2404,6 +2452,45 @@ release_list(thread_local_poolrelease_local(tail); } +static +void +pack_sb_pages(thread_local_pool* pool, + thr_repository::send_buffer* sb) +{ + Uint32 release_count = 0; + assert(sb->m_buffer.m_first_page != 0); + assert(sb->m_buffer.m_last_page != 0); + assert(sb->m_buffer.m_last_page->m_next == 0); + + thr_send_page* curr = sb->m_buffer.m_first_page; + Uint32 curr_free = curr->max_bytes() - (curr->m_bytes + curr->m_start); + while (curr->m_next != 0) + { + thr_send_page* next = curr->m_next; + assert(next->m_start == 0); // only first page should have half sent bytes + if (next->m_bytes <= curr_free) + { + thr_send_page * save = next; + memcpy(curr->m_data + (curr->m_bytes + curr->m_start), + next->m_data, + next->m_bytes); + + curr_free -= next->m_bytes; + + curr->m_bytes += next->m_bytes; + curr->m_next = next->m_next; + release_count++; + + pool->release_local(save); + } + else + { + curr = next; + curr_free = curr->max_bytes() - (curr->m_bytes + curr->m_start); + } + } + sb->m_buffer.m_last_page = curr; +} static Uint32 @@ -2467,6 +2554,14 @@ bytes_sent(thread_local_poolm_buffer.m_first_page = curr; assert(sb->m_bytes > bytes); sb->m_bytes -= bytes; + + /** + * Since not all bytes were sent... + * spend the time to try to pack the pages + * possibly releasing send-buffer + */ + pack_sb_pages(pool, sb); + return sb->m_bytes; } @@ -2548,6 +2643,40 @@ register_pending_send(thr_data *selfptr, } } +static void try_send(thr_data * selfptr, Uint32 node); + +void +pack_send_buffer(thr_data *selfptr, Uint32 node, Uint32 thr_no) +{ + thr_repository* rep = &g_thr_repository; + thr_repository::send_buffer* sb = rep->m_send_buffers+node; + thread_local_pool* pool = + &rep->m_thread[thr_no].m_send_buffer_pool; + + lock(&sb->m_send_lock); + int bytes = link_thread_send_buffers(sb, node); + if (bytes) + { + pack_sb_pages(pool, sb); + } + unlock(&sb->m_send_lock); + selfptr->m_send_buffer_pool.release_global(rep->m_mm, + RG_TRANSPORTER_BUFFERS); + if (sb->m_force_send) + { + try_send(selfptr, node); + } +} + +void +pack_send_buffers(thr_data *selfptr, Uint32 thr_no) +{ + for (Uint32 i = 1; i < NDB_ARRAY_SIZE(selfptr->m_send_buffers); i++) + { + pack_send_buffer(selfptr, i, thr_no); + } +} + /** * publish thread-locally prepared send-buffer */ @@ -2574,9 +2703,7 @@ flush_send_buffer(thr_data* selfptr, Uin if (unlikely(next == ri)) { - lock(&sb->m_send_lock); - link_thread_send_buffers(sb, node); - unlock(&sb->m_send_lock); + pack_send_buffer(selfptr, node, thr_no); } dst->m_buffers[wi] = src->m_first_page; @@ -2594,6 +2721,7 @@ flush_send_buffer(thr_data* selfptr, Uin bool mt_send_handle::forceSend(NodeId nodeId) { + int res; struct thr_repository *rep = &g_thr_repository; struct thr_data *selfptr = m_selfptr; struct thr_repository::send_buffer * sb = rep->m_send_buffers + nodeId; @@ -2603,13 +2731,12 @@ mt_send_handle::forceSend(NodeId nodeId) sb->m_force_send = 0; lock(&sb->m_send_lock); sb->m_send_thread = selfptr->m_thr_no; - globalTransporterRegistry.performSend(nodeId); + res = globalTransporterRegistry.performSend(nodeId); sb->m_send_thread = NO_SEND_THREAD; unlock(&sb->m_send_lock); + selfptr->m_send_buffer_pool.release_global(rep->m_mm, + RG_TRANSPORTER_BUFFERS); } while (sb->m_force_send); - - selfptr->m_send_buffer_pool.release_global(rep->m_mm, RG_TRANSPORTER_BUFFERS); - return true; } @@ -2620,6 +2747,7 @@ static void try_send(thr_data * selfptr, Uint32 node) { + int res; struct thr_repository *rep = &g_thr_repository; struct thr_repository::send_buffer * sb = rep->m_send_buffers + node; @@ -2634,12 +2762,12 @@ try_send(thr_data * selfptr, Uint32 node mb(); sb->m_send_thread = selfptr->m_thr_no; - globalTransporterRegistry.performSend(node); + res = globalTransporterRegistry.performSend(node); sb->m_send_thread = NO_SEND_THREAD; unlock(&sb->m_send_lock); + selfptr->m_send_buffer_pool.release_global(rep->m_mm, + RG_TRANSPORTER_BUFFERS); } while (sb->m_force_send); - - selfptr->m_send_buffer_pool.release_global(rep->m_mm, RG_TRANSPORTER_BUFFERS); } /** @@ -2682,6 +2810,7 @@ static Uint32 do_send(struct thr_data* selfptr, bool must_send) { + int res = 1; Uint32 i; Uint32 count = selfptr->m_pending_send_count; Uint8 *nodes = selfptr->m_pending_send_nodes; @@ -2764,17 +2893,21 @@ do_send(struct thr_data* selfptr, bool m * holds the send lock for this remote node. */ sb->m_send_thread = selfptr->m_thr_no; - int res = globalTransporterRegistry.performSend(node); + res = globalTransporterRegistry.performSend(node); sb->m_send_thread = NO_SEND_THREAD; unlock(&sb->m_send_lock); if (res) { register_pending_send(selfptr, node); } + if (sb->m_force_send) /* Release memory if looping */ + selfptr->m_send_buffer_pool.release_global(rep->m_mm, + RG_TRANSPORTER_BUFFERS); } while (sb->m_force_send); } - selfptr->m_send_buffer_pool.release_global(rep->m_mm, RG_TRANSPORTER_BUFFERS); + selfptr->m_send_buffer_pool.release_global(rep->m_mm, + RG_TRANSPORTER_BUFFERS); return selfptr->m_pending_send_count; } @@ -3406,6 +3539,11 @@ aligned_signal(unsigned char signal_buf[ TransporterReceiveHandleKernel * g_trp_receive_handle_ptr[MAX_NDBMT_RECEIVE_THREADS]; +/** + * Array for mapping nodes to receiver threads and function to access it. + */ +static NodeId g_node_to_recv_thr_map[MAX_NODES]; + extern "C" void * mt_receiver_thread_main(void *thr_arg) @@ -3417,7 +3555,7 @@ mt_receiver_thread_main(void *thr_arg) unsigned thr_no = selfptr->m_thr_no; Uint32& watchDogCounter = selfptr->m_watchdog_counter; Uint32 thrSignalId = 0; - Uint32 recv_thread_idx = thr_no - first_receiver_thread_no; + const Uint32 recv_thread_idx = thr_no - first_receiver_thread_no; bool has_received = false; int cnt = 0; @@ -3622,6 +3760,14 @@ mt_job_thread_main(void *thr_arg) { loops++; + watchDogCounter = 11; + if (!selfptr->m_send_buffer_pool.fill(g_thr_repository.m_mm, + RG_TRANSPORTER_BUFFERS, + THR_FREE_BUF_MAX)) + { + pack_send_buffers(selfptr, thr_no); + } + watchDogCounter = 2; scan_time_queues(selfptr, now); @@ -4173,9 +4319,21 @@ setcpuaffinity(struct thr_repository* re } } +/** + * return receiver thread handling a particular node + * returned number is indexed from 0 and upwards to #receiver threads + * (or MAX_NODES is none) + */ +Uint32 +mt_get_recv_thread_idx(NodeId nodeId) +{ + assert(nodeId < NDB_ARRAY_SIZE(g_node_to_recv_thr_map)); + return g_node_to_recv_thr_map[nodeId]; +} + static void -mt_assign_receiver_threads(void) +assign_receiver_threads(void) { Uint32 num_recv_threads = globalData.ndbMtReceiveThreads; Uint32 recv_thread_idx = 0; @@ -4221,6 +4379,11 @@ ThreadConfig::ipControlLoop(NdbThread* p #endif setcpuaffinity(rep); + /** + * assign nodes to receiver threads + */ + assign_receiver_threads(); + /* Start the send thread(s) */ if (g_send_threads) g_send_threads->start_send_threads(); @@ -4228,10 +4391,7 @@ ThreadConfig::ipControlLoop(NdbThread* p /* * Start threads for all execution threads, except for the receiver * thread, which runs in the main thread. - * - * Assign nodes to receiver threads before starting any threads. */ - mt_assign_receiver_threads(); for (thr_no = 0; thr_no < num_threads; thr_no++) { rep->m_thread[thr_no].m_time = NdbTick_CurrentMillisecond(); === modified file 'storage/ndb/src/kernel/vm/mt.hpp' --- a/storage/ndb/src/kernel/vm/mt.hpp revid:mikael.ronstrom@stripped +++ b/storage/ndb/src/kernel/vm/mt.hpp revid:mikael.ronstrom@stripped @@ -110,4 +110,12 @@ mt_get_thr_stat(class SimulatedBlock *, class TransporterReceiveHandle * mt_get_trp_receive_handle(unsigned instance); +/** + * return receiver thread handling a particular node + * returned number is indexed from 0 and upwards to #receiver threads + * (or MAX_NODES is none) + */ +Uint32 +mt_get_recv_thread_idx(NodeId nodeId); + #endif No bundle (reason: useless for push emails).