From: Mikael Ronstrom Date: January 11 2012 4:28pm Subject: bzr push into mysql-5.5-cluster-7.2 branch (mikael.ronstrom:3690 to 3701) List-Archive: http://lists.mysql.com/commits/142377 Message-Id: <201201111629.q0BGTe9D015818@dator6> MIME-Version: 1.0 Content-Type: text/plain; charset="us-ascii" Content-Transfer-Encoding: 7bit 3701 Mikael Ronstrom 2012-01-11 Fixed ndbd for new method modified: storage/ndb/src/kernel/vm/dummy_nonmt.cpp 3700 Mikael Ronstrom 2012-01-11 Fix wrong variable name modified: storage/ndb/src/kernel/vm/Configuration.cpp 3699 Mikael Ronstrom 2012-01-11 Fix wrong variable name modified: storage/ndb/src/kernel/vm/Configuration.cpp 3698 Mikael Ronstrom 2012-01-11 Fixes modified: storage/ndb/src/kernel/ndbd.cpp storage/ndb/src/kernel/vm/Configuration.cpp storage/ndb/src/kernel/vm/Emulator.hpp storage/ndb/src/kernel/vm/mt.cpp 3697 Mikael Ronstrom 2012-01-10 More error checks at allocation of lists of send buffer pools modified: storage/ndb/src/kernel/vm/mt.cpp 3696 Mikael Ronstrom 2012-01-10 New adaptive spin lock modified: storage/ndb/src/kernel/vm/mt.cpp 3695 Mikael Ronstrom 2012-01-10 New file for checksums added: storage/ndb/include/util/Checksum.hpp 3694 Mikael Ronstrom 2012-01-10 Try checksum patch modified: storage/ndb/src/common/transporter/TransporterInternalDefinitions.hpp storage/ndb/src/kernel/blocks/dbdict/Dbdict.cpp storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp storage/ndb/src/kernel/blocks/dbtup/DbtupExecQuery.cpp 3693 Mikael Ronstrom 2012-01-10 Pad diverify queue modified: storage/ndb/src/kernel/blocks/dbdih/Dbdih.hpp 3692 Mikael Ronstrom 2012-01-10 Compute job buffer including total number of threads modified: storage/ndb/src/kernel/vm/mt.cpp 3691 Mikael Ronstrom 2012-01-10 Fix use of max LQH threads where max TC threads should have been used modified: mysql-test/include/default_ndbd.cnf storage/ndb/src/kernel/blocks/dbdih/Dbdih.hpp 3690 Mikael Ronstrom 2012-01-09 [merge] merge modified: storage/ndb/memcache/src/ClusterConnectionPool.cc storage/ndb/src/ndbjtie/ndbapi_jtie.hpp === modified file 'mysql-test/include/default_ndbd.cnf' --- a/mysql-test/include/default_ndbd.cnf revid:mikael.ronstrom@stripped +++ b/mysql-test/include/default_ndbd.cnf revid:mikael.ronstrom@stripped @@ -1,5 +1,7 @@ [cluster_config] +ThreadConfig=ldm={count=16},tc={count=24},send={count=4},recv={count=4} +NoOfFragmentLogParts=16 MaxNoOfSavedMessages= 1000 MaxNoOfConcurrentTransactions= 2048 MaxNoOfConcurrentOperations= 10000 === added file 'storage/ndb/include/util/Checksum.hpp' --- a/storage/ndb/include/util/Checksum.hpp 1970-01-01 00:00:00 +0000 +++ b/storage/ndb/include/util/Checksum.hpp revid:mikael.ronstrom@stripped @@ -0,0 +1,126 @@ +/* + Copyright (c) 2012, Oracle and/or its affiliates. All rights reserved. + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; version 2 of the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA +*/ + +#ifndef CHECKSUM_HPP +#define CHECKSUM_HPP + + +/** + Optimized XOR checksum calculation. Loop unrolling will + reduce relative loop overhead and encourace usage of parallel + arithmetic adders which are common on most modern CPUs. +*/ +inline +Uint32 +computeXorChecksumShort(const Uint32 *buf, Uint32 words, Uint32 sum = 0) +{ + const Uint32 *end_unroll = buf + (words & ~3); + const Uint32 *end = buf + words; + + /** + * Aggregate as chunks of 4*Uint32 words: + * Take care if rewriting this part, code has intentionally + * been unrolled in order to take advantage of HW parallelism + * where there are multiple adders in the CPU core. + */ + while (buf < end_unroll) + { + sum ^= buf[0] ^ buf[1] ^ buf[2] ^ buf[3]; + buf += 4; + } + // Wrap up remaining part + while (buf < end) + { + sum ^= buf[0]; + buf++; + } + return sum; +} + +/** + Optimized XOR checksum calculation intended for longer strings. + Temporary aggregate XOR-sums into Uint64 which are folded into + Uint32 in the final stage. + Also unrool loop as above to take advantage of HW parallelism. + Callee is responsible for checking that there are sufficient 'words' + to be checksumed to complete at least a chunk of 4*Uint64 words. +*/ +inline +Uint32 +computeXorChecksumLong(const Uint32 *buf, Uint32 words, Uint32 sum = 0) +{ + unsigned int i = 0; + + // Align to Uint64 boundary to optimize mem. access below + if (((size_t)(buf) % 8) != 0) + { + sum ^= buf[0]; + buf++; + words--; + } + + const Uint64 *p = reinterpret_cast(buf); + Uint64 sum64 = *p++; + + const Uint32 words64 = (words/2) - 1; // Rem. after init of sum64 + const Uint64 *end = p + (words64 & ~3); + + /** + * Aggregate as chunks of 4*Uint64 words: + * Take care if rewriting this part: code has intentionally + * been unrolled in order to take advantage of HW parallelism + * where there are multiple adders in the CPU core. + */ + do + { + sum64 ^= p[0] ^ p[1] ^ p[2] ^ p[3]; + p+=4; + } while (p < end); + + // Wrap up last part which didn't fit in a 4*Uint64 chunk + end += (words64 % 4); + while (p < end) + { + sum64 ^= p[0]; + p++; + } + + // Fold temp Uint64 sum into a final Uint32 sum + sum ^= (Uint32)(sum64 & 0xffffffff) ^ + (Uint32)(sum64 >> 32); + + // Append last odd Uint32 word + if ((words%2) != 0) + sum ^= buf[words-1]; + + return sum; +} + + +inline +Uint32 +computeXorChecksum(const Uint32 *buf, Uint32 words, Uint32 sum = 0) +{ + if (words < 16) // Decided by empirical experiments + return computeXorChecksumShort(buf,words,sum); + else + return computeXorChecksumLong(buf,words,sum); +} + + +#endif // CHECKSUM_HPP + === modified file 'storage/ndb/src/common/transporter/TransporterInternalDefinitions.hpp' --- a/storage/ndb/src/common/transporter/TransporterInternalDefinitions.hpp revid:mikael.ronstrom@stripped +++ b/storage/ndb/src/common/transporter/TransporterInternalDefinitions.hpp revid:mikael.ronstrom@stripped @@ -19,6 +19,8 @@ #ifndef TransporterInternalDefinitions_H #define TransporterInternalDefinitions_H +#include + #if defined DEBUG_TRANSPORTER || defined VM_TRACE #include #endif @@ -49,10 +51,7 @@ inline Uint32 computeChecksum(const Uint32 * const startOfData, int nWords) { - Uint32 chksum = startOfData[0]; - for (int i=1; i < nWords; i++) - chksum ^= startOfData[i]; - return chksum; + return computeXorChecksum(startOfData+1, nWords-1, startOfData[0]); } struct Protocol6 { === modified file 'storage/ndb/src/kernel/blocks/dbdict/Dbdict.cpp' --- a/storage/ndb/src/kernel/blocks/dbdict/Dbdict.cpp revid:mikael.ronstrom@stripped +++ b/storage/ndb/src/kernel/blocks/dbdict/Dbdict.cpp revid:mikael.ronstrom@stripped @@ -31,6 +31,7 @@ #include #include #include +#include #include #include #include @@ -20747,10 +20748,7 @@ Dbdict::validateChecksum(const XSchemaFi Uint32 Dbdict::computeChecksum(const Uint32 * src, Uint32 len){ - Uint32 ret = 0; - for(Uint32 i = 0; i #include #include +#include #include "../suma/Suma.hpp" #include "DblqhCommon.hpp" @@ -23694,8 +23695,10 @@ Dblqh::execDROP_TRIG_IMPL_REF(Signal* si Uint32 Dblqh::calcPageCheckSum(LogPageRecordPtr logP){ Uint32 checkSum = 37; #ifdef VM_TRACE - for (Uint32 i = (ZPOS_CHECKSUM+1); ilogPageWord[i] ^ checkSum; + checkSum = computeXorChecksum( + logP.p->logPageWord + (ZPOS_CHECKSUM+1), + ZPAGE_SIZE - (ZPOS_CHECKSUM+1), + checkSum); #endif return checkSum; } === modified file 'storage/ndb/src/kernel/blocks/dbtup/DbtupExecQuery.cpp' --- a/storage/ndb/src/kernel/blocks/dbtup/DbtupExecQuery.cpp revid:mikael.ronstrom@stripped +++ b/storage/ndb/src/kernel/blocks/dbtup/DbtupExecQuery.cpp revid:mikael.ronstrom@stripped @@ -29,6 +29,7 @@ #include #include #include +#include // #define TRACE_INTERPRETER @@ -131,17 +132,14 @@ Dbtup::calculateChecksum(Tuple_header* t Tablerec* regTabPtr) { Uint32 checksum; - Uint32 i, rec_size, *tuple_header; + Uint32 rec_size, *tuple_header; rec_size= regTabPtr->m_offsets[MM].m_fix_header_size; tuple_header= tuple_ptr->m_data; - checksum= 0; // includes tupVersion //printf("%p - ", tuple_ptr); - - for (i= 0; i < rec_size-Tuple_header::HeaderSize; i++) { - checksum ^= tuple_header[i]; - //printf("%.8x ", tuple_header[i]); - } + + checksum = computeXorChecksum( + tuple_header, rec_size-Tuple_header::HeaderSize); //printf("-> %.8x\n", checksum); @@ -158,9 +156,7 @@ Dbtup::calculateChecksum(Tuple_header* t vsize_words= calculate_total_var_size(req_struct->var_len_array, regTabPtr->no_var_attr); ndbassert(req_struct->var_data_end >= &var_data_part[vsize_words]); - for (i= 0; i < vsize_words; i++) { - checksum ^= var_data_part[i]; - } + checksum = computeXorChecksum(var_data_part,vsize_words,checksum); } #endif return checksum; === modified file 'storage/ndb/src/kernel/ndbd.cpp' --- a/storage/ndb/src/kernel/ndbd.cpp revid:mikael.ronstrom@stripped +++ b/storage/ndb/src/kernel/ndbd.cpp revid:mikael.ronstrom@stripped @@ -215,6 +215,12 @@ init_global_memory_manager(EmulatorData Uint64 mem = globalTransporterRegistry.get_total_max_send_buffer(); sbpages = Uint32((mem + GLOBAL_PAGE_SIZE - 1) / GLOBAL_PAGE_SIZE); Resource_limit rl; + /* + Add one allocation of 32 pages per thread since each thread is + likely to seize at least one such in addition to the memory + needed to buffer in front of transporters + */ + sbpages += 32 * get_total_number_of_block_threads(); rl.m_min = sbpages; rl.m_max = sbpages; rl.m_resource_id = RG_TRANSPORTER_BUFFERS; === modified file 'storage/ndb/src/kernel/vm/Configuration.cpp' --- a/storage/ndb/src/kernel/vm/Configuration.cpp revid:mikael.ronstrom@stripped +++ b/storage/ndb/src/kernel/vm/Configuration.cpp revid:mikael.ronstrom@stripped @@ -681,6 +681,11 @@ Configuration::calcSizeAlt(ConfigValues { lqhInstances = globalData.ndbMtLqhWorkers; } + Uint32 tcInstances = 1; + if (globalData.ndbMtTcThreads) + { + tcInstances = globalData.ndbMtTcThreads; + } Uint64 indexMem = 0, dataMem = 0; ndb_mgm_get_int64_parameter(&db, CFG_DB_DATA_MEM, &dataMem); @@ -808,7 +813,8 @@ Configuration::calcSizeAlt(ConfigValues #if NDB_VERSION_D < NDB_MAKE_VERSION(7,2,0) noOfLocalScanRecords = (noOfDBNodes * noOfScanRecords) + #else - noOfLocalScanRecords = 4 * (noOfDBNodes * noOfScanRecords) + + noOfLocalScanRecords = tcInstances * lqhInstances * + (noOfDBNodes * noOfScanRecords) + #endif 1 /* NR */ + 1 /* LCP */; === modified file 'storage/ndb/src/kernel/vm/Emulator.hpp' --- a/storage/ndb/src/kernel/vm/Emulator.hpp revid:mikael.ronstrom@stripped +++ b/storage/ndb/src/kernel/vm/Emulator.hpp revid:mikael.ronstrom@stripped @@ -80,6 +80,11 @@ struct EmulatorData { extern struct EmulatorData globalEmulatorData; /** + * Compute total number of block threads in data node + */ +Uint32 get_total_number_of_block_threads(void); + +/** * Compute no of pages to be used as job-buffer */ Uint32 compute_jb_pages(struct EmulatorData* ed); === modified file 'storage/ndb/src/kernel/vm/dummy_nonmt.cpp' --- a/storage/ndb/src/kernel/vm/dummy_nonmt.cpp revid:mikael.ronstrom@stripped +++ b/storage/ndb/src/kernel/vm/dummy_nonmt.cpp revid:mikael.ronstrom@stripped @@ -44,6 +44,11 @@ mt_get_instance_count(Uint32 block) return 0; } +Uint32 get_total_number_of_block_threads(void) +{ + return 1; +} + Uint32 compute_jb_pages(struct EmulatorData*) { === modified file 'storage/ndb/src/kernel/vm/mt.cpp' --- a/storage/ndb/src/kernel/vm/mt.cpp revid:mikael.ronstrom@stripped +++ b/storage/ndb/src/kernel/vm/mt.cpp revid:mikael.ronstrom@stripped @@ -358,6 +358,212 @@ trylock(struct thr_spin_lock* sl) #define thr_spin_lock thr_mutex #endif +#if defined(HAVE_GCC_ATOMIC_BUILTINS) && defined(USE_FUTEX) + +/** + * Adaptive locks: Combine spinlocks with FUTEX locks + * + * Has these properties: + * 1) There is a 'fast track' not requiring any systems calls, + * where an atomic CAS (Compare And Swap) take or release + * the lock in the normal (?) uncontended case. + * + * 2) If this fails, we are allowed to spinlock upto + * LOCK_MAX_SPINS waiting for the lock to become free. + * The rational here is to try to avoid expensive context + * switches. + * + * 3) If even this fail, FUTEXes are used to suspend/resume + * the thread. + * + * In addition there is the 'adaptive' part where we detect + * in 2) that there are other lock waiters which failed to grab + * lock while spining. (Lock is 'contended'). In these cases + * other lock waiters will skip part 2), + * and go directly from 1) -> 3). + */ + +/** + * State transitions for lock is: + * LOCK_FREE -> LOCK_TAKEN -> LOCK_CONTENDED -> LOCK_FREE + * NOTE: It should never go directly from LOCK_CONTENDED -> LOCK_TAKEN + */ +#define LOCK_FREE 0 // Lock is free +#define LOCK_TAKEN 1 // Lock taken, there might be spinlock waiters +#define LOCK_CONTENDED 2 // Lock taken, and spinlock waiters might have suspended + +#define LOCK_MAX_SPINS (100*1000) + +class Adaptive_Lock +{ +public: + void init() + { + m_state = LOCK_FREE; + } + + void lock() + { + // Atomically set LOCK_TAKEN iff it was LOCK_FREE + Uint32 state = __sync_val_compare_and_swap(&m_state, LOCK_FREE, LOCK_TAKEN); + if (likely(state==LOCK_FREE)) + { + return; // Immediate success + } + /** + * Has to wait for LOCK_FREE, we either: + */ + if (likely(state==LOCK_TAKEN)) + { + /** + * Spinlock for a while until it's LOCK_FREE, or giving up + * and declare LOCK_CONTENDED where we suspend this thread. + */ + lock_wait(); + } + else + { + /** + * If lock already was in a LOCK_CONTENDED state, + * we suspend directly. + */ + lock_contended(); + } + } + + int trylock() + { + // Atomically set LOCK_TAKEN iff it was LOCK_FREE + if (likely(__sync_bool_compare_and_swap(&m_state, LOCK_FREE, LOCK_TAKEN))) + { + return 0; // success + } + return 1; + } + + void unlock() + { + // We own the lock, so it's either LOCK_TAKEN or LOCK_CONTENDED + int state = __sync_fetch_and_sub(&m_state,1); + assert(state != LOCK_FREE); + if (unlikely(state == LOCK_CONTENDED)) + { + // Important: we are still holding the lock! + m_state = LOCK_FREE; // not any more + // Wake up one thread (no fairness assumed) + futex_wake(&m_state); + } + // Else: There was no suspended waiters to wakeup + } + +private: + void lock_wait(); + void lock_contended(); + + volatile unsigned int m_state; // LOCK_xxx state +}; // class Adaptive_Lock + + +ATTRIBUTE_NOINLINE +void Adaptive_Lock::lock_wait() +{ + Uint32 spins = 0; + do + { + /** + * Inner part of loop is expected to run entirely in local L1 cache + * *not* writing or CAS'ing as this may steal memory bandwidth + * and invalidate cachelines in other L1 / L2 caches. + */ + do + { + cpu_pause(); + if (spins++ >= LOCK_MAX_SPINS) + { + // We give up waiting for it to become free, will suspend + lock_contended(); + return; + } + } while (m_state != LOCK_FREE); + + // Someone may steal the lock before our CAS! + } while(!__sync_bool_compare_and_swap(&m_state, LOCK_FREE, LOCK_TAKEN)); + + // LOCK_FREE -> LOCK_TAKEN succeeded -> Got lock +} + +ATTRIBUTE_NOINLINE +void Adaptive_Lock::lock_contended() +{ + /** + * Assume lock is still taken, try to make it LOCK_CONTENDED and wait + */ + do + { + if (m_state == LOCK_CONTENDED || + __sync_bool_compare_and_swap(&m_state, LOCK_TAKEN, LOCK_CONTENDED)) + { + // let's wait, but only if the value is still LOCK_CONTENDED + futex_wait(&m_state, LOCK_CONTENDED, NULL); + } + + /** + * Try (again) assuming the lock has become LOCK_FREE + * However, it could have been a spurious wakeup, or some + * other can steal the lock before we have CAS'ed it. + */ + } while (!__sync_bool_compare_and_swap(&m_state, LOCK_FREE, LOCK_CONTENDED)); + + // LOCK_FREE -> LOCK_CONTENDED succeeded -> Got lock +} + +template +struct thr_adaptive_lock +{ + thr_adaptive_lock(const char * name = 0) + { + m_mutex.init(); + register_lock(this, name); + } + + union { + Adaptive_Lock m_mutex; + char pad[SZ]; + }; +}; + +template +static +inline +void +lock(struct thr_adaptive_lock* sl) +{ + sl->m_mutex.lock(); +} + +template +static +inline +void +unlock(struct thr_adaptive_lock* sl) +{ + sl->m_mutex.unlock(); +} + +template +static +inline +int +trylock(struct thr_adaptive_lock * sl) +{ + return sl->m_mutex.trylock(); +} + +#else +#define thr_adaptive_lock thr_mutex +#endif // HAVE_GCC_ATOMIC_BUILTINS && USE_FUTEX + + template struct thr_mutex { @@ -411,19 +617,29 @@ struct thr_safe_pool Uint32 m_cnt; thr_spin_lock m_lock; - T* seize_list(Ndbd_mem_manager *mm, Uint32 rg, Uint32 alloc_cnt) { + T* seize_list(Ndbd_mem_manager *mm, + Uint32 rg, + Uint32 alloc_cnt, + Uint32 *alloced) { T* ret = 0; T* prev = 0; T* next = 0; Uint32 i; assert(alloc_cnt > 0); + *alloced = alloc_cnt; lock(&m_lock); if (alloc_cnt > m_cnt) { unlock(&m_lock); for (i = 0; i < alloc_cnt; i++) { ret = seize(mm, rg); + require(ret != 0 || i != 0); + if (!ret && i > 0) + { + *alloced = i; + return prev; + } assert(ret); ret->m_next = prev; prev = ret; @@ -509,6 +725,7 @@ public: T *seize(Ndbd_mem_manager *mm, Uint32 rg) { T *tmp; + Uint32 alloced = 0; while (1) { tmp = m_freelist; @@ -521,8 +738,8 @@ public: } else { - m_freelist = m_global_pool->seize_list(mm, rg, m_alloc_size); - m_free = m_alloc_size; + m_freelist = m_global_pool->seize_list(mm, rg, m_alloc_size, &alloced); + m_free = alloced; } } @@ -910,7 +1127,7 @@ struct thr_data * with this part. */ char unused_protection1[NDB_CL]; - struct thr_spin_lock<64> m_jba_write_lock; + struct thr_adaptive_lock<64> m_jba_write_lock; struct thr_job_queue m_jba; struct thr_job_queue_head m_jba_head; @@ -1046,7 +1263,7 @@ struct thr_repository /** * lock */ - struct thr_spin_lock<8> m_send_lock; + struct thr_adaptive_lock<8> m_send_lock; /** * pending data @@ -4010,9 +4227,18 @@ rep_init(struct thr_repository* rep, uns #include Uint32 +get_total_number_of_block_threads(void) +{ + return (NUM_MAIN_THREADS + + globalData.ndbMtLqhThreads + + globalData.ndbMtTcThreads + + globalData.ndbMtReceiveThreads); +} + +Uint32 compute_jb_pages(struct EmulatorData * ed) { - Uint32 cnt = NUM_MAIN_THREADS + globalData.ndbMtLqhThreads + 1; + Uint32 cnt = get_total_number_of_block_threads(); Uint32 perthread = 0; No bundle (reason: useless for push emails).