3877 John David Duncan 2012-04-05
On-demand growth of the NdbInstance pool is enabled by default, and can be explicitly disabled or enabled by setting scheduler option g0 or g1.
modified:
storage/ndb/memcache/src/schedulers/S_sched.cc
storage/ndb/memcache/src/schedulers/S_sched.h
3876 John David Duncan 2012-04-05
bug#13890064 -- memcached aborts with "Unknown error code: 13" when binary GET request meets ENGINE_TMPFAIL.
modified:
storage/ndb/memcache/extra/memcached/daemon/memcached.c
3875 John David Duncan 2012-04-05
Error 9002 was used elsewhere in NDB.
Error codes in the memcached engine now start at 29000.
modified:
storage/ndb/memcache/include/ndb_engine_errors.h
storage/ndb/memcache/src/ndb_engine_errors.cc
storage/ndb/memcache/src/ndb_worker.cc
storage/ndb/memcache/src/schedulers/S_sched.cc
3874 John David Duncan 2012-04-05 [merge]
merge
modified:
storage/ndb/memcache/include/Scheduler.h
storage/ndb/memcache/include/ndb_engine.h
storage/ndb/memcache/include/ndb_engine_errors.h
storage/ndb/memcache/include/ndb_pipeline.h
storage/ndb/memcache/src/ExternalValue.cc
storage/ndb/memcache/src/ndb_engine.c
storage/ndb/memcache/src/ndb_engine_errors.cc
storage/ndb/memcache/src/ndb_error_logger.cc
storage/ndb/memcache/src/ndb_pipeline.cc
storage/ndb/memcache/src/schedulers/S_sched.cc
storage/ndb/memcache/src/schedulers/S_sched.h
storage/ndb/memcache/src/schedulers/Stockholm.cc
storage/ndb/memcache/src/schedulers/Stockholm.h
storage/ndb/memcache/unit/alloc.cc
3873 John David Duncan 2012-03-31 [merge]
local merge
modified:
storage/ndb/memcache/include/ndbmemcache_config.in
=== modified file 'storage/ndb/memcache/extra/memcached/daemon/memcached.c'
--- a/storage/ndb/memcache/extra/memcached/daemon/memcached.c 2011-09-20 05:13:01 +0000
+++ b/storage/ndb/memcache/extra/memcached/daemon/memcached.c 2012-04-06 00:06:41 +0000
@@ -1761,6 +1761,10 @@ static void process_bin_get(conn *c) {
case ENGINE_NOT_MY_VBUCKET:
write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_NOT_MY_VBUCKET, 0);
break;
+ case ENGINE_TMPFAIL:
+ write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_ETMPFAIL, 0);
+ break;
+
default:
/* @todo add proper error handling! */
settings.extensions.logger->log(EXTENSION_LOG_WARNING, c,
=== modified file 'storage/ndb/memcache/include/Scheduler.h'
--- a/storage/ndb/memcache/include/Scheduler.h 2012-03-07 01:22:53 +0000
+++ b/storage/ndb/memcache/include/Scheduler.h 2012-04-03 00:34:18 +0000
@@ -23,13 +23,21 @@
#include "ndbmemcache_global.h"
#include <memcached/types.h>
-#include "ndb_pipeline.h"
+#include "thread_identifier.h"
+
+
+typedef struct {
+ int nthreads; /* number of worker threads */
+ int max_clients; /* maximum number of client connections */
+ const char * config_string; /* scheduler-specific configuration string */
+} scheduler_options;
+
#ifdef __cplusplus
-#include "NdbInstance.h"
-#include "Configuration.h"
-#include "thread_identifier.h"
+/* Forward declarations */
+class Configuration;
+class workitem;
/* Scheduler is an interface */
@@ -44,10 +52,9 @@ public:
/** init() is the called from the main thread,
after configuration has been read.
threadnum: which thread this scheduler will eventually attach to
- nthreads: how many total threads will be initialized
- config_string: additional configuration string for scheduler
+ options: struct specifying run-time options
*/
- virtual void init(int threadnum, int nthreads, const char *config_string) = 0;
+ virtual void init(int threadnum, const scheduler_options * options) = 0;
/** attach_thread() is called from each thread
at pipeline initialization time. */
@@ -87,9 +94,6 @@ public:
*/
virtual bool global_reconfigure(Configuration *new_config) = 0;
- /** each scheduler instance serves a single NDB pipeline
- */
- ndb_pipeline *pipeline;
};
#endif
=== modified file 'storage/ndb/memcache/include/ndb_engine.h'
--- a/storage/ndb/memcache/include/ndb_engine.h 2012-03-07 01:22:53 +0000
+++ b/storage/ndb/memcache/include/ndb_engine.h 2012-04-03 00:34:18 +0000
@@ -62,6 +62,7 @@ struct ndb_engine {
} startup_options;
struct {
+ size_t maxconns;
size_t nthreads;
bool cas_enabled;
size_t verbose;
=== modified file 'storage/ndb/memcache/include/ndb_engine_errors.h'
--- a/storage/ndb/memcache/include/ndb_engine_errors.h 2012-03-23 19:01:07 +0000
+++ b/storage/ndb/memcache/include/ndb_engine_errors.h 2012-04-05 21:37:02 +0000
@@ -22,11 +22,22 @@
#include <ndberror.h>
-/* Errors 9000 - 9099 are reported as "Scheduler Error" */
-extern ndberror_struct AppError9001_ReconfLock;
-extern ndberror_struct AppError9002_NoNDBs;
-extern ndberror_struct AppError9003_SyncClose;
+/*
+ The NDB Engine for Memcached uses error codes 29000 - 29999
+*/
-/* Errors 9100 and up are reported as "Memcached Error" */
+
+
+/*** Errors 290xx and 291xx are reported as "Scheduler Error" ***/
+
+/* 2900x: general scheduler error codes */
+extern ndberror_struct AppError29001_ReconfLock;
+extern ndberror_struct AppError29002_NoNDBs;
+
+/* 2902x: blocking NDB operations in worker thread */
+extern ndberror_struct AppError29023_SyncClose;
+extern ndberror_struct AppError29024_autogrow;
+
+/*** Errors 29200 and up are reported as "Memcached Error" ***/
#endif
=== modified file 'storage/ndb/memcache/include/ndb_pipeline.h'
--- a/storage/ndb/memcache/include/ndb_pipeline.h 2012-03-23 23:33:32 +0000
+++ b/storage/ndb/memcache/include/ndb_pipeline.h 2012-04-05 21:00:05 +0000
@@ -26,14 +26,13 @@
#include "workqueue.h"
#include "ndb_engine.h"
+#include "Scheduler.h"
/* This structure is used in both C and C++ code, requiring a small hack: */
#ifdef __cplusplus
-/* forward declaration: */
-class Scheduler;
-#define C_OR_CPP_SCHEDULER Scheduler
+#define CPP_SCHEDULER Scheduler
#else
-#define C_OR_CPP_SCHEDULER void
+#define CPP_SCHEDULER void
#endif
/* In each pipeline there lives an allocator, which is used for workitems,
@@ -83,9 +82,9 @@ typedef struct request_pipeline {
unsigned int id; /*! each pipeline has an id */
unsigned int nworkitems; /*! counter used to give each workitem an id */
struct ndb_engine *engine;
- pthread_t engine_thread_id;
+ pthread_t worker_thread_id;
allocator_slab_class alligator[ALLIGATOR_ARRAY_SIZE]; /*! an allocator */
- C_OR_CPP_SCHEDULER *scheduler;
+ CPP_SCHEDULER *scheduler;
memory_pool *pool; /*! has the whole lifetime of the pipeline */
} ndb_pipeline;
@@ -99,7 +98,7 @@ DECLARE_FUNCTIONS_WITH_C_LINKAGE
ndb_pipeline * ndb_pipeline_initialize(struct ndb_engine *);
/** create a generic request pipeline */
-ndb_pipeline * get_request_pipeline(int thd_id);
+ndb_pipeline * get_request_pipeline(int thd_id, struct ndb_engine *);
/** call into a pipeline for its own statistics */
void pipeline_add_stats(ndb_pipeline *, const char *key, ADD_STAT, const void *);
@@ -111,7 +110,7 @@ ENGINE_ERROR_CODE pipeline_flush_all(ndb
/***** SCHEDULER APIS *****/
/** Global initialization of scheduler, at startup time */
-void * scheduler_initialize(const char *name, int nthreads, int threadnum);
+void * scheduler_initialize(ndb_pipeline *, scheduler_options *);
/** shutdown a scheduler */
void scheduler_shutdown(ndb_pipeline *);
=== modified file 'storage/ndb/memcache/src/ExternalValue.cc'
--- a/storage/ndb/memcache/src/ExternalValue.cc 2012-03-23 19:01:07 +0000
+++ b/storage/ndb/memcache/src/ExternalValue.cc 2012-04-05 21:00:05 +0000
@@ -22,6 +22,7 @@
#include <assert.h>
#include "workitem.h"
+#include "NdbInstance.h"
#include "Operation.h"
#include "Scheduler.h"
#include "status_block.h"
=== modified file 'storage/ndb/memcache/src/ndb_engine.c'
--- a/storage/ndb/memcache/src/ndb_engine.c 2012-03-23 23:33:32 +0000
+++ b/storage/ndb/memcache/src/ndb_engine.c 2012-04-05 21:00:05 +0000
@@ -166,6 +166,7 @@ static ENGINE_ERROR_CODE ndb_initialize(
ENGINE_ERROR_CODE return_status;
struct ndb_engine *ndb_eng = ndb_handle(handle);
struct default_engine *def_eng = default_handle(ndb_eng);
+ scheduler_options sched_opts;
/* Process options for both the ndb engine and the default engine: */
read_cmdline_options(ndb_eng, def_eng, config_str);
@@ -212,6 +213,10 @@ static ENGINE_ERROR_CODE ndb_initialize(
/* prefetch data dictionary objects */
prefetch_dictionary_objects();
+ /* Build the scheduler options structure */
+ sched_opts.nthreads = ndb_eng->server_options.nthreads;
+ sched_opts.max_clients = ndb_eng->server_options.maxconns;
+
/* Allocate and initailize the pipelines, and their schedulers.
This will take some time; each pipeline creates slab and pool allocators,
and each scheduler may preallocate a large number of Ndb objects and
@@ -224,9 +229,9 @@ static ENGINE_ERROR_CODE ndb_initialize(
ndb_eng->pipelines = malloc(nthreads * sizeof(void *));
ndb_eng->schedulers = malloc(nthreads * sizeof(void *));
for(i = 0 ; i < nthreads ; i++) {
- ndb_eng->pipelines[i] = get_request_pipeline(i);
- ndb_eng->schedulers[i] =
- scheduler_initialize(ndb_eng->startup_options.scheduler, nthreads, i);
+ ndb_eng->pipelines[i] = get_request_pipeline(i, ndb_eng);
+ ndb_eng->schedulers[i] = scheduler_initialize(ndb_eng->pipelines[i],
+ & sched_opts);
if(ndb_eng->schedulers[i] == 0) {
logger->log(LOG_WARNING, NULL, "Illegal scheduler: \"%s\"\n",
ndb_eng->startup_options.scheduler);
@@ -819,6 +824,9 @@ int fetch_core_settings(struct ndb_engin
{ .key = "cas_enabled",
.datatype = DT_BOOL,
.value.dt_bool = &engine->server_options.cas_enabled },
+ { .key = "maxconns",
+ .datatype = DT_SIZE,
+ .value.dt_size = &engine->server_options.maxconns },
{ .key = "num_threads",
.datatype = DT_SIZE,
.value.dt_size = &engine->server_options.nthreads },
=== modified file 'storage/ndb/memcache/src/ndb_engine_errors.cc'
--- a/storage/ndb/memcache/src/ndb_engine_errors.cc 2012-03-23 19:01:07 +0000
+++ b/storage/ndb/memcache/src/ndb_engine_errors.cc 2012-04-05 21:37:02 +0000
@@ -20,17 +20,22 @@
#include <ndberror.h>
-ndberror_struct AppError9001_ReconfLock =
- { ndberror_st_temporary , ndberror_cl_application , 9001, -1,
+ndberror_struct AppError29001_ReconfLock =
+ { ndberror_st_temporary , ndberror_cl_application , 29001, -1,
"Could not obtain configuration read lock", 0
};
-ndberror_struct AppError9002_NoNDBs =
- { ndberror_st_temporary , ndberror_cl_application , 9002, -1,
+ndberror_struct AppError29002_NoNDBs =
+ { ndberror_st_temporary , ndberror_cl_application , 29002, -1,
"No Ndb Instances in freelist", 0
};
-ndberror_struct AppError9003_SyncClose =
- { ndberror_st_temporary , ndberror_cl_application , 9003, -1,
+ndberror_struct AppError29023_SyncClose =
+ { ndberror_st_temporary , ndberror_cl_application , 29023, -1,
"Waited for synchronous close of NDB transaction", 0
};
+
+ndberror_struct AppError29024_autogrow =
+ { ndberror_st_temporary , ndberror_cl_application , 29024, -1,
+ "Out of Ndb instances, growing freelist", 0
+ };
=== modified file 'storage/ndb/memcache/src/ndb_error_logger.cc'
--- a/storage/ndb/memcache/src/ndb_error_logger.cc 2012-03-07 03:57:36 +0000
+++ b/storage/ndb/memcache/src/ndb_error_logger.cc 2012-04-05 05:29:06 +0000
@@ -209,7 +209,7 @@ void ndb_error_logger_stats(ADD_STAT add
for(i = 0 ; i < ERROR_HASH_TABLE_SIZE; i++) {
for(sym = error_hash_table[i] ; sym != 0 ; sym = sym->next) {
klen = sprintf(key, "NDB_Error_%d", sym->error_code);
- vlen = sprintf(val, "%du", sym->count);
+ vlen = sprintf(val, "%lu", sym->count);
add_stat(key, klen, val, vlen, cookie);
}
}
=== modified file 'storage/ndb/memcache/src/ndb_pipeline.cc'
--- a/storage/ndb/memcache/src/ndb_pipeline.cc 2012-03-07 01:22:53 +0000
+++ b/storage/ndb/memcache/src/ndb_pipeline.cc 2012-04-03 00:34:18 +0000
@@ -71,7 +71,12 @@ void init_pool_header(allocation_referen
/* The public API */
-/* initialize a new pipeline for an NDB engine thread */
+/* Attach a new pipeline to an NDB worker thread.
+ Some initialization has already occured when the main single-thread startup
+ called get_request_pipeline(). But this is the first call into a pipeline
+ from its worker thread. It will initialize the thread's identifier, and
+ attach the pipeline to its scheduler.
+*/
ndb_pipeline * ndb_pipeline_initialize(struct ndb_engine *engine) {
bool did_inc;
unsigned int id;
@@ -85,14 +90,13 @@ ndb_pipeline * ndb_pipeline_initialize(s
/* Fetch the partially initialized pipeline */
ndb_pipeline * self = (ndb_pipeline *) engine->pipelines[id];
-
+
+ /* Sanity checks */
assert(self->id == id);
+ assert(self->engine == engine);
- /* Set the pointer back to the engine */
- self->engine = engine;
-
- /* And the thread id */
- self->engine_thread_id = pthread_self();
+ /* Set the pthread id */
+ self->worker_thread_id = pthread_self();
/* Create and set a thread identity */
tid = (thread_identifier *) memory_pool_alloc(self->pool, sizeof(thread_identifier));
@@ -108,16 +112,20 @@ ndb_pipeline * ndb_pipeline_initialize(s
}
-/* Allocate and initialize a generic request pipeline */
-ndb_pipeline * get_request_pipeline(int thd_id) {
+/* Allocate and initialize a generic request pipeline.
+ In unit test code, this can be called with a NULL engine pointer --
+ it will still initialize a usable slab allocator and memory pool
+ which can be tested.
+*/
+ndb_pipeline * get_request_pipeline(int thd_id, struct ndb_engine *engine) {
/* Allocate the pipeline */
ndb_pipeline *self = (ndb_pipeline *) malloc(sizeof(ndb_pipeline));
/* Initialize */
- self->engine = 0;
+ self->engine = engine;
self->id = thd_id;
self->nworkitems = 0;
-
+
/* Say hi to the alligator */
init_allocator(self);
@@ -160,27 +168,27 @@ ENGINE_ERROR_CODE pipeline_flush_all(ndb
/* The scheduler API */
-void * scheduler_initialize(const char *cf, int nthreads, int athread) {
+void * scheduler_initialize(ndb_pipeline *self, scheduler_options *options) {
Scheduler *s = 0;
- const char *sched_options = 0;
+ const char *cf = self->engine->startup_options.scheduler;
+ options->config_string = 0;
if(cf == 0 || *cf == 0) {
s = new DEFAULT_SCHEDULER;
}
else if(!strncasecmp(cf, "stockholm", 9)) {
s = new Scheduler_stockholm;
- sched_options = & cf[9];
+ options->config_string = & cf[9];
}
else if(!strncasecmp(cf,"S", 1)) {
s = new S::SchedulerWorker;
- sched_options = & cf[1];
+ options->config_string = & cf[1];
}
else {
return NULL;
}
-
- s->init(athread, nthreads, sched_options);
- s->pipeline = 0;
+
+ s->init(self->id, options);
return (void *) s;
}
=== modified file 'storage/ndb/memcache/src/ndb_worker.cc'
--- a/storage/ndb/memcache/src/ndb_worker.cc 2012-03-23 19:01:07 +0000
+++ b/storage/ndb/memcache/src/ndb_worker.cc 2012-04-05 21:37:02 +0000
@@ -880,7 +880,7 @@ void worker_close(NdbTransaction *tx, wo
nwaits_post = ndb->getClientStat(Ndb::WaitExecCompleteCount);
if(nwaits_post > nwaits_pre)
- log_app_error(& AppError9003_SyncClose);
+ log_app_error(& AppError29023_SyncClose);
if(wqitem->ext_val)
delete wqitem->ext_val;
=== modified file 'storage/ndb/memcache/src/schedulers/S_sched.cc'
--- a/storage/ndb/memcache/src/schedulers/S_sched.cc 2012-03-22 19:24:32 +0000
+++ b/storage/ndb/memcache/src/schedulers/S_sched.cc 2012-04-06 00:12:54 +0000
@@ -64,13 +64,14 @@ S::SchedulerGlobal::SchedulerGlobal(Conf
}
-void S::SchedulerGlobal::init(int _nthreads, const char *_config_string) {
+void S::SchedulerGlobal::init(const scheduler_options *sched_opts) {
DEBUG_ENTER_METHOD("S::SchedulerGlobal::init");
/* Set member variables */
- nthreads = _nthreads;
- config_string = _config_string;
+ nthreads = sched_opts->nthreads;
+ config_string = sched_opts->config_string;
parse_config_string(nthreads, config_string);
+ options.max_clients = sched_opts->max_clients;
/* Fetch or initialize clusters */
nclusters = conf->nclusters;
@@ -104,8 +105,9 @@ void S::SchedulerGlobal::init(int _nthre
/* Log message for startup */
logger->log(LOG_WARNING, 0, "Scheduler: starting for %d cluster%s; "
- "c%d,f%d,t%d", nclusters, nclusters == 1 ? "" : "s",
- options.n_connections, options.force_send, options.send_timer);
+ "c%d,f%d,g%d,t%d", nclusters, nclusters == 1 ? "" : "s",
+ options.n_connections, options.force_send,
+ options.auto_grow, options.send_timer);
/* Now Running */
running = true;
@@ -171,6 +173,7 @@ void S::SchedulerGlobal::parse_config_st
options.n_connections = 0; // 0 = n_connections based on db-stored config
options.force_send = 0; // 0 = force send always off
options.send_timer = 1; // 1 = 1 ms. timer in send thread
+ options.auto_grow = 1; // 1 = allow NDB instance pool to grow on demand
if(str) {
const char *s = str;
@@ -188,6 +191,9 @@ void S::SchedulerGlobal::parse_config_st
case 'f':
options.force_send = value;
break;
+ case 'g':
+ options.auto_grow = value;
+ break;
case 't':
options.send_timer = value;
break;
@@ -214,6 +220,10 @@ void S::SchedulerGlobal::parse_config_st
logger->log(LOG_WARNING, 0, "Invalid scheduler configuration.\n");
assert(options.send_timer >= 1 && options.send_timer <= 10);
}
+ if(options.auto_grow < 0 || options.auto_grow > 1) {
+ logger->log(LOG_WARNING, 0, "Invalid scheduler configuration.\n");
+ assert(options.auto_grow == 0 || options.auto_grow == 1);
+ }
}
@@ -249,15 +259,14 @@ void S::SchedulerGlobal::add_stats(const
/* SchedulerWorker methods */
void S::SchedulerWorker::init(int my_thread,
- int nthreads,
- const char * config_string) {
+ const scheduler_options * options) {
/* On the first call in, initialize the SchedulerGlobal.
* This will start the send & poll threads for each connection.
*/
if(my_thread == 0) {
sched_generation_number = 1;
s_global = new SchedulerGlobal(& get_Configuration());
- s_global->init(nthreads, config_string);
+ s_global->init(options);
}
/* Initialize member variables */
@@ -286,7 +295,7 @@ void S::SchedulerWorker::attach_thread(t
ENGINE_ERROR_CODE S::SchedulerWorker::schedule(workitem *item) {
int c = item->prefix_info.cluster_id;
ENGINE_ERROR_CODE response_code;
- NdbInstance *inst;
+ NdbInstance *inst = 0;
S::WorkerConnection *wc;
const KeyPrefix *pfx;
@@ -299,29 +308,41 @@ ENGINE_ERROR_CODE S::SchedulerWorker::sc
pthread_rwlock_unlock(& reconf_lock);
}
else {
- log_app_error(& AppError9001_ReconfLock);
+ log_app_error(& AppError29001_ReconfLock);
return ENGINE_TMPFAIL;
}
/* READ LOCK RELEASED */
item->base.nsuffix = item->base.nkey - pfx->prefix_len;
if(item->base.nsuffix == 0) return ENGINE_EINVAL; // key too short
-
- if(wc && wc->freelist) {
+
+ if(wc == 0) return ENGINE_FAILED;
+
+ if(wc->freelist) { /* Get the next NDB from the freelist. */
inst = wc->freelist;
wc->freelist = inst->next;
}
- else {
- /* No more free NDBs.
- Eventually Scheduler::io_completed() will run _in this thread_ and return
- an NDB to the freelist. But no other thread can free one, so
- all we can do is return an error.
- (Or, alternately, the scheduler may be shutting down.)
- */
- log_app_error(& AppError9002_NoNDBs);
- return ENGINE_TMPFAIL;
+ else { /* No free NDBs. */
+ if(wc->sendqueue->is_aborted()) {
+ return ENGINE_TMPFAIL;
+ }
+ else { /* Try to make an NdbInstance on the fly */
+ inst = wc->newNdbInstance();
+ if(inst) {
+ log_app_error(& AppError29024_autogrow);
+ }
+ else {
+ /* We have hit a hard maximum. Eventually Scheduler::io_completed()
+ will run _in this thread_ and return an NDB to the freelist.
+ But no other thread can free one, so here we return an error.
+ */
+ log_app_error(& AppError29002_NoNDBs);
+ return ENGINE_TMPFAIL;
+ }
+ }
}
+ assert(inst);
inst->link_workitem(item);
// Fetch the query plan for this prefix.
@@ -520,6 +541,18 @@ void S::Cluster::add_stats(const char *s
/* WorkerConnection methods */
+
+NdbInstance * S::WorkerConnection::newNdbInstance() {
+ NdbInstance *inst = 0;
+ if(instances.current < instances.max) {
+ inst = new NdbInstance(conn->conn, 2);
+ instances.current++;
+ inst->id = ((id.thd + 1) * 10000) + instances.current;
+ }
+ return inst;
+}
+
+
S::WorkerConnection::WorkerConnection(SchedulerGlobal *global,
int thd_id, int cluster_id) {
S::Cluster *cl = global->clusters[cluster_id];
@@ -536,21 +569,25 @@ S::WorkerConnection::WorkerConnection(Sc
plan_set = new ConnQueryPlanSet(conn->conn, conf->nprefixes);
plan_set->buildSetForConfiguration(conf, cluster_id);
+ /* How many NDB instances to start initially */
+ instances.initial = conn->instances.initial / conn->n_workers;
+
+ /* Maximum size of send queue, and upper bound on NDB instances */
+ instances.max = conn->instances.max / conn->n_workers;
+
/* Build the freelist */
freelist = 0;
- int my_ndb_inst = conn->nInst / conn->n_workers;
- for(int j = 0 ; j < my_ndb_inst ; j++ ) {
- NdbInstance *inst = new NdbInstance(conn->conn, 2);
- inst->id = ((id.thd + 1) * 10000) + j + 1;
+ for(instances.current = 0; instances.current < instances.initial; ) {
+ NdbInstance *inst = newNdbInstance();
inst->next = freelist;
freelist = inst;
}
DEBUG_PRINT("Cluster %d, connection %d (node %d), worker %d: %d NDBs.",
- id.cluster, id.conn, id.node, id.thd, my_ndb_inst);
+ id.cluster, id.conn, id.node, id.thd, instances.current);
/* Initialize the sendqueue */
- sendqueue = new Queue<NdbInstance>(my_ndb_inst);
+ sendqueue = new Queue<NdbInstance>(instances.max);
/* Hoard a transaction (an API connect record) for each Ndb object. This
* first call to startTransaction() will send TC_SEIZEREQ and wait for a
@@ -560,7 +597,7 @@ S::WorkerConnection::WorkerConnection(Sc
QueryPlan *plan;
const KeyPrefix *prefix = conf->getNextPrefixForCluster(id.cluster, NULL);
if(prefix) {
- NdbTransaction ** txlist = new NdbTransaction * [my_ndb_inst];
+ NdbTransaction ** txlist = new NdbTransaction * [instances.current];
int i = 0;
// Open them all.
@@ -573,7 +610,7 @@ S::WorkerConnection::WorkerConnection(Sc
}
// Close them all.
- for(i = 0 ; i < my_ndb_inst ; i++) {
+ for(i = 0 ; i < instances.current ; i++) {
txlist[i]->close();
}
@@ -642,17 +679,29 @@ S::Connection::Connection(S::Cluster & _
n_workers = global->options.n_worker_threads / cluster.nconnections;
if(n_total_workers % cluster.nconnections > id) n_workers += 1;
- /* How many NDB objects are needed? */
- /* Note that this is used to configure hard limits on the size of the
- * waitgroup, the sentqueue, and the reschedulequeue -- and it will not be
+ /* How many NDB objects are needed for the desired performance? */
+ double total_ndb_objects = conf->figureInFlightTransactions(cluster.cluster_id);
+ instances.initial = (int) (total_ndb_objects / cluster.nconnections);
+ while(instances.initial % n_workers) instances.initial++; // round up
+
+ /* The maximum number of NDB objects.
+ * This is used to configure hard limits on the size of the waitgroup,
+ * the sentqueue, and the reschedulequeue -- and it will not be
* possible to increase those limits during online reconfig.
*/
- double total_ndb_objects = conf->figureInFlightTransactions(cluster.cluster_id);
- nInst = (int) (total_ndb_objects / cluster.nconnections);
- while(nInst % n_workers) nInst++; // round up
-
+ instances.max = instances.initial;
+ // allow the pool to grow on demand?
+ if(global->options.auto_grow)
+ instances.max *= 1.6;
+ // max_clients imposes a hard upper limit
+ if(instances.max > (global->options.max_clients / cluster.nconnections))
+ instances.max = global->options.max_clients / cluster.nconnections;
+ // instances.initial might also be subject to the max_clients limit
+ if(instances.initial > instances.max)
+ instances.initial = instances.max;
+
/* Get a multi-wait Poll Group */
- pollgroup = conn->create_ndb_wait_group(nInst);
+ pollgroup = conn->create_ndb_wait_group(instances.max);
/* Initialize the statistics */
stats.sent_operations = 0;
@@ -665,8 +714,8 @@ S::Connection::Connection(S::Cluster & _
sem.counter = 0;
/* Initialize the queues for sent and resceduled items */
- sentqueue = new Queue<NdbInstance>(nInst);
- reschedulequeue = new Queue<NdbInstance>(nInst);
+ sentqueue = new Queue<NdbInstance>(instances.max);
+ reschedulequeue = new Queue<NdbInstance>(instances.max);
}
@@ -726,6 +775,14 @@ void S::Connection::add_stats(const char
klen = sprintf(key, "cl%d.conn%d.timeout_races", cluster.cluster_id, id);
vlen = sprintf(val, "%llu", stats.timeout_races);
add_stat(key, klen, val, vlen, cookie);
+
+ klen = sprintf(key, "cl%d.conn%d.instances.initial", cluster.cluster_id, id);
+ vlen = sprintf(val, "%d", instances.initial);
+ add_stat(key, klen, val, vlen, cookie);
+
+ klen = sprintf(key, "cl%d.conn%d.instances.max", cluster.cluster_id, id);
+ vlen = sprintf(val, "%d", instances.max);
+ add_stat(key, klen, val, vlen, cookie);
}
=== modified file 'storage/ndb/memcache/src/schedulers/S_sched.h'
--- a/storage/ndb/memcache/src/schedulers/S_sched.h 2012-03-07 01:22:53 +0000
+++ b/storage/ndb/memcache/src/schedulers/S_sched.h 2012-04-06 00:12:54 +0000
@@ -61,7 +61,7 @@ class S::SchedulerGlobal {
public:
SchedulerGlobal(Configuration *);
~SchedulerGlobal() {};
- void init(int threads, const char *config_string);
+ void init(const scheduler_options *options);
void add_stats(const char *, ADD_STAT, const void *);
void reconfigure(Configuration *);
void shutdown();
@@ -82,6 +82,8 @@ public:
int n_connections; /** preferred number of NDB cluster connections */
int force_send; /** how to use NDB force-send */
int send_timer; /** milliseconds to set for adaptive send timer */
+ int auto_grow; /** whether to allow NDB instance pool to grow */
+ int max_clients; /** memcached max allowed connections */
} options;
private:
@@ -99,7 +101,7 @@ class S::SchedulerWorker : public Schedu
public:
SchedulerWorker() {};
~SchedulerWorker() {};
- void init(int threadnum, int nthreads, const char *config_string);
+ void init(int threadnum, const scheduler_options * sched_opts);
void attach_thread(thread_identifier *);
ENGINE_ERROR_CODE schedule(workitem *);
void yield(workitem *) const {};
@@ -111,6 +113,7 @@ public:
private:
int id;
+ ndb_pipeline *pipeline;
SchedulerGlobal * m_global;
};
@@ -159,9 +162,12 @@ private:
Queue<NdbInstance> * reschedulequeue;
int id;
int node_id;
- int nInst;
- int n_total_workers;
- int n_workers;
+ int n_total_workers; /* same as SchedulerGlobal::options.n_worker_threads */
+ int n_workers; /* number of workers for this connection */
+ struct {
+ int initial; /* start with this many NDB instances */
+ int max; /* scale up to this many */
+ } instances;
pthread_t send_thread_id;
pthread_t poll_thread_id;
struct {
@@ -187,6 +193,7 @@ public:
~WorkerConnection();
void shutdown();
void reconfigure(Configuration *);
+ NdbInstance * newNdbInstance();
struct {
int thd : 8;
@@ -194,6 +201,11 @@ public:
int conn : 8;
unsigned int node : 8;
} id;
+ struct {
+ int initial;
+ int current;
+ int max;
+ } instances;
S::Connection *conn;
ConnQueryPlanSet *plan_set, *old_plan_set;
NdbInstance *freelist;
=== modified file 'storage/ndb/memcache/src/schedulers/Stockholm.cc'
--- a/storage/ndb/memcache/src/schedulers/Stockholm.cc 2012-03-07 01:22:53 +0000
+++ b/storage/ndb/memcache/src/schedulers/Stockholm.cc 2012-04-03 00:34:18 +0000
@@ -46,14 +46,15 @@ extern "C" {
}
-void Scheduler_stockholm::init(int my_thread, int nthreads, const char *config_string) {
+void Scheduler_stockholm::init(int my_thread,
+ const scheduler_options *options) {
const Configuration & conf = get_Configuration();
/* How many NDB instances are needed per cluster? */
for(unsigned int c = 0 ; c < conf.nclusters ; c++) {
ClusterConnectionPool *pool = conf.getConnectionPoolById(c);
double total_ndb_objects = conf.figureInFlightTransactions(c);
- cluster[c].nInst = (int) total_ndb_objects / nthreads;
+ cluster[c].nInst = (int) total_ndb_objects / options->nthreads;
DEBUG_PRINT("cluster %d: %d TPS @ %d usec RTT ==> %d NDB instances.",
c, conf.max_tps, pool->usec_rtt, cluster[c].nInst);
}
=== modified file 'storage/ndb/memcache/src/schedulers/Stockholm.h'
--- a/storage/ndb/memcache/src/schedulers/Stockholm.h 2012-03-07 01:22:53 +0000
+++ b/storage/ndb/memcache/src/schedulers/Stockholm.h 2012-04-03 00:34:18 +0000
@@ -44,7 +44,7 @@ class Scheduler_stockholm : public Sched
public:
Scheduler_stockholm() {};
~Scheduler_stockholm() {};
- void init(int threadnum, int nthreads, const char *config_string);
+ void init(int threadnum, const scheduler_options *options);
void attach_thread(thread_identifier *);
ENGINE_ERROR_CODE schedule(workitem *);
void yield(workitem *) const; // inlined
@@ -56,6 +56,7 @@ public:
bool global_reconfigure(Configuration *) { return false; } ;
private:
+ ndb_pipeline *pipeline;
struct {
struct workqueue *queue;
struct sched_stats_stockholm {
=== modified file 'storage/ndb/memcache/unit/alloc.cc'
--- a/storage/ndb/memcache/unit/alloc.cc 2011-12-18 23:26:44 +0000
+++ b/storage/ndb/memcache/unit/alloc.cc 2012-04-03 00:34:18 +0000
@@ -27,14 +27,16 @@
#include "all_tests.h"
+#define TEST_ALLOC_BLOCKS 34
+
int run_allocator_test(QueryPlan *, Ndb *, int v) {
- struct request_pipeline *p = get_request_pipeline(0);
+ struct request_pipeline *p = get_request_pipeline(0, NULL);
memory_pool *p1 = pipeline_create_memory_pool(p);
int sz = 13;
uint tot = 0;
void *v1, *v2;
- for(int i = 0 ; i < 25 ; i++) {
+ for(int i = 0 ; i < TEST_ALLOC_BLOCKS ; i++) {
v1 = memory_pool_alloc(p1, sz); tot += sz;
v2 = memory_pool_alloc(p1, sz + 1); tot += sz + 1;
sz = (int) (sz * 1.25);
No bundle (reason: useless for push emails).
| Thread |
|---|
| • bzr push into mysql-5.5-cluster-7.2 branch (john.duncan:3873 to 3877) | John David Duncan | 10 Apr |