List:Commits« Previous MessageNext Message »
From:John David Duncan Date:April 6 2012 12:17am
Subject:bzr push into mysql-5.5-cluster-7.2 branch (john.duncan:3873 to 3877)
View as plain text  
 3877 John David Duncan	2012-04-05
      On-demand growth of the NdbInstance pool is enabled by default, and can be explicitly disabled or enabled by setting scheduler option g0 or g1.

    modified:
      storage/ndb/memcache/src/schedulers/S_sched.cc
      storage/ndb/memcache/src/schedulers/S_sched.h
 3876 John David Duncan	2012-04-05
      bug#13890064 -- memcached aborts with "Unknown error code: 13" when binary GET request meets ENGINE_TMPFAIL.

    modified:
      storage/ndb/memcache/extra/memcached/daemon/memcached.c
 3875 John David Duncan	2012-04-05
      Error 9002 was used elsewhere in NDB. 
      Error codes in the memcached engine now start at 29000.

    modified:
      storage/ndb/memcache/include/ndb_engine_errors.h
      storage/ndb/memcache/src/ndb_engine_errors.cc
      storage/ndb/memcache/src/ndb_worker.cc
      storage/ndb/memcache/src/schedulers/S_sched.cc
 3874 John David Duncan	2012-04-05 [merge]
      merge

    modified:
      storage/ndb/memcache/include/Scheduler.h
      storage/ndb/memcache/include/ndb_engine.h
      storage/ndb/memcache/include/ndb_engine_errors.h
      storage/ndb/memcache/include/ndb_pipeline.h
      storage/ndb/memcache/src/ExternalValue.cc
      storage/ndb/memcache/src/ndb_engine.c
      storage/ndb/memcache/src/ndb_engine_errors.cc
      storage/ndb/memcache/src/ndb_error_logger.cc
      storage/ndb/memcache/src/ndb_pipeline.cc
      storage/ndb/memcache/src/schedulers/S_sched.cc
      storage/ndb/memcache/src/schedulers/S_sched.h
      storage/ndb/memcache/src/schedulers/Stockholm.cc
      storage/ndb/memcache/src/schedulers/Stockholm.h
      storage/ndb/memcache/unit/alloc.cc
 3873 John David Duncan	2012-03-31 [merge]
      local merge

    modified:
      storage/ndb/memcache/include/ndbmemcache_config.in
=== modified file 'storage/ndb/memcache/extra/memcached/daemon/memcached.c'
--- a/storage/ndb/memcache/extra/memcached/daemon/memcached.c	2011-09-20 05:13:01 +0000
+++ b/storage/ndb/memcache/extra/memcached/daemon/memcached.c	2012-04-06 00:06:41 +0000
@@ -1761,6 +1761,10 @@ static void process_bin_get(conn *c) {
     case ENGINE_NOT_MY_VBUCKET:
         write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_NOT_MY_VBUCKET, 0);
         break;
+    case ENGINE_TMPFAIL:
+        write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_ETMPFAIL, 0);
+        break;
+
     default:
         /* @todo add proper error handling! */
         settings.extensions.logger->log(EXTENSION_LOG_WARNING, c,

=== modified file 'storage/ndb/memcache/include/Scheduler.h'
--- a/storage/ndb/memcache/include/Scheduler.h	2012-03-07 01:22:53 +0000
+++ b/storage/ndb/memcache/include/Scheduler.h	2012-04-03 00:34:18 +0000
@@ -23,13 +23,21 @@
 #include "ndbmemcache_global.h"
 #include <memcached/types.h>
 
-#include "ndb_pipeline.h"
+#include "thread_identifier.h"
+
+
+typedef struct {
+  int nthreads;                /* number of worker threads */
+  int max_clients;             /* maximum number of client connections */
+  const char * config_string;  /* scheduler-specific configuration string */
+} scheduler_options;       
+
 
 #ifdef __cplusplus
-#include "NdbInstance.h"
-#include "Configuration.h"
-#include "thread_identifier.h"
 
+/* Forward declarations */
+class Configuration;
+class workitem;
 
 /* Scheduler is an interface */
 
@@ -44,10 +52,9 @@ public:
   /** init() is the called from the main thread, 
       after configuration has been read. 
       threadnum: which thread this scheduler will eventually attach to 
-      nthreads: how many total threads will be initialized 
-      config_string: additional configuration string for scheduler   
+      options: struct specifying run-time options   
   */
-  virtual void init(int threadnum, int nthreads, const char *config_string) = 0;
+  virtual void init(int threadnum, const scheduler_options * options) = 0;
                     
   /** attach_thread() is called from each thread 
       at pipeline initialization time. */
@@ -87,9 +94,6 @@ public:
    */
   virtual bool global_reconfigure(Configuration *new_config) = 0;
     
-  /** each scheduler instance serves a single NDB pipeline 
-  */
-  ndb_pipeline *pipeline;
 };
 #endif
 

=== modified file 'storage/ndb/memcache/include/ndb_engine.h'
--- a/storage/ndb/memcache/include/ndb_engine.h	2012-03-07 01:22:53 +0000
+++ b/storage/ndb/memcache/include/ndb_engine.h	2012-04-03 00:34:18 +0000
@@ -62,6 +62,7 @@ struct ndb_engine {
   } startup_options;
   
   struct {
+    size_t maxconns;
     size_t nthreads;
     bool cas_enabled;  
     size_t verbose;

=== modified file 'storage/ndb/memcache/include/ndb_engine_errors.h'
--- a/storage/ndb/memcache/include/ndb_engine_errors.h	2012-03-23 19:01:07 +0000
+++ b/storage/ndb/memcache/include/ndb_engine_errors.h	2012-04-05 21:37:02 +0000
@@ -22,11 +22,22 @@
 
 #include <ndberror.h>
 
-/* Errors 9000 - 9099 are reported as "Scheduler Error" */
-extern ndberror_struct AppError9001_ReconfLock;
-extern ndberror_struct AppError9002_NoNDBs;
-extern ndberror_struct AppError9003_SyncClose;
+/* 
+   The NDB Engine for Memcached uses error codes 29000 - 29999 
+*/
 
-/* Errors 9100 and up are reported as "Memcached Error" */
+
+
+/*** Errors 290xx and 291xx are reported as "Scheduler Error" ***/
+
+/* 2900x: general scheduler error codes */
+extern ndberror_struct AppError29001_ReconfLock;
+extern ndberror_struct AppError29002_NoNDBs;
+
+/* 2902x: blocking NDB operations in worker thread */
+extern ndberror_struct AppError29023_SyncClose;
+extern ndberror_struct AppError29024_autogrow;
+
+/*** Errors 29200 and up are reported as "Memcached Error" ***/
 
 #endif

=== modified file 'storage/ndb/memcache/include/ndb_pipeline.h'
--- a/storage/ndb/memcache/include/ndb_pipeline.h	2012-03-23 23:33:32 +0000
+++ b/storage/ndb/memcache/include/ndb_pipeline.h	2012-04-05 21:00:05 +0000
@@ -26,14 +26,13 @@
 
 #include "workqueue.h"
 #include "ndb_engine.h"
+#include "Scheduler.h"
 
 /* This structure is used in both C and C++ code, requiring a small hack: */
 #ifdef __cplusplus
-/* forward declaration: */
-class Scheduler;
-#define C_OR_CPP_SCHEDULER Scheduler
+#define CPP_SCHEDULER Scheduler
 #else 
-#define C_OR_CPP_SCHEDULER void
+#define CPP_SCHEDULER void
 #endif
 
 /* In each pipeline there lives an allocator, which is used for workitems, 
@@ -83,9 +82,9 @@ typedef struct request_pipeline {
   unsigned int id;              /*! each pipeline has an id */
   unsigned int nworkitems;      /*! counter used to give each workitem an id */
   struct ndb_engine *engine;    
-  pthread_t engine_thread_id;   
+  pthread_t worker_thread_id;   
   allocator_slab_class alligator[ALLIGATOR_ARRAY_SIZE];  /*! an allocator */
-  C_OR_CPP_SCHEDULER *scheduler;
+  CPP_SCHEDULER *scheduler;
   memory_pool *pool;            /*! has the whole lifetime of the pipeline */
 } ndb_pipeline;
 
@@ -99,7 +98,7 @@ DECLARE_FUNCTIONS_WITH_C_LINKAGE
 ndb_pipeline * ndb_pipeline_initialize(struct ndb_engine *);
 
 /** create a generic request pipeline */
-ndb_pipeline * get_request_pipeline(int thd_id);
+ndb_pipeline * get_request_pipeline(int thd_id, struct ndb_engine *);
 
 /** call into a pipeline for its own statistics */
 void pipeline_add_stats(ndb_pipeline *, const char *key, ADD_STAT, const void *);
@@ -111,7 +110,7 @@ ENGINE_ERROR_CODE pipeline_flush_all(ndb
 /***** SCHEDULER APIS *****/
 
 /** Global initialization of scheduler, at startup time */
-void * scheduler_initialize(const char *name, int nthreads, int threadnum);
+void * scheduler_initialize(ndb_pipeline *, scheduler_options *);
 
 /** shutdown a scheduler */
 void scheduler_shutdown(ndb_pipeline *);

=== modified file 'storage/ndb/memcache/src/ExternalValue.cc'
--- a/storage/ndb/memcache/src/ExternalValue.cc	2012-03-23 19:01:07 +0000
+++ b/storage/ndb/memcache/src/ExternalValue.cc	2012-04-05 21:00:05 +0000
@@ -22,6 +22,7 @@
 #include <assert.h>
 
 #include "workitem.h"
+#include "NdbInstance.h"
 #include "Operation.h"
 #include "Scheduler.h"
 #include "status_block.h"

=== modified file 'storage/ndb/memcache/src/ndb_engine.c'
--- a/storage/ndb/memcache/src/ndb_engine.c	2012-03-23 23:33:32 +0000
+++ b/storage/ndb/memcache/src/ndb_engine.c	2012-04-05 21:00:05 +0000
@@ -166,6 +166,7 @@ static ENGINE_ERROR_CODE ndb_initialize(
   ENGINE_ERROR_CODE return_status;
   struct ndb_engine *ndb_eng = ndb_handle(handle);
   struct default_engine *def_eng = default_handle(ndb_eng);
+  scheduler_options sched_opts;
   
   /* Process options for both the ndb engine and the default engine:  */
   read_cmdline_options(ndb_eng, def_eng, config_str);
@@ -212,6 +213,10 @@ static ENGINE_ERROR_CODE ndb_initialize(
   /* prefetch data dictionary objects */
   prefetch_dictionary_objects();
 
+  /* Build the scheduler options structure */
+  sched_opts.nthreads = ndb_eng->server_options.nthreads;
+  sched_opts.max_clients = ndb_eng->server_options.maxconns;
+
   /* Allocate and initailize the pipelines, and their schedulers.  
      This will take some time; each pipeline creates slab and pool allocators,
      and each scheduler may preallocate a large number of Ndb objects and 
@@ -224,9 +229,9 @@ static ENGINE_ERROR_CODE ndb_initialize(
   ndb_eng->pipelines  = malloc(nthreads * sizeof(void *));
   ndb_eng->schedulers = malloc(nthreads * sizeof(void *));
   for(i = 0 ; i < nthreads ; i++) {
-    ndb_eng->pipelines[i] = get_request_pipeline(i);
-    ndb_eng->schedulers[i] = 
-      scheduler_initialize(ndb_eng->startup_options.scheduler, nthreads, i);
+    ndb_eng->pipelines[i] = get_request_pipeline(i, ndb_eng);
+    ndb_eng->schedulers[i] = scheduler_initialize(ndb_eng->pipelines[i], 
+                                                  & sched_opts);
     if(ndb_eng->schedulers[i] == 0) {
       logger->log(LOG_WARNING, NULL, "Illegal scheduler: \"%s\"\n", 
                   ndb_eng->startup_options.scheduler); 
@@ -819,6 +824,9 @@ int fetch_core_settings(struct ndb_engin
     { .key = "cas_enabled",
       .datatype = DT_BOOL,
       .value.dt_bool = &engine->server_options.cas_enabled },
+    { .key = "maxconns",
+      .datatype = DT_SIZE,
+      .value.dt_size = &engine->server_options.maxconns },
     { .key = "num_threads",
       .datatype = DT_SIZE,
       .value.dt_size = &engine->server_options.nthreads },

=== modified file 'storage/ndb/memcache/src/ndb_engine_errors.cc'
--- a/storage/ndb/memcache/src/ndb_engine_errors.cc	2012-03-23 19:01:07 +0000
+++ b/storage/ndb/memcache/src/ndb_engine_errors.cc	2012-04-05 21:37:02 +0000
@@ -20,17 +20,22 @@
 
 #include <ndberror.h>
 
-ndberror_struct AppError9001_ReconfLock = 
-  { ndberror_st_temporary , ndberror_cl_application , 9001, -1,
+ndberror_struct AppError29001_ReconfLock = 
+  { ndberror_st_temporary , ndberror_cl_application , 29001, -1,
     "Could not obtain configuration read lock", 0
   };
 
-ndberror_struct AppError9002_NoNDBs =
-  { ndberror_st_temporary , ndberror_cl_application , 9002, -1,
+ndberror_struct AppError29002_NoNDBs =
+  { ndberror_st_temporary , ndberror_cl_application , 29002, -1,
     "No Ndb Instances in freelist", 0
   };
 
-ndberror_struct AppError9003_SyncClose =
-  { ndberror_st_temporary , ndberror_cl_application , 9003, -1,
+ndberror_struct AppError29023_SyncClose =
+  { ndberror_st_temporary , ndberror_cl_application , 29023, -1,
     "Waited for synchronous close of NDB transaction", 0 
   };
+
+ndberror_struct AppError29024_autogrow = 
+  { ndberror_st_temporary , ndberror_cl_application , 29024, -1,
+    "Out of Ndb instances, growing freelist", 0 
+  };

=== modified file 'storage/ndb/memcache/src/ndb_error_logger.cc'
--- a/storage/ndb/memcache/src/ndb_error_logger.cc	2012-03-07 03:57:36 +0000
+++ b/storage/ndb/memcache/src/ndb_error_logger.cc	2012-04-05 05:29:06 +0000
@@ -209,7 +209,7 @@ void ndb_error_logger_stats(ADD_STAT add
   for(i = 0 ; i < ERROR_HASH_TABLE_SIZE; i++) {
     for(sym = error_hash_table[i] ; sym != 0 ; sym = sym->next) { 
       klen = sprintf(key, "NDB_Error_%d", sym->error_code);
-      vlen = sprintf(val, "%du", sym->count);
+      vlen = sprintf(val, "%lu", sym->count);
       add_stat(key, klen, val, vlen, cookie);
     }
   }

=== modified file 'storage/ndb/memcache/src/ndb_pipeline.cc'
--- a/storage/ndb/memcache/src/ndb_pipeline.cc	2012-03-07 01:22:53 +0000
+++ b/storage/ndb/memcache/src/ndb_pipeline.cc	2012-04-03 00:34:18 +0000
@@ -71,7 +71,12 @@ void init_pool_header(allocation_referen
 
 /* The public API */
 
-/* initialize a new pipeline for an NDB engine thread */
+/* Attach a new pipeline to an NDB worker thread. 
+   Some initialization has already occured when the main single-thread startup
+   called get_request_pipeline().  But this is the first call into a pipeline
+   from its worker thread.  It will initialize the thread's identifier, and 
+   attach the pipeline to its scheduler.
+*/
 ndb_pipeline * ndb_pipeline_initialize(struct ndb_engine *engine) {
   bool did_inc;
   unsigned int id;
@@ -85,14 +90,13 @@ ndb_pipeline * ndb_pipeline_initialize(s
   
   /* Fetch the partially initialized pipeline */
   ndb_pipeline * self = (ndb_pipeline *) engine->pipelines[id];
-  
+
+  /* Sanity checks */
   assert(self->id == id);
+  assert(self->engine == engine);
   
-  /* Set the pointer back to the engine */
-  self->engine = engine;
-  
-  /* And the thread id */
-  self->engine_thread_id = pthread_self(); 
+  /* Set the pthread id */
+  self->worker_thread_id = pthread_self(); 
 
   /* Create and set a thread identity */
   tid = (thread_identifier *) memory_pool_alloc(self->pool, sizeof(thread_identifier));
@@ -108,16 +112,20 @@ ndb_pipeline * ndb_pipeline_initialize(s
 }
 
 
-/* Allocate and initialize a generic request pipeline */
-ndb_pipeline * get_request_pipeline(int thd_id) { 
+/* Allocate and initialize a generic request pipeline.
+   In unit test code, this can be called with a NULL engine pointer -- 
+   it will still initialize a usable slab allocator and memory pool 
+   which can be tested.  
+*/
+ndb_pipeline * get_request_pipeline(int thd_id, struct ndb_engine *engine) { 
   /* Allocate the pipeline */
   ndb_pipeline *self = (ndb_pipeline *) malloc(sizeof(ndb_pipeline)); 
   
   /* Initialize */  
-  self->engine = 0;
+  self->engine = engine;
   self->id = thd_id;
   self->nworkitems = 0;
-
+    
   /* Say hi to the alligator */  
   init_allocator(self);
   
@@ -160,27 +168,27 @@ ENGINE_ERROR_CODE pipeline_flush_all(ndb
 
 /* The scheduler API */
 
-void * scheduler_initialize(const char *cf, int nthreads, int athread) {
+void * scheduler_initialize(ndb_pipeline *self, scheduler_options *options) {
   Scheduler *s = 0;
-  const char *sched_options = 0;
+  const char *cf = self->engine->startup_options.scheduler;
+  options->config_string = 0;
   
   if(cf == 0 || *cf == 0) {
     s = new DEFAULT_SCHEDULER;
   }
   else if(!strncasecmp(cf, "stockholm", 9)) {
     s = new Scheduler_stockholm;
-    sched_options = & cf[9];
+    options->config_string = & cf[9];
   }
   else if(!strncasecmp(cf,"S", 1)) {
     s = new S::SchedulerWorker;
-    sched_options = & cf[1];
+    options->config_string = & cf[1];
   }
   else {
     return NULL;
   }
-    
-  s->init(athread, nthreads, sched_options);
-  s->pipeline = 0;
+  
+  s->init(self->id, options);
 
   return (void *) s;
 }

=== modified file 'storage/ndb/memcache/src/ndb_worker.cc'
--- a/storage/ndb/memcache/src/ndb_worker.cc	2012-03-23 19:01:07 +0000
+++ b/storage/ndb/memcache/src/ndb_worker.cc	2012-04-05 21:37:02 +0000
@@ -880,7 +880,7 @@ void worker_close(NdbTransaction *tx, wo
   nwaits_post = ndb->getClientStat(Ndb::WaitExecCompleteCount);
 
   if(nwaits_post > nwaits_pre) 
-    log_app_error(& AppError9003_SyncClose);
+    log_app_error(& AppError29023_SyncClose);
  
   if(wqitem->ext_val) 
     delete wqitem->ext_val;

=== modified file 'storage/ndb/memcache/src/schedulers/S_sched.cc'
--- a/storage/ndb/memcache/src/schedulers/S_sched.cc	2012-03-22 19:24:32 +0000
+++ b/storage/ndb/memcache/src/schedulers/S_sched.cc	2012-04-06 00:12:54 +0000
@@ -64,13 +64,14 @@ S::SchedulerGlobal::SchedulerGlobal(Conf
 }
 
 
-void S::SchedulerGlobal::init(int _nthreads, const char *_config_string) {
+void S::SchedulerGlobal::init(const scheduler_options *sched_opts) {
   DEBUG_ENTER_METHOD("S::SchedulerGlobal::init");
 
   /* Set member variables */
-  nthreads = _nthreads;
-  config_string = _config_string;
+  nthreads = sched_opts->nthreads;
+  config_string = sched_opts->config_string;
   parse_config_string(nthreads, config_string);
+  options.max_clients = sched_opts->max_clients;
 
   /* Fetch or initialize clusters */
   nclusters = conf->nclusters;
@@ -104,8 +105,9 @@ void S::SchedulerGlobal::init(int _nthre
   
   /* Log message for startup */
   logger->log(LOG_WARNING, 0, "Scheduler: starting for %d cluster%s; "
-              "c%d,f%d,t%d", nclusters, nclusters == 1 ? "" : "s",
-              options.n_connections, options.force_send, options.send_timer);
+              "c%d,f%d,g%d,t%d", nclusters, nclusters == 1 ? "" : "s",
+              options.n_connections, options.force_send, 
+              options.auto_grow, options.send_timer);
 
   /* Now Running */
   running = true;
@@ -171,6 +173,7 @@ void S::SchedulerGlobal::parse_config_st
   options.n_connections = 0;   // 0 = n_connections based on db-stored config
   options.force_send = 0;      // 0 = force send always off
   options.send_timer = 1;      // 1 = 1 ms. timer in send thread
+  options.auto_grow = 1;       // 1 = allow NDB instance pool to grow on demand
 
   if(str) {
     const char *s = str;
@@ -188,6 +191,9 @@ void S::SchedulerGlobal::parse_config_st
         case 'f':
           options.force_send = value;
           break;
+        case 'g':
+          options.auto_grow = value;
+          break;
         case 't':
           options.send_timer = value;
           break;
@@ -214,6 +220,10 @@ void S::SchedulerGlobal::parse_config_st
     logger->log(LOG_WARNING, 0, "Invalid scheduler configuration.\n");
     assert(options.send_timer >= 1 && options.send_timer <= 10);
   }
+  if(options.auto_grow < 0 || options.auto_grow > 1) {
+    logger->log(LOG_WARNING, 0, "Invalid scheduler configuration.\n");
+    assert(options.auto_grow == 0 || options.auto_grow == 1);
+  }
 }
 
 
@@ -249,15 +259,14 @@ void S::SchedulerGlobal::add_stats(const
 /* SchedulerWorker methods */
 
 void S::SchedulerWorker::init(int my_thread, 
-                              int nthreads, 
-                              const char * config_string) {
+                              const scheduler_options * options) {
   /* On the first call in, initialize the SchedulerGlobal.
    * This will start the send & poll threads for each connection.
    */
   if(my_thread == 0) {
     sched_generation_number = 1;
     s_global = new SchedulerGlobal(& get_Configuration());
-    s_global->init(nthreads, config_string);
+    s_global->init(options);
   }
   
   /* Initialize member variables */
@@ -286,7 +295,7 @@ void S::SchedulerWorker::attach_thread(t
 ENGINE_ERROR_CODE S::SchedulerWorker::schedule(workitem *item) {
   int c = item->prefix_info.cluster_id;
   ENGINE_ERROR_CODE response_code;
-  NdbInstance *inst;
+  NdbInstance *inst = 0;
   S::WorkerConnection *wc;
   const KeyPrefix *pfx;
   
@@ -299,29 +308,41 @@ ENGINE_ERROR_CODE S::SchedulerWorker::sc
     pthread_rwlock_unlock(& reconf_lock);
   }
   else {
-    log_app_error(& AppError9001_ReconfLock);
+    log_app_error(& AppError29001_ReconfLock);
     return ENGINE_TMPFAIL;
   }
   /* READ LOCK RELEASED */
   
   item->base.nsuffix = item->base.nkey - pfx->prefix_len;
   if(item->base.nsuffix == 0) return ENGINE_EINVAL; // key too short
-  
-  if(wc && wc->freelist) {
+ 
+  if(wc == 0) return ENGINE_FAILED;
+    
+  if(wc->freelist) {                 /* Get the next NDB from the freelist. */
     inst = wc->freelist;
     wc->freelist = inst->next;
   }
-  else {
-    /* No more free NDBs. 
-     Eventually Scheduler::io_completed() will run _in this thread_ and return 
-     an NDB to the freelist.  But no other thread can free one, so 
-     all we can do is return an error. 
-     (Or, alternately, the scheduler may be shutting down.)
-     */
-    log_app_error(& AppError9002_NoNDBs);
-    return ENGINE_TMPFAIL;
+  else {                             /* No free NDBs. */
+    if(wc->sendqueue->is_aborted()) {
+      return ENGINE_TMPFAIL;
+    }
+    else {                           /* Try to make an NdbInstance on the fly */
+      inst = wc->newNdbInstance();
+      if(inst) {
+        log_app_error(& AppError29024_autogrow);
+      }
+      else {
+        /* We have hit a hard maximum.  Eventually Scheduler::io_completed() 
+           will run _in this thread_ and return an NDB to the freelist.  
+           But no other thread can free one, so here we return an error. 
+         */
+        log_app_error(& AppError29002_NoNDBs);
+        return ENGINE_TMPFAIL;
+      }
+    }
   }
   
+  assert(inst);
   inst->link_workitem(item);
   
   // Fetch the query plan for this prefix.
@@ -520,6 +541,18 @@ void S::Cluster::add_stats(const char *s
 
 /* WorkerConnection methods */
 
+
+NdbInstance * S::WorkerConnection::newNdbInstance() {
+  NdbInstance *inst = 0;
+  if(instances.current < instances.max) {
+    inst = new NdbInstance(conn->conn, 2);
+    instances.current++;
+    inst->id = ((id.thd + 1) * 10000) + instances.current;
+  }
+  return inst;
+}
+
+
 S::WorkerConnection::WorkerConnection(SchedulerGlobal *global,
                                       int thd_id, int cluster_id) {
   S::Cluster *cl = global->clusters[cluster_id];  
@@ -536,21 +569,25 @@ S::WorkerConnection::WorkerConnection(Sc
   plan_set = new ConnQueryPlanSet(conn->conn, conf->nprefixes);
   plan_set->buildSetForConfiguration(conf, cluster_id);
 
+  /* How many NDB instances to start initially */
+  instances.initial = conn->instances.initial / conn->n_workers;
+
+  /* Maximum size of send queue, and upper bound on NDB instances */
+  instances.max = conn->instances.max / conn->n_workers;
+
   /* Build the freelist */
   freelist = 0;
-  int my_ndb_inst = conn->nInst / conn->n_workers;
-  for(int j = 0 ; j < my_ndb_inst ; j++ ) {
-    NdbInstance *inst = new NdbInstance(conn->conn, 2);
-    inst->id = ((id.thd + 1) * 10000) + j + 1; 
+  for(instances.current = 0; instances.current < instances.initial; ) {
+    NdbInstance *inst = newNdbInstance();
     inst->next = freelist;
     freelist = inst;
   }
 
   DEBUG_PRINT("Cluster %d, connection %d (node %d), worker %d: %d NDBs.", 
-              id.cluster, id.conn, id.node, id.thd, my_ndb_inst);
+              id.cluster, id.conn, id.node, id.thd, instances.current);
   
   /* Initialize the sendqueue */
-  sendqueue = new Queue<NdbInstance>(my_ndb_inst);
+  sendqueue = new Queue<NdbInstance>(instances.max);
   
   /* Hoard a transaction (an API connect record) for each Ndb object.  This
    * first call to startTransaction() will send TC_SEIZEREQ and wait for a 
@@ -560,7 +597,7 @@ S::WorkerConnection::WorkerConnection(Sc
   QueryPlan *plan;
   const KeyPrefix *prefix = conf->getNextPrefixForCluster(id.cluster, NULL);
   if(prefix) {
-    NdbTransaction ** txlist = new NdbTransaction * [my_ndb_inst];
+    NdbTransaction ** txlist = new NdbTransaction * [instances.current];
     int i = 0;
 
     // Open them all.
@@ -573,7 +610,7 @@ S::WorkerConnection::WorkerConnection(Sc
     }
     
     // Close them all.
-    for(i = 0 ; i < my_ndb_inst ; i++) {
+    for(i = 0 ; i < instances.current ; i++) {
       txlist[i]->close();
     }    
     
@@ -642,17 +679,29 @@ S::Connection::Connection(S::Cluster & _
   n_workers = global->options.n_worker_threads / cluster.nconnections;
   if(n_total_workers % cluster.nconnections > id) n_workers += 1;  
 
-  /* How many NDB objects are needed? */
-  /* Note that this is used to configure hard limits on the size of the 
-   * waitgroup, the sentqueue, and the reschedulequeue -- and it will not be 
+  /* How many NDB objects are needed for the desired performance? */
+  double total_ndb_objects = conf->figureInFlightTransactions(cluster.cluster_id);
+  instances.initial = (int) (total_ndb_objects / cluster.nconnections);
+  while(instances.initial % n_workers) instances.initial++; // round up
+
+  /* The maximum number of NDB objects.
+   * This is used to configure hard limits on the size of the waitgroup, 
+   * the sentqueue, and the reschedulequeue -- and it will not be 
    * possible to increase those limits during online reconfig. 
    */
-  double total_ndb_objects = conf->figureInFlightTransactions(cluster.cluster_id);
-  nInst = (int) (total_ndb_objects / cluster.nconnections);
-  while(nInst % n_workers) nInst++; // round up
-    
+  instances.max = instances.initial;
+  // allow the pool to grow on demand? 
+  if(global->options.auto_grow)
+    instances.max *= 1.6;
+  // max_clients imposes a hard upper limit
+  if(instances.max > (global->options.max_clients / cluster.nconnections))
+    instances.max = global->options.max_clients / cluster.nconnections;
+  // instances.initial might also be subject to the max_clients limit
+  if(instances.initial > instances.max) 
+    instances.initial = instances.max;
+  
   /* Get a multi-wait Poll Group */
-  pollgroup = conn->create_ndb_wait_group(nInst);
+  pollgroup = conn->create_ndb_wait_group(instances.max);
       
   /* Initialize the statistics */
   stats.sent_operations = 0;
@@ -665,8 +714,8 @@ S::Connection::Connection(S::Cluster & _
   sem.counter = 0;
     
   /* Initialize the queues for sent and resceduled items */
-  sentqueue = new Queue<NdbInstance>(nInst);
-  reschedulequeue = new Queue<NdbInstance>(nInst);
+  sentqueue = new Queue<NdbInstance>(instances.max);
+  reschedulequeue = new Queue<NdbInstance>(instances.max);
 }
 
 
@@ -726,6 +775,14 @@ void S::Connection::add_stats(const char
   klen = sprintf(key, "cl%d.conn%d.timeout_races", cluster.cluster_id, id);
   vlen = sprintf(val, "%llu", stats.timeout_races);
   add_stat(key, klen, val, vlen, cookie);
+  
+  klen = sprintf(key, "cl%d.conn%d.instances.initial", cluster.cluster_id, id);
+  vlen = sprintf(val, "%d", instances.initial);
+  add_stat(key, klen, val, vlen, cookie);
+
+  klen = sprintf(key, "cl%d.conn%d.instances.max", cluster.cluster_id, id);
+  vlen = sprintf(val, "%d", instances.max);
+  add_stat(key, klen, val, vlen, cookie);
 }                              
   
 

=== modified file 'storage/ndb/memcache/src/schedulers/S_sched.h'
--- a/storage/ndb/memcache/src/schedulers/S_sched.h	2012-03-07 01:22:53 +0000
+++ b/storage/ndb/memcache/src/schedulers/S_sched.h	2012-04-06 00:12:54 +0000
@@ -61,7 +61,7 @@ class S::SchedulerGlobal {
 public:
   SchedulerGlobal(Configuration *);
   ~SchedulerGlobal() {};
-  void init(int threads, const char *config_string);
+  void init(const scheduler_options *options);
   void add_stats(const char *, ADD_STAT, const void *);
   void reconfigure(Configuration *);
   void shutdown();
@@ -82,6 +82,8 @@ public:
     int n_connections;     /** preferred number of NDB cluster connections */
     int force_send;        /** how to use NDB force-send */
     int send_timer;        /** milliseconds to set for adaptive send timer */
+    int auto_grow;         /** whether to allow NDB instance pool to grow */
+    int max_clients;       /** memcached max allowed connections */
   } options;
 
 private:
@@ -99,7 +101,7 @@ class S::SchedulerWorker : public Schedu
 public:  
   SchedulerWorker() {};
   ~SchedulerWorker() {};
-  void init(int threadnum, int nthreads, const char *config_string);
+  void init(int threadnum, const scheduler_options * sched_opts);
   void attach_thread(thread_identifier *);
   ENGINE_ERROR_CODE schedule(workitem *);
   void yield(workitem *) const {};
@@ -111,6 +113,7 @@ public:  
   
 private:
   int id;
+  ndb_pipeline *pipeline;
   SchedulerGlobal * m_global;
 };
 
@@ -159,9 +162,12 @@ private:
   Queue<NdbInstance> * reschedulequeue;
   int id;
   int node_id;
-  int nInst;
-  int n_total_workers;
-  int n_workers;
+  int n_total_workers;   /* same as SchedulerGlobal::options.n_worker_threads */
+  int n_workers;         /* number of workers for this connection */
+  struct {
+    int initial;         /* start with this many NDB instances */
+    int max;             /* scale up to this many */
+  } instances; 
   pthread_t send_thread_id;
   pthread_t poll_thread_id;  
   struct {
@@ -187,6 +193,7 @@ public:
   ~WorkerConnection();
   void shutdown();
   void reconfigure(Configuration *);
+  NdbInstance * newNdbInstance();
 
   struct { 
     int thd           : 8;
@@ -194,6 +201,11 @@ public:
     int conn          : 8;
     unsigned int node : 8;
   } id;
+  struct {
+    int initial;
+    int current;
+    int max;
+  } instances;
   S::Connection *conn;
   ConnQueryPlanSet *plan_set, *old_plan_set;
   NdbInstance *freelist;

=== modified file 'storage/ndb/memcache/src/schedulers/Stockholm.cc'
--- a/storage/ndb/memcache/src/schedulers/Stockholm.cc	2012-03-07 01:22:53 +0000
+++ b/storage/ndb/memcache/src/schedulers/Stockholm.cc	2012-04-03 00:34:18 +0000
@@ -46,14 +46,15 @@ extern "C" {
 }
 
 
-void Scheduler_stockholm::init(int my_thread, int nthreads, const char *config_string) {
+void Scheduler_stockholm::init(int my_thread, 
+                               const scheduler_options *options) {
   const Configuration & conf = get_Configuration();
 
   /* How many NDB instances are needed per cluster? */
   for(unsigned int c = 0 ; c < conf.nclusters ; c++) {
     ClusterConnectionPool *pool = conf.getConnectionPoolById(c);
     double total_ndb_objects = conf.figureInFlightTransactions(c);
-    cluster[c].nInst = (int) total_ndb_objects / nthreads;
+    cluster[c].nInst = (int) total_ndb_objects / options->nthreads;
     DEBUG_PRINT("cluster %d: %d TPS @ %d usec RTT ==> %d NDB instances.",
                 c, conf.max_tps, pool->usec_rtt, cluster[c].nInst);
   }

=== modified file 'storage/ndb/memcache/src/schedulers/Stockholm.h'
--- a/storage/ndb/memcache/src/schedulers/Stockholm.h	2012-03-07 01:22:53 +0000
+++ b/storage/ndb/memcache/src/schedulers/Stockholm.h	2012-04-03 00:34:18 +0000
@@ -44,7 +44,7 @@ class Scheduler_stockholm : public Sched
 public:
   Scheduler_stockholm() {};
   ~Scheduler_stockholm() {};
-  void init(int threadnum, int nthreads, const char *config_string);
+  void init(int threadnum, const scheduler_options *options);
   void attach_thread(thread_identifier *);
   ENGINE_ERROR_CODE schedule(workitem *);
   void yield(workitem *) const;                                       // inlined
@@ -56,6 +56,7 @@ public:
   bool global_reconfigure(Configuration *) { return false; } ;
 
 private:  
+  ndb_pipeline *pipeline;
   struct {
     struct workqueue *queue; 
     struct sched_stats_stockholm { 

=== modified file 'storage/ndb/memcache/unit/alloc.cc'
--- a/storage/ndb/memcache/unit/alloc.cc	2011-12-18 23:26:44 +0000
+++ b/storage/ndb/memcache/unit/alloc.cc	2012-04-03 00:34:18 +0000
@@ -27,14 +27,16 @@
 
 #include "all_tests.h"
 
+#define TEST_ALLOC_BLOCKS 34
+
 int run_allocator_test(QueryPlan *, Ndb *, int v) {
-  struct request_pipeline *p = get_request_pipeline(0);
+  struct request_pipeline *p = get_request_pipeline(0, NULL);
   
   memory_pool *p1 = pipeline_create_memory_pool(p);
   int sz = 13;
   uint tot = 0;
   void *v1, *v2;
-  for(int i = 0 ; i < 25 ; i++) {
+  for(int i = 0 ; i < TEST_ALLOC_BLOCKS ; i++) {
     v1 = memory_pool_alloc(p1, sz);     tot += sz;
     v2 = memory_pool_alloc(p1, sz + 1); tot += sz + 1;
     sz = (int) (sz * 1.25);

No bundle (reason: useless for push emails).
Thread
bzr push into mysql-5.5-cluster-7.2 branch (john.duncan:3873 to 3877) John David Duncan10 Apr