From: John David Duncan Date: April 19 2011 1:32am Subject: bzr push into mysql-5.1-telco-7.2 branch (john.duncan:4160 to 4161) List-Archive: http://lists.mysql.com/commits/135686 Message-Id: <201104190132.p3J1Wcnp012844@acsmt358.oracle.com> MIME-Version: 1.0 Content-Type: text/plain; charset="us-ascii" Content-Transfer-Encoding: 7bit 4161 John David Duncan 2011-04-18 [merge] Merge removed: storage/ndb/memcache/src/schedulers/3thread.cc storage/ndb/memcache/src/schedulers/3thread.h added: storage/ndb/memcache/src/schedulers/Bulk.cc storage/ndb/memcache/src/schedulers/Bulk.h modified: storage/ndb/memcache/Makefile.am storage/ndb/memcache/include/NdbInstance.h storage/ndb/memcache/src/NdbInstance.cc storage/ndb/memcache/src/ndb_pipeline.cc storage/ndb/memcache/src/ndb_worker.cc storage/ndb/memcache/src/schedulers/Stockholm.cc storage/ndb/memcache/src/workqueue.c storage/ndb/memcache/unit/Makefile.am storage/ndb/memcache/unit/harness.cc 4160 John David Duncan 2011-04-18 Add Ndb Custom Data ptr modified: storage/ndb/include/ndbapi/Ndb.hpp storage/ndb/src/ndbapi/Ndb.cpp storage/ndb/src/ndbapi/NdbImpl.hpp storage/ndb/src/ndbapi/Ndbinit.cpp === modified file 'storage/ndb/memcache/Makefile.am' --- a/storage/ndb/memcache/Makefile.am 2011-04-09 07:36:24 +0000 +++ b/storage/ndb/memcache/Makefile.am 2011-04-19 01:23:01 +0000 @@ -34,8 +34,8 @@ ndb_engine_la_SOURCES= \ src/workqueue.c \ src/schedulers/Stockholm.h \ src/schedulers/Stockholm.cc \ - src/schedulers/3thread.h \ - src/schedulers/3thread.cc \ + src/schedulers/Bulk.h \ + src/schedulers/Bulk.cc \ src/schedulers/Flex.h \ src/schedulers/Flex.cc \ src/schedulers/Flex_broker.h \ === modified file 'storage/ndb/memcache/include/NdbInstance.h' --- a/storage/ndb/memcache/include/NdbInstance.h 2011-04-08 08:14:11 +0000 +++ b/storage/ndb/memcache/include/NdbInstance.h 2011-04-19 01:16:36 +0000 @@ -62,6 +62,7 @@ public: Uint64 batches; // number of pollNdb() calls that got results. Uint64 transactions; // total polled transactions from pollNdb(). Uint64 locks_failed; // number of failed lock attempts. + int npending; bool is_locked; private: @@ -74,6 +75,7 @@ inline void LockableNdbInstance::addPoll if(n) { batches++; transactions += n; + npending -= n; } } === modified file 'storage/ndb/memcache/src/NdbInstance.cc' --- a/storage/ndb/memcache/src/NdbInstance.cc 2011-04-08 08:14:11 +0000 +++ b/storage/ndb/memcache/src/NdbInstance.cc 2011-04-19 01:16:36 +0000 @@ -75,6 +75,7 @@ LockableNdbInstance::LockableNdbInstance batches = 0; transactions = 0; next = 0; + npending = 0; } === modified file 'storage/ndb/memcache/src/ndb_pipeline.cc' --- a/storage/ndb/memcache/src/ndb_pipeline.cc 2011-04-06 04:31:28 +0000 +++ b/storage/ndb/memcache/src/ndb_pipeline.cc 2011-04-19 01:16:36 +0000 @@ -35,9 +35,9 @@ #include "thread_identifier.h" #include "ndb_worker.h" -#include "schedulers/3thread.h" -#include "schedulers/Stockholm.h" #include "schedulers/Flex.h" +#include "schedulers/Bulk.h" +#include "schedulers/Stockholm.h" #define DEFAULT_SCHEDULER Scheduler_flex @@ -181,8 +181,8 @@ void * initialize_scheduler(const char * s = new Scheduler_flex; if(!strncasecmp(cf, "stockholm", 10)) s = new Scheduler_stockholm; - else if(!strncasecmp(cf, "3-thread", 9)) - s = new Scheduler_3thread; + else if(!strncasecmp(cf, "bulk", 5)) + s = new Scheduler_bulk; else { /* pass the config string to the default scheduler */ sched_options = cf; === modified file 'storage/ndb/memcache/src/ndb_worker.cc' --- a/storage/ndb/memcache/src/ndb_worker.cc 2011-04-09 07:36:24 +0000 +++ b/storage/ndb/memcache/src/ndb_worker.cc 2011-04-19 01:16:36 +0000 @@ -853,9 +853,13 @@ bool finalize_write(workitem *wqitem, bo se = (struct default_engine *) pipeline->engine->m_default_engine; /* If the write was succesful, update the local cache */ + /* Possible bugs here: + (1) store_item will store nbytes as length, which is wrong. + (2) The CAS may be incorrect. + */ if(wqitem->prefix_info.do_mc_write && tx_did_match) { return (store_item(se, wqitem->cache_item, - hash_item_get_cas_ptr(wqitem->cache_item), // is that right? + hash_item_get_cas_ptr(wqitem->cache_item), OPERATION_SET, wqitem->cookie) == ENGINE_SUCCESS); } return true; === removed file 'storage/ndb/memcache/src/schedulers/3thread.cc' --- a/storage/ndb/memcache/src/schedulers/3thread.cc 2011-04-06 04:31:28 +0000 +++ b/storage/ndb/memcache/src/schedulers/3thread.cc 1970-01-01 00:00:00 +0000 @@ -1,274 +0,0 @@ -/* - Copyright (c) 2011, Oracle and/or its affiliates. All rights - reserved. - - This program is free software; you can redistribute it and/or - modify it under the terms of the GNU General Public License - as published by the Free Software Foundation; version 2 of - the License. - - This program is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU General Public License for more details. - - You should have received a copy of the GNU General Public License - along with this program; if not, write to the Free Software - Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA - 02110-1301 USA - */ -/* System headers */ -/* C++ files must define __STDC_FORMAT_MACROS in order to get PRIu64 */ -#define __STDC_FORMAT_MACROS -#include -#include -#include - -/* Memcache headers */ -#include "memcached/types.h" -#include - -/* NDB Memcache headers */ -#include "3thread.h" -#include "workitem.h" -#include "ndb_worker.h" -#include "status_block.h" - -extern EXTENSION_LOGGER_DESCRIPTOR *logger; - -extern "C" { - void * run_3thd_worker_thread(void *); - void * run_3thd_commit_thread(void *); -} - -status_block status_block_unsupported = { ENGINE_ENOTSUP, "Not supported." }; - -void Scheduler_3thread::init(int my_thread, int, const char *) { - const Configuration & conf = get_Configuration(); - ClusterConnectionPool *pool; - Ndb_cluster_connection *conn; - LockableNdbInstance *inst; - - /* Allocate and initialize the workqueue. - The engine thread will add items to this queue, and the worker thread will - consume them. - */ - queue = (struct workqueue *) malloc(sizeof(struct workqueue)); - workqueue_init(queue, 8192, 1); - - /* Do initialization steps that are configuration-dependent */ - nclusters = conf.nclusters; - - /* Get the NDB instances. - For now, the plan is that we get 2 NDB instances for cluster 0, - and one instance for each additional cluster. - (This is open to refinement later). - - We maintain the instances in an array of lists, so - instances[n] is the head list of instances for cluster n. - - We also have to decide how many transactions to accommodate on each - instance. For now we'll use WORK_QUEUE_SIZE on *each* instance, though - actually WORK_QUEUE_SIZE is the upper bound on transactions for all - instances combined. - */ - instances = new LockableNdbInstance *[nclusters]; - for(int i = 0 ; i < nclusters ; i++) { - pool = conf.getConnectionById(i); - conn = pool->getPooledConnection(my_thread); - inst = new LockableNdbInstance(conn, conf.nprefixes, 8192); - instances[i] = inst; - } - - /* Plus the extra instance for cluster 0: */ - pool = conf.getConnectionById(0); - conn = pool->getPooledConnection(my_thread); - inst = new LockableNdbInstance(conn, conf.nprefixes, 8192); - instances[0]->next = inst; - - // Launch the worker thread - pthread_create(& worker_thread_id, NULL, run_3thd_worker_thread, (void *) this); - - // Launch the commit thread - pthread_create(& commit_thread_id, NULL, run_3thd_commit_thread, (void *) this); -} - - -void Scheduler_3thread::attach_thread(thread_identifier * parent) { - - pipeline = parent->pipeline; - logger->log(LOG_WARNING, 0, "Pipeline %d attached to 3-thread scheduler.\n", - pipeline->id); - stats.cycles = 0; - stats.worker_thread_vtime = 0; - stats.commit_thread_vtime = 0; -} - - -ENGINE_ERROR_CODE Scheduler_3thread::schedule(workitem *newitem) { - workqueue_add(queue, newitem); - return ENGINE_EWOULDBLOCK; -} - - -void Scheduler_3thread::add_stats(ADD_STAT add_stat, - const void * cookie) { - char key[128]; - char val[128]; - const char *units[4] = { "ns", "us", "ms", "s" }; - int klen, vlen, p; - char id; - LockableNdbInstance *inst; - - klen = sprintf(key, "pipeline_%d_commit_cycles", pipeline->id); - vlen = sprintf(val, "%"PRIu64, stats.cycles); - add_stat(key, klen, val, vlen, cookie); - - if(stats.worker_thread_vtime != 0) { - klen = sprintf(key, "pipeline_%d_ndb_worker_thread_time", pipeline->id); - vlen = sprintf(val, "%"PRIu64, stats.worker_thread_vtime); - add_stat(key, klen, val, vlen, cookie); - } - if(stats.commit_thread_vtime != 0) { - klen = sprintf(key, "pipeline_%d_commit_thread_time", pipeline->id); - vlen = sprintf(val, "%"PRIu64, stats.commit_thread_vtime); - add_stat(key, klen, val, vlen, cookie); - } - - for(int i = 0; i < nclusters; i++) { - for(id = 'a', inst = instances[i] ; inst ; id++, inst = inst->next) { - - /* transactions */ - klen = sprintf(key, "pipeline_%d_db%d%c_transactions", pipeline->id, i, id); - vlen = sprintf(val, "%"PRIu64, inst->transactions); - add_stat(key, klen, val, vlen, cookie); - - /* batches */ - klen = sprintf(key, "pipeline_%d_db%d%c_batches", pipeline->id, i, id); - vlen = sprintf(val, "%"PRIu64, inst->batches); - add_stat(key, klen, val, vlen, cookie); - - /* locks_failed */ - klen = sprintf(key, "pipeline_%d_db%d%c_locks_failed", pipeline->id, i, id); - vlen = sprintf(val, "%"PRIu64, inst->locks_failed); - add_stat(key, klen, val, vlen, cookie); - } - } -} - -// ------------------------------- The worker thread ---------------------- - -#define STAT_INTERVAL 50 - - -/* OPEN QUESTION: How does the worker thread get terminated? */ -/* NOTE: "conf = getConfigration()" will not be safe for hot-swapping; - we'll have to ensure that the worker thread and the pipeline use - the same config. -*/ - - -void * run_3thd_worker_thread(void *s) { - return ((Scheduler_3thread *) s)->run_ndb_worker_thread(); -} - - -void * Scheduler_3thread::run_ndb_worker_thread() { - DEBUG_ENTER(); - workitem *newitem; - const Configuration & conf = get_Configuration(); // see note - int lockerr; - unsigned int ncycles = 0; - - do { - /* Wait for something to appear on the queue */ - newitem = (workitem *) workqueue_consumer_wait(queue); - - /* Fetch the config for its key prefix */ - const KeyPrefix *pfx = conf.getPrefixByInfo(newitem->prefix_info); - - /* From here on we will work solely with the suffix part of the key. */ - newitem->base.nsuffix = newitem->base.nkey - pfx->prefix_len; - DEBUG_ASSERT(newitem->base.nsuffix > 0); - - /* Get the head NdbInstance for the correct cluster */ - LockableNdbInstance *inst = instances[pfx->info.cluster_id]; - - // A: Walk the list of instances trying to lock one. - lockerr = - 1; - while(lockerr && inst->next) { - lockerr = inst->trylock(); - if(lockerr) inst = inst->next; - } - // B: If you still don't have a lock, wait on the lock for the last instance. - if(lockerr) - lockerr = inst->lock(); - // C: If you *still* don't have a lock, that's either EINVAL or EDEADLK, - // and you will hit the assertion in getPlanForPrefix(). - - // Now we've got an NDB Instance. Fetch the query plan for this prefix. - newitem->plan = inst->getPlanForPrefix(pfx); - - // Build the NDB transaction - if(!worker_prepare_operation(newitem)) { - /******* Error handling. The operation is not supported. */ - if(newitem->base.is_sync) { - newitem->status = & status_block_unsupported; - pipeline->engine->server.cookie->store_engine_specific(newitem->cookie, newitem); - pipeline->engine->server.cookie->notify_io_complete(newitem->cookie, ENGINE_ENOTSUP); - } - } - - inst->unlock(); - - if(ncycles++ % STAT_INTERVAL == 0) - stats.worker_thread_vtime = get_thread_vtime(); - - } while(newitem); -} - - - -// ------------------------------- The commit thread ---------------------- - - -void * run_3thd_commit_thread(void *s) { - return ((Scheduler_3thread *) s)->run_ndb_commit_thread(); -} - - -//FIXME: On shutdown, the commit thread tends to segfault inside sendPollNdb(). - -/* Note: You could use gethrtime() to keep the pace here. */ -/* Note: latency will go up with the number of clusters unless you have - one commit thread per cluster */ - -void * Scheduler_3thread::run_ndb_commit_thread() { - DEBUG_ENTER(); - LockableNdbInstance *inst; - int transactions, sleep_usec, wait_msec; - - while(1) { - sleep_usec = 1000; - /* For all NDBs, send transactions and poll for results */ - for(int i = 0; i < nclusters ; i++) { - for(inst = instances[i] ; inst ; inst = inst->next) { - /* Wait 1 ms, but just poll the last instance. */ - wait_msec = inst->next ? 1 : 0; - inst->lock(); /* fixme: actually just pollNdb() now */ - transactions = inst->db->sendPollNdb( - /* wait time (0 = just poll) */ wait_msec, - /* minimum ops to wait for */ queue->depth, - /* force send */ 1); - inst->addPollStat(transactions); // update statistics - if(transactions > 0) sleep_usec = 500; - inst->unlock(); - } - } - usleep(sleep_usec); - - stats.cycles++; - if(! (stats.cycles % STAT_INTERVAL)) - stats.commit_thread_vtime = get_thread_vtime(); - } -} === removed file 'storage/ndb/memcache/src/schedulers/3thread.h' --- a/storage/ndb/memcache/src/schedulers/3thread.h 2011-04-07 11:20:56 +0000 +++ b/storage/ndb/memcache/src/schedulers/3thread.h 1970-01-01 00:00:00 +0000 @@ -1,66 +0,0 @@ -/* - Copyright (c) 2011, Oracle and/or its affiliates. All rights - reserved. - - This program is free software; you can redistribute it and/or - modify it under the terms of the GNU General Public License - as published by the Free Software Foundation; version 2 of - the License. - - This program is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU General Public License for more details. - - You should have received a copy of the GNU General Public License - along with this program; if not, write to the Free Software - Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA - 02110-1301 USA - */ -#ifndef NDBMEMCACHE_3THREAD_SCHEDULER_H -#define NDBMEMCACHE_3THREAD_SCHEDULER_H - -#ifndef __cplusplus -#error "This file is for C++ only" -#endif - -#include - -#include "ndbmemcache_config.h" -#include "Scheduler.h" -#include "NdbInstance.h" - - -/* - * 3-Thread Scheduler - * - * The 3-thread scheduler launches an NDB worker thread and a separate - * commit thread; it uses a small number of Ndb objects. - */ -class Scheduler_3thread : public Scheduler { -public: - void init(int threadnum, int nthreads, const char *config_string); - void attach_thread(thread_identifier *); - ENGINE_ERROR_CODE schedule(workitem *); - void io_completed(workitem *) {}; - void add_stats(ADD_STAT, const void *); - void * run_ndb_worker_thread(); - void * run_ndb_commit_thread(); - - -private: - pthread_t worker_thread_id; - pthread_t commit_thread_id; - int nclusters; - struct workqueue *queue; - LockableNdbInstance **instances; - struct sched_stats_3thread { - uint64_t cycles; /* total number of loops in the commit thread */ - uint64_t worker_thread_vtime; - uint64_t commit_thread_vtime; - } stats; -}; - - -#endif - === added file 'storage/ndb/memcache/src/schedulers/Bulk.cc' --- a/storage/ndb/memcache/src/schedulers/Bulk.cc 1970-01-01 00:00:00 +0000 +++ b/storage/ndb/memcache/src/schedulers/Bulk.cc 2011-04-19 01:16:36 +0000 @@ -0,0 +1,282 @@ +/* + Copyright (c) 2011, Oracle and/or its affiliates. All rights + reserved. + + This program is free software; you can redistribute it and/or + modify it under the terms of the GNU General Public License + as published by the Free Software Foundation; version 2 of + the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA + 02110-1301 USA + */ +/* System headers */ +/* C++ files must define __STDC_FORMAT_MACROS in order to get PRIu64 */ +#define __STDC_FORMAT_MACROS +#include +#include +#include + +/* Memcache headers */ +#include "memcached/types.h" +#include + +/* NDB Memcache headers */ +#include "bulk.h" +#include "workitem.h" +#include "ndb_worker.h" +#include "status_block.h" + +extern EXTENSION_LOGGER_DESCRIPTOR *logger; + +class bulk_commit_thread_spec { +public: + bulk_commit_thread_spec(Scheduler_bulk *s, int i): sched(s), cluster_id(i) {}; + Scheduler_bulk *sched; + int cluster_id; +}; + +extern "C" { + void * run_bulk_commit_thread(void *); +} + +status_block status_block_unsupported = { ENGINE_ENOTSUP, "Not supported." }; + + +void Scheduler_bulk::init(int my_thread, int nthreads, const char *) { + const Configuration & conf = get_Configuration(); + ClusterConnectionPool *pool; + Ndb_cluster_connection *conn; + LockableNdbInstance *inst; + + /* Do initialization steps that are configuration-dependent */ + nclusters = conf.nclusters; + + /* Get the NDB instances -- 2 instances for each cluster. */ + const int tx_per_inst = 8192; // maximum transactions on each NDB Instance + for(int i = 0 ; i < nclusters ; i++) { + pool = conf.getConnectionById(i); + conn = pool->getPooledConnection(my_thread); + inst = new LockableNdbInstance(conn, conf.nprefixes, tx_per_inst); + inst->next = new LockableNdbInstance(conn, conf.nprefixes, tx_per_inst); + cluster[i].instance = inst; + + /* also get a master NdbInstance for the connection */ + pool->setNdbInstance(my_thread, new NdbInstance(conn, conf.nprefixes, 128)); + } + + /* Hoard a number of transactions for each NDB object. How many? Enough + to meet the performance requested in the configuration. */ + for(int c = 0 ; c < conf.nclusters ; c++) { + ClusterConnectionPool *pool = conf.getConnectionById(c); + cluster[c].usec_rtt = pool->usec_rtt; + double tx_time_in_usec = pool->usec_rtt * 5; + double tx_per_ndb_per_sec = 1000000 / tx_time_in_usec; + double prefetch = conf.max_tps / tx_per_ndb_per_sec; + cluster[c].nprefetch = (int) (prefetch / nthreads); + DEBUG_PRINT("cluster %d: prefetch %d transactions.", c, cluster[c].nprefetch); + + NdbTransaction ** txlist1 = (NdbTransaction **) + calloc(cluster[c].nprefetch, sizeof(NdbTransaction *)); + NdbTransaction ** txlist2 = (NdbTransaction **) + calloc(cluster[c].nprefetch, sizeof(NdbTransaction *)); + + const KeyPrefix *prefix = conf.getNextPrefixForCluster(c, NULL); + QueryPlan *plan = 0; + + /* open them all */ + for(int i = 0; i < cluster[c].nprefetch ; i++) { + plan = cluster[c].instance->getPlanForPrefix(prefix); + txlist1[i] = cluster[c].instance->db->startTransaction(); // on instance 1 + txlist2[i] = cluster[c].instance->next->db->startTransaction(); // on instance 2 + } + + /* close them all */ + for(int i = 0; i < cluster[c].nprefetch ; i++) { + txlist1[i]->close(); + txlist2[i]->close(); + } + + /* free the lists */ + delete[] txlist1; + delete[] txlist2; + + // Launch the commit thread + bulk_commit_thread_spec * spec = new bulk_commit_thread_spec(this, c); + pthread_create(& cluster[c].commit_thread_id, NULL, + run_bulk_commit_thread, spec); + } + +} + + +void Scheduler_bulk::attach_thread(thread_identifier * parent) { + + pipeline = parent->pipeline; + logger->log(LOG_WARNING, 0, "Pipeline %d attached to bulk scheduler.\n", + pipeline->id); +} + + +ENGINE_ERROR_CODE Scheduler_bulk::schedule(workitem *newitem) { + DEBUG_ENTER(); + const Configuration & conf = get_Configuration(); // see note + int lockerr; + ENGINE_ERROR_CODE response_code = ENGINE_EWOULDBLOCK; + /* Fetch the config for its key prefix */ + const KeyPrefix *pfx = conf.getPrefixByInfo(newitem->prefix_info); + + if(newitem->prefix_info.prefix_id) { + DEBUG_PRINT("prefix %d: \"%s\" Table: %s Value Cols: %d", + newitem->prefix_info.prefix_id, pfx->prefix, + pfx->table->table_name, pfx->table->nvaluecols); + } + + /* From here on we will work mainly with the suffix part of the key. */ + newitem->base.nsuffix = newitem->base.nkey - pfx->prefix_len; + DEBUG_ASSERT(newitem->base.nsuffix > 0); + + /* Get the head NdbInstance for the correct cluster */ + LockableNdbInstance *inst = cluster[newitem->prefix_info.cluster_id].instance; + + // A: Walk the list of instances trying to lock one. + lockerr = - 1; + while(lockerr && inst->next) { + lockerr = inst->trylock(); + if(lockerr) inst = inst->next; + } + // B: If you still don't have a lock, wait on the lock for the last instance. + if(lockerr) + lockerr = inst->lock(); + // C: If you *still* don't have a lock, that's either EINVAL or EDEADLK, + // and you will hit the assertion in getPlanForPrefix(). + + // Now we've got an NDB Instance. Fetch the query plan for this prefix. + newitem->plan = inst->getPlanForPrefix(pfx); + + workitem_set_NdbInstance(newitem, inst); + + // Build the NDB transaction + if(worker_prepare_operation(newitem)) { + inst->npending++; + } + else { + /******* Error handling. The operation is not supported. */ + if(newitem->base.is_sync) { + newitem->status = & status_block_unsupported; + pipeline->engine->server.cookie->store_engine_specific(newitem->cookie, newitem); + pipeline->engine->server.cookie->notify_io_complete(newitem->cookie, ENGINE_ENOTSUP); + } + } + + inst->unlock(); + + return response_code; +} + + +void Scheduler_bulk::io_completed(workitem *item) { + DEBUG_ENTER(); + item->ndb_instance = NULL; +} + + +void Scheduler_bulk::add_stats(ADD_STAT add_stat, + const void * cookie) { + char key[128]; + char val[128]; + const char *units[4] = { "ns", "us", "ms", "s" }; + int klen, vlen, p; + char id; + LockableNdbInstance *inst; + + for(int i = 0; i < nclusters; i++) { + /* per-cluster */ + klen = sprintf(key, "t%d_db%d_commit_cycles", pipeline->id, i); + vlen = sprintf(val, "%"PRIu64, cluster[i].stats.cycles); + add_stat(key, klen, val, vlen, cookie); + + if(cluster[i].stats.vtime != 0) { + klen = sprintf(key, "t%d_db%d_commit_thread_vtime", pipeline->id, i); + vlen = sprintf(val, "%"PRIu64, cluster[i].stats.vtime); + add_stat(key, klen, val, vlen, cookie); + } + + /* per-cluster, per-NdbInstance */ + for(id = 'a', inst = cluster[i].instance ; inst ; id++, inst = inst->next) { + /* transactions */ + klen = sprintf(key, "t%d_db%d%c_transactions", pipeline->id, i, id); + vlen = sprintf(val, "%"PRIu64, inst->transactions); + add_stat(key, klen, val, vlen, cookie); + + /* batches */ + klen = sprintf(key, "t%d_db%d%c_batches", pipeline->id, i, id); + vlen = sprintf(val, "%"PRIu64, inst->batches); + add_stat(key, klen, val, vlen, cookie); + + /* locks_failed */ + klen = sprintf(key, "t%d_db%d%c_locks_failed", pipeline->id, i, id); + vlen = sprintf(val, "%"PRIu64, inst->locks_failed); + add_stat(key, klen, val, vlen, cookie); + } + } +} + + +// ------------------------------- The commit thread ---------------------- + +#define STAT_INTERVAL 50 + + +void * run_bulk_commit_thread(void *s) { + bulk_commit_thread_spec *spec = (bulk_commit_thread_spec *) s; + + return spec->sched->run_ndb_commit_thread(spec->cluster_id); +} + +//FIXME: On shutdown, the commit thread tends to segfault inside sendPollNdb(). + +/* Note: You could use gethrtime() to keep the pace here. */ +/* Note: latency will go up with the number of clusters unless you have + one commit thread per cluster */ + +void * Scheduler_bulk::run_ndb_commit_thread(int c) { + DEBUG_ENTER(); + LockableNdbInstance *inst; + int transactions, sleep_usec, wait_msec; + int rtt = cluster[c].usec_rtt; + + cluster[c].stats.cycles = 0; + cluster[c].stats.vtime = 0; + + while(1) { + sleep_usec = rtt * 2; + /* For all NDBs, send transactions and poll for results */ + for(inst = cluster[c].instance ; inst ; inst = inst->next) { + /* Wait with 1 ms timeout, but just poll the last instance. */ + wait_msec = inst->next ? 1 : 0; + inst->lock(); + if(inst->npending) { // there are pending transactions + transactions = inst->db->sendPollNdb( + /* wait time (0 = just poll) */ wait_msec, + /* minimum ops to wait for */ (inst->npending + 1) / 2, + /* force send */ 1); + inst->addPollStat(transactions); // update statistics + if(transactions > 0) sleep_usec = rtt; + } + inst->unlock(); + } + usleep(sleep_usec); + + cluster[c].stats.cycles++; + if(! (cluster[c].stats.cycles % STAT_INTERVAL)) + cluster[c].stats.vtime = get_thread_vtime(); + } +} === added file 'storage/ndb/memcache/src/schedulers/Bulk.h' --- a/storage/ndb/memcache/src/schedulers/Bulk.h 1970-01-01 00:00:00 +0000 +++ b/storage/ndb/memcache/src/schedulers/Bulk.h 2011-04-19 01:16:36 +0000 @@ -0,0 +1,65 @@ +/* + Copyright (c) 2011, Oracle and/or its affiliates. All rights + reserved. + + This program is free software; you can redistribute it and/or + modify it under the terms of the GNU General Public License + as published by the Free Software Foundation; version 2 of + the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA + 02110-1301 USA + */ +#ifndef NDBMEMCACHE_BULK_SCHEDULER_H +#define NDBMEMCACHE_BULK_SCHEDULER_H + +#ifndef __cplusplus +#error "This file is for C++ only" +#endif + +#include + +#include "config.h" +#include "Scheduler.h" +#include "NdbInstance.h" + + +/* + * Bulk Scheduler + * + * The bulk scheduler uses a small number of Ndb objects with many transactions + * on each. + */ +class Scheduler_bulk : public Scheduler { +public: + void init(int threadnum, int nthreads, const char *config_string); + void attach_thread(thread_identifier *); + ENGINE_ERROR_CODE schedule(workitem *); + void io_completed(workitem *); + void add_stats(ADD_STAT, const void *); + void * run_ndb_commit_thread(int thread_id); + +private: + int nclusters; + struct { + int usec_rtt; + int nprefetch; + pthread_t commit_thread_id; + LockableNdbInstance *instance; + struct sched_stats_bulk { + uint64_t cycles; /* total number of loops in the commit thread */ + uint64_t vtime; + } stats; + } cluster[MAX_CLUSTERS]; +}; + + +#endif + === modified file 'storage/ndb/memcache/src/schedulers/Stockholm.cc' --- a/storage/ndb/memcache/src/schedulers/Stockholm.cc 2011-04-09 07:36:24 +0000 +++ b/storage/ndb/memcache/src/schedulers/Stockholm.cc 2011-04-19 01:16:36 +0000 @@ -178,7 +178,6 @@ ENGINE_ERROR_CODE Scheduler_stockholm::s return ENGINE_TMPFAIL; } - workitem_set_NdbInstance(newitem, inst); // Fetch the query plan for this prefix. === modified file 'storage/ndb/memcache/src/workqueue.c' --- a/storage/ndb/memcache/src/workqueue.c 2011-04-07 11:20:56 +0000 +++ b/storage/ndb/memcache/src/workqueue.c 2011-04-19 01:16:36 +0000 @@ -50,7 +50,7 @@ int workqueue_init(struct workqueue *q, q->depth = 0; q->minfree = size / 16; assert(nconsumers > 0); - q->threads = nconsumers - 1; /* boolean 0 if there is just one consumer */ + q->threads = nconsumers - 1; /* boolean false if there is just one consumer */ pthread_cond_init(& q->not_empty, NULL); pthread_cond_init(& q->not_full, NULL); === modified file 'storage/ndb/memcache/unit/Makefile.am' --- a/storage/ndb/memcache/unit/Makefile.am 2011-04-08 08:01:14 +0000 +++ b/storage/ndb/memcache/unit/Makefile.am 2011-04-19 01:23:01 +0000 @@ -46,7 +46,7 @@ run_unit_tests_CFLAGS = -g -O0 \ run_unit_test_CXXFLAGS = -g -O0 -run_unit_tests_LDADD = ../ndb_engine_la-3thread.lo \ +run_unit_tests_LDADD = ../ndb_engine_la-Bulk.lo \ ../ndb_engine_la-ClusterConnectionPool.lo \ ../ndb_engine_la-Configuration.lo \ ../ndb_engine_la-DataTypeHandler.lo \ === modified file 'storage/ndb/memcache/unit/harness.cc' --- a/storage/ndb/memcache/unit/harness.cc 2011-04-08 08:01:14 +0000 +++ b/storage/ndb/memcache/unit/harness.cc 2011-04-19 01:16:36 +0000 @@ -40,7 +40,6 @@ extern EXTENSION_LOGGER_DESCRIPTOR *logg Ndb_cluster_connection * connect(const char *); int main(int argc, char *argv[]) { - int test_number = -1; char * test_name = 0; if(argc > 1) connect_string = argv[1]; No bundle (reason: useless for push emails).