From: John David Duncan Date: April 6 2012 12:17am Subject: bzr push into mysql-5.5-cluster-7.2 branch (john.duncan:3873 to 3877) List-Archive: http://lists.mysql.com/commits/143404 Message-Id: <201204060018.q360I1ko001307@acsmt356.oracle.com> MIME-Version: 1.0 Content-Type: text/plain; charset="us-ascii" Content-Transfer-Encoding: 7bit 3877 John David Duncan 2012-04-05 On-demand growth of the NdbInstance pool is enabled by default, and can be explicitly disabled or enabled by setting scheduler option g0 or g1. modified: storage/ndb/memcache/src/schedulers/S_sched.cc storage/ndb/memcache/src/schedulers/S_sched.h 3876 John David Duncan 2012-04-05 bug#13890064 -- memcached aborts with "Unknown error code: 13" when binary GET request meets ENGINE_TMPFAIL. modified: storage/ndb/memcache/extra/memcached/daemon/memcached.c 3875 John David Duncan 2012-04-05 Error 9002 was used elsewhere in NDB. Error codes in the memcached engine now start at 29000. modified: storage/ndb/memcache/include/ndb_engine_errors.h storage/ndb/memcache/src/ndb_engine_errors.cc storage/ndb/memcache/src/ndb_worker.cc storage/ndb/memcache/src/schedulers/S_sched.cc 3874 John David Duncan 2012-04-05 [merge] merge modified: storage/ndb/memcache/include/Scheduler.h storage/ndb/memcache/include/ndb_engine.h storage/ndb/memcache/include/ndb_engine_errors.h storage/ndb/memcache/include/ndb_pipeline.h storage/ndb/memcache/src/ExternalValue.cc storage/ndb/memcache/src/ndb_engine.c storage/ndb/memcache/src/ndb_engine_errors.cc storage/ndb/memcache/src/ndb_error_logger.cc storage/ndb/memcache/src/ndb_pipeline.cc storage/ndb/memcache/src/schedulers/S_sched.cc storage/ndb/memcache/src/schedulers/S_sched.h storage/ndb/memcache/src/schedulers/Stockholm.cc storage/ndb/memcache/src/schedulers/Stockholm.h storage/ndb/memcache/unit/alloc.cc 3873 John David Duncan 2012-03-31 [merge] local merge modified: storage/ndb/memcache/include/ndbmemcache_config.in === modified file 'storage/ndb/memcache/extra/memcached/daemon/memcached.c' --- a/storage/ndb/memcache/extra/memcached/daemon/memcached.c 2011-09-20 05:13:01 +0000 +++ b/storage/ndb/memcache/extra/memcached/daemon/memcached.c 2012-04-06 00:06:41 +0000 @@ -1761,6 +1761,10 @@ static void process_bin_get(conn *c) { case ENGINE_NOT_MY_VBUCKET: write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_NOT_MY_VBUCKET, 0); break; + case ENGINE_TMPFAIL: + write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_ETMPFAIL, 0); + break; + default: /* @todo add proper error handling! */ settings.extensions.logger->log(EXTENSION_LOG_WARNING, c, === modified file 'storage/ndb/memcache/include/Scheduler.h' --- a/storage/ndb/memcache/include/Scheduler.h 2012-03-07 01:22:53 +0000 +++ b/storage/ndb/memcache/include/Scheduler.h 2012-04-03 00:34:18 +0000 @@ -23,13 +23,21 @@ #include "ndbmemcache_global.h" #include -#include "ndb_pipeline.h" +#include "thread_identifier.h" + + +typedef struct { + int nthreads; /* number of worker threads */ + int max_clients; /* maximum number of client connections */ + const char * config_string; /* scheduler-specific configuration string */ +} scheduler_options; + #ifdef __cplusplus -#include "NdbInstance.h" -#include "Configuration.h" -#include "thread_identifier.h" +/* Forward declarations */ +class Configuration; +class workitem; /* Scheduler is an interface */ @@ -44,10 +52,9 @@ public: /** init() is the called from the main thread, after configuration has been read. threadnum: which thread this scheduler will eventually attach to - nthreads: how many total threads will be initialized - config_string: additional configuration string for scheduler + options: struct specifying run-time options */ - virtual void init(int threadnum, int nthreads, const char *config_string) = 0; + virtual void init(int threadnum, const scheduler_options * options) = 0; /** attach_thread() is called from each thread at pipeline initialization time. */ @@ -87,9 +94,6 @@ public: */ virtual bool global_reconfigure(Configuration *new_config) = 0; - /** each scheduler instance serves a single NDB pipeline - */ - ndb_pipeline *pipeline; }; #endif === modified file 'storage/ndb/memcache/include/ndb_engine.h' --- a/storage/ndb/memcache/include/ndb_engine.h 2012-03-07 01:22:53 +0000 +++ b/storage/ndb/memcache/include/ndb_engine.h 2012-04-03 00:34:18 +0000 @@ -62,6 +62,7 @@ struct ndb_engine { } startup_options; struct { + size_t maxconns; size_t nthreads; bool cas_enabled; size_t verbose; === modified file 'storage/ndb/memcache/include/ndb_engine_errors.h' --- a/storage/ndb/memcache/include/ndb_engine_errors.h 2012-03-23 19:01:07 +0000 +++ b/storage/ndb/memcache/include/ndb_engine_errors.h 2012-04-05 21:37:02 +0000 @@ -22,11 +22,22 @@ #include -/* Errors 9000 - 9099 are reported as "Scheduler Error" */ -extern ndberror_struct AppError9001_ReconfLock; -extern ndberror_struct AppError9002_NoNDBs; -extern ndberror_struct AppError9003_SyncClose; +/* + The NDB Engine for Memcached uses error codes 29000 - 29999 +*/ -/* Errors 9100 and up are reported as "Memcached Error" */ + + +/*** Errors 290xx and 291xx are reported as "Scheduler Error" ***/ + +/* 2900x: general scheduler error codes */ +extern ndberror_struct AppError29001_ReconfLock; +extern ndberror_struct AppError29002_NoNDBs; + +/* 2902x: blocking NDB operations in worker thread */ +extern ndberror_struct AppError29023_SyncClose; +extern ndberror_struct AppError29024_autogrow; + +/*** Errors 29200 and up are reported as "Memcached Error" ***/ #endif === modified file 'storage/ndb/memcache/include/ndb_pipeline.h' --- a/storage/ndb/memcache/include/ndb_pipeline.h 2012-03-23 23:33:32 +0000 +++ b/storage/ndb/memcache/include/ndb_pipeline.h 2012-04-05 21:00:05 +0000 @@ -26,14 +26,13 @@ #include "workqueue.h" #include "ndb_engine.h" +#include "Scheduler.h" /* This structure is used in both C and C++ code, requiring a small hack: */ #ifdef __cplusplus -/* forward declaration: */ -class Scheduler; -#define C_OR_CPP_SCHEDULER Scheduler +#define CPP_SCHEDULER Scheduler #else -#define C_OR_CPP_SCHEDULER void +#define CPP_SCHEDULER void #endif /* In each pipeline there lives an allocator, which is used for workitems, @@ -83,9 +82,9 @@ typedef struct request_pipeline { unsigned int id; /*! each pipeline has an id */ unsigned int nworkitems; /*! counter used to give each workitem an id */ struct ndb_engine *engine; - pthread_t engine_thread_id; + pthread_t worker_thread_id; allocator_slab_class alligator[ALLIGATOR_ARRAY_SIZE]; /*! an allocator */ - C_OR_CPP_SCHEDULER *scheduler; + CPP_SCHEDULER *scheduler; memory_pool *pool; /*! has the whole lifetime of the pipeline */ } ndb_pipeline; @@ -99,7 +98,7 @@ DECLARE_FUNCTIONS_WITH_C_LINKAGE ndb_pipeline * ndb_pipeline_initialize(struct ndb_engine *); /** create a generic request pipeline */ -ndb_pipeline * get_request_pipeline(int thd_id); +ndb_pipeline * get_request_pipeline(int thd_id, struct ndb_engine *); /** call into a pipeline for its own statistics */ void pipeline_add_stats(ndb_pipeline *, const char *key, ADD_STAT, const void *); @@ -111,7 +110,7 @@ ENGINE_ERROR_CODE pipeline_flush_all(ndb /***** SCHEDULER APIS *****/ /** Global initialization of scheduler, at startup time */ -void * scheduler_initialize(const char *name, int nthreads, int threadnum); +void * scheduler_initialize(ndb_pipeline *, scheduler_options *); /** shutdown a scheduler */ void scheduler_shutdown(ndb_pipeline *); === modified file 'storage/ndb/memcache/src/ExternalValue.cc' --- a/storage/ndb/memcache/src/ExternalValue.cc 2012-03-23 19:01:07 +0000 +++ b/storage/ndb/memcache/src/ExternalValue.cc 2012-04-05 21:00:05 +0000 @@ -22,6 +22,7 @@ #include #include "workitem.h" +#include "NdbInstance.h" #include "Operation.h" #include "Scheduler.h" #include "status_block.h" === modified file 'storage/ndb/memcache/src/ndb_engine.c' --- a/storage/ndb/memcache/src/ndb_engine.c 2012-03-23 23:33:32 +0000 +++ b/storage/ndb/memcache/src/ndb_engine.c 2012-04-05 21:00:05 +0000 @@ -166,6 +166,7 @@ static ENGINE_ERROR_CODE ndb_initialize( ENGINE_ERROR_CODE return_status; struct ndb_engine *ndb_eng = ndb_handle(handle); struct default_engine *def_eng = default_handle(ndb_eng); + scheduler_options sched_opts; /* Process options for both the ndb engine and the default engine: */ read_cmdline_options(ndb_eng, def_eng, config_str); @@ -212,6 +213,10 @@ static ENGINE_ERROR_CODE ndb_initialize( /* prefetch data dictionary objects */ prefetch_dictionary_objects(); + /* Build the scheduler options structure */ + sched_opts.nthreads = ndb_eng->server_options.nthreads; + sched_opts.max_clients = ndb_eng->server_options.maxconns; + /* Allocate and initailize the pipelines, and their schedulers. This will take some time; each pipeline creates slab and pool allocators, and each scheduler may preallocate a large number of Ndb objects and @@ -224,9 +229,9 @@ static ENGINE_ERROR_CODE ndb_initialize( ndb_eng->pipelines = malloc(nthreads * sizeof(void *)); ndb_eng->schedulers = malloc(nthreads * sizeof(void *)); for(i = 0 ; i < nthreads ; i++) { - ndb_eng->pipelines[i] = get_request_pipeline(i); - ndb_eng->schedulers[i] = - scheduler_initialize(ndb_eng->startup_options.scheduler, nthreads, i); + ndb_eng->pipelines[i] = get_request_pipeline(i, ndb_eng); + ndb_eng->schedulers[i] = scheduler_initialize(ndb_eng->pipelines[i], + & sched_opts); if(ndb_eng->schedulers[i] == 0) { logger->log(LOG_WARNING, NULL, "Illegal scheduler: \"%s\"\n", ndb_eng->startup_options.scheduler); @@ -819,6 +824,9 @@ int fetch_core_settings(struct ndb_engin { .key = "cas_enabled", .datatype = DT_BOOL, .value.dt_bool = &engine->server_options.cas_enabled }, + { .key = "maxconns", + .datatype = DT_SIZE, + .value.dt_size = &engine->server_options.maxconns }, { .key = "num_threads", .datatype = DT_SIZE, .value.dt_size = &engine->server_options.nthreads }, === modified file 'storage/ndb/memcache/src/ndb_engine_errors.cc' --- a/storage/ndb/memcache/src/ndb_engine_errors.cc 2012-03-23 19:01:07 +0000 +++ b/storage/ndb/memcache/src/ndb_engine_errors.cc 2012-04-05 21:37:02 +0000 @@ -20,17 +20,22 @@ #include -ndberror_struct AppError9001_ReconfLock = - { ndberror_st_temporary , ndberror_cl_application , 9001, -1, +ndberror_struct AppError29001_ReconfLock = + { ndberror_st_temporary , ndberror_cl_application , 29001, -1, "Could not obtain configuration read lock", 0 }; -ndberror_struct AppError9002_NoNDBs = - { ndberror_st_temporary , ndberror_cl_application , 9002, -1, +ndberror_struct AppError29002_NoNDBs = + { ndberror_st_temporary , ndberror_cl_application , 29002, -1, "No Ndb Instances in freelist", 0 }; -ndberror_struct AppError9003_SyncClose = - { ndberror_st_temporary , ndberror_cl_application , 9003, -1, +ndberror_struct AppError29023_SyncClose = + { ndberror_st_temporary , ndberror_cl_application , 29023, -1, "Waited for synchronous close of NDB transaction", 0 }; + +ndberror_struct AppError29024_autogrow = + { ndberror_st_temporary , ndberror_cl_application , 29024, -1, + "Out of Ndb instances, growing freelist", 0 + }; === modified file 'storage/ndb/memcache/src/ndb_error_logger.cc' --- a/storage/ndb/memcache/src/ndb_error_logger.cc 2012-03-07 03:57:36 +0000 +++ b/storage/ndb/memcache/src/ndb_error_logger.cc 2012-04-05 05:29:06 +0000 @@ -209,7 +209,7 @@ void ndb_error_logger_stats(ADD_STAT add for(i = 0 ; i < ERROR_HASH_TABLE_SIZE; i++) { for(sym = error_hash_table[i] ; sym != 0 ; sym = sym->next) { klen = sprintf(key, "NDB_Error_%d", sym->error_code); - vlen = sprintf(val, "%du", sym->count); + vlen = sprintf(val, "%lu", sym->count); add_stat(key, klen, val, vlen, cookie); } } === modified file 'storage/ndb/memcache/src/ndb_pipeline.cc' --- a/storage/ndb/memcache/src/ndb_pipeline.cc 2012-03-07 01:22:53 +0000 +++ b/storage/ndb/memcache/src/ndb_pipeline.cc 2012-04-03 00:34:18 +0000 @@ -71,7 +71,12 @@ void init_pool_header(allocation_referen /* The public API */ -/* initialize a new pipeline for an NDB engine thread */ +/* Attach a new pipeline to an NDB worker thread. + Some initialization has already occured when the main single-thread startup + called get_request_pipeline(). But this is the first call into a pipeline + from its worker thread. It will initialize the thread's identifier, and + attach the pipeline to its scheduler. +*/ ndb_pipeline * ndb_pipeline_initialize(struct ndb_engine *engine) { bool did_inc; unsigned int id; @@ -85,14 +90,13 @@ ndb_pipeline * ndb_pipeline_initialize(s /* Fetch the partially initialized pipeline */ ndb_pipeline * self = (ndb_pipeline *) engine->pipelines[id]; - + + /* Sanity checks */ assert(self->id == id); + assert(self->engine == engine); - /* Set the pointer back to the engine */ - self->engine = engine; - - /* And the thread id */ - self->engine_thread_id = pthread_self(); + /* Set the pthread id */ + self->worker_thread_id = pthread_self(); /* Create and set a thread identity */ tid = (thread_identifier *) memory_pool_alloc(self->pool, sizeof(thread_identifier)); @@ -108,16 +112,20 @@ ndb_pipeline * ndb_pipeline_initialize(s } -/* Allocate and initialize a generic request pipeline */ -ndb_pipeline * get_request_pipeline(int thd_id) { +/* Allocate and initialize a generic request pipeline. + In unit test code, this can be called with a NULL engine pointer -- + it will still initialize a usable slab allocator and memory pool + which can be tested. +*/ +ndb_pipeline * get_request_pipeline(int thd_id, struct ndb_engine *engine) { /* Allocate the pipeline */ ndb_pipeline *self = (ndb_pipeline *) malloc(sizeof(ndb_pipeline)); /* Initialize */ - self->engine = 0; + self->engine = engine; self->id = thd_id; self->nworkitems = 0; - + /* Say hi to the alligator */ init_allocator(self); @@ -160,27 +168,27 @@ ENGINE_ERROR_CODE pipeline_flush_all(ndb /* The scheduler API */ -void * scheduler_initialize(const char *cf, int nthreads, int athread) { +void * scheduler_initialize(ndb_pipeline *self, scheduler_options *options) { Scheduler *s = 0; - const char *sched_options = 0; + const char *cf = self->engine->startup_options.scheduler; + options->config_string = 0; if(cf == 0 || *cf == 0) { s = new DEFAULT_SCHEDULER; } else if(!strncasecmp(cf, "stockholm", 9)) { s = new Scheduler_stockholm; - sched_options = & cf[9]; + options->config_string = & cf[9]; } else if(!strncasecmp(cf,"S", 1)) { s = new S::SchedulerWorker; - sched_options = & cf[1]; + options->config_string = & cf[1]; } else { return NULL; } - - s->init(athread, nthreads, sched_options); - s->pipeline = 0; + + s->init(self->id, options); return (void *) s; } === modified file 'storage/ndb/memcache/src/ndb_worker.cc' --- a/storage/ndb/memcache/src/ndb_worker.cc 2012-03-23 19:01:07 +0000 +++ b/storage/ndb/memcache/src/ndb_worker.cc 2012-04-05 21:37:02 +0000 @@ -880,7 +880,7 @@ void worker_close(NdbTransaction *tx, wo nwaits_post = ndb->getClientStat(Ndb::WaitExecCompleteCount); if(nwaits_post > nwaits_pre) - log_app_error(& AppError9003_SyncClose); + log_app_error(& AppError29023_SyncClose); if(wqitem->ext_val) delete wqitem->ext_val; === modified file 'storage/ndb/memcache/src/schedulers/S_sched.cc' --- a/storage/ndb/memcache/src/schedulers/S_sched.cc 2012-03-22 19:24:32 +0000 +++ b/storage/ndb/memcache/src/schedulers/S_sched.cc 2012-04-06 00:12:54 +0000 @@ -64,13 +64,14 @@ S::SchedulerGlobal::SchedulerGlobal(Conf } -void S::SchedulerGlobal::init(int _nthreads, const char *_config_string) { +void S::SchedulerGlobal::init(const scheduler_options *sched_opts) { DEBUG_ENTER_METHOD("S::SchedulerGlobal::init"); /* Set member variables */ - nthreads = _nthreads; - config_string = _config_string; + nthreads = sched_opts->nthreads; + config_string = sched_opts->config_string; parse_config_string(nthreads, config_string); + options.max_clients = sched_opts->max_clients; /* Fetch or initialize clusters */ nclusters = conf->nclusters; @@ -104,8 +105,9 @@ void S::SchedulerGlobal::init(int _nthre /* Log message for startup */ logger->log(LOG_WARNING, 0, "Scheduler: starting for %d cluster%s; " - "c%d,f%d,t%d", nclusters, nclusters == 1 ? "" : "s", - options.n_connections, options.force_send, options.send_timer); + "c%d,f%d,g%d,t%d", nclusters, nclusters == 1 ? "" : "s", + options.n_connections, options.force_send, + options.auto_grow, options.send_timer); /* Now Running */ running = true; @@ -171,6 +173,7 @@ void S::SchedulerGlobal::parse_config_st options.n_connections = 0; // 0 = n_connections based on db-stored config options.force_send = 0; // 0 = force send always off options.send_timer = 1; // 1 = 1 ms. timer in send thread + options.auto_grow = 1; // 1 = allow NDB instance pool to grow on demand if(str) { const char *s = str; @@ -188,6 +191,9 @@ void S::SchedulerGlobal::parse_config_st case 'f': options.force_send = value; break; + case 'g': + options.auto_grow = value; + break; case 't': options.send_timer = value; break; @@ -214,6 +220,10 @@ void S::SchedulerGlobal::parse_config_st logger->log(LOG_WARNING, 0, "Invalid scheduler configuration.\n"); assert(options.send_timer >= 1 && options.send_timer <= 10); } + if(options.auto_grow < 0 || options.auto_grow > 1) { + logger->log(LOG_WARNING, 0, "Invalid scheduler configuration.\n"); + assert(options.auto_grow == 0 || options.auto_grow == 1); + } } @@ -249,15 +259,14 @@ void S::SchedulerGlobal::add_stats(const /* SchedulerWorker methods */ void S::SchedulerWorker::init(int my_thread, - int nthreads, - const char * config_string) { + const scheduler_options * options) { /* On the first call in, initialize the SchedulerGlobal. * This will start the send & poll threads for each connection. */ if(my_thread == 0) { sched_generation_number = 1; s_global = new SchedulerGlobal(& get_Configuration()); - s_global->init(nthreads, config_string); + s_global->init(options); } /* Initialize member variables */ @@ -286,7 +295,7 @@ void S::SchedulerWorker::attach_thread(t ENGINE_ERROR_CODE S::SchedulerWorker::schedule(workitem *item) { int c = item->prefix_info.cluster_id; ENGINE_ERROR_CODE response_code; - NdbInstance *inst; + NdbInstance *inst = 0; S::WorkerConnection *wc; const KeyPrefix *pfx; @@ -299,29 +308,41 @@ ENGINE_ERROR_CODE S::SchedulerWorker::sc pthread_rwlock_unlock(& reconf_lock); } else { - log_app_error(& AppError9001_ReconfLock); + log_app_error(& AppError29001_ReconfLock); return ENGINE_TMPFAIL; } /* READ LOCK RELEASED */ item->base.nsuffix = item->base.nkey - pfx->prefix_len; if(item->base.nsuffix == 0) return ENGINE_EINVAL; // key too short - - if(wc && wc->freelist) { + + if(wc == 0) return ENGINE_FAILED; + + if(wc->freelist) { /* Get the next NDB from the freelist. */ inst = wc->freelist; wc->freelist = inst->next; } - else { - /* No more free NDBs. - Eventually Scheduler::io_completed() will run _in this thread_ and return - an NDB to the freelist. But no other thread can free one, so - all we can do is return an error. - (Or, alternately, the scheduler may be shutting down.) - */ - log_app_error(& AppError9002_NoNDBs); - return ENGINE_TMPFAIL; + else { /* No free NDBs. */ + if(wc->sendqueue->is_aborted()) { + return ENGINE_TMPFAIL; + } + else { /* Try to make an NdbInstance on the fly */ + inst = wc->newNdbInstance(); + if(inst) { + log_app_error(& AppError29024_autogrow); + } + else { + /* We have hit a hard maximum. Eventually Scheduler::io_completed() + will run _in this thread_ and return an NDB to the freelist. + But no other thread can free one, so here we return an error. + */ + log_app_error(& AppError29002_NoNDBs); + return ENGINE_TMPFAIL; + } + } } + assert(inst); inst->link_workitem(item); // Fetch the query plan for this prefix. @@ -520,6 +541,18 @@ void S::Cluster::add_stats(const char *s /* WorkerConnection methods */ + +NdbInstance * S::WorkerConnection::newNdbInstance() { + NdbInstance *inst = 0; + if(instances.current < instances.max) { + inst = new NdbInstance(conn->conn, 2); + instances.current++; + inst->id = ((id.thd + 1) * 10000) + instances.current; + } + return inst; +} + + S::WorkerConnection::WorkerConnection(SchedulerGlobal *global, int thd_id, int cluster_id) { S::Cluster *cl = global->clusters[cluster_id]; @@ -536,21 +569,25 @@ S::WorkerConnection::WorkerConnection(Sc plan_set = new ConnQueryPlanSet(conn->conn, conf->nprefixes); plan_set->buildSetForConfiguration(conf, cluster_id); + /* How many NDB instances to start initially */ + instances.initial = conn->instances.initial / conn->n_workers; + + /* Maximum size of send queue, and upper bound on NDB instances */ + instances.max = conn->instances.max / conn->n_workers; + /* Build the freelist */ freelist = 0; - int my_ndb_inst = conn->nInst / conn->n_workers; - for(int j = 0 ; j < my_ndb_inst ; j++ ) { - NdbInstance *inst = new NdbInstance(conn->conn, 2); - inst->id = ((id.thd + 1) * 10000) + j + 1; + for(instances.current = 0; instances.current < instances.initial; ) { + NdbInstance *inst = newNdbInstance(); inst->next = freelist; freelist = inst; } DEBUG_PRINT("Cluster %d, connection %d (node %d), worker %d: %d NDBs.", - id.cluster, id.conn, id.node, id.thd, my_ndb_inst); + id.cluster, id.conn, id.node, id.thd, instances.current); /* Initialize the sendqueue */ - sendqueue = new Queue(my_ndb_inst); + sendqueue = new Queue(instances.max); /* Hoard a transaction (an API connect record) for each Ndb object. This * first call to startTransaction() will send TC_SEIZEREQ and wait for a @@ -560,7 +597,7 @@ S::WorkerConnection::WorkerConnection(Sc QueryPlan *plan; const KeyPrefix *prefix = conf->getNextPrefixForCluster(id.cluster, NULL); if(prefix) { - NdbTransaction ** txlist = new NdbTransaction * [my_ndb_inst]; + NdbTransaction ** txlist = new NdbTransaction * [instances.current]; int i = 0; // Open them all. @@ -573,7 +610,7 @@ S::WorkerConnection::WorkerConnection(Sc } // Close them all. - for(i = 0 ; i < my_ndb_inst ; i++) { + for(i = 0 ; i < instances.current ; i++) { txlist[i]->close(); } @@ -642,17 +679,29 @@ S::Connection::Connection(S::Cluster & _ n_workers = global->options.n_worker_threads / cluster.nconnections; if(n_total_workers % cluster.nconnections > id) n_workers += 1; - /* How many NDB objects are needed? */ - /* Note that this is used to configure hard limits on the size of the - * waitgroup, the sentqueue, and the reschedulequeue -- and it will not be + /* How many NDB objects are needed for the desired performance? */ + double total_ndb_objects = conf->figureInFlightTransactions(cluster.cluster_id); + instances.initial = (int) (total_ndb_objects / cluster.nconnections); + while(instances.initial % n_workers) instances.initial++; // round up + + /* The maximum number of NDB objects. + * This is used to configure hard limits on the size of the waitgroup, + * the sentqueue, and the reschedulequeue -- and it will not be * possible to increase those limits during online reconfig. */ - double total_ndb_objects = conf->figureInFlightTransactions(cluster.cluster_id); - nInst = (int) (total_ndb_objects / cluster.nconnections); - while(nInst % n_workers) nInst++; // round up - + instances.max = instances.initial; + // allow the pool to grow on demand? + if(global->options.auto_grow) + instances.max *= 1.6; + // max_clients imposes a hard upper limit + if(instances.max > (global->options.max_clients / cluster.nconnections)) + instances.max = global->options.max_clients / cluster.nconnections; + // instances.initial might also be subject to the max_clients limit + if(instances.initial > instances.max) + instances.initial = instances.max; + /* Get a multi-wait Poll Group */ - pollgroup = conn->create_ndb_wait_group(nInst); + pollgroup = conn->create_ndb_wait_group(instances.max); /* Initialize the statistics */ stats.sent_operations = 0; @@ -665,8 +714,8 @@ S::Connection::Connection(S::Cluster & _ sem.counter = 0; /* Initialize the queues for sent and resceduled items */ - sentqueue = new Queue(nInst); - reschedulequeue = new Queue(nInst); + sentqueue = new Queue(instances.max); + reschedulequeue = new Queue(instances.max); } @@ -726,6 +775,14 @@ void S::Connection::add_stats(const char klen = sprintf(key, "cl%d.conn%d.timeout_races", cluster.cluster_id, id); vlen = sprintf(val, "%llu", stats.timeout_races); add_stat(key, klen, val, vlen, cookie); + + klen = sprintf(key, "cl%d.conn%d.instances.initial", cluster.cluster_id, id); + vlen = sprintf(val, "%d", instances.initial); + add_stat(key, klen, val, vlen, cookie); + + klen = sprintf(key, "cl%d.conn%d.instances.max", cluster.cluster_id, id); + vlen = sprintf(val, "%d", instances.max); + add_stat(key, klen, val, vlen, cookie); } === modified file 'storage/ndb/memcache/src/schedulers/S_sched.h' --- a/storage/ndb/memcache/src/schedulers/S_sched.h 2012-03-07 01:22:53 +0000 +++ b/storage/ndb/memcache/src/schedulers/S_sched.h 2012-04-06 00:12:54 +0000 @@ -61,7 +61,7 @@ class S::SchedulerGlobal { public: SchedulerGlobal(Configuration *); ~SchedulerGlobal() {}; - void init(int threads, const char *config_string); + void init(const scheduler_options *options); void add_stats(const char *, ADD_STAT, const void *); void reconfigure(Configuration *); void shutdown(); @@ -82,6 +82,8 @@ public: int n_connections; /** preferred number of NDB cluster connections */ int force_send; /** how to use NDB force-send */ int send_timer; /** milliseconds to set for adaptive send timer */ + int auto_grow; /** whether to allow NDB instance pool to grow */ + int max_clients; /** memcached max allowed connections */ } options; private: @@ -99,7 +101,7 @@ class S::SchedulerWorker : public Schedu public: SchedulerWorker() {}; ~SchedulerWorker() {}; - void init(int threadnum, int nthreads, const char *config_string); + void init(int threadnum, const scheduler_options * sched_opts); void attach_thread(thread_identifier *); ENGINE_ERROR_CODE schedule(workitem *); void yield(workitem *) const {}; @@ -111,6 +113,7 @@ public: private: int id; + ndb_pipeline *pipeline; SchedulerGlobal * m_global; }; @@ -159,9 +162,12 @@ private: Queue * reschedulequeue; int id; int node_id; - int nInst; - int n_total_workers; - int n_workers; + int n_total_workers; /* same as SchedulerGlobal::options.n_worker_threads */ + int n_workers; /* number of workers for this connection */ + struct { + int initial; /* start with this many NDB instances */ + int max; /* scale up to this many */ + } instances; pthread_t send_thread_id; pthread_t poll_thread_id; struct { @@ -187,6 +193,7 @@ public: ~WorkerConnection(); void shutdown(); void reconfigure(Configuration *); + NdbInstance * newNdbInstance(); struct { int thd : 8; @@ -194,6 +201,11 @@ public: int conn : 8; unsigned int node : 8; } id; + struct { + int initial; + int current; + int max; + } instances; S::Connection *conn; ConnQueryPlanSet *plan_set, *old_plan_set; NdbInstance *freelist; === modified file 'storage/ndb/memcache/src/schedulers/Stockholm.cc' --- a/storage/ndb/memcache/src/schedulers/Stockholm.cc 2012-03-07 01:22:53 +0000 +++ b/storage/ndb/memcache/src/schedulers/Stockholm.cc 2012-04-03 00:34:18 +0000 @@ -46,14 +46,15 @@ extern "C" { } -void Scheduler_stockholm::init(int my_thread, int nthreads, const char *config_string) { +void Scheduler_stockholm::init(int my_thread, + const scheduler_options *options) { const Configuration & conf = get_Configuration(); /* How many NDB instances are needed per cluster? */ for(unsigned int c = 0 ; c < conf.nclusters ; c++) { ClusterConnectionPool *pool = conf.getConnectionPoolById(c); double total_ndb_objects = conf.figureInFlightTransactions(c); - cluster[c].nInst = (int) total_ndb_objects / nthreads; + cluster[c].nInst = (int) total_ndb_objects / options->nthreads; DEBUG_PRINT("cluster %d: %d TPS @ %d usec RTT ==> %d NDB instances.", c, conf.max_tps, pool->usec_rtt, cluster[c].nInst); } === modified file 'storage/ndb/memcache/src/schedulers/Stockholm.h' --- a/storage/ndb/memcache/src/schedulers/Stockholm.h 2012-03-07 01:22:53 +0000 +++ b/storage/ndb/memcache/src/schedulers/Stockholm.h 2012-04-03 00:34:18 +0000 @@ -44,7 +44,7 @@ class Scheduler_stockholm : public Sched public: Scheduler_stockholm() {}; ~Scheduler_stockholm() {}; - void init(int threadnum, int nthreads, const char *config_string); + void init(int threadnum, const scheduler_options *options); void attach_thread(thread_identifier *); ENGINE_ERROR_CODE schedule(workitem *); void yield(workitem *) const; // inlined @@ -56,6 +56,7 @@ public: bool global_reconfigure(Configuration *) { return false; } ; private: + ndb_pipeline *pipeline; struct { struct workqueue *queue; struct sched_stats_stockholm { === modified file 'storage/ndb/memcache/unit/alloc.cc' --- a/storage/ndb/memcache/unit/alloc.cc 2011-12-18 23:26:44 +0000 +++ b/storage/ndb/memcache/unit/alloc.cc 2012-04-03 00:34:18 +0000 @@ -27,14 +27,16 @@ #include "all_tests.h" +#define TEST_ALLOC_BLOCKS 34 + int run_allocator_test(QueryPlan *, Ndb *, int v) { - struct request_pipeline *p = get_request_pipeline(0); + struct request_pipeline *p = get_request_pipeline(0, NULL); memory_pool *p1 = pipeline_create_memory_pool(p); int sz = 13; uint tot = 0; void *v1, *v2; - for(int i = 0 ; i < 25 ; i++) { + for(int i = 0 ; i < TEST_ALLOC_BLOCKS ; i++) { v1 = memory_pool_alloc(p1, sz); tot += sz; v2 = memory_pool_alloc(p1, sz + 1); tot += sz + 1; sz = (int) (sz * 1.25); No bundle (reason: useless for push emails).