From: John David Duncan Date: April 6 2011 4:31am Subject: bzr commit into mysql-5.1-telco-7.2 branch (john.duncan:4150) List-Archive: http://lists.mysql.com/commits/134761 Message-Id: <201104060432.p364WDmi004073@acsmt356.oracle.com> MIME-Version: 1.0 Content-Type: multipart/mixed; boundary="===============1131761229==" --===============1131761229== MIME-Version: 1.0 Content-Type: text/plain; charset="us-ascii" Content-Transfer-Encoding: 7bit Content-Disposition: inline #At file:///Users/jdd/bzr-repo/working/5.1-telco-7.2-memcache/ based on revid:john.duncan@stripped 4150 John David Duncan 2011-04-05 Remove Dispatch scheduler; fix deadlock in Stockholm & Flex schedulers; implement flush_all removed: storage/ndb/memcache/src/schedulers/Dispatch.cc storage/ndb/memcache/src/schedulers/Dispatch.h storage/ndb/memcache/src/schedulers/flex.cc storage/ndb/memcache/src/schedulers/flex.h added: storage/ndb/memcache/src/schedulers/Flex_broker.cc storage/ndb/memcache/src/schedulers/Flex_broker.h storage/ndb/memcache/src/schedulers/Flex_cluster.cc storage/ndb/memcache/src/schedulers/Flex_cluster.h storage/ndb/memcache/src/schedulers/Flex_thread_spec.h modified: storage/ndb/memcache/Makefile.am storage/ndb/memcache/README storage/ndb/memcache/include/ClusterConnectionPool.h storage/ndb/memcache/include/NdbInstance.h storage/ndb/memcache/include/Operation.h storage/ndb/memcache/include/QueryPlan.h storage/ndb/memcache/include/Scheduler.h storage/ndb/memcache/include/ndb_pipeline.h storage/ndb/memcache/include/ndb_worker.h storage/ndb/memcache/include/ndbmemcache_global.h storage/ndb/memcache/include/workitem.h storage/ndb/memcache/scripts/metadata.sql storage/ndb/memcache/src/Configuration.cc storage/ndb/memcache/src/NdbInstance.cc storage/ndb/memcache/src/ndb_engine.c storage/ndb/memcache/src/ndb_pipeline.cc storage/ndb/memcache/src/ndb_worker.cc storage/ndb/memcache/src/schedulers/3thread.cc storage/ndb/memcache/src/schedulers/3thread.h storage/ndb/memcache/src/schedulers/Stockholm.cc storage/ndb/memcache/src/schedulers/Stockholm.h storage/ndb/memcache/src/workitem.c storage/ndb/memcache/unit/Makefile.am storage/ndb/memcache/xcode/ndbmemcache.xcodeproj/project.pbxproj === modified file 'storage/ndb/memcache/Makefile.am' --- a/storage/ndb/memcache/Makefile.am 2011-03-30 06:54:53 +0000 +++ b/storage/ndb/memcache/Makefile.am 2011-04-06 04:31:28 +0000 @@ -34,12 +34,14 @@ ndb_engine_la_SOURCES= \ src/workqueue.c \ src/schedulers/Stockholm.h \ src/schedulers/Stockholm.cc \ - src/schedulers/Dispatch.h \ - src/schedulers/Dispatch.cc \ src/schedulers/3thread.h \ src/schedulers/3thread.cc \ - src/schedulers/flex.h \ - src/schedulers/flex.cc \ + src/schedulers/Flex.h \ + src/schedulers/Flex.cc \ + src/schedulers/Flex_broker.h \ + src/schedulers/Flex_broker.cc \ + src/schedulers/Flex_cluster.h \ + src/schedulers/Flex_cluster.cc \ src/atomics.c \ src/debug.c \ src/hash_item_util.c \ === modified file 'storage/ndb/memcache/README' --- a/storage/ndb/memcache/README 2011-03-30 06:54:53 +0000 +++ b/storage/ndb/memcache/README 2011-04-06 04:31:28 +0000 @@ -19,11 +19,18 @@ QUICK START OBTAIN MEMCACHED -------------------------- -Obtain memcached 1.6 source from github: -git clone http://github.com/memcached/memcached -git checkout engine-pu +# Obtain memcached 1.6 source from github, and then checkout the +# "engine-pu" branch which contains the "storage engine API" code +git clone git://github.com/memcached/memcached +cd memcached +git checkout -t origin/engine-pu +DECIDE WHERE MEMCACHED WILL BE INSTALLED +-------------------------- +# You will have to configure memcached and the NDB engine separately, but +# it is important + BUILD AND INSTALL MEMCACHED -------------------------- Note that memcached requires libevent 1.3 or newer. === modified file 'storage/ndb/memcache/include/ClusterConnectionPool.h' --- a/storage/ndb/memcache/include/ClusterConnectionPool.h 2011-03-30 06:54:53 +0000 +++ b/storage/ndb/memcache/include/ClusterConnectionPool.h 2011-04-06 04:31:28 +0000 @@ -24,6 +24,8 @@ #include +#include "NdbInstance.h" + class ClusterConnectionPool { /* public instance variables */ @@ -35,7 +37,8 @@ public: private: Ndb_cluster_connection *conn; unsigned int pool_size; - Ndb_cluster_connection *pool_connections[MAX_CONNECT_POOL]; + Ndb_cluster_connection * pool_connections[MAX_CONNECT_POOL]; + NdbInstance * master_instances[MAX_CONNECT_POOL]; /* public class methods */ public: @@ -59,6 +62,12 @@ public: /** Get the connection numbered "my_id modulo pool_size" */ Ndb_cluster_connection *getPooledConnection(int my_id) const; // inlined + + /** Set an NdbInstance to be the master instance for a connection */ + void setNdbInstance(int my_id, NdbInstance *); // inlined + + /** Get the master NdbInstance for this connection */ + NdbInstance * getNdbInstance(int my_id) const; // inlined }; /* Inline functions */ @@ -73,4 +82,12 @@ inline Ndb_cluster_connection * ClusterC return pool_connections[i % pool_size]; } +inline void ClusterConnectionPool::setNdbInstance(int i, NdbInstance *inst) { + master_instances[i % pool_size] = inst; +} + +inline NdbInstance * ClusterConnectionPool::getNdbInstance(int i) const { + return master_instances[i % pool_size]; +} + #endif === modified file 'storage/ndb/memcache/include/NdbInstance.h' --- a/storage/ndb/memcache/include/NdbInstance.h 2011-03-30 06:54:53 +0000 +++ b/storage/ndb/memcache/include/NdbInstance.h 2011-04-06 04:31:28 +0000 @@ -42,6 +42,7 @@ public: /* Public Instance Variables */ Ndb *db; ndbmc_atomic32_t in_use; + NdbInstance *next; protected: int nplans; === modified file 'storage/ndb/memcache/include/Operation.h' --- a/storage/ndb/memcache/include/Operation.h 2011-03-30 06:54:53 +0000 +++ b/storage/ndb/memcache/include/Operation.h 2011-04-06 04:31:28 +0000 @@ -120,6 +120,7 @@ inline Operation::Operation(QueryPlan *p key_buffer(kbuf) { if(op == OP_READ) record = plan->val_record; + else if(op == OP_FLUSH) record = plan->key_record; // scanning delete else record = plan->row_record; } === modified file 'storage/ndb/memcache/include/QueryPlan.h' --- a/storage/ndb/memcache/include/QueryPlan.h 2011-03-30 06:54:53 +0000 +++ b/storage/ndb/memcache/include/QueryPlan.h 2011-04-06 04:31:28 +0000 @@ -46,6 +46,7 @@ class QueryPlan { public: QueryPlan() : initialized(0) {}; QueryPlan(Ndb *, const TableSpec *, PlanOpts opts = NoOptions); + bool keyIsPrimaryKey(); /* public instance variables */ bool initialized; @@ -71,7 +72,6 @@ class QueryPlan { private: /* Private methods */ - bool keyIsPrimaryKey(); const NdbDictionary::Index * chooseIndex(); /* Private instance variables */ === modified file 'storage/ndb/memcache/include/Scheduler.h' --- a/storage/ndb/memcache/include/Scheduler.h 2011-03-30 06:54:53 +0000 +++ b/storage/ndb/memcache/include/Scheduler.h 2011-04-06 04:31:28 +0000 @@ -52,6 +52,10 @@ public: /** schedule() is called from the NDB Engine thread when an operation is ready to be queued for further async processing */ virtual ENGINE_ERROR_CODE schedule(workitem *) = 0; + + /** io_completed() is called from the NDB Engine thread when an IO + completion notification has been received */ + virtual void io_completed(workitem *) = 0; /** add_stats() allows the engine to delegate certain statistics to the scheduler. */ === modified file 'storage/ndb/memcache/include/ndb_pipeline.h' --- a/storage/ndb/memcache/include/ndb_pipeline.h 2011-03-30 06:54:53 +0000 +++ b/storage/ndb/memcache/include/ndb_pipeline.h 2011-04-06 04:31:28 +0000 @@ -118,6 +118,11 @@ void * initialize_scheduler(const char * /** pass a workitem to the configured scheduler, for execution */ ENGINE_ERROR_CODE pipeline_schedule_operation(ndb_pipeline *, struct workitem *); +/** schedule a "flush_all" operation */ +ENGINE_ERROR_CODE pipeline_flush_all(ndb_pipeline *); + +/** notify scheduler of IO completion */ +void pipeline_io_completed(ndb_pipeline *, struct workitem *); /***** MEMORY MANAGEMENT APIS *****/ === modified file 'storage/ndb/memcache/include/ndb_worker.h' --- a/storage/ndb/memcache/include/ndb_worker.h 2011-03-30 06:54:53 +0000 +++ b/storage/ndb/memcache/include/ndb_worker.h 2011-04-06 04:31:28 +0000 @@ -29,4 +29,6 @@ bool worker_prepare_operation(workitem * bool build_hash_item(workitem *, Operation &); +ENGINE_ERROR_CODE ndb_flush_all(ndb_pipeline *); + #endif === modified file 'storage/ndb/memcache/include/ndbmemcache_global.h' --- a/storage/ndb/memcache/include/ndbmemcache_global.h 2011-03-30 06:54:53 +0000 +++ b/storage/ndb/memcache/include/ndbmemcache_global.h 2011-04-06 04:31:28 +0000 @@ -48,7 +48,8 @@ enum { OP_READ = 8, OP_DELETE, OP_ARITHMETIC, - OP_SCAN + OP_SCAN, + OP_FLUSH }; #endif === modified file 'storage/ndb/memcache/include/workitem.h' --- a/storage/ndb/memcache/include/workitem.h 2011-03-30 06:54:53 +0000 +++ b/storage/ndb/memcache/include/workitem.h 2011-04-06 04:31:28 +0000 @@ -29,9 +29,12 @@ /* struct workitem is used in both C and C++ code, requiring a small hack: */ #ifdef __cplusplus #include "QueryPlan.h" +#include "NdbInstance.h" #define C_OR_CPP_QUERYPLAN QueryPlan +#define C_OR_CPP_NDBINSTANCE NdbInstance #else #define C_OR_CPP_QUERYPLAN void +#define C_OR_CPP_NDBINSTANCE void #endif @@ -46,7 +49,8 @@ typedef struct workitem { unsigned has_value : 1; /*! are we able to use a no-copy value? */ unsigned retries : 3; /*! how many times this job has been retried */ unsigned complete : 1; /*! is this operation finished? */ - unsigned _unused : 4; /*! (32 bits total) */ + unsigned broker : 2; /*! for use by the flex scheduler */ + unsigned _unused : 2; /*! (32 bits total) */ } base; unsigned int id; struct workitem *previous; /*! used to chain workitems in multi-key get */ @@ -56,6 +60,8 @@ typedef struct workitem { uint64_t math_value; /*! IN: incr initial value; OUT: incr result */ hash_item * cache_item; /*! used for write requests */ ndb_pipeline *pipeline; /*! pointer back to request pipeline */ + C_OR_CPP_NDBINSTANCE *ndb_instance; + /*! pointer to ndb instance, if applicable */ const void *cookie; /*! memcached's connection cookie */ C_OR_CPP_QUERYPLAN *plan; /*! QueryPlan for resolving this request */ const char *key; /*! pointer to the key */ @@ -137,6 +143,9 @@ const char * workitem_get_key_suffix(wor */ size_t workitem_get_key_buf_size(int nkey); +/*! Set the workitem's NdbInstance +*/ +void workitem_set_NdbInstance(workitem*, C_OR_CPP_NDBINSTANCE *); END_FUNCTIONS_WITH_C_LINKAGE === modified file 'storage/ndb/memcache/scripts/metadata.sql' --- a/storage/ndb/memcache/scripts/metadata.sql 2011-03-30 06:54:53 +0000 +++ b/storage/ndb/memcache/scripts/metadata.sql 2011-04-06 04:31:28 +0000 @@ -209,6 +209,7 @@ INSERT INTO memcache_server_roles (role_ INSERT INTO memcache_server_roles (role_name, role_id) VALUES ("db-only", 1); INSERT INTO memcache_server_roles (role_name, role_id) VALUES ("mc-only", 2); INSERT INTO memcache_server_roles (role_name, role_id) VALUES ("ndb-caching", 3); +INSERT INTO memcache_server_roles (role_name, role_id) VALUES ("test", 4); -- ndb_clusters table -- Create an entry for the primary cluster. @@ -216,9 +217,10 @@ INSERT INTO ndb_clusters values (0, @@nd -- cache_policies table -- Create some sample policies. -INSERT INTO cache_policies(policy_name, get_policy, set_policy, delete_policy) +INSERT INTO cache_policies VALUES("memcache-only", "cache_only", "cache_only", "cache_only"), ("ndb-only", "ndb_only", "ndb_only", "ndb_only"), + ("ndb-test", "ndb_only", "ndb_only", "ndb_only", "true"), ("caching", "caching", "caching", "caching"), ("caching-with-local-deletes", "caching", "caching", "cache_only"), ("ndb-read-only", "ndb_only", "disabled", "disabled"); @@ -241,7 +243,8 @@ INSERT INTO key_prefixes (server_role_id (1, "", 0, "ndb-only", "demo_table"), (1, "t:", 0, "ndb-only", "demo_tabs"), (2, "", 0, "memcache-only", NULL), - (3, "", 0, "caching", "demo_table") + (3, "", 0, "caching", "demo_table"), + (4, "", 0, "ndb-test", "demo_table") ; === modified file 'storage/ndb/memcache/src/Configuration.cc' --- a/storage/ndb/memcache/src/Configuration.cc 2011-03-30 06:54:53 +0000 +++ b/storage/ndb/memcache/src/Configuration.cc 2011-04-06 04:31:28 +0000 @@ -513,7 +513,9 @@ bool config_v1::get_policies() { success = false; } DEBUG_PRINT("map size: %d", policies.size()); - + + tx->close(); + return success; } === modified file 'storage/ndb/memcache/src/NdbInstance.cc' --- a/storage/ndb/memcache/src/NdbInstance.cc 2011-03-30 06:54:53 +0000 +++ b/storage/ndb/memcache/src/NdbInstance.cc 2011-04-06 04:31:28 +0000 @@ -33,6 +33,7 @@ NdbInstance::NdbInstance(Ndb_cluster_con nplans = nprefixes; db = new Ndb(c); in_use = false; + next = 0; plans = new QueryPlan *[nplans]; memset(plans, 0, (nplans * sizeof(QueryPlan *))); db->init(ntransactions); === modified file 'storage/ndb/memcache/src/ndb_engine.c' --- a/storage/ndb/memcache/src/ndb_engine.c 2011-03-30 06:54:53 +0000 +++ b/storage/ndb/memcache/src/ndb_engine.c 2011-04-06 04:31:28 +0000 @@ -100,22 +100,27 @@ ENGINE_ERROR_CODE create_instance(uint64 ndb_eng->npipelines = 0; ndb_eng->engine.interface.interface = 1; - ndb_eng->engine.get_info = ndb_get_info; - ndb_eng->engine.initialize = ndb_initialize; - ndb_eng->engine.destroy = ndb_destroy; - ndb_eng->engine.allocate = ndb_allocate; - ndb_eng->engine.remove = ndb_remove; - ndb_eng->engine.release = ndb_release; - ndb_eng->engine.get = ndb_get; - ndb_eng->engine.get_stats = ndb_get_stats; - ndb_eng->engine.reset_stats = ndb_reset_stats; - ndb_eng->engine.store = ndb_store; - ndb_eng->engine.arithmetic = ndb_arithmetic; - ndb_eng->engine.flush = ndb_flush; - ndb_eng->engine.unknown_command = ndb_unknown_command; - ndb_eng->engine.item_set_cas = item_set_cas; /* reused */ - ndb_eng->engine.get_item_info = ndb_get_item_info; - + ndb_eng->engine.get_info = ndb_get_info; + ndb_eng->engine.initialize = ndb_initialize; + ndb_eng->engine.destroy = ndb_destroy; + ndb_eng->engine.allocate = ndb_allocate; + ndb_eng->engine.remove = ndb_remove; + ndb_eng->engine.release = ndb_release; + ndb_eng->engine.get = ndb_get; + ndb_eng->engine.get_stats = ndb_get_stats; + ndb_eng->engine.reset_stats = ndb_reset_stats; + ndb_eng->engine.store = ndb_store; + ndb_eng->engine.arithmetic = ndb_arithmetic; + ndb_eng->engine.flush = ndb_flush; + ndb_eng->engine.unknown_command = ndb_unknown_command; + ndb_eng->engine.item_set_cas = item_set_cas; /* reused */ + ndb_eng->engine.get_item_info = ndb_get_item_info; + ndb_eng->engine.get_stats_struct = NULL; + ndb_eng->engine.aggregate_stats = NULL; + ndb_eng->engine.tap_notify = NULL; + ndb_eng->engine.get_tap_iterator = NULL; + ndb_eng->engine.errinfo = NULL; + ndb_eng->server = *api; ndb_eng->get_server_api = get_server_api; @@ -127,8 +132,11 @@ ENGINE_ERROR_CODE create_instance(uint64 ndb_eng->info.info.description = "NDB Memcache " VERSION; ndb_eng->info.info.num_features = 3; ndb_eng->info.info.features[0].feature = ENGINE_FEATURE_CAS; + ndb_eng->info.info.features[0].description = NULL; ndb_eng->info.info.features[1].feature = ENGINE_FEATURE_PERSISTENT_STORAGE; + ndb_eng->info.info.features[1].description = NULL; ndb_eng->info.info.features[2].feature = ENGINE_FEATURE_LRU; + ndb_eng->info.info.features[2].description = NULL; /* Now call create_instace() for the default engine */ e = create_my_default_instance(interface, get_server_api, @@ -259,11 +267,10 @@ static ENGINE_ERROR_CODE ndb_allocate(EN const int flags, const rel_time_t exptime) { + DEBUG_ENTER(); struct ndb_engine* ndb_eng = ndb_handle(handle); struct default_engine *def_eng = default_handle(ndb_eng); - /* ndb_pipeline *pipeline = get_my_pipeline_config(ndb_eng); */ - /* DEBUG_ENTER(); */ return def_eng->engine.allocate(ndb_eng->m_default_engine, cookie, item, key, nkey, nbytes, flags, exptime); @@ -284,7 +291,6 @@ static ENGINE_ERROR_CODE ndb_remove(ENGI ENGINE_ERROR_CODE return_status = ENGINE_KEY_ENOENT; prefix_info_t prefix; workitem *wqitem; - DEBUG_ENTER(); /* Is this a callback after completed I/O? */ wqitem = ndb_eng->server.cookie->get_engine_specific(cookie); @@ -292,6 +298,7 @@ static ENGINE_ERROR_CODE ndb_remove(ENGI DEBUG_PRINT("Got callback: %s", wqitem->status->comment); ndb_eng->server.cookie->store_engine_specific(cookie, wqitem->previous); return_status = wqitem->status->status; + pipeline_io_completed(pipeline, wqitem); workitem_free(wqitem); return return_status; } @@ -380,6 +387,7 @@ static ENGINE_ERROR_CODE ndb_get(ENGINE_ pipeline->id, wqitem->id, wqitem->status->comment); *item = wqitem->cache_item; wqitem->base.complete = 1; + pipeline_io_completed(pipeline, wqitem); if(wqitem->status->status != ENGINE_SUCCESS) { DEBUG_PRINT("pop and free the workitem."); ndb_eng->server.cookie->store_engine_specific(cookie, wqitem->previous); @@ -458,6 +466,7 @@ static ENGINE_ERROR_CODE ndb_store(ENGIN if(wqitem) { // fixme: chaining DEBUG_PRINT("Got callback: %s", wqitem->status->comment); + pipeline_io_completed(pipeline, wqitem); return wqitem->status->status; } @@ -513,6 +522,7 @@ static ENGINE_ERROR_CODE ndb_arithmetic( DEBUG_PRINT("Got arithmetic callback: %s", wqitem->status->comment); wqitem->base.complete = 1; *result = wqitem->math_value; + pipeline_io_completed(pipeline, wqitem); if(wqitem->status->status != ENGINE_SUCCESS) { DEBUG_PRINT("pop and free the workitem."); ndb_eng->server.cookie->store_engine_specific(cookie, wqitem->previous); @@ -552,11 +562,22 @@ static ENGINE_ERROR_CODE ndb_arithmetic( static ENGINE_ERROR_CODE ndb_flush(ENGINE_HANDLE* handle, const void* cookie, time_t when) { +/* + Notes on flush: + The server will call *only* into ndb_flush (not to allocate or release). + The NDB engine ignores the "when" parameter. + Flush operations have special handling, outside of the scheduler. + They are performed synchronously. + And we always send the flush command to the cache engine. +*/ + + DEBUG_ENTER(); struct ndb_engine* ndb_eng = ndb_handle(handle); struct default_engine *def_eng = default_handle(ndb_eng); - /* ndb_pipeline *pipeline = get_my_pipeline_config(ndb_eng); */ - /* DEBUG_ENTER(); */ - return def_eng->engine.flush(ndb_eng->m_default_engine, cookie, when); + ndb_pipeline *pipeline = get_my_pipeline_config(ndb_eng); + + (void) def_eng->engine.flush(ndb_eng->m_default_engine, cookie, when); + return pipeline_flush_all(pipeline); } @@ -566,9 +587,9 @@ static ENGINE_ERROR_CODE ndb_unknown_com protocol_binary_request_header *request, ADD_RESPONSE response) { + DEBUG_ENTER(); struct ndb_engine* ndb_eng = ndb_handle(handle); struct default_engine *def_eng = default_handle(ndb_eng); - /* ndb_pipeline *pipeline = get_my_pipeline_config(ndb_eng); */ return def_eng->engine.unknown_command(ndb_eng->m_default_engine, cookie, request, response); === modified file 'storage/ndb/memcache/src/ndb_pipeline.cc' --- a/storage/ndb/memcache/src/ndb_pipeline.cc 2011-03-30 06:54:53 +0000 +++ b/storage/ndb/memcache/src/ndb_pipeline.cc 2011-04-06 04:31:28 +0000 @@ -33,11 +33,11 @@ #include "ndb_engine.h" #include "debug.h" #include "thread_identifier.h" +#include "ndb_worker.h" #include "schedulers/3thread.h" #include "schedulers/Stockholm.h" -#include "schedulers/Dispatch.h" -#include "schedulers/flex.h" +#include "schedulers/Flex.h" #define DEFAULT_SCHEDULER Scheduler_flex @@ -96,7 +96,7 @@ ndb_pipeline * ndb_pipeline_initialize(s self->engine_thread_id = pthread_self(); /* Get the configuration */ - Configuration Conf = get_Configuration(); + const Configuration & Conf = get_Configuration(); /* Create and set a thread identity */ tid = (thread_identifier *) memory_pool_alloc(self->pool, sizeof(thread_identifier)); @@ -181,8 +181,6 @@ void * initialize_scheduler(const char * s = new Scheduler_flex; if(!strncasecmp(cf, "stockholm", 10)) s = new Scheduler_stockholm; - else if(!strncasecmp(cf, "dispatch", 9)) - s = new Scheduler_dispatch; else if(!strncasecmp(cf, "3-thread", 9)) s = new Scheduler_3thread; else { @@ -205,6 +203,16 @@ ENGINE_ERROR_CODE pipeline_schedule_oper } +ENGINE_ERROR_CODE pipeline_flush_all(ndb_pipeline *self) { + return ndb_flush_all(self); +} + + +void pipeline_io_completed(ndb_pipeline *self, struct workitem *item) { + self->scheduler->io_completed(item); +} + + /* The slab allocator API */ int pipeline_get_size_class_id(size_t object_size) { === modified file 'storage/ndb/memcache/src/ndb_worker.cc' --- a/storage/ndb/memcache/src/ndb_worker.cc 2011-03-30 06:54:53 +0000 +++ b/storage/ndb/memcache/src/ndb_worker.cc 2011-04-06 04:31:28 +0000 @@ -46,11 +46,11 @@ #include "status_block.h" #include "Operation.h" #include "ndb_engine.h" -#include "debug.h" #include "hash_item_util.h" #include "ndb_worker.h" void incrCallback(int, NdbTransaction *, void *); // callback for incr/decr +void flush_scan(int result, NdbTransaction *tx, void *itemptr); // callback for flush void DBcallback(int, NdbTransaction *, void *); // callback for all others bool worker_do_read(workitem *, bool with_cas); @@ -62,6 +62,7 @@ void worker_set_cas(ndb_pipeline *, uint bool finalize_read(workitem *); bool finalize_write(workitem *, bool); int build_cas_routine(NdbInterpretedCode *r, int cas_col, uint64_t cas_val); +bool scan_delete(NdbInstance *, QueryPlan *); extern EXTENSION_LOGGER_DESCRIPTOR *logger; @@ -96,7 +97,7 @@ status_block status_block_bad_add = status_block status_block_bad_replace = { ENGINE_NOT_STORED, "Tuple not found" }; - + void worker_set_cas(ndb_pipeline *p, uint64_t *cas) { /* Be careful here -- ndbmc_atomic32_t might be a signed type. @@ -143,6 +144,7 @@ bool worker_prepare_operation(workitem * default: return false; /* not supported */ } + return r; /* fixme: distinguish "not supported" from "failed" */ } @@ -651,7 +653,7 @@ bool finalize_read(workitem *wqitem) { && op.getStringValueNoCopy(COL_STORE_VALUE, & wqitem->value_ptr, & wqitem->value_size) && op.appendCRLF(COL_STORE_VALUE, wqitem->value_size)) { - /* The workiten's value_ptr and value_size were set above. */ + /* The workitem's value_ptr and value_size were set above. */ DEBUG_PRINT("using no-copy buffer."); wqitem->base.has_value = true; need_hash_item = false; @@ -766,7 +768,70 @@ int build_cas_routine(NdbInterpretedCode return r->finalise(); // resolve the label/branch } + +/* Flush all is a fully synchronous operation -- + the memcache server is waiting for a response, and the thread is blocked. +*/ +ENGINE_ERROR_CODE ndb_flush_all(ndb_pipeline *pipeline) { + DEBUG_ENTER(); + const Configuration &conf = get_Configuration(); + + DEBUG_PRINT(" %d prefixes", conf.nprefixes); + for(int i = 0 ; i < conf.nprefixes ; i++) { + const KeyPrefix *p = conf.getPrefix(i); + if(p->info.use_ndb && p->info.do_db_flush) { + NdbInstance *inst = conf.getConnectionById(p->info.cluster_id)-> + getNdbInstance(pipeline->id); + QueryPlan *plan = inst->getPlanForPrefix(p); + if(plan->keyIsPrimaryKey()) { + /* To flush, scan the table and delete every row */ + /* ToDo: ensure against duplicates (don't scan the same table twice) */ + DEBUG_PRINT("deleting from %s", p->table->table_name); + scan_delete(inst, plan); + } + else DEBUG_PRINT(" not scanning table %s -- access path is not primary key", + p->table->table_name); + } + else DEBUG_PRINT(" Not scanning table %s -- use_ndb:%d flush:%d", + p->table->table_name, p->info.use_ndb, p->info.do_db_flush); + } + + return ENGINE_SUCCESS; +} + + +bool scan_delete(NdbInstance *inst, QueryPlan *plan) { + DEBUG_ENTER(); + int res = 0; + int check; + NdbTransaction *tx = inst->db->startTransaction(); + NdbScanOperation *scan = tx->getNdbScanOperation(plan->table); + scan->readTuplesExclusive(); + + /* execute NoCommit */ + if((res = tx->execute(NdbTransaction::NoCommit)) != 0) + logger->log(LOG_WARNING, 0, "execute(NoCommit): %s\n", tx->getNdbError().message); + + /* scan and delete */ + while(scan->nextResult(true) == 0) { + do { + if((res = scan->deleteCurrentTuple()) != 0) { + logger->log(LOG_WARNING, 0, "deleteCurrentTuple(): %s\n", + tx->getNdbError().message); + } + } while((check = scan->nextResult(false)) == 0); + + /* execute a batch */ + if(check != -1) + if((res = tx->execute(NdbTransaction::NoCommit)) != 0) + logger->log(LOG_WARNING, 0, "execute(NoCommit): %s\n", + tx->getNdbError().message); + } + tx->execute(NdbTransaction::Commit); // execute & commit + tx->close(); + return (res == 0); +} === modified file 'storage/ndb/memcache/src/schedulers/3thread.cc' --- a/storage/ndb/memcache/src/schedulers/3thread.cc 2011-03-30 06:54:53 +0000 +++ b/storage/ndb/memcache/src/schedulers/3thread.cc 2011-04-06 04:31:28 +0000 @@ -22,6 +22,7 @@ #define __STDC_FORMAT_MACROS #include #include +#include /* Memcache headers */ #include "memcached/types.h" @@ -43,7 +44,7 @@ extern "C" { status_block status_block_unsupported = { ENGINE_ENOTSUP, "Not supported." }; void Scheduler_3thread::init(int my_thread, int, const char *) { - const Configuration conf = get_Configuration(); + const Configuration & conf = get_Configuration(); ClusterConnectionPool *pool; Ndb_cluster_connection *conn; LockableNdbInstance *inst; @@ -175,7 +176,7 @@ void * run_3thd_worker_thread(void *s) { void * Scheduler_3thread::run_ndb_worker_thread() { DEBUG_ENTER(); workitem *newitem; - const Configuration conf = get_Configuration(); // see note + const Configuration & conf = get_Configuration(); // see note int lockerr; unsigned int ncycles = 0; === modified file 'storage/ndb/memcache/src/schedulers/3thread.h' --- a/storage/ndb/memcache/src/schedulers/3thread.h 2011-03-30 06:54:53 +0000 +++ b/storage/ndb/memcache/src/schedulers/3thread.h 2011-04-06 04:31:28 +0000 @@ -42,6 +42,7 @@ public: void init(int threadnum, int nthreads, const char *config_string); void attach_thread(thread_identifier *); ENGINE_ERROR_CODE schedule(workitem *); + void io_completed(workitem *) {}; void add_stats(ADD_STAT, const void *); void * run_ndb_worker_thread(); void * run_ndb_commit_thread(); === removed file 'storage/ndb/memcache/src/schedulers/Dispatch.cc' --- a/storage/ndb/memcache/src/schedulers/Dispatch.cc 2011-03-30 06:54:53 +0000 +++ b/storage/ndb/memcache/src/schedulers/Dispatch.cc 1970-01-01 00:00:00 +0000 @@ -1,187 +0,0 @@ -/* - Copyright (c) 2011, Oracle and/or its affiliates. All rights - reserved. - - This program is free software; you can redistribute it and/or - modify it under the terms of the GNU General Public License - as published by the Free Software Foundation; version 2 of - the License. - - This program is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU General Public License for more details. - - You should have received a copy of the GNU General Public License - along with this program; if not, write to the Free Software - Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA - 02110-1301 USA - */ -/* System headers */ -/* C++ files must define __STDC_FORMAT_MACROS in order to get PRIu64 */ -#include "config.h" -#ifdef HAVE_DISPATCH_DISPATCH_H - -#define __STDC_FORMAT_MACROS -#include -#include -#include - -/* Memcache headers */ -#include "memcached/types.h" -#include - -/* NDB Memcache headers */ -#include "Dispatch.h" -#include "workitem.h" -#include "ndb_worker.h" - -extern EXTENSION_LOGGER_DESCRIPTOR *logger; - -extern "C" { - void dispatch_pollNdb(void *arg); -} - - -void Scheduler_dispatch::init(int, int nthreads, const char *) { - const Configuration conf = get_Configuration(); - - /* Get the NDB instances. - - Each pipeline has 75 NDB instances per cluster. - Each instance can handle 1 transaction. - - With 75 instances, a thread can handle 25,000 ops/sec (25 ops per ms) - and all operations can be in-flight for 3 milliseconds. - - We maintain the instances in an array of lists, so - instances[n] is the head list of instances for cluster n. - - TODO: This needs to work with all defined clusters - */ - - for(int j = 0 ; j < 75 ; j++) { - NdbInstance *inst = new NdbInstance(conf.getConnectionById(0), - conf.nprefixes, - 1); - inst->db->init(1); - instances[j] = inst; - } - - /* Now hoarde some transactions (API connect records). startTransaction() - will send TC_SEIZEREQ and wait for a reply. This really needs to be done - at initialization time when nobody is waiting for it. - TODO: this should be one array per cluster - */ - NdbTransaction * txlist[75]; - for(int j = 0 ; j < 75 ; j++) { - txlist[j] = instances[j]->db->startTransaction(); - } - - for(int j = 0 ; j < 75 ; j++) { - txlist[j]->close(); - } - - /* Get the dispatch queue */ - asyncqueue = dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_HIGH, 0); -} - - -void Scheduler_dispatch::attach_thread(thread_identifier * p) { - - pipeline = parent->pipeline; - logger->log(LOG_WARNING, 0, "Pipeline %d attached to libdispatch scheduler.\n", - pipeline->id); - - /* Initialize counters */ - instance_counter = 0; -} - - -ENGINE_ERROR_CODE Scheduler_dispatch::schedule(workitem *newitem) { - NdbInstance *inst; - const Configuration conf = get_Configuration(); // see note - - /* Fetch the config for its key prefix */ - const KeyPrefix *pfx = conf.getPrefixByInfo(newitem->prefix_info); - - /* From here on we will work mainly with the suffix part of the key. */ - newitem->base.nsuffix = newitem->base.nkey - pfx->prefix_len; - // FIXME: runtime error back to client? - DEBUG_ASSERT(newitem->base.nsuffix > 0); - - /* Get an NDB instance. But if the instance is in use, go forward and - get the next one. If something goes wrong (cluster crash, disconnected...) - and the commit thread can no longer release instances, it's possible that - there will be no free instances and this will loop forever. - FIXME: catch this condition somehow. - */ - do { - inst = instances[instance_counter]; - instance_counter = (++ instance_counter % 75); - } while(inst->in_use == true); - - // Now we've got an NDB Instance. - inst->in_use = true; - - // Fetch the query plan for this prefix. - newitem->plan = inst->getPlanForPrefix(pfx); - - // Build the NDB transaction - if(worker_prepare_operation(newitem)) { - // Queue the operation for a dispatch thread. - dispatch_async_f(asyncqueue, inst, dispatch_pollNdb); - return ENGINE_EWOULDBLOCK; - } - else return ENGINE_ENOTSUP; -} - - -void Scheduler_dispatch::add_stats(ADD_STAT add_stat, - const void * cookie) { - return; -} - - -/* - libdispatch version of the commit thread: - Take an NdbInstance, call pollNdb() on it, then mark it as free: -*/ -void dispatch_pollNdb(void *arg) { - NdbInstance *inst = (NdbInstance *) arg; - - /* Poll */ - inst->db->pollNdb(); - - /* Now we are done with the instance */ - inst->in_use = false; -} - - -#else - -/* HAVE_DISPATCH_DISPATCH_H */ -/* Stub version of the dispatch scheduler. */ -/* This allows the C++ compiler to create a vtable, so that the engine can be - linked into memcached, but the stub constructor in dispatch.h will produce - an error message and exit. -*/ - -/* System headers */ -#include - -/* Memcache headers */ -#include "memcached/types.h" -#include - -/* NDB Memcache headers */ -#include "Dispatch.h" - - -void Scheduler_dispatch::init(int, int, const char *) { return; } -void Scheduler_dispatch::attach_thread(thread_identifier *) { return; } -void Scheduler_dispatch::add_stats(ADD_STAT,const void *) { return; } -ENGINE_ERROR_CODE Scheduler_dispatch::schedule(workitem *) { return ENGINE_FAILED; } - - -#endif === removed file 'storage/ndb/memcache/src/schedulers/Dispatch.h' --- a/storage/ndb/memcache/src/schedulers/Dispatch.h 2011-03-30 06:54:53 +0000 +++ b/storage/ndb/memcache/src/schedulers/Dispatch.h 1970-01-01 00:00:00 +0000 @@ -1,83 +0,0 @@ -/* - Copyright (c) 2011, Oracle and/or its affiliates. All rights - reserved. - - This program is free software; you can redistribute it and/or - modify it under the terms of the GNU General Public License - as published by the Free Software Foundation; version 2 of - the License. - - This program is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU General Public License for more details. - - You should have received a copy of the GNU General Public License - along with this program; if not, write to the Free Software - Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA - 02110-1301 USA - */ -#ifndef NDBMEMCACHE_DISPATCH_SCHEDULER_H -#define NDBMEMCACHE_DISPATCH_SCHEDULER_H - -#ifndef __cplusplus -#error "This file is for C++ only" -#endif - -#include - -#include "config.h" -#include "Scheduler.h" - -#ifdef HAVE_DISPATCH_DISPATCH_H -#include - -/* - * Dispatch Scheduler - * - * The dispatch scheduler runs only on platforms that support libdispatch. - * It uses a large number of Ndb objects. - */ - - -class Scheduler_dispatch : public Scheduler { -public: - Scheduler_dispatch() {}; - ~Scheduler_dispatch(); - void init(int threadnum, int nthreads, const char *config_string); - void attach_thread(thread_identifier *); - ENGINE_ERROR_CODE schedule(workitem *); - void add_stats(ADD_STAT, const void *); - -private: - dispatch_queue_t asyncqueue; - int instance_counter; - NdbInstance *instances[75]; -}; - - -#else - /* Crippled version of the libdispatch scheduler. */ - -class Scheduler_dispatch : public Scheduler { -public: - Scheduler_dispatch(); - void init(int threadnum, int nthreads, const char *config_string); - void attach_thread(thread_identifier *); - ENGINE_ERROR_CODE schedule(workitem *); - void add_stats(ADD_STAT, const void *); -}; - - -inline Scheduler_dispatch::Scheduler_dispatch() { - fprintf(stderr, "ndbmemcache was compiled without libdispatch. " - "The dispatch scheduler is not available.\n " - "memcached will now exit.\n"); - exit(99); -} - -#endif - - -#endif - === added file 'storage/ndb/memcache/src/schedulers/Flex_broker.cc' --- a/storage/ndb/memcache/src/schedulers/Flex_broker.cc 1970-01-01 00:00:00 +0000 +++ b/storage/ndb/memcache/src/schedulers/Flex_broker.cc 2011-04-06 04:31:28 +0000 @@ -0,0 +1,92 @@ +/* + Copyright (c) 2011, Oracle and/or its affiliates. All rights + reserved. + + This program is free software; you can redistribute it and/or + modify it under the terms of the GNU General Public License + as published by the Free Software Foundation; version 2 of + the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA + 02110-1301 USA + */ + +#include +#include + +#include "workitem.h" +#include "KeyPrefix.h" +#include "Configuration.h" +#include "debug.h" +#include "Flex.h" +#include "Flex_thread_spec.h" +#include "Flex_broker.h" +#include "Flex_cluster.h" + + +Scheduler_flex::Broker::Broker(Scheduler_flex *s, int my_id) : + sched(s), + id(my_id), + conf(get_Configuration()), + queue(s->broker_queue) +{ + clusters = (Cluster **) calloc(conf.nclusters, sizeof(Cluster *)); + + for(int c = 0 ; c < conf.nclusters ; c++) { + clusters[c] = new Cluster(sched, id, c); + } +} + + +ENGINE_ERROR_CODE Scheduler_flex::Broker::schedule(workitem *item) { + const KeyPrefix *pfx = conf.getPrefixByInfo(item->prefix_info); + + if(item->prefix_info.prefix_id) { + DEBUG_PRINT("prefix %d: \"%s\" Table: %s Value Cols: %d", + item->prefix_info.prefix_id, pfx->prefix, + pfx->table->table_name, pfx->table->nvaluecols); + } + + /* Record the suffix length */ + item->base.nsuffix = item->base.nkey - pfx->prefix_len; + if(item->base.nsuffix == 0) return ENGINE_EINVAL; // key too short + + /* Set my broker id in the workitem */ + item->base.broker = id; + + clusters[item->prefix_info.cluster_id]->schedule(item, pfx); +} + + +void Scheduler_flex::Broker::contribute_stats() { + for(int i = 0 ; i < conf.nclusters ; i++) + clusters[i]->contribute_stats(); +} + + +/* + Broker thread: get a workitem off the workqueue, and build its operations. +*/ +void * Scheduler_flex::Broker::run_broker_thread() { + workitem *newitem; + + DEBUG_ENTER(); + + while(1) { + /* Wait for something to appear on the queue */ + newitem = (workitem *) workqueue_consumer_wait(queue); + + if(newitem == NULL) return 0; /* queue has been shut down and emptied */ + + (void) schedule(newitem); + } +} + + === added file 'storage/ndb/memcache/src/schedulers/Flex_broker.h' --- a/storage/ndb/memcache/src/schedulers/Flex_broker.h 1970-01-01 00:00:00 +0000 +++ b/storage/ndb/memcache/src/schedulers/Flex_broker.h 2011-04-06 04:31:28 +0000 @@ -0,0 +1,54 @@ +/* + Copyright (c) 2011, Oracle and/or its affiliates. All rights + reserved. + + This program is free software; you can redistribute it and/or + modify it under the terms of the GNU General Public License + as published by the Free Software Foundation; version 2 of + the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA + 02110-1301 USA + */ + +#ifndef NDBMEMCACHE_FLEX_BROKER_H +#define NDBMEMCACHE_FLEX_BROKER_H + +#ifndef __cplusplus +#error "This file is for C++ only" +#endif + + +class Scheduler_flex::Broker { + friend class Scheduler_flex; + friend class thread_spec; + +public: + Broker(Scheduler_flex *, int my_id); + ~Broker(); + ENGINE_ERROR_CODE schedule(workitem *); /*< schedule item on its cluster */ + + pthread_t thread_id; + Cluster ** clusters; + +protected: + void * run_broker_thread(); + void contribute_stats(); + +private: + int id; + const Configuration conf; + Scheduler_flex *sched; + struct workqueue *queue; +}; + + +#endif + === added file 'storage/ndb/memcache/src/schedulers/Flex_cluster.cc' --- a/storage/ndb/memcache/src/schedulers/Flex_cluster.cc 1970-01-01 00:00:00 +0000 +++ b/storage/ndb/memcache/src/schedulers/Flex_cluster.cc 2011-04-06 04:31:28 +0000 @@ -0,0 +1,248 @@ +/* + Copyright (c) 2011, Oracle and/or its affiliates. All rights + reserved. + + This program is free software; you can redistribute it and/or + modify it under the terms of the GNU General Public License + as published by the Free Software Foundation; version 2 of + the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA + 02110-1301 USA + */ + + +#include "workitem.h" +#include "KeyPrefix.h" +#include "Configuration.h" +#include "debug.h" +#include "Flex.h" +#include "Flex_thread_spec.h" +#include "Flex_broker.h" +#include "Flex_cluster.h" +#include "ndb_worker.h" + +extern EXTENSION_LOGGER_DESCRIPTOR *logger; + +extern "C" { + void * run_flex_commit_thread(void *); +} + + +Scheduler_flex::Cluster::Cluster(Scheduler_flex *s, + int broker, + int cluster) : sched(s), + broker_id(broker), + id(cluster), + conf(get_Configuration()) { + DEBUG_PRINT("thread: %d, broker: %d, cluster: %d", sched->thread_id, + broker, cluster); + /* Get the connection pool for my cluster */ + ClusterConnectionPool *pool = conf.getConnectionById(id); + + /* Get a number of NDB instances. + That number is determined here, using max_tps and usec_rtt. + We allow a transaction to be in-flight for 5 * RTT, + and we need enough NDB objects to meet one thread's share of max_tps. + I go step-by-step through the math here (the compiler may optimize it). + */ + double tx_time_in_usec = pool->usec_rtt * 5; + double tx_per_ndb_per_sec = 1000000 / tx_time_in_usec; + double total_ndb_objects = conf.max_tps / tx_per_ndb_per_sec; + double ndbs_per_pipeline = total_ndb_objects / sched->config.n_engine_threads; + nInst = (int) (ndbs_per_pipeline / sched->nbrokers); + + /* For now, the queue size is fixed */ + queue_size = 8192; + + /* Do we need more connections? */ + for(int n = pool->getPoolSize(); n < sched->config.n_connections; n++) + (void) pool->addPooledConnection(); // Maybe not all were added; that's OK. + + /* Now obtain the appropriate one for this thread. */ + Ndb_cluster_connection *conn = pool->getPooledConnection(sched->thread_id); + DEBUG_PRINT("broker %d, cluster %d, node %d: " + "%d TPS @ %d usec RTT ==> %d NDB instances.", broker_id, id, + conn->node_id(), conf.max_tps, pool->usec_rtt, nInst); + + // Get (and store) the master NDB instance for the connection + pool->setNdbInstance(sched->thread_id, + new NdbInstance(conn, conf.nprefixes, 128)); + + // Get the NDB instances + instances = (NdbInstance**) calloc(nInst, sizeof(NdbInstance *)); + for(int i = 0; i < nInst ; i++) { + NdbInstance *inst = new NdbInstance(conn, conf.nprefixes, 1); + instances[i] = inst; + inst->next = nextFree; + nextFree = inst; + } + + /* Hoard a transaction (an API connect record) for each Ndb object. This + first call to startTransaction() will send TC_SEIZEREQ and wait for a + reply, but later at runtime startTransaction() should return immediately. + Also, for each NDB instance, ore-build the QueryPlan for the default key prefix. + TODO? Start one tx *per data node*. + */ + QueryPlan *plan; + const KeyPrefix *default_prefix = conf.getDefaultPrefix(); + NdbTransaction ** txlist; + txlist = ( NdbTransaction **) calloc(nInst, sizeof(NdbTransaction *)); + + // Open them all. + for(int i = 0 ; i < nInst ; i++) { + NdbTransaction *tx; + plan = instances[i]->getPlanForPrefix(default_prefix); + tx = instances[i]->db->startTransaction(); + if(! tx) logger->log(LOG_WARNING, 0, instances[i]->db->getNdbError().message); + txlist[i] = tx; + } + + // Close them all. + for(int i = 0 ; i < nInst ; i++) { + txlist[i]->close(); + } + + // Free the list. + free(txlist); + + /* Allocate thread ids */ + commit_thread_ids = (pthread_t *) calloc(sched->config.n_commit_threads, + sizeof(pthread_t)); + + /* Allocate and initialize a workqueue */ + queue = (struct workqueue *) malloc(sizeof(struct workqueue)); + workqueue_init(queue, queue_size, sched->config.n_commit_threads); + + /* Randomly take samples of the workqueue depth */ + do_queue_sample = random() % FLEX_STATS_INITIAL_INTERVAL; +} + + +void Scheduler_flex::Cluster::attach_thread(thread_identifier *parent) { + /* Adjust the thread stack size */ + pthread_attr_t commit_thd_attr; + size_t thd_stack_size; + pthread_attr_init(& commit_thd_attr); + pthread_attr_getstacksize(& commit_thd_attr, & thd_stack_size); + pthread_attr_setstacksize(& commit_thd_attr, thd_stack_size / 4); + + /* Start the commit threads */ + for(int t = 0 ; t < sched->config.n_commit_threads ; t++) { + thread_spec * spec = new thread_spec(sched, parent, broker_id, id, t); + pthread_create(& commit_thread_ids[t], & commit_thd_attr, + run_flex_commit_thread, (void *) spec); + } +} + + +ENGINE_ERROR_CODE Scheduler_flex::Cluster::schedule(workitem *newitem, + const KeyPrefix *pfx) { + NdbInstance *inst; + + if (nextFree) + { + inst = nextFree; + nextFree = inst->next; + } + else + { + return ENGINE_TMPFAIL; + } + + workitem_set_NdbInstance(newitem, inst); + + // Fetch the query plan for this prefix. + newitem->plan = inst->getPlanForPrefix(pfx); + if(! newitem->plan) return ENGINE_FAILED; + + // Build the NDB transaction + if(worker_prepare_operation(newitem)) { + + if(do_queue_sample-- == 0) { // Sample for statistics. + /* To simulate sampling the depth immediately after the item is added, + sample it now and add one. */ + int depth = queue->depth + 1; + /* Queue depth can be erroneous because of thread races. + Use it only if it looks valid. */ + if(depth > 0 && depth <= queue_size) { + stats.queue_total_depth += depth; + stats.queue_samples++; + } + do_queue_sample = random() % FLEX_STATS_SAMPLE_INTERVAL; + } + + // Put the NdbInstance on the queue for the commit thread. + workqueue_add(queue, inst); + return ENGINE_EWOULDBLOCK; + } + else return ENGINE_ENOTSUP; +} + + +void Scheduler_flex::Cluster::io_completed(workitem *item) { + DEBUG_ENTER(); + NdbInstance* inst = item->ndb_instance; + item->ndb_instance = NULL; + if(inst) { + inst->next = nextFree; + nextFree = inst; + } +} + + +void Scheduler_flex::Cluster::contribute_stats() { + if(stats.cycles) + sched->stats.total_avg_ndb_depth += (double) stats.ndb_depth / stats.cycles; + if(stats.queue_samples) + sched->stats.total_avg_commit_queue_depth += + (double) stats.queue_total_depth / stats.queue_samples; +} + + +/* + Commit thread: Get an NdbInstance off the workqueue, and call pollNdb() on it. +*/ +void * Scheduler_flex::Cluster::run_commit_thread() { + DEBUG_ENTER(); + + NdbInstance *inst; + + while(1) { + /* Wait for something to appear on the queue */ + inst = (NdbInstance *) workqueue_consumer_wait(queue); + + if(inst == NULL) return 0; /* queue has been shut down and emptied */ + + /* Poll */ + inst->db->pollNdb(); + + /* Now we are done with the instance */ + atomic_cmp_swap_int(& inst->in_use, true, false); // in_use = false + } +} + + +void * run_flex_commit_thread(void *s) { + Scheduler_flex::thread_spec *spec = (Scheduler_flex::thread_spec *) s; + if(spec->has_broker_threads()) + set_child_thread_id(spec->parent, "br%d.cl%d.commit%d", + spec->broker, spec->cluster_id, spec->number); + else + set_child_thread_id(spec->parent, "cl%d.commit%d", + spec->cluster_id, spec->number); + + spec->run_commit_thread(); + + delete spec; + return 0; +} + + === added file 'storage/ndb/memcache/src/schedulers/Flex_cluster.h' --- a/storage/ndb/memcache/src/schedulers/Flex_cluster.h 1970-01-01 00:00:00 +0000 +++ b/storage/ndb/memcache/src/schedulers/Flex_cluster.h 2011-04-06 04:31:28 +0000 @@ -0,0 +1,69 @@ +/* + Copyright (c) 2011, Oracle and/or its affiliates. All rights + reserved. + + This program is free software; you can redistribute it and/or + modify it under the terms of the GNU General Public License + as published by the Free Software Foundation; version 2 of + the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA + 02110-1301 USA + */ + +#ifndef NDBMEMCACHE_FLEX_CLUSTER_H +#define NDBMEMCACHE_FLEX_CLUSTER_H + +#ifndef __cplusplus +#error "This file is for C++ only" +#endif + +#include "src/schedulers/Flex.h" +#include "src/schedulers/Flex_broker.h" + +class Scheduler_flex::Cluster { + friend class Broker; + friend class thread_spec; + +public: + Cluster(Scheduler_flex *, int broker, int cluster_id); + ~Cluster(); + + void attach_thread(thread_identifier *); /*< start commit threads */ + ENGINE_ERROR_CODE schedule(workitem *, const KeyPrefix *); + void io_completed(workitem *); + +protected: + void * run_commit_thread(); + void contribute_stats(); + struct cluster_stats { + uint64_t cycles; + uint64_t ndb_depth; + uint64_t queue_total_depth; + uint64_t queue_samples; + } stats; + +private: + const Configuration conf; + Scheduler_flex *sched; + int do_queue_sample; + int broker_id; + int id; + int queue_size; + struct workqueue *queue; + pthread_t * commit_thread_ids; + NdbInstance **instances; + NdbInstance *nextFree; + int nInst; +}; + + + +#endif === added file 'storage/ndb/memcache/src/schedulers/Flex_thread_spec.h' --- a/storage/ndb/memcache/src/schedulers/Flex_thread_spec.h 1970-01-01 00:00:00 +0000 +++ b/storage/ndb/memcache/src/schedulers/Flex_thread_spec.h 2011-04-06 04:31:28 +0000 @@ -0,0 +1,51 @@ +/* + Copyright (c) 2011, Oracle and/or its affiliates. All rights + reserved. + + This program is free software; you can redistribute it and/or + modify it under the terms of the GNU General Public License + as published by the Free Software Foundation; version 2 of + the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA + 02110-1301 USA + */ + +#ifndef NDBMEMCACHE_FLEX_THREAD_SPEC_H +#define NDBMEMCACHE_FLEX_THREAD_SPEC_H + +#ifndef __cplusplus +#error "This file is for C++ only" +#endif + +#include "Flex.h" +#include "Flex_broker.h" +#include "Flex_cluster.h" + +class Scheduler_flex::thread_spec { +public: + thread_spec(Scheduler_flex *s, thread_identifier *p, + int b, int c, int n) : sched(s), parent(p), + broker(b), cluster_id(c), number(n) {}; + + Broker * get_broker() { return sched->brokers[broker]; }; + bool has_broker_threads() { return (sched->config.n_broker_threads > 0); }; + void * run_broker_thread() { return get_broker()->run_broker_thread(); }; + Cluster * get_cluster() { return get_broker()->clusters[cluster_id]; }; + void * run_commit_thread() { return get_cluster()->run_commit_thread(); }; + + Scheduler_flex *sched; + thread_identifier *parent; + int broker; + int cluster_id; + int number; +}; + +#endif === modified file 'storage/ndb/memcache/src/schedulers/Stockholm.cc' --- a/storage/ndb/memcache/src/schedulers/Stockholm.cc 2011-03-30 06:54:53 +0000 +++ b/storage/ndb/memcache/src/schedulers/Stockholm.cc 2011-04-06 04:31:28 +0000 @@ -47,7 +47,7 @@ extern "C" { void Scheduler_stockholm::init(int my_thread, int nthreads, const char *config_string) { - const Configuration conf = get_Configuration(); + const Configuration & conf = get_Configuration(); /* For each cluster, we need a number of NDB instances; that number is determined here, using max_tps and usec_rtt. @@ -64,6 +64,7 @@ void Scheduler_stockholm::init(int my_th DEBUG_PRINT("cluster %d: %d TPS @ %d usec RTT ==> %d NDB instances.", c, conf.max_tps, pool->usec_rtt, cluster[c].nInst); } + // Get the NDB instances. for(int c = 0 ; c < conf.nclusters ; c++) { @@ -72,11 +73,21 @@ void Scheduler_stockholm::init(int my_th ClusterConnectionPool *pool = conf.getConnectionById(c); Ndb_cluster_connection *conn = pool->getPooledConnection(my_thread); - + + /* Set the master NdbInstance for the connection */ + pool->setNdbInstance(my_thread, + new NdbInstance(conn, conf.nprefixes, 128)); + + cluster[c].nextFree = NULL; for(int i = 0; i < cluster[c].nInst ; i++) { NdbInstance *inst = new NdbInstance(conn, conf.nprefixes, 1); cluster[c].instances[i] = inst; + inst->next = cluster[c].nextFree; + cluster[c].nextFree = inst; } + + logger->log(LOG_WARNING, 0, "Pipeline %d using %u Ndb instances for Cluster %u.\n", + my_thread, cluster[c].nInst, c); } /* Hoard a transaction (an API connect record) for each Ndb object. This @@ -116,7 +127,7 @@ void Scheduler_stockholm::init(int my_th void Scheduler_stockholm::attach_thread(thread_identifier * parent) { pipeline = parent->pipeline; - const Configuration conf = get_Configuration(); + const Configuration & conf = get_Configuration(); logger->log(LOG_WARNING, 0, "Pipeline %d attached to Stockholm scheduler; " "launching %d commit thread%s.\n", pipeline->id, conf.nclusters, @@ -137,7 +148,7 @@ void Scheduler_stockholm::attach_thread( ENGINE_ERROR_CODE Scheduler_stockholm::schedule(workitem *newitem) { NdbInstance *inst; int c; - const Configuration conf = get_Configuration(); + const Configuration & conf = get_Configuration(); /* Fetch the config for its key prefix */ const KeyPrefix *pfx = conf.getPrefixByInfo(newitem->prefix_info); @@ -154,21 +165,18 @@ ENGINE_ERROR_CODE Scheduler_stockholm::s c = newitem->prefix_info.cluster_id; - /* Get an NDB instance. But if the instance is in use, go forward and - get the next one. If something goes wrong (cluster crash, disconnected...) - and the commit thread can no longer release instances, it's possible that - there will be no free instances and this will loop forever. - FIXME: catch this condition somehow. - */ - int i = 0; - inst = cluster[c].instances[i]; - while(inst->in_use == 1) { - i = ( ++i % cluster[c].nInst); - inst = cluster[c].instances[i]; - }; - - // Now we've got an NDB Instance. - atomic_cmp_swap_int(& inst->in_use, false, true); // in_use = true + if (cluster[c].nextFree) + { + inst = cluster[c].nextFree; + cluster[c].nextFree = inst->next; + } + else + { + return ENGINE_TMPFAIL; + } + + + workitem_set_NdbInstance(newitem, inst); // Fetch the query plan for this prefix. newitem->plan = inst->getPlanForPrefix(pfx); @@ -176,7 +184,8 @@ ENGINE_ERROR_CODE Scheduler_stockholm::s // Build the NDB transaction if(worker_prepare_operation(newitem)) { - // Put the NdbInstance on the queue for the commit thread. + // Put the NdbInstance on the queue for the commit thread. + // Should probably be the workitem? workqueue_add(cluster[c].queue, inst); return ENGINE_EWOULDBLOCK; } @@ -184,12 +193,25 @@ ENGINE_ERROR_CODE Scheduler_stockholm::s } +void Scheduler_stockholm::io_completed(workitem *item) { + DEBUG_ENTER(); + NdbInstance* inst = item->ndb_instance; + item->ndb_instance = NULL; + + if(inst) { + int c = item->prefix_info.cluster_id; + inst->next = cluster[c].nextFree; + cluster[c].nextFree = inst; + } +} + + void Scheduler_stockholm::add_stats(ADD_STAT add_stat, const void * cookie) { char key[128]; char val[128]; int klen, vlen, p; - const Configuration conf = get_Configuration(); + const Configuration & conf = get_Configuration(); for(int c = 0 ; c < conf.nclusters; c++) { klen = sprintf(key, "pipeline_%d_cluster_%d_commit_cycles", pipeline->id, c); @@ -228,9 +250,6 @@ void * Scheduler_stockholm::run_ndb_comm /* Poll */ inst->db->pollNdb(); - /* Now we are done with the instance */ - atomic_cmp_swap_int(& inst->in_use, true, false); // in_use = false - cluster[c].stats.cycles++; if(! (cluster[c].stats.cycles % STAT_INTERVAL)) cluster[c].stats.commit_thread_vtime = get_thread_vtime(); === modified file 'storage/ndb/memcache/src/schedulers/Stockholm.h' --- a/storage/ndb/memcache/src/schedulers/Stockholm.h 2011-03-30 06:54:53 +0000 +++ b/storage/ndb/memcache/src/schedulers/Stockholm.h 2011-04-06 04:31:28 +0000 @@ -45,6 +45,7 @@ public: void init(int threadnum, int nthreads, const char *config_string); void attach_thread(thread_identifier *); ENGINE_ERROR_CODE schedule(workitem *); + void io_completed(workitem *); void add_stats(ADD_STAT, const void *); void * run_ndb_commit_thread(int cluster_id); @@ -58,6 +59,7 @@ private: pthread_t commit_thread_id; NdbInstance **instances; int nInst; + NdbInstance *nextFree; } cluster[MAX_CLUSTERS]; }; === removed file 'storage/ndb/memcache/src/schedulers/flex.cc' --- a/storage/ndb/memcache/src/schedulers/flex.cc 2011-03-30 06:54:53 +0000 +++ b/storage/ndb/memcache/src/schedulers/flex.cc 1970-01-01 00:00:00 +0000 @@ -1,464 +0,0 @@ -/* - Copyright (c) 2011, Oracle and/or its affiliates. All rights - reserved. - - This program is free software; you can redistribute it and/or - modify it under the terms of the GNU General Public License - as published by the Free Software Foundation; version 2 of - the License. - - This program is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU General Public License for more details. - - You should have received a copy of the GNU General Public License - along with this program; if not, write to the Free Software - Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA - 02110-1301 USA - */ -/* System headers */ -/* C++ files must define __STDC_FORMAT_MACROS in order to get PRIu64 */ -#define __STDC_FORMAT_MACROS -#include -#include -#include - -/* Memcache headers */ -#include "memcached/types.h" -#include - -/* NDB Memcache headers */ -#include "flex.h" -#include "workitem.h" -#include "ndb_worker.h" -#include "thread_identifier.h" -#include "ClusterConnectionPool.h" - -/* Random stat samples will on average be taken twice as often as - STATS_SAMPLE_INTERVAL (based on uniform random distribution). - STATS_INITIAL_INTERVAL is lower so as to get stats quickly on startup. -*/ -#define STATS_INITIAL_INTERVAL 20 -#define STATS_SAMPLE_INTERVAL 400 - -extern EXTENSION_LOGGER_DESCRIPTOR *logger; - - -extern "C" { - void * run_flex_commit_thread(void *); - void * run_flex_broker_thread(void *); -} - - -void Scheduler_flex::init(int t, int nthreads, const char *user_config) { - thread_id = t; - const char * active_cf; - - /* Set some baseline default values for the configuration */ - config.n_engine_threads = nthreads; - config.n_broker_threads = 0; - config.n_commit_threads = 2; - config.n_db_pending = 0; - config.n_connections = 1; - - /* The default config parameters */ - static const char * default_config = "b0,c2,p1"; - - /* Choose which string to parse */ - if(user_config && *user_config) active_cf = user_config; - else active_cf = default_config; - - /* disregard the return value from sscanf(), but test the config for validity, - and write it to the logger when the scheduler starts running */ - sscanf(active_cf, "b%d,c%d,p%d", - & config.n_broker_threads, & config.n_commit_threads, - & config.n_connections); - - /* Test validity of configuration */ - if(config.n_broker_threads < 0 || config.n_broker_threads > 2) { - logger->log(LOG_WARNING, 0, "Invalid scheduler configuration.\n"); - assert(config.n_broker_threads >= 0 && config.n_broker_threads <= 2); - } - if(config.n_commit_threads < 1 || config.n_commit_threads > 16) { - logger->log(LOG_WARNING, 0, "Invalid scheduler configuration.\n"); - assert(config.n_commit_threads >= 1 && config.n_commit_threads <= 16); - } - if(config.n_db_pending < 0 || config.n_db_pending > 16) { - logger->log(LOG_WARNING, 0, "Invalid scheduler configuration.\n"); - assert(config.n_db_pending >= 0 && config.n_db_pending <= 16); - } - if(config.n_connections < 1 || config.n_connections > 4) { - logger->log(LOG_WARNING, 0, "Invalid scheduler configuration.\n"); - assert(config.n_connections >= 1 && config.n_connections <= 4); - } - - /* There is always a broker, even if there is no broker thread */ - nbrokers = config.n_broker_threads > 1 ? config.n_broker_threads : 1; - - /* Create a workqueue for the broker thread */ - broker_queue_size = 8192; - if(config.n_broker_threads) { - do_queue_sample = random() % STATS_INITIAL_INTERVAL; - broker_queue = (struct workqueue *) malloc(sizeof(struct workqueue)); - workqueue_init(broker_queue, broker_queue_size, config.n_broker_threads); - } - else broker_queue = 0; - - /* Initialize the brokers. - If there are no broker threads, then broker[0]'s methods run in the engine - thread. Otherwise each broker will run mostly within a broker thread. - */ - for(int i = 0 ; i < nbrokers ; i++) - brokers[i] = new Broker(this, i); -} - - -Scheduler_flex::Broker::Broker(Scheduler_flex *s, - int my_id) : sched(s), - id(my_id), - conf(get_Configuration()), - queue(s->broker_queue) { - clusters = (Cluster **) calloc(conf.nclusters, sizeof(Cluster *)); - - for(int c = 0 ; c < conf.nclusters ; c++) { - clusters[c] = new Cluster(sched, id, c); - } -} - - -Scheduler_flex::Cluster::Cluster(Scheduler_flex *s, - int broker, - int cluster) : sched(s), - broker_id(broker), - id(cluster), - conf(get_Configuration()) { - DEBUG_PRINT("thread: %d, broker: %d, cluster: %d", sched->thread_id, - broker, cluster); - /* Get the connection pool for my cluster */ - ClusterConnectionPool *pool = conf.getConnectionById(id); - - /* Get a number of NDB instances. - That number is determined here, using max_tps and usec_rtt. - We allow a transaction to be in-flight for 5 * RTT, - and we need enough NDB objects to meet one thread's share of max_tps. - I go step-by-step through the math here (the compiler may optimize it). - */ - double tx_time_in_usec = pool->usec_rtt * 5; - double tx_per_ndb_per_sec = 1000000 / tx_time_in_usec; - double total_ndb_objects = conf.max_tps / tx_per_ndb_per_sec; - double ndbs_per_pipeline = total_ndb_objects / sched->config.n_engine_threads; - nInst = (int) (ndbs_per_pipeline / sched->nbrokers); - - /* For now, the queue size is fixed */ - queue_size = 8192; - - /* Do we need more connections? */ - for(int n = pool->getPoolSize(); n < sched->config.n_connections; n++) - (void) pool->addPooledConnection(); - /* Maybe not all of them were added; that's OK. */ - - /* Now obtain the appropriate one for this thread. */ - Ndb_cluster_connection *conn = pool->getPooledConnection(sched->thread_id); - - DEBUG_PRINT("broker %d, cluster %d, node %d: " - "%d TPS @ %d usec RTT ==> %d NDB instances.", broker_id, id, - conn->node_id(), conf.max_tps, pool->usec_rtt, nInst); - - // Get the NDB instances. - instances = (NdbInstance**) calloc(nInst, sizeof(NdbInstance *)); - for(int i = 0; i < nInst ; i++) { - instances[i] = new NdbInstance(conn, conf.nprefixes, 1); - } - - /* Hoard a transaction (an API connect record) for each Ndb object. This - first call to startTransaction() will send TC_SEIZEREQ and wait for a - reply, but later at runtime startTransaction() should return immediately. - Also, for each NDB instance, ore-build the QueryPlan for the default key prefix. - TODO? Start one tx *per data node*. - */ - QueryPlan *plan; - const KeyPrefix *default_prefix = conf.getDefaultPrefix(); - NdbTransaction ** txlist; - txlist = ( NdbTransaction **) calloc(nInst, sizeof(NdbTransaction *)); - - // Open them all. - for(int i = 0 ; i < nInst ; i++) { - NdbTransaction *tx; - plan = instances[i]->getPlanForPrefix(default_prefix); - tx = instances[i]->db->startTransaction(); - if(! tx) logger->log(LOG_WARNING, 0, instances[i]->db->getNdbError().message); - txlist[i] = tx; - } - - // Close them all. - for(int i = 0 ; i < nInst ; i++) { - txlist[i]->close(); - } - - // Free the list. - free(txlist); - - /* Allocate thread ids */ - commit_thread_ids = (pthread_t *) calloc(sched->config.n_commit_threads, - sizeof(pthread_t)); - - /* Allocate and initialize a workqueue */ - queue = (struct workqueue *) malloc(sizeof(struct workqueue)); - workqueue_init(queue, queue_size, sched->config.n_commit_threads); - - /* Randomly take samples of the workqueue depth */ - do_queue_sample = random() % STATS_INITIAL_INTERVAL; -} - - -void Scheduler_flex::attach_thread(thread_identifier * parent) { - const Configuration & conf = get_Configuration(); - pipeline = parent->pipeline; - - logger->log(LOG_WARNING, 0, "Pipeline %d attached to flex scheduler: " - "config \"b%d,c%d,p%d\"; %d cluster%s.\n", pipeline->id, - config.n_broker_threads, config.n_commit_threads, - config.n_connections, conf.nclusters, - conf.nclusters == 1 ? "" : "s"); - - // Launch commit threads for each cluster - for(int i = 0 ; i < nbrokers; i++) - for(int j = 0 ; j < conf.nclusters ; j++) - brokers[i]->clusters[j]->attach_thread(parent); - - // Adjust the thread stack size for the broker threads - pthread_attr_t broker_thd_attr; - size_t thd_stack_size; - pthread_attr_init(& broker_thd_attr); - pthread_attr_getstacksize(& broker_thd_attr, & thd_stack_size); - pthread_attr_setstacksize(& broker_thd_attr, thd_stack_size / 2); - - // Launch the broker threads - for(int i = 0; i < config.n_broker_threads ; i++) { - thread_spec *spec = new thread_spec(this, parent, i, 0, 0); - pthread_create(& brokers[i]->thread_id, & broker_thd_attr, - run_flex_broker_thread, (void *) spec); - } -} - - -void Scheduler_flex::Cluster::attach_thread(thread_identifier *parent) { - /* Adjust the thread stack size */ - pthread_attr_t commit_thd_attr; - size_t thd_stack_size; - pthread_attr_init(& commit_thd_attr); - pthread_attr_getstacksize(& commit_thd_attr, & thd_stack_size); - pthread_attr_setstacksize(& commit_thd_attr, thd_stack_size / 4); - - /* Start the commit threads */ - for(int t = 0 ; t < sched->config.n_commit_threads ; t++) { - thread_spec * spec = new thread_spec(sched, parent, broker_id, id, t); - pthread_create(& commit_thread_ids[t], & commit_thd_attr, - run_flex_commit_thread, (void *) spec); - } -} - - -ENGINE_ERROR_CODE Scheduler_flex::schedule(workitem *newitem) { - if(config.n_broker_threads) { - - if(do_queue_sample-- == 0) { - /* See comments in Cluster::schedule() about queue depth sampling */ - int depth = broker_queue->depth + 1; - if(depth > 0 && depth <= broker_queue_size) { - stats.broker_queue_total_depth += depth ; - stats.broker_queue_samples++; - } - do_queue_sample = random() % STATS_SAMPLE_INTERVAL; - } - - workqueue_add(broker_queue, newitem); - - return ENGINE_EWOULDBLOCK; - } - else { - return brokers[0]->schedule(newitem); - } -} - - -ENGINE_ERROR_CODE Scheduler_flex::Broker::schedule(workitem *newitem) { - const KeyPrefix *pfx = conf.getPrefixByInfo(newitem->prefix_info); - - if(newitem->prefix_info.prefix_id) { - DEBUG_PRINT("prefix %d: \"%s\" Table: %s Value Cols: %d", - newitem->prefix_info.prefix_id, pfx->prefix, - pfx->table->table_name, pfx->table->nvaluecols); - } - - /* Record the suffix length */ - newitem->base.nsuffix = newitem->base.nkey - pfx->prefix_len; - if(newitem->base.nsuffix == 0) return ENGINE_EINVAL; // key too short - - clusters[newitem->prefix_info.cluster_id]->schedule(newitem, pfx); -} - - -ENGINE_ERROR_CODE Scheduler_flex::Cluster::schedule(workitem *newitem, - const KeyPrefix *pfx) { - bool did_cas; - NdbInstance *inst; - int i = 0; - - /* Get an NDB instance. But if the instance is in use, go forward and - get the next one. If something goes wrong (cluster crash, disconnected...) - and the commit thread can no longer release instances, it's possible that - there will be no free instances and this will loop forever. - FIXME: catch this condition somehow. - */ - inst = instances[i]; - while(inst->in_use == 1) { - i = ( ++i % nInst); - inst = instances[i]; - }; - stats.cycles++; - stats.ndb_depth += i; - - // Now we've got an NDB Instance. - atomic_cmp_swap_int(& inst->in_use, false, true); // in_use = true - - // Fetch the query plan for this prefix. - newitem->plan = inst->getPlanForPrefix(pfx); - if(! newitem->plan) return ENGINE_FAILED; - - // Build the NDB transaction - if(worker_prepare_operation(newitem)) { - - if(do_queue_sample-- == 0) { // Sample for statistics. - /* To simulate sampling the depth immediately after the item is added, - sample it now and add one. */ - int depth = queue->depth + 1; - /* Queue depth can be erroneous because of thread races. - Use it only if it looks valid. */ - if(depth > 0 && depth <= queue_size) { - stats.queue_total_depth += depth; - stats.queue_samples++; - } - do_queue_sample = random() % STATS_SAMPLE_INTERVAL; - } - - // Put the NdbInstance on the queue for the commit thread. - workqueue_add(queue, inst); - return ENGINE_EWOULDBLOCK; - } - else return ENGINE_ENOTSUP; -} - - -void Scheduler_flex::add_stats(ADD_STAT add_stat, const void * cookie) { - char key[128]; - char val[128]; - int klen, vlen, p; - - stats.total_avg_ndb_depth = 0; - stats.total_avg_commit_queue_depth = 0; - for(int i = 0 ; i < nbrokers ; i++) - brokers[i]->contribute_stats(); - - klen = sprintf(key, "t%d_avg_ndb_depth", pipeline->id); - vlen = sprintf(val, "%.3f", stats.total_avg_ndb_depth / nbrokers); - add_stat(key, klen, val, vlen, cookie); - - klen = sprintf(key, "t%d_avg_commit_queue_depth", pipeline->id); - vlen = sprintf(val, "%.3f", stats.total_avg_commit_queue_depth / nbrokers); - add_stat(key, klen, val, vlen, cookie); - - if(config.n_broker_threads && stats.broker_queue_samples) { - klen = sprintf(key, "t%d_broker_queue_avg_depth", pipeline->id); - vlen = sprintf(val, "%.3f", - (double) stats.broker_queue_total_depth / stats.broker_queue_samples); - add_stat(key, klen, val, vlen, cookie); - } -} - - -void Scheduler_flex::Broker::contribute_stats() { - for(int i = 0 ; i < conf.nclusters ; i++) - clusters[i]->contribute_stats(); -} - - -void Scheduler_flex::Cluster::contribute_stats() { - if(stats.cycles) - sched->stats.total_avg_ndb_depth += (double) stats.ndb_depth / stats.cycles; - if(stats.queue_samples) - sched->stats.total_avg_commit_queue_depth += - (double) stats.queue_total_depth / stats.queue_samples; -} - - -/* - Broker thread: get a workitem off the workqueue, and build its operations. -*/ -void * Scheduler_flex::Broker::run_broker_thread() { - workitem *newitem; - - DEBUG_ENTER(); - - while(1) { - /* Wait for something to appear on the queue */ - newitem = (workitem *) workqueue_consumer_wait(queue); - - if(newitem == NULL) return 0; /* queue has been shut down and emptied */ - - (void) schedule(newitem); - } -} - - -/* - Commit thread: Get an NdbInstance off the workqueue, and call pollNdb() on it. -*/ -void * Scheduler_flex::Cluster::run_commit_thread() { - DEBUG_ENTER(); - - NdbInstance *inst; - - while(1) { - /* Wait for something to appear on the queue */ - inst = (NdbInstance *) workqueue_consumer_wait(queue); - - if(inst == NULL) return 0; /* queue has been shut down and emptied */ - - /* Poll */ - inst->db->pollNdb(); - - /* Now we are done with the instance */ - atomic_cmp_swap_int(& inst->in_use, true, false); // in_use = false - } -} - - -void * run_flex_broker_thread(void *s) { - Scheduler_flex::thread_spec *spec = (Scheduler_flex::thread_spec *) s; - set_child_thread_id(spec->parent, "broker%d", spec->broker); - - spec->run_broker_thread(); - - delete spec; - return 0; -} - - -void * run_flex_commit_thread(void *s) { - Scheduler_flex::thread_spec *spec = (Scheduler_flex::thread_spec *) s; - if(spec->has_broker_threads()) - set_child_thread_id(spec->parent, "br%d.cl%d.commit%d", - spec->broker, spec->cluster_id, spec->number); - else - set_child_thread_id(spec->parent, "cl%d.commit%d", - spec->cluster_id, spec->number); - - spec->run_commit_thread(); - - delete spec; - return 0; -} - - === removed file 'storage/ndb/memcache/src/schedulers/flex.h' --- a/storage/ndb/memcache/src/schedulers/flex.h 2011-03-30 06:54:53 +0000 +++ b/storage/ndb/memcache/src/schedulers/flex.h 1970-01-01 00:00:00 +0000 @@ -1,167 +0,0 @@ -/* - Copyright (c) 2011, Oracle and/or its affiliates. All rights - reserved. - - This program is free software; you can redistribute it and/or - modify it under the terms of the GNU General Public License - as published by the Free Software Foundation; version 2 of - the License. - - This program is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU General Public License for more details. - - You should have received a copy of the GNU General Public License - along with this program; if not, write to the Free Software - Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA - 02110-1301 USA - */ - -#ifndef NDBMEMCACHE_FLEX_SCHEDULER_H -#define NDBMEMCACHE_FLEX_SCHEDULER_H - -#ifndef __cplusplus -#error "This file is for C++ only" -#endif - -#include - -#include "config.h" -#include "Scheduler.h" -#include "KeyPrefix.h" - - -/* - * Flex Scheduler - * - * The scheduler is configurable: - * - 0, 1, or 2 "middleman" broker threads per engine thread - * - some number of commit threads per {broker, cluster id} tuple - * - some max number of Ndb objects to poll concurrently in a commit thread - * - */ - - -class Scheduler_flex : public Scheduler { -public: - class Broker; - class Cluster; - class thread_spec; - - friend class Broker; - friend class Cluster; - friend class thread_spec; - - Scheduler_flex() {}; - ~Scheduler_flex(); - void init(int threadnum, int nthreads, const char *config_string); - void attach_thread(thread_identifier *); - ENGINE_ERROR_CODE schedule(workitem *); - void add_stats(ADD_STAT, const void *); - -protected: - int thread_id; - int do_queue_sample; - int nbrokers; - int broker_queue_size; - struct workqueue * broker_queue; - struct flex_cf { - int n_engine_threads; //< no. of libevent worker threads in memcached - int n_broker_threads; //< no. of separate broker threads, 0 - 2 - int n_commit_threads; //< no. of commit threads per {broker,cluster} pair - int n_db_pending; //< no. of pollable Ndbs inside a commit thread - int n_connections; //< size of Ndb_cluster_connection pool - } config; - struct flex_stats { - double total_avg_ndb_depth; - double total_avg_commit_queue_depth; - uint64_t broker_queue_total_depth; - uint64_t broker_queue_samples; - } stats; - - Broker * brokers[2]; -}; - - -class Scheduler_flex::Cluster { -friend class Broker; -friend class thread_spec; - -public: - Cluster(Scheduler_flex *, int broker, int cluster_id); - ~Cluster(); - - void attach_thread(thread_identifier *); /*< start commit threads */ - ENGINE_ERROR_CODE schedule(workitem *, const KeyPrefix *); - -protected: - void * run_commit_thread(); - void contribute_stats(); - struct cluster_stats { - uint64_t cycles; - uint64_t ndb_depth; - uint64_t queue_total_depth; - uint64_t queue_samples; - } stats; - -private: - const Configuration conf; - Scheduler_flex *sched; - int do_queue_sample; - int broker_id; - int id; - int queue_size; - struct workqueue *queue; - pthread_t * commit_thread_ids; - NdbInstance **instances; - int nInst; -}; - - -class Scheduler_flex::Broker { -friend class Scheduler_flex; -friend class thread_spec; - -public: - Broker(Scheduler_flex *, int my_id); - ~Broker(); - ENGINE_ERROR_CODE schedule(workitem *); /*< schedule item on its cluster */ - - pthread_t thread_id; - Cluster ** clusters; - -protected: - void * run_broker_thread(); - void contribute_stats(); - -private: - int id; - const Configuration conf; - Scheduler_flex *sched; - struct workqueue *queue; -}; - - -class Scheduler_flex::thread_spec { -public: - thread_spec(Scheduler_flex *s, thread_identifier *p, - int b, int c, int n) : sched(s), parent(p), - broker(b), cluster_id(c), number(n) {}; - - Broker * get_broker() { return sched->brokers[broker]; }; - bool has_broker_threads() { return (sched->config.n_broker_threads > 0); }; - void * run_broker_thread() { return get_broker()->run_broker_thread(); }; - Cluster * get_cluster() { return get_broker()->clusters[cluster_id]; }; - void * run_commit_thread() { return get_cluster()->run_commit_thread(); }; - - Scheduler_flex *sched; - thread_identifier *parent; - int broker; - int cluster_id; - int number; -}; - - -#endif - === modified file 'storage/ndb/memcache/src/workitem.c' --- a/storage/ndb/memcache/src/workitem.c 2011-03-30 06:54:53 +0000 +++ b/storage/ndb/memcache/src/workitem.c 2011-04-06 04:31:28 +0000 @@ -184,6 +184,13 @@ const char * workitem_get_operation(work } +void workitem_set_NdbInstance(workitem *item, C_OR_CPP_NDBINSTANCE * _Ndb_instance) +{ + assert(item->ndb_instance == NULL); + item->ndb_instance = _Ndb_instance; +} + + void workitem_free(workitem *item) { /* It's OK to free all of these; a free() with class_id 0 is a no-op */ @@ -192,6 +199,11 @@ void workitem_free(workitem *item) pipeline_free(item->pipeline, item->ndb_key_buffer, item->keybuf1_cls); pipeline_free(item->pipeline, item->key_buffer_2, item->keybuf2_cls); + if (item->ndb_instance != NULL) + { + assert(false); // TODO Not thread safe, investigate path + } + pipeline_free(item->pipeline, item, workitem_class_id); } === modified file 'storage/ndb/memcache/unit/Makefile.am' --- a/storage/ndb/memcache/unit/Makefile.am 2011-03-30 06:54:53 +0000 +++ b/storage/ndb/memcache/unit/Makefile.am 2011-04-06 04:31:28 +0000 @@ -38,7 +38,6 @@ run_unit_tests_LDADD = ../ndb_engine_la- ../ndb_engine_la-ClusterConnectionPool.lo \ ../ndb_engine_la-Configuration.lo \ ../ndb_engine_la-DataTypeHandler.lo \ - ../ndb_engine_la-Dispatch.lo \ ../ndb_engine_la-KeyPrefix.lo \ ../ndb_engine_la-NdbInstance.lo \ ../ndb_engine_la-QueryPlan.lo \ @@ -49,7 +48,9 @@ run_unit_tests_LDADD = ../ndb_engine_la- ../ndb_engine_la-atomics.lo \ ../ndb_engine_la-debug.lo \ ../ndb_engine_la-embedded_default_engine.lo \ - ../ndb_engine_la-flex.lo \ + ../ndb_engine_la-Flex.lo \ + ../ndb_engine_la-Flex_broker.lo \ + ../ndb_engine_la-Flex_cluster.lo \ ../ndb_engine_la-hash_item_util.lo \ ../ndb_engine_la-items.lo \ ../ndb_engine_la-ndb_configuration.lo \ === modified file 'storage/ndb/memcache/xcode/ndbmemcache.xcodeproj/project.pbxproj' --- a/storage/ndb/memcache/xcode/ndbmemcache.xcodeproj/project.pbxproj 2011-03-30 06:54:53 +0000 +++ b/storage/ndb/memcache/xcode/ndbmemcache.xcodeproj/project.pbxproj 2011-04-06 04:31:28 +0000 @@ -9,12 +9,22 @@ /* Begin PBXFileReference section */ 65278D6612BC452E00189195 /* Stockholm.cc */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.cpp.cpp; path = Stockholm.cc; sourceTree = ""; }; 65278D6C12BC456100189195 /* Scheduler.h */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.c.h; path = Scheduler.h; sourceTree = ""; }; + 652DC977134A8F55000C5787 /* dtrace.flush */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = text; path = dtrace.flush; sourceTree = ""; }; + 652DCB4E134BE87D000C5787 /* 3thread.cc */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.cpp.cpp; path = 3thread.cc; sourceTree = ""; }; + 652DCB4F134BE87D000C5787 /* 3thread.h */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.c.h; path = 3thread.h; sourceTree = ""; }; + 652DCB5D134BF512000C5787 /* Flex_broker.cc */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.cpp.cpp; path = Flex_broker.cc; sourceTree = ""; }; + 652DCB5E134BF512000C5787 /* Flex_broker.h */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.c.h; path = Flex_broker.h; sourceTree = ""; }; + 652DCB5F134BF512000C5787 /* Flex_cluster.cc */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.cpp.cpp; path = Flex_cluster.cc; sourceTree = ""; }; + 652DCB60134BF512000C5787 /* Flex_cluster.h */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.c.h; path = Flex_cluster.h; sourceTree = ""; }; + 652DCB61134BF637000C5787 /* Flex_thread_spec.h */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.c.h; path = Flex_thread_spec.h; sourceTree = ""; }; + 652E5A371306533A004F8795 /* binary-spec.txt */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = text; name = "binary-spec.txt"; path = "../binary-spec.txt"; sourceTree = SOURCE_ROOT; }; 652E896F12EDF39300CA79EE /* dtrace.mget.default */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = text; path = dtrace.mget.default; sourceTree = ""; }; 652E897012EDF49500CA79EE /* ndb_engine.d */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.dtrace; path = ndb_engine.d; sourceTree = ""; }; 652E897112EDF49D00CA79EE /* dtrace.mget.ndb */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = text; path = dtrace.mget.ndb; sourceTree = ""; }; 652E8C4B12F1046700CA79EE /* trace.cas.coord */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = text; path = trace.cas.coord; sourceTree = ""; }; 652E8C5512F20CE700CA79EE /* trace.binary */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = text; path = trace.binary; sourceTree = ""; }; 652E8C7C12F2795B00CA79EE /* run-tests.sh */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = text.script.sh; name = "run-tests.sh"; path = "../run-tests.sh"; sourceTree = SOURCE_ROOT; }; + 652E8C8512F284EF00CA79EE /* protocol.txt */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = text; path = protocol.txt; sourceTree = ""; }; 65570AAD1298A3060043D9B9 /* assoc.h */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.c.h; path = assoc.h; sourceTree = ""; }; 65570AC41298A7800043D9B9 /* visibility.h */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.c.h; path = visibility.h; sourceTree = ""; }; 65570AC51298A7800043D9B9 /* vbucket.h */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.c.h; path = vbucket.h; sourceTree = ""; }; @@ -101,8 +111,8 @@ 65BB2F01128B9F82003031C5 /* early.close */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = text; path = early.close; sourceTree = ""; }; 65BB2F02128B9F82003031C5 /* engine.d */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.dtrace; path = engine.d; sourceTree = ""; }; 65BB2F03128B9F82003031C5 /* userfunc.d */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.dtrace; path = userfunc.d; sourceTree = ""; }; - 65BF5E221334286D0043FAB2 /* flex.h */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.c.h; path = flex.h; sourceTree = ""; }; - 65BF5E231334286D0043FAB2 /* flex.cc */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.cpp.cpp; path = flex.cc; sourceTree = ""; }; + 65BF5E221334286D0043FAB2 /* Flex.h */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.c.h; path = Flex.h; sourceTree = ""; }; + 65BF5E231334286D0043FAB2 /* Flex.cc */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.cpp.cpp; path = Flex.cc; sourceTree = ""; }; 65BF5F1D13346C120043FAB2 /* thread_identifier.h */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.c.h; path = thread_identifier.h; sourceTree = ""; }; 65BF5F1E13346C640043FAB2 /* status_block.h */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.c.h; path = status_block.h; sourceTree = ""; }; 65BF5F2F13346E1A0043FAB2 /* thread_identifier.c */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.c.c; path = thread_identifier.c; sourceTree = ""; }; @@ -111,11 +121,7 @@ 65BF6524133F108F0043FAB2 /* pmpstack.awk */ = {isa = PBXFileReference; explicitFileType = sourcecode; fileEncoding = 4; indentWidth = 4; name = pmpstack.awk; path = ../scripts/pmpstack.awk; sourceTree = ""; tabWidth = 4; }; 65C1872B12D6AF00009E5AE1 /* config.h */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.c.h; name = config.h; path = ../config.h; sourceTree = SOURCE_ROOT; }; 65C1875612D6EC7D009E5AE1 /* ndb_worker.h */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.c.h; path = ndb_worker.h; sourceTree = ""; }; - 65C187B812D94D21009E5AE1 /* Dispatch.cc */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.cpp.cpp; path = Dispatch.cc; sourceTree = ""; }; 65C1886612DA682D009E5AE1 /* stats_settings.txt */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = text; path = stats_settings.txt; sourceTree = ""; }; - 65C1886712DAA9BB009E5AE1 /* 3thread.cc */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.cpp.cpp; path = 3thread.cc; sourceTree = ""; }; - 65C1887012DAAD56009E5AE1 /* 3thread.h */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.c.h; path = 3thread.h; sourceTree = ""; }; - 65C1887112DAAD5D009E5AE1 /* Dispatch.h */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.c.h; path = Dispatch.h; sourceTree = ""; }; 65C1887212DAAD62009E5AE1 /* Stockholm.h */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.c.h; path = Stockholm.h; sourceTree = ""; }; 65E51D7412A0A1D60039115A /* timing.c */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.c.c; path = timing.c; sourceTree = ""; }; 65E51D7512A0A49C0039115A /* timing.h */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.c.h; path = timing.h; sourceTree = ""; }; @@ -126,6 +132,8 @@ 08FB7794FE84155DC02AAC07 /* ndbmemcache */ = { isa = PBXGroup; children = ( + 652E8C8512F284EF00CA79EE /* protocol.txt */, + 652E5A371306533A004F8795 /* binary-spec.txt */, 65AA54F7127C8705003FE674 /* ToDoList.rtf */, 652E8C7C12F2795B00CA79EE /* run-tests.sh */, 65BB2EFA128B9F82003031C5 /* traces */, @@ -278,8 +286,8 @@ 65570AD21298A7800043D9B9 /* util.h */, ); name = memcached; - path = ../../../../../../../Desktop/ndbmem/builds/t5/include/memcached; - sourceTree = SOURCE_ROOT; + path = ../../../builds/t5/include/memcached; + sourceTree = ""; }; 65AA5868127F3EFF003FE674 /* scripts */ = { isa = PBXGroup; @@ -310,6 +318,7 @@ 652E8C5512F20CE700CA79EE /* trace.binary */, 65B977C112F3984E00FEBB0D /* trace.incr */, 6569FC85130A4A5E00DFAC09 /* dtrace.binary.set */, + 652DC977134A8F55000C5787 /* dtrace.flush */, ); path = traces; sourceTree = ""; @@ -317,14 +326,17 @@ 65C1871312D6A944009E5AE1 /* schedulers */ = { isa = PBXGroup; children = ( - 65BF5E221334286D0043FAB2 /* flex.h */, - 65BF5E231334286D0043FAB2 /* flex.cc */, - 65C1887012DAAD56009E5AE1 /* 3thread.h */, - 65C1886712DAA9BB009E5AE1 /* 3thread.cc */, - 65C1887112DAAD5D009E5AE1 /* Dispatch.h */, - 65C187B812D94D21009E5AE1 /* Dispatch.cc */, + 65BF5E221334286D0043FAB2 /* Flex.h */, + 65BF5E231334286D0043FAB2 /* Flex.cc */, + 652DCB61134BF637000C5787 /* Flex_thread_spec.h */, + 652DCB5E134BF512000C5787 /* Flex_broker.h */, + 652DCB5D134BF512000C5787 /* Flex_broker.cc */, + 652DCB60134BF512000C5787 /* Flex_cluster.h */, + 652DCB5F134BF512000C5787 /* Flex_cluster.cc */, 65C1887212DAAD62009E5AE1 /* Stockholm.h */, 65278D6612BC452E00189195 /* Stockholm.cc */, + 652DCB4E134BE87D000C5787 /* 3thread.cc */, + 652DCB4F134BE87D000C5787 /* 3thread.h */, ); path = schedulers; sourceTree = ""; --===============1131761229== MIME-Version: 1.0 Content-Type: text/bzr-bundle; charset="us-ascii"; name="bzr/john.duncan@stripped" Content-Transfer-Encoding: 7bit Content-Disposition: inline # Bazaar merge directive format 2 (Bazaar 0.90) # revision_id: john.duncan@stripped # target_branch: file:///Users/jdd/bzr-repo/working/5.1-telco-7.2-\ # memcache/ # testament_sha1: e8d1162a14d088c225321edbd35f36f717787d69 # timestamp: 2011-04-05 21:31:57 -0700 # base_revision_id: john.duncan@stripped\ # w8ygi4r7zm1ximtw # # Begin bundle IyBCYXphYXIgcmV2aXNpb24gYnVuZGxlIHY0CiMKQlpoOTFBWSZTWUM/9AsAI2n/gH/87wp7//// /////r////tgNr72+vPr29vprm4cr73vvvne9S6fe77vbffKB3Ocej23d8d4eM5fdvq2d3vvblsu vvdzlAUqQ0AyCIJClPvnOWd9ijXXQMEFWfdu754OPoW+vvZ9PX2D773Yyb7bWOhUWNbD3s9HsYBs ySgol1nsM8j7g6XT67kobX17m8LtZgXMAJaPhiQQIyZTAaj0mqeamiflT9JpP1BHqPUeoeoGQBoA 8oAAASgg0EAiampmUUflNNJoZoIDI0AAMgAADBABKAIhEEhtQaaYj1ME0aEwAaAAACNGBGCYASaU RNCZJpPJNU/0nop+powqe2pNI009IaA2poNAAANMRpkP1IESSBAIxGQCMhqNDEyRHpiNMqenqnps pptFHlBoeo9DTUPU/VBIkCAEBMgTQmU8TSTT8oaBkR6jQNAHqBoAaANDnV0RXQDKIP76aBA9EEG4 ELCMKEkpPX7PlkfC37H0IGmLqEfzkfdT/HjT0ps3WwQPM7lX7O5nT0BmPSZh4oGSLw4ndgpsTO78 tjQvtJikkiVYpTLI4MfGEZs/P1iT/3pb43VqoilzO3rVP+iBdjyLgf1uAgpE1F17ggKiTBn3h+d2 e3HnyvWeLmcnb9rb8HTrT1LXvS+umLXWz8NsJ+kiJ2tgVCkmZQXswHSiUWS6MySUTvNlWX/rE/SE xig/K5ckpelh7KXeeEZoFYdVqohLBWUo0k5zwKKk689zVTamPfkfFLFqWZj0t8esUFio0U8bAyr6 +5Qcfu47iooiKHRKddrPuJe98nRYckMIIiw15ZL0xw0tuoKL0peyGl0GxOzNELQtO6w6JBlYIhN8 SjnWeRnO0qFC/rWVXCdH/rrAaZx7WrhqzA54u9gYno5XjmautGu4otQ32XLhc5walXwl1ct/O9RX hHbELRPrE+hLCeKZlaE4Q/LDshpDSdP2vVT2zxeiO0SUUJ5tKK4qbd+b5Zz+96vN693Hh7U9L368 tww3WOuDFpUUZydHZ9tb7zApReqLZRdd4XxK1ZLPzF95MbzaQquTEsmjDDe6pDeysmWQ3pOCSHtZ wQA/5TQJQUEJ48ZTXTXgYzm+G5Ged7rSo9W8LGJUjOWoWNsQXmnxL6K71iLu7tycNpTVjN6VgaYm M3K4nJ20tKBefHYmjG+UsY3Hr8htMawZDSgBfVAQT+k3dfTYP0Zg8olp6J2eBX8bwS0dkbohrgaI 1HlV2VrtSnJAVSm+1rAyIWgc0M0LRS4YMBVKqtMUC2FwGBfcWRznrpFNcELvswo7e/bhdCei6yNx H+v8uWyXwCtJpdh+Zpp1sqCMJPGjOrFxJOODGOdhSTAokLVHcceeDRYQRJFkYa+aGmDIrIjzxQCo CkzFKFEP52c27edCPxXIGOyZswM+6nmQwRZsfkPb5/v6dE4rffin5rPy6dOYXnArX+7sNnZ42Afc DuzvFrsa4XyRTU03nYKIa9A+V74KaRPA7apeGgmnR7WFnm09NKqw8Dc8OJug94I4l8V4Utt+nCfC lIviNtJ5YjMYa9k0bgtDiEyIbhz2oJCEf58uSTimyKjkJJiJJJUcu6XDVPmfY77LFpiV9HrDMHxh QlIcSitpBfTCRk8yL5+QCfGBFVgqwiwRBYpEZERYsRBUVFFBSAcyTHdDq7oZljt6okjT4/CSe21M /jb03dlJadtKv7dfuawTw0zcTLnoy52YZrermwrTTrozQ25YVMzh9FpSFGutYCVpfk0REYyStDvw halPghR6RFVklc6PMy0yjBaFmr0oXeRzMN1jIkPKtRh1VBl0hh3wsQJvdKZmcGl9U4UW5U4+xGFJ KEKXFQyqZ2YopgEwP75JSTZQVIw2u+j5hhlR10VVfMbVSua3dgrZlptWJjVq11pbvCJms6Yqd+br R7M5p8TJlU1V1y4eLud7qhvdHOlXqj50HyC29vKGez0P1pWCPSrTM/fzNhRm3iTwgTDImU7wzyNP hgikJyry/su7SuuAuk6jp/9UV3XJv3S7qnRofQwa+MyCMTLpANG94VHFfmr/G/HvbSHkUycfdIQw js5CGM9mQvD5Q+cl8Hd0rL694j3okImM0i1pExi/qwR/OcEZ6Iz4HuRRRRRR7IXi84c4eyCQ3pDE PxxbX+qqcfdxNvfsTmTvV1JZMdSaLnfyVm57WLSSWEnofNmjgF5JATip4WYvA+lRQhu9t2ITtSvT smI9gkgnQQx1ERKKMbTzVTFSmvcTzHdHeII6KEp516CG9PnvrG/NW6m+HSJTSeM/mmv6+1MC/FOG 76uwqRhpkx8JwaMQTEZaXO0Ogtb4vRHqNTHYKX1kLNAUcAc9RcOY0/472Qqhg3Y20QpGI+fS5HyI aB2P0WwemKvibmxzjRDhGv46EyIl6s6z8svIOn2EUOFgjK2spLuThbcHvx+XcVcBFKynugV4r/7E w0Of+Fx02gyEVU6NuSVUN/P69bfnQXfiGOFQ0EnJ2Bjom2CSFxX4Dj0RVD63l+2K7gVUJYxKWT48 s8m6g6bYbvuzR31IRe2/doBXvxRJGNcuzjw4AWyJOVvT27UmRgjKO2U0c0bxND58llIrop77IkK5 g78pyRldwYpdkM3GKzXVgV16/UzWILVHMbbBphdDwDfxjrhJGINHHhOaPsemDmiqI5s01YMLkTyr wu05gp+GO0nAbs8Zd2unhPzbWl6ESqjJsNGPyWXC67younmLpgqWr3zVSXRNTGu+U5++/yaNqWzb 7+BSzNdd2rSNREoocRSQyXCmtdAPM4yrpNkulT6yJcEq4kaJmCIJhaAZmeO4rRkqF9+a3zI4XjfU be+JqaOlZESZWBHQ2RlJCypJrOVcxRDE3Q5h/kQ9Gf9papCs6XZGz3EROENd5CXta6SlTYxY+hM+ lx74WZF8suZ69dO+V+mnOobo+T3Iy2Je21+5QjqfT4guxByxsrrjTlGWO65v8s7XXjLuU8+lY9xU q0artEJQilfPdND5mSL0e0R5kerSzY+luJNje19aiO+mXG4aNVoOBd2Wyg2HXEo7RJaCuTTIpvWF xzDwZIPwPUyvKX+EkZ7U5z9vpNZHq2VSbJ9zc/DZGnr0LId+ZHci/0tFKwtzCtmLk+2lZW6eN60R p6O+/qjZFZO3OSgyJzZNssUyQWRIUmmDNnqwWQzKHdPXAxeANFKGZGBzv4pKTcs6mlC3wdr02X0u 12p6nn16Za+plles4vCqae73vla0tGrPhnfbTax5FbRlXOzuz8/s2Qe/1f9yXa2N1YqYZPYyZJuU SlGjuijrwzj+5BEV/y5097XnyX97yO1e3e9S1YtFv8vaerqzvH0dkKJ3nN4y9n5fEwx0ecEfsxIp YGpYiLEYooxEWDETvtGRlSsZFhINpYkRAqSVkRGjVETnCffZIGIcvtWO6rD295SaIG9Yk/TrQ+Pd zuOb5H1I/jcnbmm6LM4YGA/oxImNxbnE7JTkvqdHA3VpeXq6mBvMYmyWabZoT4967N3SCtAzMhYd CEbDILPMZ5pNqIOTiHItkrIiguy4JiXvsvM1obdXkKUXTGuUcfXana3Cou2sld6QOlG20LiMvmxS lVjNO4u8l3VIFFDk9Hj6XV9mjwVKivG41UFRTXeu6GnHP1p2m4fvsl+8mf+Lg2xSdGujMX5J8BCn ibxxb1Hxr1Mp4q6uRrEyQtqMd5LQmvkbYOxr9v1x4pV0zhkGMW9rvJRWX6i7CrJhlqr3aK3ut9Lq VJJ+EaNYPfYjt6XnOhTPfIwmh6Vl1nTm5rW0t+8piSjbHs3Srh0tN5RD2jTOG7/wgeDBtNpBZwqJ bAqW0E0O/OFzBI2tC2JIUWJlOlhX1MkBUVITIgaISHdIfKs+i4vV8q2V6De1wjw4NQWlC2/R50WO 27qyTdr9TPHNozWJJ4RQHkqn/Aigw2kQIrzTkZQRNkdpIg3uT4UxL1Fcwp0TwlrHvojaC8zzfd/S mltH+Fm1lPKythB6HaUzKpl6Yl9Fbrn3NF6PJfSDY5Mj5P07h5lIhvSD2gN5kcw5+c3A7yB5S8Tl QxQooVgqMZ98rUqkKhQYwgSSxO9h84h5DWECEBfxGigfgFXq15g0aungd9xjcNjbJ28+qCEBuqUo ZL1f6CQinrcVB8aolBJpBCkjOUIdM6PPbQuMS12ieITVT7q6pCCdbGn3ulD0C7/TodqojRHYj9EQ fJywlwcuuvEdLqythakTcFEfhQspGDXqXWe2Tpz8+TT0VEjSggiMGHeDJkEnuYPHA0KCixcUFQvr wb0FOuWIHcHMd7bAziMLYk+ODDdv4Hwq3DZlrE9JpXEpyN5jGsLEd4yu9KYqCggYUogkUcKRHYa3 N+qYK0eJ4BnmKS0kBaSGd6vKIkvSmQZ6jTlQfRAJ3IT4ghRQ9gCSCw+yf0WVfpw0wkgfuhfP4xow /fmB7y8DIkgf9iF/5dxZF2oYljsibPNb3tqLedvdaxkwC4SBwAhnh58/ufr/u+sw5/U0LjLeqARd EJUNH2pnWkPn11wIK2pJANlmK9l7QXtCm72TaVWe66CjEzRTDPLwLCcYyUAJNFSfsDIDI0lg/aZ0 QdxkAfyNZmvGIinmNBrD0lHLHDYmBUsVIKoQLcWORMzHRIQbFlIBBoDOUlxxtAdEEBtEsVTrWRWA ECx+AjYgQWBAyk6vgXkipI7C8ri1QcihVJK5bJYiD/rg4oxcwZNzgqXg2qVBSZBguLmKlxShsbHC Y6dx+rbyxpCGUnBJsyFxYoO7GJbUdWdadjVBeJZvhKMaXWVHu6ps5uKQxbxjQikYPdpH0mNEd0Cv KC+FqozSEFTzzxCVABIrD/M37zh89A2YPLgiEMASNDdnmcC5MqQF8N1djgDBIcTcXdUcRERbY89f GPF99CMTl4bDRaqz1nXDYzLJM51FVFQ7wQSLHJubHHeD7J7j1IIORkZG4awO40lRuJvpPmklmgXA ctmfHrkeRM7HsfRdbG4vnYXg2GJFWqeVjg5TkoT58ixU1PZlfNlmZHstlTQ4LGC8i2r+3YsmFRWI H+vwbha2DbR44UlvpwQTwgietzg3I93kbkzUyN4FkKeRyVLkyQTNCciCDsWGHPSxYU7AJ8E5C47I nS6qGAEXuoKtm6cl3CmtGOYpaknk+KvRpzsgw08V38+vZKxi64d3Au6FwBEHCw+2zKtjjWJIYecp FUF0tUUc2JjXwwugx6C+O00I/gnZZX5F5Q1DoRZ2agv69ZhlTjdebEkhA9OoL+QxIqLtNVG0MmWG dKjz5C86soAEn9SYM7TnDTYU1JiCpgxg7Gp7tY89AAXBwCJHfM6HQ2NjM4lTqTGbMqYG8vMi5mNT oZOwh8oCbttmzyEibTUTlpMUWbQ8nwLNtzl4wK5eeAFLKijBCBQiT3DOzriSOJmM1lcOkDHIgSSz gu1NSgVWuHSWA7DnI4kFjCj5IQEoD4s3JyyLLkKCBIVmLG5FyXVWTQjc0PYMUKKux7yYxuS6N+2o 5nmKgyyNSIkVyNseDY5LDlToPLuUGDkoQdg11Y1JHFiRcWC2DkqObFzc0+ZWRWVh99b9T7LYpMzK KKOs4OCyZeWgo3LY0MrEPo1btDmqYUjJwoggapKIiY1528aa5FsVr5G2S1Bk0e53NUBGyvHNgwKc BxkQ+J7kzmYiIncLkhxOtDKph7eCpAKKajHJypuVuLQyYKgJUg8HJaFVZEwrEtxUvtVDcWxksLOq VUgqVOJAxBBuda2NzcyVBEcREuCYAIKimxgNyxBk4NRlyVNjg2MHBMyGSpUYcgwXJGCmSC5OCY9C pQ+RFphm63fkXS5JCk+TSrqPvmrtFVMSJOSdbjSpfC8Qnf5exjZKnAbD6liRuSqokkQQPQB5aGpR cCxHI5GZmWJFamEOKpCDiCKmZwoDkApnEmbnKZ0zkAtjbI9CQqV52NxlnlrHAfoFkUnVejvYrudh uKoHQ59wAsWfsoujXGQqdsN3QSnYWtyhwVJ/IcBoHQKanRBqeDUcGMlhzogobkyB+IFLkzUXaSun Zb1ZcXdoo0Q7wzTq8ok72/Jlm3cQpmz7lChg3H1AQFKnY8eMxltMikXLhMOD2mTXbC5uX4FGOhUu cFEFd2JyKk65mRdrtlaB5FjtDQwN+P9fsbbbbbbbbbxJMBcCJzb1O04lLtI5Y6muhLsLt1151KnA 3ZaHMsdlxzND2iSWV95brfY4mBUxPDikk0CYpudiD2GmlyCRyclKSNjUqdCnc5NSpsYNSRqAnVC/ U2kUq1lSqlgCcTacT0mlmtOGQG3iXlitsGxqYMTaxfE4JEHJYmZg1G000fYmbkzQ6KV3sUktDQqM cGg1SZwVPLYgympY1Mmk1plcGhifNJK+MliewA5i8iFtwMWNx6C8vGc4SSoGuY7jMmXnBcTIkcSx hbiQVgfPncYGZzJGRxMjA2SEFwklMSS8lv413773loyq3dnr1N4oCkQSacSkMsdq2qzPUlWNqrNw AKykZIHPLJdpFiZUXrxUxItqPpAlC1PIgd/THJsWMF9ELX+Md4MMqjGx3HSwMbFwB/Ybg3qfGEEs N1TzOB7GieRaPPB7TU4KCm5oXEzMqwkcjQ6lSwyYQZmhgMuOzukJJRH207a0O3ntak1vJW0YlVs2 lVItJ4ZlaKOre3feBElnO5gwew6L30LnMoMim5oPthfBwSvKuDHUEilcGMDcMSGamwyhpTrkVNC9 JPEtgaljDYsTO58Rih3EREoXMFSxqkw552Nih0XPM4KHAxcmUNzsTDY8ixMLHcJGTQoHBNEEBzzC AWQbwV1zKHiO3mgPApaKNVmPGAGQI5aUQkD3Wnp9VJVz1q4wmJXdXZYoTj2WrTRAXssK30qzG8Kl 4rkAGNnKtv3KcAkhD62FQPJ4ffifGBTa7nJnC6DNi+LV3KD4ZwX3QEPxbfmGGQytIkuyAr2l0jm6 s+p0eeG+8mHnKRTv0PRM17Ntc3270wlZC0r9aQ2Gs/5ESKkuLSVYYur1LbEkh20Vj9RmUWTYU1sw 6/19NT8LEFdN5aL1OHRGw22OEiHmdpBRl3yF4LOCzS7UY+QK6MvD5+xttssThJJGVdmNfKn7k2Je J+iacQPmJ86LcDKDuihIH7IsPtJGlGqaf8oH+ULRUviE8vkhHrmf6XVGPVulnIPlRAAyYSRsQSJS KKypMKiIIijAGUQEk8sJGGFVVZAkJFSSS8Tb080+tX5aoW687Ab1HyRAWd38L9IinpT+aWT5j2Hz noPzh50sv52z/s3T/jowuT70/amlfWn9BPxJgcheJ8wnY+++9BZBofvgknIEEg4QVi6/AE6wTWGw T/05gDUDMV+n8KpIdB8/hZ/GAfSaCjcvRYX+P705ukdq8nk3IGax2JDWnTH/CTfzWqQZuj+aQLUp nEufSfCfOF6dppP0B/cHcW51fCD+YA5jfAzkHw8bcdmaIQfWhiVppHu77riAtFxGoYgd4SgISuQB xnMec4eM+w0HNq1rynOMW7viGukL0z/1vAuKH9P5Yb9e7cjndnuAmCyFFJLgCp+8H2upxOp0OgD0 kG49gdRyaEgQgRT+oPqBp/HGHpW/ZS4spQuExiy0LhLGBmMk+ORkETcnV5qVuV6QCyUiaHsjOI/H B7Fv5JKG3nd6YaVdKZ/Az4J4LcWOmGq7MmXuTuTH9Jkk3odwqUevfthYwTgcAc4m+J8BVTiNW1gc aG5W/m5vjS2JrNZbUg/ebl9Fs4xOow5S1DBNKQecvTYCZleKVpLUSI5JSFBSdSuStrJfY+45gpNp Cfl9VrRqCXJrIQgnqETlAUhQUH1JEIx37Q46cgJECpvNShboTordiJAZIoi7l3+HtRgeJxhHe0BQ 951HkZo3IhLbdcjU2Q8vEXRU1CPi5ofxbII+UqgPM/Bq94C3oog3I4wKvIjVLhZHkcbwL0TRYQ0Y kGKE0Qe8F47E8ugr0Yk70R2kx4SKDYaLicW/wNvclg80CRKgvfBC4wyTaZ1tppNQGPjZSx2JrJkl FJcYHJMt5iYJgnOYLqYjyjuCPMjR10Dirr8vEAzprvVvrYmhMQsn59hjecqQHjHQDSbSBem8S8d3 WvahA3EG5CT/Gn79/6qwic/y/T8eCHOZacesOQDGjEQ958qBgHUl3NPunjzqi4OJZHMwXLNjf6TS XwhgmTs+kDtORDymt+dQ99OksNhBU50PraJpr74lj0JYSPOsqn46qmNmxoP4WIOSTTgBGWGqjDFB N54YehyuZaYZhMwN0DdSiApEARTBmSnDOsG6Z2kGTtOJIzIgFOwG/Yoq0Js4R0CRRNpyRUrWF17h fy8rkBVitQ1mRf8bmSSkXUbFCmNGFl4cOepn6Q3YOZsTRFVegTmuhZ0gOiZaQ0wx47wqSAkZgmBZ i/Ifh2U1gek808Ls8pjJxoiJ3RxjIImBz0DznOI9I6pqWUUWTQ5qTPQyfeLjmBTBvuxQ3JjDEFj2 jCmwwxgyDH/zEiZqXLnYZRD6Shv4U7kxjJ0E+gOAZGIhj22G9VTpAMBQwMxRUES9y7/+tFyBnDWb DWbzp4mvMuykgAevWB5v3fOR+lRC76D3gyV938L9zF/6XBw48kJSwoc2y10HlD/fk20iYDE97dBA rTY90Q98NUZ9GZosQCMhGJBFIxMBUMTXeBhGoLPTcfO0WTALNHOCweJ4+c+zURTBwiYIDdyki9LP eDM71eiIMjsB7kv16wCsBUUnHkFtLzq/G+MehE6cXWcIsJYN0QpsFghgNDNdIhEinZYQKTaRNWS8 brxF2LnqBt0Uu7wIQLsNR5GHGPYVPMd57zj7DmULHvYypcYq8qd0iC89piR7z84klobFDU+0Xmps XlSqQg1BJBYyPWzA9FxvKL5g7MTcaG8uNxqfaJmhBBuMD4tLmcSNhrI8RcjU2Mip8P2ahLiGIdw2 MY0hpIlEwYdIdI7Um2NqK0qRZ3kgMOrsYveTtasEk3x62DaSIeUzfJvpfQr62huteZwJyIp4kJAU MLKppJHSLJ8C0+L1XlFOwmxhPrk2DYQSAXIbki2SJcBD0HgaCFgrRvew8CHYHKQWWdFIJIGGGLOk McBkcg5/faG4sUYMByTBuYeQiUlobEFgnY4+lJsDG5SQcvgUMWhFPcUPedKZzSbzSUQ5D+wVelQL z0FBT2cTE4kIYEN5yIvXFDVAMzE1HSXOxgU8GDBABhBJFRkD5ZlxMghVMqIcaSxpOo0mw9t5mozF WB7joaDiIQivRGIMUK6CYmzKFjolrU2Qoo49OfoY3Kqe814zhJA+nE6B6WkBPSfU5xMj0Oxa999f PiYXfulZfFXIhqxB5W2lRTHCRKKpIBGPmqhVUiPtWz7GIFiJjUDXW/5onG4SIpcQZkgDhXaQcPSJ xSdQZnUA9odvyGiRJ5lWIMWCMSQ13PfrmHlpCwR50/3YZuSZxctYMKIXOL5oO9JjRDGfk020N2mq 4d2rkdmaqmYJpqZhZlMMGfVipUNmYMhSUNyQrMDqpbhxaCQMLWTESd5nYh+kerwPYWcnMY6ZJxHN 6hg8U72NSzsUYCByq64zdcwKbD8by4tPfEe8vOCSVjAzNdTeklpqcCh5jpjsYFDQaS44aCSty5U8 ZZRYZ77HypQgRgdRpNQ7szeE+6D3gnM8hcwkiIhA7xJKixAbhiOfEqcDkV7zgTHmbO8lzvJkj3eg Qm1ziEDQOp4kuPgWwHQ3CocPV6v8kTCa6YBoGVJ0RBZkDvRr6gnMW82Ow5G47D0iO4yF8RJ5E4AM BuTSn2ejHlfe2mHxb6wapIQw54SJ5aqFnTpBXMIpbR28geCeCUJbffd7av+jHFxAVhmz1Lq1XFlR p0iRg6yQUJDJuyzBfbpSRXK0hXnMFKjVqmeSMxJUBmbwEYSxqfk/FSjDdjhQNXiNJ9N7dtoSYOe0 MbGJyWmkNMWFVzVIQWjydUS7YK6S4jKXSVhNkJt+nNtFKa3QpdvqNo3ooLMxtQwMr5o1JssiNnZL x/U81tXY32cOiZwZk6IoVWa28rKtHC4PFry20b14F4VZ1ehCdVHhpLo1h1tS/nAY4MIEFpzQqxEE 6UyRwzdB4xMkJDAWYFSIsQymK+iEqewqecg+zwG9uPRkHFXDZQ9h7yReM8wyZkXLMbYDYGLIZY+Q zIO0ZBoe4yNTM+UkTGbBid1Q1O81wSczp39OtpdrrDuYKyLlTqIMUdIJjEMJRWcBxRgOBl1Ri7ln QSWBSgtAAamjRZ3VQNoyFhaSyMjyLhINhiEX6QkcTvPE1U9659hmdpgXBwbH16wAvA7ic7F4zUJF i4bVC2vAkairuRNBxn+NnP8QdSARC+cCC4z845+M6QiIaOfWWeCJ+OdLAYsHjSuZO+ua4vLBWg7z iZzlTAFYJLZo/Ahh0OhkSAv+Hed5yMdTv+R881tQXmJHker3dRhcT80dZ4HkB0mg6xwTmQDoSbtI 5xV6822cfZRdk5xM4eAged9XqoxROkgi9CvlzGffMTDE0vemmxmxB1eAT4x+pX2iOln9ndbe3MEG GEugIfDISDgGyhJL3o7eJtMqAdxMbPGzobMUOu3v46lxuhpdGrEVUzZbWhp4Qc4VEWCKqqIoNpaU KKOvRgwIoq6U11zCYSgS2IxSbBTalrrl1GpW8S95/AncDdyXgezb0NnRNIhJVByhuNkDwwgfB2a7 e7Hkg4zjOBWZMwcKGZCXNVkBLgiqd9TwK8+8ecfKeMENzQTbG2IIEYEWQkMzkn6tapa0Cm6/MCcg qTeMADoVwAeBAALnse0vA2sUIovpOPVH7SEWbOI+4SOQFHONiokB1klJTG4iFRk1tKISyvQeownk PaXHA9J7bjSSMP2RjJIp1jAodgcs7JfYNxuNTBqdMBN5MmZJmg8/zRm+c1Pb88GhoQYHNzc6KpqU MmhuO35JMV5T8qiHLy7TI2Ggx2TcdIDvOAr8TrM5AeB0nMWCHUK6U8Gk4oB+xF4OI+VGWBDlDEKy UrQhUn0AVAPseVVWyq+m7lvhUoplVdYa0DoVKVDo8aRgq7L05BOVXUmOPkPYBiucFxSqUdxQ+s0V n6lELCh8A8unkp9IbLvdYbjvLkawkqqhNg5tDNI9CewIOpedL1RSsIvSWvaCvqIXv97PGDphhBbI TG5xM5cWW3NsHIEMgkJaYjiCgIGFEJJwUOrSH1OJEhCsA0YSNsgoUDrYGsEIQwehTU1uSBcVaoyC QfwEhrUU+EDFMh9kvRUyJqUhYyAfqOYAHJTjB6sTJwXNZ8h5D1D6Syt1hCeNHo3/qBPjxv6sucXz RDliHc/V9VJZjzdlBhCRJn9NWJCRFemBUbdekwTWG0Oe7nHMp+TxKqoUEkqgsK3J8o/jSrJMlmZM 8HWZXiPoeBKACgiqZgGIvwROkD0pmdCaQGZ0PldNakmJF8k5+c9gbUX1ofDxADOpm5UqmMiRCMEj AIJAZ5/Vgb3kxnuTUDGbcAXo4koMOidS7hGJBx7j9rA30ZxTMtzgHuSzIw3RRFPKFCwxZnE299m2 aYTZzIxUYJq+cO/OPn8OGTs2JQuN4jvSABIBIBEWDIv2+VAOIdXjxzBIMYpECfyB6EuAD81D1Gjr E7DJPOd0oYgkE1wNMMtzgIcoegA+Q0qnnDHzg9ublMaRRVKW+F/NqBo/oCHp3bZ0JwImk1OvFfDZ idPrHoFDmsyXx9kbRUJ5XAVcqY9qj6DxFYi14a1EOn4FO0S1Qcyw8USKsVEAUh4/HPSuoGfIAGsA NV8ol5YG9SjtAfOohubjQrQ1UokGEKhdeUNr0uL1uJcx3BvUbq8FEI3FiXkiQ0vmDBkWRjnXYDom Waye2ETg5FqxS5FujLrltYhYkCzaJYbhFIi6UtdYENERbBWLlRfoz58ooN95nkI5IJIp2wPdJEAQ RgIkRARGMISwYpzaRDAy2UmXkTJTeRUhdlknwmuOZM80LD60qlPfuKAukhyFgPB8rvE06elPh/aT Z5+GpLcE/Xc6Ne1JzuUCO3aBhiqFwczxTQrAM4jxgiiCSCQQA039Xw7/lsmdnzc9cBCuxBLSlGAU LAkLCGIxRK+tq1yXVzQDzeeTfpd7S+Uk6qkOYcldGEHpFQRGKwVLHINhUU+e+DIfLi1VzsfAmgI2 uqIFrBCKBoNSkcefQK32HCJQBSVL5tCh9w2D5k+S/NvNzARN6AxhtiWG7NjL2056AWak435hu0pp Ko6NXOYYpkjCAgd3X/wMAQqZY1lngQ0R0FUtw0EEQjaELUFYUIFCKQzg04puzFWLw3hfgUlBcym7 IMNaRFoOP6UwUUyQA0AxX8hgqGd/WDdfGBFkHwil4ATETRonElEzUbIqS7okbCa182Jewwb42whJ ZGoKqxcY9JN8OROIdBA6hiKyChIdcIJ6x2Mmskp7/+L7OieJMFeoKnjNFVpOCicULwSWAc2AiEPF sEuTQB5arcUG1huRIkFwgJLIx3QtGlyeViD8oL5iRkh8UOIBuklYdB4posmZIrCoqMgNSxAWAghB YijGQNTBprkkqSCsEEAMWhrbMQtkkzDfOQ63QOwPlO4HoRnZq0D4O3IPNkHFyPJMUPGRPeBMwPcH lmpNeUQoHYpq6SISZJAccBHmT61EKP7RNyZ1MFfw+0RT5VA9twgHA7RIrqYNiEu/th8QF/yXvOWU U1G+1gvhIESChwBDlkIQJdFS9IKNuSF0+TNimUBYiQhMEvDg+NyN4ETAzLqsg6KKza7HyBFoGmSR b17IzM/m+aRUeKNEURNeWjCqGKaGP4rUDeEA5kuf6CKYoBmCdozKd1e0UcwbNxg7Q+9MnzEG4CbC 2yzZ9yf+h6xuu0LtEQC8EkfJQ81UPfwO93brajyGJTPHbLvJPEb+D79PIHJD3BPCY1NB3+Bft4sJ LJJ2U/IhPomwku0FsjMsID1iA+vH1irJjGGDmcJB+DGwFG6ChKiPnDySEFN0pxKKt6Erg4jAbANm YTZPgna2i4n5l5BoolsXA8kyvzZMwdxhMuwnwl9BtAB9Kv9r7dbFVzC5JQG982+Bl1Wko+8Z3FMB PtE+4B+APuAepw3bvGxGSJ+nZbkIQz3GJlQ7TKgj+sQ6YdjGSIsBjPJSxBgsifFLApGIs9rCgyIC BmLSU1FF/WxWoNmAhURkBkJEI4+CXKK3K5K5ZGB8KZH5BkX30w8AMzAUYJOwkskKccsYNsHGCYqY wpSeEBPNBQ4cKQ8+sDfE1A7JAD2JJBPD3JIkCMF38McqnIDsXcQFccDpDbubJmzUh5oQiHn76FdP yirNHQrltD30xTVhtOXmDvTcXmAi6V2gyAWNUROs9sIRjBGIwDLltstOpK4kk0S7AVsDRGRmj6Qa QHs20mBC16JndIbfC+dtqnvdgNt9LQUsDZ3IxU4X1zPgoJbbilIiHNhHo/+GGBRFsMFMmxEwqHhq dhBypxIBaHO0ziaLOnUPO/c4qo51O1bUn9x0lCjdSGgtayPNaz0YqiLIL/ASSunOc17q0hjmcD/c FBnRkVUWmoULkGwJABfmD2n+SVB8bKnjZ95nzvryVRZS11lO5nnEr7vdWBRSXsmHzEPKHiIdfdgc Qv+eL9YGjKTaJIhb2NiwaNNXINeJmiRZzexIW4oVmQcET3RCyLjzpEOGsrE6kvACrYEm4gIUHdyZ hHriY7sAPxxEz5ndamfgTPCEsByBCjM8vqegf/ZISHFLoRIb3VAHfMpexLoISBIN4NGbO8phgKhc 8CXoa4H4Q3Mc1cfm2nkBkDSEmmCpj4bN396I7QA6vOBCBnTZRdxpIxVZyOfpmk7QfTQ7WJfoBXl7 nTGXeNqIkIAXpndsMk0gXyiOOvQWMiioNpVGQFyh7pveaJD0eVpd81ouWI9ZjzvdS8LN1xVWQxGA qbkJNakTMnjpiRJSgspGDUBs4iZ+witJPe1mi0JE5xLMMkJdlQnpUJa0WIkDZZpAsDSBohY2BgXn 3XqmFyLSUZYiLYaUMg2ByHSJsNA4geg0Y8oF6qGvN0W3EBdJnpxq2+oVVJQVNaSiySnZRVkvIlhd 5v58HJFDMjUu2UJjBDligYQtyJpLIZaYmc6cEV33e/6otIw1pKk0jTgO5IAPXAR1kAvWLcAR+0HG CyJnRSubcl6/FSJAMqDISd5jtCyN1jrTsBM3ecDXbbyHABkbSscxmMBYCKiwjGCAjHiBsyeU4Czj NLDNoQNJjoNLR0qIUGjokA87e5JkkSQTfuu7k0uw5VkodER3Tr0Ku4ubjCIV8wD0CWbwSlfU6T0d FVB85Kg8kuOC6k2fD0KVXDeXD7lbwOO4DyKhiA7VqLaEiSCVEJBn3f/RgkiIdT2GgyEr5IcofYoh xTjiusFbHRnYG8aOQH+/ARTETgnwHk6PLsM1Yg/M5tHig3KlVvVP7ji3mWvlrUWqTxK29NJkgwmU eeKocLROwpKnYedskSCW6DJz+YuNi1RB501Je04Ja2DawAdglfMRJ3lSUkZAhIU5rwjIelv3E48F aTdAphuAe4ruSZQOBBkOABvA4hhBRYg6+CC4hPOhPgNhN6Uj14KGBhcVGkKFMAQipaSqASAUUBDa Hy9VwF6/S4JxWE0MYmwBF2sb29x0iiIqj1IzlZ0qXinTIwjclqK+QBz+dwLgIugZDN1uKiH8HoNy eYHMbEy+gVZ8wq71Rs6sgX9kO4gB/rC0FskgyIROks0DUQsx+8t+Fk/8XckU4UJBDP/QLA== --===============1131761229==--