List:Commits« Previous MessageNext Message »
From:John David Duncan Date:April 6 2011 4:32am
Subject:bzr commit into mysql-5.1-telco-7.2 branch (john.duncan:4151)
View as plain text  
#At file:///Users/jdd/bzr-repo/working/5.1-telco-7.2-memcache/ based on revid:john.duncan@stripped

 4151 John David Duncan	2011-04-05
      Add files missing from last commit

    added:
      storage/ndb/memcache/src/schedulers/Flex.cc
      storage/ndb/memcache/src/schedulers/Flex.h
=== added file 'storage/ndb/memcache/src/schedulers/Flex.cc'
--- a/storage/ndb/memcache/src/schedulers/Flex.cc	1970-01-01 00:00:00 +0000
+++ b/storage/ndb/memcache/src/schedulers/Flex.cc	2011-04-06 04:32:39 +0000
@@ -0,0 +1,208 @@
+/*
+ Copyright (c) 2011, Oracle and/or its affiliates. All rights
+ reserved.
+ 
+ This program is free software; you can redistribute it and/or
+ modify it under the terms of the GNU General Public License
+ as published by the Free Software Foundation; version 2 of
+ the License.
+ 
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+ 
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ 02110-1301  USA
+ */
+/* System headers */
+/* C++ files must define __STDC_FORMAT_MACROS in order to get PRIu64 */
+#define __STDC_FORMAT_MACROS 
+#include <inttypes.h>
+#include <stdlib.h>
+#include <stdio.h>
+
+/* Memcache headers */
+#include "memcached/types.h"
+#include <memcached/extension_loggers.h>
+
+/* NDB Memcache headers */
+#include "workitem.h"
+#include "thread_identifier.h"
+#include "ClusterConnectionPool.h"
+#include "Flex.h"
+#include "Flex_thread_spec.h"
+#include "Flex_broker.h"
+#include "Flex_cluster.h"
+
+
+extern EXTENSION_LOGGER_DESCRIPTOR *logger;
+
+
+extern "C" {
+  void * run_flex_broker_thread(void *);
+}
+
+
+void Scheduler_flex::init(int t, int nthreads, const char *user_config) {
+  thread_id = t;
+  const char * active_cf;
+  
+  /* Set some baseline default values for the configuration */
+  config.n_engine_threads = nthreads;
+  config.n_broker_threads = 0;
+  config.n_commit_threads = 2;
+  config.n_db_pending = 0;
+  config.n_connections = 1;
+  
+  /* The default config parameters */
+  static const char * default_config = "b0,c2,p1";
+ 
+  /* Choose which string to parse */
+  if(user_config && *user_config) active_cf = user_config;
+  else active_cf = default_config;
+
+  /* disregard the return value from sscanf(), but test the config for validity,
+     and write it to the logger when the scheduler starts running */
+  sscanf(active_cf, "b%d,c%d,p%d", 
+         & config.n_broker_threads, & config.n_commit_threads, 
+         & config.n_connections);
+
+  /* Test validity of configuration */
+  if(config.n_broker_threads < 0 || config.n_broker_threads > 2) {
+    logger->log(LOG_WARNING, 0, "Invalid scheduler configuration.\n");
+    assert(config.n_broker_threads >= 0 && config.n_broker_threads <= 2);
+  }
+  if(config.n_commit_threads < 1 || config.n_commit_threads > 16) {
+    logger->log(LOG_WARNING, 0, "Invalid scheduler configuration.\n");
+    assert(config.n_commit_threads >= 1 && config.n_commit_threads <= 16);
+  }
+  if(config.n_db_pending < 0 || config.n_db_pending > 16) {
+    logger->log(LOG_WARNING, 0, "Invalid scheduler configuration.\n");
+    assert(config.n_db_pending >= 0 && config.n_db_pending <= 16);
+  }
+  if(config.n_connections < 1 || config.n_connections > 4) {
+    logger->log(LOG_WARNING, 0, "Invalid scheduler configuration.\n");
+    assert(config.n_connections >= 1 && config.n_connections <= 4);
+  }
+  
+  /* There is always a broker, even if there is no broker thread */
+  nbrokers = config.n_broker_threads > 1 ? config.n_broker_threads : 1;
+   
+  /* Create a workqueue for the broker thread */
+  broker_queue_size = 8192;
+  if(config.n_broker_threads) {
+    do_queue_sample = random() % FLEX_STATS_INITIAL_INTERVAL;
+    broker_queue = (struct workqueue *) malloc(sizeof(struct workqueue));
+    workqueue_init(broker_queue, broker_queue_size, config.n_broker_threads); 
+  }
+  else broker_queue = 0;
+  
+  /* Initialize the brokers.
+     If there are no broker threads, then broker[0]'s methods run in the engine 
+     thread.  Otherwise each broker will run mostly within a broker thread.
+   */
+  for(int i = 0 ; i < nbrokers ; i++) 
+    brokers[i] = new Broker(this, i);
+}
+
+
+void Scheduler_flex::attach_thread(thread_identifier * parent) {
+  const Configuration & conf = get_Configuration();
+  pipeline = parent->pipeline;
+  
+  logger->log(LOG_WARNING, 0, "Pipeline %d attached to flex scheduler: "
+              "config \"b%d,c%d,p%d\"; %d cluster%s.\n", pipeline->id,
+              config.n_broker_threads, config.n_commit_threads,
+              config.n_connections, conf.nclusters,
+              conf.nclusters == 1 ? "" : "s");
+
+  // Launch commit threads for each cluster
+  for(int i = 0 ; i < nbrokers; i++) 
+    for(int j = 0 ; j < conf.nclusters ; j++) 
+      brokers[i]->clusters[j]->attach_thread(parent);
+
+  // Adjust the thread stack size for the broker threads
+  pthread_attr_t broker_thd_attr;
+  size_t thd_stack_size;
+  pthread_attr_init(& broker_thd_attr);
+  pthread_attr_getstacksize(& broker_thd_attr, & thd_stack_size);
+  pthread_attr_setstacksize(& broker_thd_attr, thd_stack_size / 2);
+  
+  // Launch the broker threads
+  for(int i = 0; i < config.n_broker_threads ; i++) {
+    thread_spec *spec = new thread_spec(this, parent, i, 0, 0);
+    pthread_create(& brokers[i]->thread_id, & broker_thd_attr, 
+                   run_flex_broker_thread, (void *) spec);
+  }
+}
+
+
+ENGINE_ERROR_CODE Scheduler_flex::schedule(workitem *newitem) {
+  if(config.n_broker_threads) {
+
+    if(do_queue_sample-- == 0) {
+      /* See comments in Cluster::schedule() about queue depth sampling */
+      int depth = broker_queue->depth + 1;
+      if(depth > 0 && depth <= broker_queue_size) {
+        stats.broker_queue_total_depth += depth ;
+        stats.broker_queue_samples++;
+      }
+      do_queue_sample = random() % FLEX_STATS_SAMPLE_INTERVAL;
+    }
+
+    workqueue_add(broker_queue, newitem);
+
+    return ENGINE_EWOULDBLOCK;
+  }
+  else {
+    return brokers[0]->schedule(newitem);
+  }
+}
+
+
+void Scheduler_flex::add_stats(ADD_STAT add_stat, const void * cookie) {
+  char key[128];
+  char val[128];
+  int klen, vlen, p;
+  
+  stats.total_avg_ndb_depth = 0;
+  stats.total_avg_commit_queue_depth = 0;
+  for(int i = 0 ; i < nbrokers ; i++) 
+    brokers[i]->contribute_stats();
+  
+  klen = sprintf(key, "t%d_avg_ndb_depth", pipeline->id);
+  vlen = sprintf(val, "%.3f", stats.total_avg_ndb_depth / nbrokers);
+  add_stat(key, klen, val, vlen, cookie);
+  
+  klen = sprintf(key, "t%d_avg_commit_queue_depth", pipeline->id);
+  vlen = sprintf(val, "%.3f", stats.total_avg_commit_queue_depth / nbrokers);
+  add_stat(key, klen, val, vlen, cookie);
+  
+  if(config.n_broker_threads && stats.broker_queue_samples) {
+    klen = sprintf(key, "t%d_broker_queue_avg_depth", pipeline->id);
+    vlen = sprintf(val, "%.3f", 
+           (double) stats.broker_queue_total_depth / stats.broker_queue_samples);
+    add_stat(key, klen, val, vlen, cookie);
+  }
+}
+
+
+void Scheduler_flex::io_completed(workitem *item) {
+  int b = item->base.broker;
+  int c = item->prefix_info.cluster_id;
+  brokers[b]->clusters[c]->io_completed(item);
+}
+
+
+void * run_flex_broker_thread(void *s) {
+  Scheduler_flex::thread_spec *spec = (Scheduler_flex::thread_spec *) s;
+  set_child_thread_id(spec->parent, "broker%d", spec->broker);
+  
+  spec->run_broker_thread();
+  
+  delete spec;
+  return 0;
+}

=== added file 'storage/ndb/memcache/src/schedulers/Flex.h'
--- a/storage/ndb/memcache/src/schedulers/Flex.h	1970-01-01 00:00:00 +0000
+++ b/storage/ndb/memcache/src/schedulers/Flex.h	2011-04-06 04:32:39 +0000
@@ -0,0 +1,95 @@
+/*
+ Copyright (c) 2011, Oracle and/or its affiliates. All rights
+ reserved.
+ 
+ This program is free software; you can redistribute it and/or
+ modify it under the terms of the GNU General Public License
+ as published by the Free Software Foundation; version 2 of
+ the License.
+ 
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+ 
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ 02110-1301  USA
+ */
+
+#ifndef NDBMEMCACHE_FLEX_SCHEDULER_H
+#define NDBMEMCACHE_FLEX_SCHEDULER_H
+
+#ifndef __cplusplus
+#error "This file is for C++ only"
+#endif
+
+#include <memcached/types.h>
+
+#include "config.h"
+#include "Scheduler.h"
+#include "KeyPrefix.h"
+
+
+/* 
+ *              Flex Scheduler 
+ *
+ *  The scheduler is configurable:
+ *     - 0, 1, or 2 "middleman" broker threads per engine thread
+ *     - some number of commit threads per {broker, cluster id} tuple
+ *     - some max number of Ndb objects to poll concurrently in a commit thread
+ * 
+ */
+class Scheduler_flex : public Scheduler {  
+public:
+  class Broker;
+  class Cluster;
+  class thread_spec;
+
+  friend class Broker;
+  friend class Cluster;
+  friend class thread_spec;
+    
+  Scheduler_flex() {};
+  ~Scheduler_flex();
+  void init(int threadnum, int nthreads, const char *config_string);
+  void attach_thread(thread_identifier *);
+  ENGINE_ERROR_CODE schedule(workitem *);
+  void io_completed(workitem *);
+  void add_stats(ADD_STAT, const void *);  
+
+protected:
+  int thread_id;
+  int do_queue_sample;
+  int nbrokers;
+  int broker_queue_size;
+  struct workqueue * broker_queue;  
+  struct flex_cf {
+    int n_engine_threads;    //< no. of libevent worker threads in memcached
+    int n_broker_threads;    //< no. of separate broker threads, 0 - 2
+    int n_commit_threads;    //< no. of commit threads per {broker,cluster} pair
+    int n_db_pending;        //< no. of pollable Ndbs inside a commit thread
+    int n_connections;       //< size of Ndb_cluster_connection pool
+  } config;  
+  struct flex_stats {
+    double total_avg_ndb_depth;
+    double total_avg_commit_queue_depth;
+    uint64_t broker_queue_total_depth;
+    uint64_t broker_queue_samples;
+  } stats;
+
+  Broker * brokers[2];
+};
+
+
+/* Random stat samples will on average be taken twice as often as 
+   FLEX_STATS_SAMPLE_INTERVAL (based on uniform random distribution). 
+   FLEX_STATS_INITIAL_INTERVAL is lower so as to get stats quickly on startup.
+*/
+#define FLEX_STATS_INITIAL_INTERVAL 20
+#define FLEX_STATS_SAMPLE_INTERVAL 400
+
+
+#endif
+


Attachment: [text/bzr-bundle] bzr/john.duncan@oracle.com-20110406043239-uh19ec62n20vlf7r.bundle
Thread
bzr commit into mysql-5.1-telco-7.2 branch (john.duncan:4151) John David Duncan6 Apr