#At file:///Users/jdd/bzr-repo/working/5.1-telco-7.2-memcache/ based on revid:john.duncan@stripped
4151 John David Duncan 2011-04-05
Add files missing from last commit
added:
storage/ndb/memcache/src/schedulers/Flex.cc
storage/ndb/memcache/src/schedulers/Flex.h
=== added file 'storage/ndb/memcache/src/schedulers/Flex.cc'
--- a/storage/ndb/memcache/src/schedulers/Flex.cc 1970-01-01 00:00:00 +0000
+++ b/storage/ndb/memcache/src/schedulers/Flex.cc 2011-04-06 04:32:39 +0000
@@ -0,0 +1,208 @@
+/*
+ Copyright (c) 2011, Oracle and/or its affiliates. All rights
+ reserved.
+
+ This program is free software; you can redistribute it and/or
+ modify it under the terms of the GNU General Public License
+ as published by the Free Software Foundation; version 2 of
+ the License.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ 02110-1301 USA
+ */
+/* System headers */
+/* C++ files must define __STDC_FORMAT_MACROS in order to get PRIu64 */
+#define __STDC_FORMAT_MACROS
+#include <inttypes.h>
+#include <stdlib.h>
+#include <stdio.h>
+
+/* Memcache headers */
+#include "memcached/types.h"
+#include <memcached/extension_loggers.h>
+
+/* NDB Memcache headers */
+#include "workitem.h"
+#include "thread_identifier.h"
+#include "ClusterConnectionPool.h"
+#include "Flex.h"
+#include "Flex_thread_spec.h"
+#include "Flex_broker.h"
+#include "Flex_cluster.h"
+
+
+extern EXTENSION_LOGGER_DESCRIPTOR *logger;
+
+
+extern "C" {
+ void * run_flex_broker_thread(void *);
+}
+
+
+void Scheduler_flex::init(int t, int nthreads, const char *user_config) {
+ thread_id = t;
+ const char * active_cf;
+
+ /* Set some baseline default values for the configuration */
+ config.n_engine_threads = nthreads;
+ config.n_broker_threads = 0;
+ config.n_commit_threads = 2;
+ config.n_db_pending = 0;
+ config.n_connections = 1;
+
+ /* The default config parameters */
+ static const char * default_config = "b0,c2,p1";
+
+ /* Choose which string to parse */
+ if(user_config && *user_config) active_cf = user_config;
+ else active_cf = default_config;
+
+ /* disregard the return value from sscanf(), but test the config for validity,
+ and write it to the logger when the scheduler starts running */
+ sscanf(active_cf, "b%d,c%d,p%d",
+ & config.n_broker_threads, & config.n_commit_threads,
+ & config.n_connections);
+
+ /* Test validity of configuration */
+ if(config.n_broker_threads < 0 || config.n_broker_threads > 2) {
+ logger->log(LOG_WARNING, 0, "Invalid scheduler configuration.\n");
+ assert(config.n_broker_threads >= 0 && config.n_broker_threads <= 2);
+ }
+ if(config.n_commit_threads < 1 || config.n_commit_threads > 16) {
+ logger->log(LOG_WARNING, 0, "Invalid scheduler configuration.\n");
+ assert(config.n_commit_threads >= 1 && config.n_commit_threads <= 16);
+ }
+ if(config.n_db_pending < 0 || config.n_db_pending > 16) {
+ logger->log(LOG_WARNING, 0, "Invalid scheduler configuration.\n");
+ assert(config.n_db_pending >= 0 && config.n_db_pending <= 16);
+ }
+ if(config.n_connections < 1 || config.n_connections > 4) {
+ logger->log(LOG_WARNING, 0, "Invalid scheduler configuration.\n");
+ assert(config.n_connections >= 1 && config.n_connections <= 4);
+ }
+
+ /* There is always a broker, even if there is no broker thread */
+ nbrokers = config.n_broker_threads > 1 ? config.n_broker_threads : 1;
+
+ /* Create a workqueue for the broker thread */
+ broker_queue_size = 8192;
+ if(config.n_broker_threads) {
+ do_queue_sample = random() % FLEX_STATS_INITIAL_INTERVAL;
+ broker_queue = (struct workqueue *) malloc(sizeof(struct workqueue));
+ workqueue_init(broker_queue, broker_queue_size, config.n_broker_threads);
+ }
+ else broker_queue = 0;
+
+ /* Initialize the brokers.
+ If there are no broker threads, then broker[0]'s methods run in the engine
+ thread. Otherwise each broker will run mostly within a broker thread.
+ */
+ for(int i = 0 ; i < nbrokers ; i++)
+ brokers[i] = new Broker(this, i);
+}
+
+
+void Scheduler_flex::attach_thread(thread_identifier * parent) {
+ const Configuration & conf = get_Configuration();
+ pipeline = parent->pipeline;
+
+ logger->log(LOG_WARNING, 0, "Pipeline %d attached to flex scheduler: "
+ "config \"b%d,c%d,p%d\"; %d cluster%s.\n", pipeline->id,
+ config.n_broker_threads, config.n_commit_threads,
+ config.n_connections, conf.nclusters,
+ conf.nclusters == 1 ? "" : "s");
+
+ // Launch commit threads for each cluster
+ for(int i = 0 ; i < nbrokers; i++)
+ for(int j = 0 ; j < conf.nclusters ; j++)
+ brokers[i]->clusters[j]->attach_thread(parent);
+
+ // Adjust the thread stack size for the broker threads
+ pthread_attr_t broker_thd_attr;
+ size_t thd_stack_size;
+ pthread_attr_init(& broker_thd_attr);
+ pthread_attr_getstacksize(& broker_thd_attr, & thd_stack_size);
+ pthread_attr_setstacksize(& broker_thd_attr, thd_stack_size / 2);
+
+ // Launch the broker threads
+ for(int i = 0; i < config.n_broker_threads ; i++) {
+ thread_spec *spec = new thread_spec(this, parent, i, 0, 0);
+ pthread_create(& brokers[i]->thread_id, & broker_thd_attr,
+ run_flex_broker_thread, (void *) spec);
+ }
+}
+
+
+ENGINE_ERROR_CODE Scheduler_flex::schedule(workitem *newitem) {
+ if(config.n_broker_threads) {
+
+ if(do_queue_sample-- == 0) {
+ /* See comments in Cluster::schedule() about queue depth sampling */
+ int depth = broker_queue->depth + 1;
+ if(depth > 0 && depth <= broker_queue_size) {
+ stats.broker_queue_total_depth += depth ;
+ stats.broker_queue_samples++;
+ }
+ do_queue_sample = random() % FLEX_STATS_SAMPLE_INTERVAL;
+ }
+
+ workqueue_add(broker_queue, newitem);
+
+ return ENGINE_EWOULDBLOCK;
+ }
+ else {
+ return brokers[0]->schedule(newitem);
+ }
+}
+
+
+void Scheduler_flex::add_stats(ADD_STAT add_stat, const void * cookie) {
+ char key[128];
+ char val[128];
+ int klen, vlen, p;
+
+ stats.total_avg_ndb_depth = 0;
+ stats.total_avg_commit_queue_depth = 0;
+ for(int i = 0 ; i < nbrokers ; i++)
+ brokers[i]->contribute_stats();
+
+ klen = sprintf(key, "t%d_avg_ndb_depth", pipeline->id);
+ vlen = sprintf(val, "%.3f", stats.total_avg_ndb_depth / nbrokers);
+ add_stat(key, klen, val, vlen, cookie);
+
+ klen = sprintf(key, "t%d_avg_commit_queue_depth", pipeline->id);
+ vlen = sprintf(val, "%.3f", stats.total_avg_commit_queue_depth / nbrokers);
+ add_stat(key, klen, val, vlen, cookie);
+
+ if(config.n_broker_threads && stats.broker_queue_samples) {
+ klen = sprintf(key, "t%d_broker_queue_avg_depth", pipeline->id);
+ vlen = sprintf(val, "%.3f",
+ (double) stats.broker_queue_total_depth / stats.broker_queue_samples);
+ add_stat(key, klen, val, vlen, cookie);
+ }
+}
+
+
+void Scheduler_flex::io_completed(workitem *item) {
+ int b = item->base.broker;
+ int c = item->prefix_info.cluster_id;
+ brokers[b]->clusters[c]->io_completed(item);
+}
+
+
+void * run_flex_broker_thread(void *s) {
+ Scheduler_flex::thread_spec *spec = (Scheduler_flex::thread_spec *) s;
+ set_child_thread_id(spec->parent, "broker%d", spec->broker);
+
+ spec->run_broker_thread();
+
+ delete spec;
+ return 0;
+}
=== added file 'storage/ndb/memcache/src/schedulers/Flex.h'
--- a/storage/ndb/memcache/src/schedulers/Flex.h 1970-01-01 00:00:00 +0000
+++ b/storage/ndb/memcache/src/schedulers/Flex.h 2011-04-06 04:32:39 +0000
@@ -0,0 +1,95 @@
+/*
+ Copyright (c) 2011, Oracle and/or its affiliates. All rights
+ reserved.
+
+ This program is free software; you can redistribute it and/or
+ modify it under the terms of the GNU General Public License
+ as published by the Free Software Foundation; version 2 of
+ the License.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ 02110-1301 USA
+ */
+
+#ifndef NDBMEMCACHE_FLEX_SCHEDULER_H
+#define NDBMEMCACHE_FLEX_SCHEDULER_H
+
+#ifndef __cplusplus
+#error "This file is for C++ only"
+#endif
+
+#include <memcached/types.h>
+
+#include "config.h"
+#include "Scheduler.h"
+#include "KeyPrefix.h"
+
+
+/*
+ * Flex Scheduler
+ *
+ * The scheduler is configurable:
+ * - 0, 1, or 2 "middleman" broker threads per engine thread
+ * - some number of commit threads per {broker, cluster id} tuple
+ * - some max number of Ndb objects to poll concurrently in a commit thread
+ *
+ */
+class Scheduler_flex : public Scheduler {
+public:
+ class Broker;
+ class Cluster;
+ class thread_spec;
+
+ friend class Broker;
+ friend class Cluster;
+ friend class thread_spec;
+
+ Scheduler_flex() {};
+ ~Scheduler_flex();
+ void init(int threadnum, int nthreads, const char *config_string);
+ void attach_thread(thread_identifier *);
+ ENGINE_ERROR_CODE schedule(workitem *);
+ void io_completed(workitem *);
+ void add_stats(ADD_STAT, const void *);
+
+protected:
+ int thread_id;
+ int do_queue_sample;
+ int nbrokers;
+ int broker_queue_size;
+ struct workqueue * broker_queue;
+ struct flex_cf {
+ int n_engine_threads; //< no. of libevent worker threads in memcached
+ int n_broker_threads; //< no. of separate broker threads, 0 - 2
+ int n_commit_threads; //< no. of commit threads per {broker,cluster} pair
+ int n_db_pending; //< no. of pollable Ndbs inside a commit thread
+ int n_connections; //< size of Ndb_cluster_connection pool
+ } config;
+ struct flex_stats {
+ double total_avg_ndb_depth;
+ double total_avg_commit_queue_depth;
+ uint64_t broker_queue_total_depth;
+ uint64_t broker_queue_samples;
+ } stats;
+
+ Broker * brokers[2];
+};
+
+
+/* Random stat samples will on average be taken twice as often as
+ FLEX_STATS_SAMPLE_INTERVAL (based on uniform random distribution).
+ FLEX_STATS_INITIAL_INTERVAL is lower so as to get stats quickly on startup.
+*/
+#define FLEX_STATS_INITIAL_INTERVAL 20
+#define FLEX_STATS_SAMPLE_INTERVAL 400
+
+
+#endif
+
Attachment: [text/bzr-bundle] bzr/john.duncan@oracle.com-20110406043239-uh19ec62n20vlf7r.bundle
| Thread |
|---|
| • bzr commit into mysql-5.1-telco-7.2 branch (john.duncan:4151) | John David Duncan | 6 Apr |