#At file:///Users/jdd/bzr-repo/working/5.1-telco-7.2-memcache/ based on revid:john.duncan@stripped
4150 John David Duncan 2011-04-05
Remove Dispatch scheduler; fix deadlock in Stockholm & Flex schedulers; implement flush_all
removed:
storage/ndb/memcache/src/schedulers/Dispatch.cc
storage/ndb/memcache/src/schedulers/Dispatch.h
storage/ndb/memcache/src/schedulers/flex.cc
storage/ndb/memcache/src/schedulers/flex.h
added:
storage/ndb/memcache/src/schedulers/Flex_broker.cc
storage/ndb/memcache/src/schedulers/Flex_broker.h
storage/ndb/memcache/src/schedulers/Flex_cluster.cc
storage/ndb/memcache/src/schedulers/Flex_cluster.h
storage/ndb/memcache/src/schedulers/Flex_thread_spec.h
modified:
storage/ndb/memcache/Makefile.am
storage/ndb/memcache/README
storage/ndb/memcache/include/ClusterConnectionPool.h
storage/ndb/memcache/include/NdbInstance.h
storage/ndb/memcache/include/Operation.h
storage/ndb/memcache/include/QueryPlan.h
storage/ndb/memcache/include/Scheduler.h
storage/ndb/memcache/include/ndb_pipeline.h
storage/ndb/memcache/include/ndb_worker.h
storage/ndb/memcache/include/ndbmemcache_global.h
storage/ndb/memcache/include/workitem.h
storage/ndb/memcache/scripts/metadata.sql
storage/ndb/memcache/src/Configuration.cc
storage/ndb/memcache/src/NdbInstance.cc
storage/ndb/memcache/src/ndb_engine.c
storage/ndb/memcache/src/ndb_pipeline.cc
storage/ndb/memcache/src/ndb_worker.cc
storage/ndb/memcache/src/schedulers/3thread.cc
storage/ndb/memcache/src/schedulers/3thread.h
storage/ndb/memcache/src/schedulers/Stockholm.cc
storage/ndb/memcache/src/schedulers/Stockholm.h
storage/ndb/memcache/src/workitem.c
storage/ndb/memcache/unit/Makefile.am
storage/ndb/memcache/xcode/ndbmemcache.xcodeproj/project.pbxproj
=== modified file 'storage/ndb/memcache/Makefile.am'
--- a/storage/ndb/memcache/Makefile.am 2011-03-30 06:54:53 +0000
+++ b/storage/ndb/memcache/Makefile.am 2011-04-06 04:31:28 +0000
@@ -34,12 +34,14 @@ ndb_engine_la_SOURCES= \
src/workqueue.c \
src/schedulers/Stockholm.h \
src/schedulers/Stockholm.cc \
- src/schedulers/Dispatch.h \
- src/schedulers/Dispatch.cc \
src/schedulers/3thread.h \
src/schedulers/3thread.cc \
- src/schedulers/flex.h \
- src/schedulers/flex.cc \
+ src/schedulers/Flex.h \
+ src/schedulers/Flex.cc \
+ src/schedulers/Flex_broker.h \
+ src/schedulers/Flex_broker.cc \
+ src/schedulers/Flex_cluster.h \
+ src/schedulers/Flex_cluster.cc \
src/atomics.c \
src/debug.c \
src/hash_item_util.c \
=== modified file 'storage/ndb/memcache/README'
--- a/storage/ndb/memcache/README 2011-03-30 06:54:53 +0000
+++ b/storage/ndb/memcache/README 2011-04-06 04:31:28 +0000
@@ -19,11 +19,18 @@ QUICK START
OBTAIN MEMCACHED
--------------------------
-Obtain memcached 1.6 source from github:
-git clone http://github.com/memcached/memcached
-git checkout engine-pu
+# Obtain memcached 1.6 source from github, and then checkout the
+# "engine-pu" branch which contains the "storage engine API" code
+git clone git://github.com/memcached/memcached
+cd memcached
+git checkout -t origin/engine-pu
+DECIDE WHERE MEMCACHED WILL BE INSTALLED
+--------------------------
+# You will have to configure memcached and the NDB engine separately, but
+# it is important
+
BUILD AND INSTALL MEMCACHED
--------------------------
Note that memcached requires libevent 1.3 or newer.
=== modified file 'storage/ndb/memcache/include/ClusterConnectionPool.h'
--- a/storage/ndb/memcache/include/ClusterConnectionPool.h 2011-03-30 06:54:53 +0000
+++ b/storage/ndb/memcache/include/ClusterConnectionPool.h 2011-04-06 04:31:28 +0000
@@ -24,6 +24,8 @@
#include <NdbApi.hpp>
+#include "NdbInstance.h"
+
class ClusterConnectionPool {
/* public instance variables */
@@ -35,7 +37,8 @@ public:
private:
Ndb_cluster_connection *conn;
unsigned int pool_size;
- Ndb_cluster_connection *pool_connections[MAX_CONNECT_POOL];
+ Ndb_cluster_connection * pool_connections[MAX_CONNECT_POOL];
+ NdbInstance * master_instances[MAX_CONNECT_POOL];
/* public class methods */
public:
@@ -59,6 +62,12 @@ public:
/** Get the connection numbered "my_id modulo pool_size" */
Ndb_cluster_connection *getPooledConnection(int my_id) const; // inlined
+
+ /** Set an NdbInstance to be the master instance for a connection */
+ void setNdbInstance(int my_id, NdbInstance *); // inlined
+
+ /** Get the master NdbInstance for this connection */
+ NdbInstance * getNdbInstance(int my_id) const; // inlined
};
/* Inline functions */
@@ -73,4 +82,12 @@ inline Ndb_cluster_connection * ClusterC
return pool_connections[i % pool_size];
}
+inline void ClusterConnectionPool::setNdbInstance(int i, NdbInstance *inst) {
+ master_instances[i % pool_size] = inst;
+}
+
+inline NdbInstance * ClusterConnectionPool::getNdbInstance(int i) const {
+ return master_instances[i % pool_size];
+}
+
#endif
=== modified file 'storage/ndb/memcache/include/NdbInstance.h'
--- a/storage/ndb/memcache/include/NdbInstance.h 2011-03-30 06:54:53 +0000
+++ b/storage/ndb/memcache/include/NdbInstance.h 2011-04-06 04:31:28 +0000
@@ -42,6 +42,7 @@ public:
/* Public Instance Variables */
Ndb *db;
ndbmc_atomic32_t in_use;
+ NdbInstance *next;
protected:
int nplans;
=== modified file 'storage/ndb/memcache/include/Operation.h'
--- a/storage/ndb/memcache/include/Operation.h 2011-03-30 06:54:53 +0000
+++ b/storage/ndb/memcache/include/Operation.h 2011-04-06 04:31:28 +0000
@@ -120,6 +120,7 @@ inline Operation::Operation(QueryPlan *p
key_buffer(kbuf)
{
if(op == OP_READ) record = plan->val_record;
+ else if(op == OP_FLUSH) record = plan->key_record; // scanning delete
else record = plan->row_record;
}
=== modified file 'storage/ndb/memcache/include/QueryPlan.h'
--- a/storage/ndb/memcache/include/QueryPlan.h 2011-03-30 06:54:53 +0000
+++ b/storage/ndb/memcache/include/QueryPlan.h 2011-04-06 04:31:28 +0000
@@ -46,6 +46,7 @@ class QueryPlan {
public:
QueryPlan() : initialized(0) {};
QueryPlan(Ndb *, const TableSpec *, PlanOpts opts = NoOptions);
+ bool keyIsPrimaryKey();
/* public instance variables */
bool initialized;
@@ -71,7 +72,6 @@ class QueryPlan {
private:
/* Private methods */
- bool keyIsPrimaryKey();
const NdbDictionary::Index * chooseIndex();
/* Private instance variables */
=== modified file 'storage/ndb/memcache/include/Scheduler.h'
--- a/storage/ndb/memcache/include/Scheduler.h 2011-03-30 06:54:53 +0000
+++ b/storage/ndb/memcache/include/Scheduler.h 2011-04-06 04:31:28 +0000
@@ -52,6 +52,10 @@ public:
/** schedule() is called from the NDB Engine thread when an operation
is ready to be queued for further async processing */
virtual ENGINE_ERROR_CODE schedule(workitem *) = 0;
+
+ /** io_completed() is called from the NDB Engine thread when an IO
+ completion notification has been received */
+ virtual void io_completed(workitem *) = 0;
/** add_stats() allows the engine to delegate certain statistics
to the scheduler. */
=== modified file 'storage/ndb/memcache/include/ndb_pipeline.h'
--- a/storage/ndb/memcache/include/ndb_pipeline.h 2011-03-30 06:54:53 +0000
+++ b/storage/ndb/memcache/include/ndb_pipeline.h 2011-04-06 04:31:28 +0000
@@ -118,6 +118,11 @@ void * initialize_scheduler(const char *
/** pass a workitem to the configured scheduler, for execution */
ENGINE_ERROR_CODE pipeline_schedule_operation(ndb_pipeline *, struct workitem *);
+/** schedule a "flush_all" operation */
+ENGINE_ERROR_CODE pipeline_flush_all(ndb_pipeline *);
+
+/** notify scheduler of IO completion */
+void pipeline_io_completed(ndb_pipeline *, struct workitem *);
/***** MEMORY MANAGEMENT APIS *****/
=== modified file 'storage/ndb/memcache/include/ndb_worker.h'
--- a/storage/ndb/memcache/include/ndb_worker.h 2011-03-30 06:54:53 +0000
+++ b/storage/ndb/memcache/include/ndb_worker.h 2011-04-06 04:31:28 +0000
@@ -29,4 +29,6 @@ bool worker_prepare_operation(workitem *
bool build_hash_item(workitem *, Operation &);
+ENGINE_ERROR_CODE ndb_flush_all(ndb_pipeline *);
+
#endif
=== modified file 'storage/ndb/memcache/include/ndbmemcache_global.h'
--- a/storage/ndb/memcache/include/ndbmemcache_global.h 2011-03-30 06:54:53 +0000
+++ b/storage/ndb/memcache/include/ndbmemcache_global.h 2011-04-06 04:31:28 +0000
@@ -48,7 +48,8 @@ enum {
OP_READ = 8,
OP_DELETE,
OP_ARITHMETIC,
- OP_SCAN
+ OP_SCAN,
+ OP_FLUSH
};
#endif
=== modified file 'storage/ndb/memcache/include/workitem.h'
--- a/storage/ndb/memcache/include/workitem.h 2011-03-30 06:54:53 +0000
+++ b/storage/ndb/memcache/include/workitem.h 2011-04-06 04:31:28 +0000
@@ -29,9 +29,12 @@
/* struct workitem is used in both C and C++ code, requiring a small hack: */
#ifdef __cplusplus
#include "QueryPlan.h"
+#include "NdbInstance.h"
#define C_OR_CPP_QUERYPLAN QueryPlan
+#define C_OR_CPP_NDBINSTANCE NdbInstance
#else
#define C_OR_CPP_QUERYPLAN void
+#define C_OR_CPP_NDBINSTANCE void
#endif
@@ -46,7 +49,8 @@ typedef struct workitem {
unsigned has_value : 1; /*! are we able to use a no-copy value? */
unsigned retries : 3; /*! how many times this job has been retried */
unsigned complete : 1; /*! is this operation finished? */
- unsigned _unused : 4; /*! (32 bits total) */
+ unsigned broker : 2; /*! for use by the flex scheduler */
+ unsigned _unused : 2; /*! (32 bits total) */
} base;
unsigned int id;
struct workitem *previous; /*! used to chain workitems in multi-key get */
@@ -56,6 +60,8 @@ typedef struct workitem {
uint64_t math_value; /*! IN: incr initial value; OUT: incr result */
hash_item * cache_item; /*! used for write requests */
ndb_pipeline *pipeline; /*! pointer back to request pipeline */
+ C_OR_CPP_NDBINSTANCE *ndb_instance;
+ /*! pointer to ndb instance, if applicable */
const void *cookie; /*! memcached's connection cookie */
C_OR_CPP_QUERYPLAN *plan; /*! QueryPlan for resolving this request */
const char *key; /*! pointer to the key */
@@ -137,6 +143,9 @@ const char * workitem_get_key_suffix(wor
*/
size_t workitem_get_key_buf_size(int nkey);
+/*! Set the workitem's NdbInstance
+*/
+void workitem_set_NdbInstance(workitem*, C_OR_CPP_NDBINSTANCE *);
END_FUNCTIONS_WITH_C_LINKAGE
=== modified file 'storage/ndb/memcache/scripts/metadata.sql'
--- a/storage/ndb/memcache/scripts/metadata.sql 2011-03-30 06:54:53 +0000
+++ b/storage/ndb/memcache/scripts/metadata.sql 2011-04-06 04:31:28 +0000
@@ -209,6 +209,7 @@ INSERT INTO memcache_server_roles (role_
INSERT INTO memcache_server_roles (role_name, role_id) VALUES ("db-only", 1);
INSERT INTO memcache_server_roles (role_name, role_id) VALUES ("mc-only", 2);
INSERT INTO memcache_server_roles (role_name, role_id) VALUES ("ndb-caching", 3);
+INSERT INTO memcache_server_roles (role_name, role_id) VALUES ("test", 4);
-- ndb_clusters table
-- Create an entry for the primary cluster.
@@ -216,9 +217,10 @@ INSERT INTO ndb_clusters values (0, @@nd
-- cache_policies table
-- Create some sample policies.
-INSERT INTO cache_policies(policy_name, get_policy, set_policy, delete_policy)
+INSERT INTO cache_policies
VALUES("memcache-only", "cache_only", "cache_only", "cache_only"),
("ndb-only", "ndb_only", "ndb_only", "ndb_only"),
+ ("ndb-test", "ndb_only", "ndb_only", "ndb_only", "true"),
("caching", "caching", "caching", "caching"),
("caching-with-local-deletes", "caching", "caching", "cache_only"),
("ndb-read-only", "ndb_only", "disabled", "disabled");
@@ -241,7 +243,8 @@ INSERT INTO key_prefixes (server_role_id
(1, "", 0, "ndb-only", "demo_table"),
(1, "t:", 0, "ndb-only", "demo_tabs"),
(2, "", 0, "memcache-only", NULL),
- (3, "", 0, "caching", "demo_table")
+ (3, "", 0, "caching", "demo_table"),
+ (4, "", 0, "ndb-test", "demo_table")
;
=== modified file 'storage/ndb/memcache/src/Configuration.cc'
--- a/storage/ndb/memcache/src/Configuration.cc 2011-03-30 06:54:53 +0000
+++ b/storage/ndb/memcache/src/Configuration.cc 2011-04-06 04:31:28 +0000
@@ -513,7 +513,9 @@ bool config_v1::get_policies() {
success = false;
}
DEBUG_PRINT("map size: %d", policies.size());
-
+
+ tx->close();
+
return success;
}
=== modified file 'storage/ndb/memcache/src/NdbInstance.cc'
--- a/storage/ndb/memcache/src/NdbInstance.cc 2011-03-30 06:54:53 +0000
+++ b/storage/ndb/memcache/src/NdbInstance.cc 2011-04-06 04:31:28 +0000
@@ -33,6 +33,7 @@ NdbInstance::NdbInstance(Ndb_cluster_con
nplans = nprefixes;
db = new Ndb(c);
in_use = false;
+ next = 0;
plans = new QueryPlan *[nplans];
memset(plans, 0, (nplans * sizeof(QueryPlan *)));
db->init(ntransactions);
=== modified file 'storage/ndb/memcache/src/ndb_engine.c'
--- a/storage/ndb/memcache/src/ndb_engine.c 2011-03-30 06:54:53 +0000
+++ b/storage/ndb/memcache/src/ndb_engine.c 2011-04-06 04:31:28 +0000
@@ -100,22 +100,27 @@ ENGINE_ERROR_CODE create_instance(uint64
ndb_eng->npipelines = 0;
ndb_eng->engine.interface.interface = 1;
- ndb_eng->engine.get_info = ndb_get_info;
- ndb_eng->engine.initialize = ndb_initialize;
- ndb_eng->engine.destroy = ndb_destroy;
- ndb_eng->engine.allocate = ndb_allocate;
- ndb_eng->engine.remove = ndb_remove;
- ndb_eng->engine.release = ndb_release;
- ndb_eng->engine.get = ndb_get;
- ndb_eng->engine.get_stats = ndb_get_stats;
- ndb_eng->engine.reset_stats = ndb_reset_stats;
- ndb_eng->engine.store = ndb_store;
- ndb_eng->engine.arithmetic = ndb_arithmetic;
- ndb_eng->engine.flush = ndb_flush;
- ndb_eng->engine.unknown_command = ndb_unknown_command;
- ndb_eng->engine.item_set_cas = item_set_cas; /* reused */
- ndb_eng->engine.get_item_info = ndb_get_item_info;
-
+ ndb_eng->engine.get_info = ndb_get_info;
+ ndb_eng->engine.initialize = ndb_initialize;
+ ndb_eng->engine.destroy = ndb_destroy;
+ ndb_eng->engine.allocate = ndb_allocate;
+ ndb_eng->engine.remove = ndb_remove;
+ ndb_eng->engine.release = ndb_release;
+ ndb_eng->engine.get = ndb_get;
+ ndb_eng->engine.get_stats = ndb_get_stats;
+ ndb_eng->engine.reset_stats = ndb_reset_stats;
+ ndb_eng->engine.store = ndb_store;
+ ndb_eng->engine.arithmetic = ndb_arithmetic;
+ ndb_eng->engine.flush = ndb_flush;
+ ndb_eng->engine.unknown_command = ndb_unknown_command;
+ ndb_eng->engine.item_set_cas = item_set_cas; /* reused */
+ ndb_eng->engine.get_item_info = ndb_get_item_info;
+ ndb_eng->engine.get_stats_struct = NULL;
+ ndb_eng->engine.aggregate_stats = NULL;
+ ndb_eng->engine.tap_notify = NULL;
+ ndb_eng->engine.get_tap_iterator = NULL;
+ ndb_eng->engine.errinfo = NULL;
+
ndb_eng->server = *api;
ndb_eng->get_server_api = get_server_api;
@@ -127,8 +132,11 @@ ENGINE_ERROR_CODE create_instance(uint64
ndb_eng->info.info.description = "NDB Memcache " VERSION;
ndb_eng->info.info.num_features = 3;
ndb_eng->info.info.features[0].feature = ENGINE_FEATURE_CAS;
+ ndb_eng->info.info.features[0].description = NULL;
ndb_eng->info.info.features[1].feature = ENGINE_FEATURE_PERSISTENT_STORAGE;
+ ndb_eng->info.info.features[1].description = NULL;
ndb_eng->info.info.features[2].feature = ENGINE_FEATURE_LRU;
+ ndb_eng->info.info.features[2].description = NULL;
/* Now call create_instace() for the default engine */
e = create_my_default_instance(interface, get_server_api,
@@ -259,11 +267,10 @@ static ENGINE_ERROR_CODE ndb_allocate(EN
const int flags,
const rel_time_t exptime)
{
+ DEBUG_ENTER();
struct ndb_engine* ndb_eng = ndb_handle(handle);
struct default_engine *def_eng = default_handle(ndb_eng);
- /* ndb_pipeline *pipeline = get_my_pipeline_config(ndb_eng); */
- /* DEBUG_ENTER(); */
return def_eng->engine.allocate(ndb_eng->m_default_engine, cookie, item,
key, nkey, nbytes, flags, exptime);
@@ -284,7 +291,6 @@ static ENGINE_ERROR_CODE ndb_remove(ENGI
ENGINE_ERROR_CODE return_status = ENGINE_KEY_ENOENT;
prefix_info_t prefix;
workitem *wqitem;
- DEBUG_ENTER();
/* Is this a callback after completed I/O? */
wqitem = ndb_eng->server.cookie->get_engine_specific(cookie);
@@ -292,6 +298,7 @@ static ENGINE_ERROR_CODE ndb_remove(ENGI
DEBUG_PRINT("Got callback: %s", wqitem->status->comment);
ndb_eng->server.cookie->store_engine_specific(cookie, wqitem->previous);
return_status = wqitem->status->status;
+ pipeline_io_completed(pipeline, wqitem);
workitem_free(wqitem);
return return_status;
}
@@ -380,6 +387,7 @@ static ENGINE_ERROR_CODE ndb_get(ENGINE_
pipeline->id, wqitem->id, wqitem->status->comment);
*item = wqitem->cache_item;
wqitem->base.complete = 1;
+ pipeline_io_completed(pipeline, wqitem);
if(wqitem->status->status != ENGINE_SUCCESS) {
DEBUG_PRINT("pop and free the workitem.");
ndb_eng->server.cookie->store_engine_specific(cookie, wqitem->previous);
@@ -458,6 +466,7 @@ static ENGINE_ERROR_CODE ndb_store(ENGIN
if(wqitem) {
// fixme: chaining
DEBUG_PRINT("Got callback: %s", wqitem->status->comment);
+ pipeline_io_completed(pipeline, wqitem);
return wqitem->status->status;
}
@@ -513,6 +522,7 @@ static ENGINE_ERROR_CODE ndb_arithmetic(
DEBUG_PRINT("Got arithmetic callback: %s", wqitem->status->comment);
wqitem->base.complete = 1;
*result = wqitem->math_value;
+ pipeline_io_completed(pipeline, wqitem);
if(wqitem->status->status != ENGINE_SUCCESS) {
DEBUG_PRINT("pop and free the workitem.");
ndb_eng->server.cookie->store_engine_specific(cookie, wqitem->previous);
@@ -552,11 +562,22 @@ static ENGINE_ERROR_CODE ndb_arithmetic(
static ENGINE_ERROR_CODE ndb_flush(ENGINE_HANDLE* handle,
const void* cookie, time_t when)
{
+/*
+ Notes on flush:
+ The server will call *only* into ndb_flush (not to allocate or release).
+ The NDB engine ignores the "when" parameter.
+ Flush operations have special handling, outside of the scheduler.
+ They are performed synchronously.
+ And we always send the flush command to the cache engine.
+*/
+
+ DEBUG_ENTER();
struct ndb_engine* ndb_eng = ndb_handle(handle);
struct default_engine *def_eng = default_handle(ndb_eng);
- /* ndb_pipeline *pipeline = get_my_pipeline_config(ndb_eng); */
- /* DEBUG_ENTER(); */
- return def_eng->engine.flush(ndb_eng->m_default_engine, cookie, when);
+ ndb_pipeline *pipeline = get_my_pipeline_config(ndb_eng);
+
+ (void) def_eng->engine.flush(ndb_eng->m_default_engine, cookie, when);
+ return pipeline_flush_all(pipeline);
}
@@ -566,9 +587,9 @@ static ENGINE_ERROR_CODE ndb_unknown_com
protocol_binary_request_header *request,
ADD_RESPONSE response)
{
+ DEBUG_ENTER();
struct ndb_engine* ndb_eng = ndb_handle(handle);
struct default_engine *def_eng = default_handle(ndb_eng);
- /* ndb_pipeline *pipeline = get_my_pipeline_config(ndb_eng); */
return def_eng->engine.unknown_command(ndb_eng->m_default_engine, cookie,
request, response);
=== modified file 'storage/ndb/memcache/src/ndb_pipeline.cc'
--- a/storage/ndb/memcache/src/ndb_pipeline.cc 2011-03-30 06:54:53 +0000
+++ b/storage/ndb/memcache/src/ndb_pipeline.cc 2011-04-06 04:31:28 +0000
@@ -33,11 +33,11 @@
#include "ndb_engine.h"
#include "debug.h"
#include "thread_identifier.h"
+#include "ndb_worker.h"
#include "schedulers/3thread.h"
#include "schedulers/Stockholm.h"
-#include "schedulers/Dispatch.h"
-#include "schedulers/flex.h"
+#include "schedulers/Flex.h"
#define DEFAULT_SCHEDULER Scheduler_flex
@@ -96,7 +96,7 @@ ndb_pipeline * ndb_pipeline_initialize(s
self->engine_thread_id = pthread_self();
/* Get the configuration */
- Configuration Conf = get_Configuration();
+ const Configuration & Conf = get_Configuration();
/* Create and set a thread identity */
tid = (thread_identifier *) memory_pool_alloc(self->pool, sizeof(thread_identifier));
@@ -181,8 +181,6 @@ void * initialize_scheduler(const char *
s = new Scheduler_flex;
if(!strncasecmp(cf, "stockholm", 10))
s = new Scheduler_stockholm;
- else if(!strncasecmp(cf, "dispatch", 9))
- s = new Scheduler_dispatch;
else if(!strncasecmp(cf, "3-thread", 9))
s = new Scheduler_3thread;
else {
@@ -205,6 +203,16 @@ ENGINE_ERROR_CODE pipeline_schedule_oper
}
+ENGINE_ERROR_CODE pipeline_flush_all(ndb_pipeline *self) {
+ return ndb_flush_all(self);
+}
+
+
+void pipeline_io_completed(ndb_pipeline *self, struct workitem *item) {
+ self->scheduler->io_completed(item);
+}
+
+
/* The slab allocator API */
int pipeline_get_size_class_id(size_t object_size) {
=== modified file 'storage/ndb/memcache/src/ndb_worker.cc'
--- a/storage/ndb/memcache/src/ndb_worker.cc 2011-03-30 06:54:53 +0000
+++ b/storage/ndb/memcache/src/ndb_worker.cc 2011-04-06 04:31:28 +0000
@@ -46,11 +46,11 @@
#include "status_block.h"
#include "Operation.h"
#include "ndb_engine.h"
-#include "debug.h"
#include "hash_item_util.h"
#include "ndb_worker.h"
void incrCallback(int, NdbTransaction *, void *); // callback for incr/decr
+void flush_scan(int result, NdbTransaction *tx, void *itemptr); // callback for flush
void DBcallback(int, NdbTransaction *, void *); // callback for all others
bool worker_do_read(workitem *, bool with_cas);
@@ -62,6 +62,7 @@ void worker_set_cas(ndb_pipeline *, uint
bool finalize_read(workitem *);
bool finalize_write(workitem *, bool);
int build_cas_routine(NdbInterpretedCode *r, int cas_col, uint64_t cas_val);
+bool scan_delete(NdbInstance *, QueryPlan *);
extern EXTENSION_LOGGER_DESCRIPTOR *logger;
@@ -96,7 +97,7 @@ status_block status_block_bad_add =
status_block status_block_bad_replace =
{ ENGINE_NOT_STORED, "Tuple not found" };
-
+
void worker_set_cas(ndb_pipeline *p, uint64_t *cas) {
/* Be careful here -- ndbmc_atomic32_t might be a signed type.
@@ -143,6 +144,7 @@ bool worker_prepare_operation(workitem *
default:
return false; /* not supported */
}
+
return r; /* fixme: distinguish "not supported" from "failed" */
}
@@ -651,7 +653,7 @@ bool finalize_read(workitem *wqitem) {
&& op.getStringValueNoCopy(COL_STORE_VALUE, & wqitem->value_ptr, & wqitem->value_size)
&& op.appendCRLF(COL_STORE_VALUE, wqitem->value_size))
{
- /* The workiten's value_ptr and value_size were set above. */
+ /* The workitem's value_ptr and value_size were set above. */
DEBUG_PRINT("using no-copy buffer.");
wqitem->base.has_value = true;
need_hash_item = false;
@@ -766,7 +768,70 @@ int build_cas_routine(NdbInterpretedCode
return r->finalise(); // resolve the label/branch
}
+
+/* Flush all is a fully synchronous operation --
+ the memcache server is waiting for a response, and the thread is blocked.
+*/
+ENGINE_ERROR_CODE ndb_flush_all(ndb_pipeline *pipeline) {
+ DEBUG_ENTER();
+ const Configuration &conf = get_Configuration();
+
+ DEBUG_PRINT(" %d prefixes", conf.nprefixes);
+ for(int i = 0 ; i < conf.nprefixes ; i++) {
+ const KeyPrefix *p = conf.getPrefix(i);
+ if(p->info.use_ndb && p->info.do_db_flush) {
+ NdbInstance *inst = conf.getConnectionById(p->info.cluster_id)->
+ getNdbInstance(pipeline->id);
+ QueryPlan *plan = inst->getPlanForPrefix(p);
+ if(plan->keyIsPrimaryKey()) {
+ /* To flush, scan the table and delete every row */
+ /* ToDo: ensure against duplicates (don't scan the same table twice) */
+ DEBUG_PRINT("deleting from %s", p->table->table_name);
+ scan_delete(inst, plan);
+ }
+ else DEBUG_PRINT(" not scanning table %s -- access path is not primary key",
+ p->table->table_name);
+ }
+ else DEBUG_PRINT(" Not scanning table %s -- use_ndb:%d flush:%d",
+ p->table->table_name, p->info.use_ndb, p->info.do_db_flush);
+ }
+
+ return ENGINE_SUCCESS;
+}
+
+
+bool scan_delete(NdbInstance *inst, QueryPlan *plan) {
+ DEBUG_ENTER();
+ int res = 0;
+ int check;
+ NdbTransaction *tx = inst->db->startTransaction();
+ NdbScanOperation *scan = tx->getNdbScanOperation(plan->table);
+ scan->readTuplesExclusive();
+
+ /* execute NoCommit */
+ if((res = tx->execute(NdbTransaction::NoCommit)) != 0)
+ logger->log(LOG_WARNING, 0, "execute(NoCommit): %s\n", tx->getNdbError().message);
+
+ /* scan and delete */
+ while(scan->nextResult(true) == 0) {
+ do {
+ if((res = scan->deleteCurrentTuple()) != 0) {
+ logger->log(LOG_WARNING, 0, "deleteCurrentTuple(): %s\n",
+ tx->getNdbError().message);
+ }
+ } while((check = scan->nextResult(false)) == 0);
+
+ /* execute a batch */
+ if(check != -1)
+ if((res = tx->execute(NdbTransaction::NoCommit)) != 0)
+ logger->log(LOG_WARNING, 0, "execute(NoCommit): %s\n",
+ tx->getNdbError().message);
+ }
+ tx->execute(NdbTransaction::Commit); // execute & commit
+ tx->close();
+ return (res == 0);
+}
=== modified file 'storage/ndb/memcache/src/schedulers/3thread.cc'
--- a/storage/ndb/memcache/src/schedulers/3thread.cc 2011-03-30 06:54:53 +0000
+++ b/storage/ndb/memcache/src/schedulers/3thread.cc 2011-04-06 04:31:28 +0000
@@ -22,6 +22,7 @@
#define __STDC_FORMAT_MACROS
#include <inttypes.h>
#include <stdio.h>
+#include <unistd.h>
/* Memcache headers */
#include "memcached/types.h"
@@ -43,7 +44,7 @@ extern "C" {
status_block status_block_unsupported = { ENGINE_ENOTSUP, "Not supported." };
void Scheduler_3thread::init(int my_thread, int, const char *) {
- const Configuration conf = get_Configuration();
+ const Configuration & conf = get_Configuration();
ClusterConnectionPool *pool;
Ndb_cluster_connection *conn;
LockableNdbInstance *inst;
@@ -175,7 +176,7 @@ void * run_3thd_worker_thread(void *s) {
void * Scheduler_3thread::run_ndb_worker_thread() {
DEBUG_ENTER();
workitem *newitem;
- const Configuration conf = get_Configuration(); // see note
+ const Configuration & conf = get_Configuration(); // see note
int lockerr;
unsigned int ncycles = 0;
=== modified file 'storage/ndb/memcache/src/schedulers/3thread.h'
--- a/storage/ndb/memcache/src/schedulers/3thread.h 2011-03-30 06:54:53 +0000
+++ b/storage/ndb/memcache/src/schedulers/3thread.h 2011-04-06 04:31:28 +0000
@@ -42,6 +42,7 @@ public:
void init(int threadnum, int nthreads, const char *config_string);
void attach_thread(thread_identifier *);
ENGINE_ERROR_CODE schedule(workitem *);
+ void io_completed(workitem *) {};
void add_stats(ADD_STAT, const void *);
void * run_ndb_worker_thread();
void * run_ndb_commit_thread();
=== removed file 'storage/ndb/memcache/src/schedulers/Dispatch.cc'
--- a/storage/ndb/memcache/src/schedulers/Dispatch.cc 2011-03-30 06:54:53 +0000
+++ b/storage/ndb/memcache/src/schedulers/Dispatch.cc 1970-01-01 00:00:00 +0000
@@ -1,187 +0,0 @@
-/*
- Copyright (c) 2011, Oracle and/or its affiliates. All rights
- reserved.
-
- This program is free software; you can redistribute it and/or
- modify it under the terms of the GNU General Public License
- as published by the Free Software Foundation; version 2 of
- the License.
-
- This program is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- GNU General Public License for more details.
-
- You should have received a copy of the GNU General Public License
- along with this program; if not, write to the Free Software
- Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- 02110-1301 USA
- */
-/* System headers */
-/* C++ files must define __STDC_FORMAT_MACROS in order to get PRIu64 */
-#include "config.h"
-#ifdef HAVE_DISPATCH_DISPATCH_H
-
-#define __STDC_FORMAT_MACROS
-#include <inttypes.h>
-#include <dispatch/dispatch.h>
-#include <stdio.h>
-
-/* Memcache headers */
-#include "memcached/types.h"
-#include <memcached/extension_loggers.h>
-
-/* NDB Memcache headers */
-#include "Dispatch.h"
-#include "workitem.h"
-#include "ndb_worker.h"
-
-extern EXTENSION_LOGGER_DESCRIPTOR *logger;
-
-extern "C" {
- void dispatch_pollNdb(void *arg);
-}
-
-
-void Scheduler_dispatch::init(int, int nthreads, const char *) {
- const Configuration conf = get_Configuration();
-
- /* Get the NDB instances.
-
- Each pipeline has 75 NDB instances per cluster.
- Each instance can handle 1 transaction.
-
- With 75 instances, a thread can handle 25,000 ops/sec (25 ops per ms)
- and all operations can be in-flight for 3 milliseconds.
-
- We maintain the instances in an array of lists, so
- instances[n] is the head list of instances for cluster n.
-
- TODO: This needs to work with all defined clusters
- */
-
- for(int j = 0 ; j < 75 ; j++) {
- NdbInstance *inst = new NdbInstance(conf.getConnectionById(0),
- conf.nprefixes,
- 1);
- inst->db->init(1);
- instances[j] = inst;
- }
-
- /* Now hoarde some transactions (API connect records). startTransaction()
- will send TC_SEIZEREQ and wait for a reply. This really needs to be done
- at initialization time when nobody is waiting for it.
- TODO: this should be one array per cluster
- */
- NdbTransaction * txlist[75];
- for(int j = 0 ; j < 75 ; j++) {
- txlist[j] = instances[j]->db->startTransaction();
- }
-
- for(int j = 0 ; j < 75 ; j++) {
- txlist[j]->close();
- }
-
- /* Get the dispatch queue */
- asyncqueue = dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_HIGH, 0);
-}
-
-
-void Scheduler_dispatch::attach_thread(thread_identifier * p) {
-
- pipeline = parent->pipeline;
- logger->log(LOG_WARNING, 0, "Pipeline %d attached to libdispatch scheduler.\n",
- pipeline->id);
-
- /* Initialize counters */
- instance_counter = 0;
-}
-
-
-ENGINE_ERROR_CODE Scheduler_dispatch::schedule(workitem *newitem) {
- NdbInstance *inst;
- const Configuration conf = get_Configuration(); // see note
-
- /* Fetch the config for its key prefix */
- const KeyPrefix *pfx = conf.getPrefixByInfo(newitem->prefix_info);
-
- /* From here on we will work mainly with the suffix part of the key. */
- newitem->base.nsuffix = newitem->base.nkey - pfx->prefix_len;
- // FIXME: runtime error back to client?
- DEBUG_ASSERT(newitem->base.nsuffix > 0);
-
- /* Get an NDB instance. But if the instance is in use, go forward and
- get the next one. If something goes wrong (cluster crash, disconnected...)
- and the commit thread can no longer release instances, it's possible that
- there will be no free instances and this will loop forever.
- FIXME: catch this condition somehow.
- */
- do {
- inst = instances[instance_counter];
- instance_counter = (++ instance_counter % 75);
- } while(inst->in_use == true);
-
- // Now we've got an NDB Instance.
- inst->in_use = true;
-
- // Fetch the query plan for this prefix.
- newitem->plan = inst->getPlanForPrefix(pfx);
-
- // Build the NDB transaction
- if(worker_prepare_operation(newitem)) {
- // Queue the operation for a dispatch thread.
- dispatch_async_f(asyncqueue, inst, dispatch_pollNdb);
- return ENGINE_EWOULDBLOCK;
- }
- else return ENGINE_ENOTSUP;
-}
-
-
-void Scheduler_dispatch::add_stats(ADD_STAT add_stat,
- const void * cookie) {
- return;
-}
-
-
-/*
- libdispatch version of the commit thread:
- Take an NdbInstance, call pollNdb() on it, then mark it as free:
-*/
-void dispatch_pollNdb(void *arg) {
- NdbInstance *inst = (NdbInstance *) arg;
-
- /* Poll */
- inst->db->pollNdb();
-
- /* Now we are done with the instance */
- inst->in_use = false;
-}
-
-
-#else
-
-/* HAVE_DISPATCH_DISPATCH_H */
-/* Stub version of the dispatch scheduler. */
-/* This allows the C++ compiler to create a vtable, so that the engine can be
- linked into memcached, but the stub constructor in dispatch.h will produce
- an error message and exit.
-*/
-
-/* System headers */
-#include <stdio.h>
-
-/* Memcache headers */
-#include "memcached/types.h"
-#include <memcached/extension_loggers.h>
-
-/* NDB Memcache headers */
-#include "Dispatch.h"
-
-
-void Scheduler_dispatch::init(int, int, const char *) { return; }
-void Scheduler_dispatch::attach_thread(thread_identifier *) { return; }
-void Scheduler_dispatch::add_stats(ADD_STAT,const void *) { return; }
-ENGINE_ERROR_CODE Scheduler_dispatch::schedule(workitem *) { return ENGINE_FAILED; }
-
-
-#endif
=== removed file 'storage/ndb/memcache/src/schedulers/Dispatch.h'
--- a/storage/ndb/memcache/src/schedulers/Dispatch.h 2011-03-30 06:54:53 +0000
+++ b/storage/ndb/memcache/src/schedulers/Dispatch.h 1970-01-01 00:00:00 +0000
@@ -1,83 +0,0 @@
-/*
- Copyright (c) 2011, Oracle and/or its affiliates. All rights
- reserved.
-
- This program is free software; you can redistribute it and/or
- modify it under the terms of the GNU General Public License
- as published by the Free Software Foundation; version 2 of
- the License.
-
- This program is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- GNU General Public License for more details.
-
- You should have received a copy of the GNU General Public License
- along with this program; if not, write to the Free Software
- Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- 02110-1301 USA
- */
-#ifndef NDBMEMCACHE_DISPATCH_SCHEDULER_H
-#define NDBMEMCACHE_DISPATCH_SCHEDULER_H
-
-#ifndef __cplusplus
-#error "This file is for C++ only"
-#endif
-
-#include <memcached/types.h>
-
-#include "config.h"
-#include "Scheduler.h"
-
-#ifdef HAVE_DISPATCH_DISPATCH_H
-#include <dispatch/dispatch.h>
-
-/*
- * Dispatch Scheduler
- *
- * The dispatch scheduler runs only on platforms that support libdispatch.
- * It uses a large number of Ndb objects.
- */
-
-
-class Scheduler_dispatch : public Scheduler {
-public:
- Scheduler_dispatch() {};
- ~Scheduler_dispatch();
- void init(int threadnum, int nthreads, const char *config_string);
- void attach_thread(thread_identifier *);
- ENGINE_ERROR_CODE schedule(workitem *);
- void add_stats(ADD_STAT, const void *);
-
-private:
- dispatch_queue_t asyncqueue;
- int instance_counter;
- NdbInstance *instances[75];
-};
-
-
-#else
- /* Crippled version of the libdispatch scheduler. */
-
-class Scheduler_dispatch : public Scheduler {
-public:
- Scheduler_dispatch();
- void init(int threadnum, int nthreads, const char *config_string);
- void attach_thread(thread_identifier *);
- ENGINE_ERROR_CODE schedule(workitem *);
- void add_stats(ADD_STAT, const void *);
-};
-
-
-inline Scheduler_dispatch::Scheduler_dispatch() {
- fprintf(stderr, "ndbmemcache was compiled without libdispatch. "
- "The dispatch scheduler is not available.\n "
- "memcached will now exit.\n");
- exit(99);
-}
-
-#endif
-
-
-#endif
-
=== added file 'storage/ndb/memcache/src/schedulers/Flex_broker.cc'
--- a/storage/ndb/memcache/src/schedulers/Flex_broker.cc 1970-01-01 00:00:00 +0000
+++ b/storage/ndb/memcache/src/schedulers/Flex_broker.cc 2011-04-06 04:31:28 +0000
@@ -0,0 +1,92 @@
+/*
+ Copyright (c) 2011, Oracle and/or its affiliates. All rights
+ reserved.
+
+ This program is free software; you can redistribute it and/or
+ modify it under the terms of the GNU General Public License
+ as published by the Free Software Foundation; version 2 of
+ the License.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ 02110-1301 USA
+ */
+
+#include <stdlib.h>
+#include <unistd.h>
+
+#include "workitem.h"
+#include "KeyPrefix.h"
+#include "Configuration.h"
+#include "debug.h"
+#include "Flex.h"
+#include "Flex_thread_spec.h"
+#include "Flex_broker.h"
+#include "Flex_cluster.h"
+
+
+Scheduler_flex::Broker::Broker(Scheduler_flex *s, int my_id) :
+ sched(s),
+ id(my_id),
+ conf(get_Configuration()),
+ queue(s->broker_queue)
+{
+ clusters = (Cluster **) calloc(conf.nclusters, sizeof(Cluster *));
+
+ for(int c = 0 ; c < conf.nclusters ; c++) {
+ clusters[c] = new Cluster(sched, id, c);
+ }
+}
+
+
+ENGINE_ERROR_CODE Scheduler_flex::Broker::schedule(workitem *item) {
+ const KeyPrefix *pfx = conf.getPrefixByInfo(item->prefix_info);
+
+ if(item->prefix_info.prefix_id) {
+ DEBUG_PRINT("prefix %d: \"%s\" Table: %s Value Cols: %d",
+ item->prefix_info.prefix_id, pfx->prefix,
+ pfx->table->table_name, pfx->table->nvaluecols);
+ }
+
+ /* Record the suffix length */
+ item->base.nsuffix = item->base.nkey - pfx->prefix_len;
+ if(item->base.nsuffix == 0) return ENGINE_EINVAL; // key too short
+
+ /* Set my broker id in the workitem */
+ item->base.broker = id;
+
+ clusters[item->prefix_info.cluster_id]->schedule(item, pfx);
+}
+
+
+void Scheduler_flex::Broker::contribute_stats() {
+ for(int i = 0 ; i < conf.nclusters ; i++)
+ clusters[i]->contribute_stats();
+}
+
+
+/*
+ Broker thread: get a workitem off the workqueue, and build its operations.
+*/
+void * Scheduler_flex::Broker::run_broker_thread() {
+ workitem *newitem;
+
+ DEBUG_ENTER();
+
+ while(1) {
+ /* Wait for something to appear on the queue */
+ newitem = (workitem *) workqueue_consumer_wait(queue);
+
+ if(newitem == NULL) return 0; /* queue has been shut down and emptied */
+
+ (void) schedule(newitem);
+ }
+}
+
+
=== added file 'storage/ndb/memcache/src/schedulers/Flex_broker.h'
--- a/storage/ndb/memcache/src/schedulers/Flex_broker.h 1970-01-01 00:00:00 +0000
+++ b/storage/ndb/memcache/src/schedulers/Flex_broker.h 2011-04-06 04:31:28 +0000
@@ -0,0 +1,54 @@
+/*
+ Copyright (c) 2011, Oracle and/or its affiliates. All rights
+ reserved.
+
+ This program is free software; you can redistribute it and/or
+ modify it under the terms of the GNU General Public License
+ as published by the Free Software Foundation; version 2 of
+ the License.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ 02110-1301 USA
+ */
+
+#ifndef NDBMEMCACHE_FLEX_BROKER_H
+#define NDBMEMCACHE_FLEX_BROKER_H
+
+#ifndef __cplusplus
+#error "This file is for C++ only"
+#endif
+
+
+class Scheduler_flex::Broker {
+ friend class Scheduler_flex;
+ friend class thread_spec;
+
+public:
+ Broker(Scheduler_flex *, int my_id);
+ ~Broker();
+ ENGINE_ERROR_CODE schedule(workitem *); /*< schedule item on its cluster */
+
+ pthread_t thread_id;
+ Cluster ** clusters;
+
+protected:
+ void * run_broker_thread();
+ void contribute_stats();
+
+private:
+ int id;
+ const Configuration conf;
+ Scheduler_flex *sched;
+ struct workqueue *queue;
+};
+
+
+#endif
+
=== added file 'storage/ndb/memcache/src/schedulers/Flex_cluster.cc'
--- a/storage/ndb/memcache/src/schedulers/Flex_cluster.cc 1970-01-01 00:00:00 +0000
+++ b/storage/ndb/memcache/src/schedulers/Flex_cluster.cc 2011-04-06 04:31:28 +0000
@@ -0,0 +1,248 @@
+/*
+ Copyright (c) 2011, Oracle and/or its affiliates. All rights
+ reserved.
+
+ This program is free software; you can redistribute it and/or
+ modify it under the terms of the GNU General Public License
+ as published by the Free Software Foundation; version 2 of
+ the License.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ 02110-1301 USA
+ */
+
+
+#include "workitem.h"
+#include "KeyPrefix.h"
+#include "Configuration.h"
+#include "debug.h"
+#include "Flex.h"
+#include "Flex_thread_spec.h"
+#include "Flex_broker.h"
+#include "Flex_cluster.h"
+#include "ndb_worker.h"
+
+extern EXTENSION_LOGGER_DESCRIPTOR *logger;
+
+extern "C" {
+ void * run_flex_commit_thread(void *);
+}
+
+
+Scheduler_flex::Cluster::Cluster(Scheduler_flex *s,
+ int broker,
+ int cluster) : sched(s),
+ broker_id(broker),
+ id(cluster),
+ conf(get_Configuration()) {
+ DEBUG_PRINT("thread: %d, broker: %d, cluster: %d", sched->thread_id,
+ broker, cluster);
+ /* Get the connection pool for my cluster */
+ ClusterConnectionPool *pool = conf.getConnectionById(id);
+
+ /* Get a number of NDB instances.
+ That number is determined here, using max_tps and usec_rtt.
+ We allow a transaction to be in-flight for 5 * RTT,
+ and we need enough NDB objects to meet one thread's share of max_tps.
+ I go step-by-step through the math here (the compiler may optimize it).
+ */
+ double tx_time_in_usec = pool->usec_rtt * 5;
+ double tx_per_ndb_per_sec = 1000000 / tx_time_in_usec;
+ double total_ndb_objects = conf.max_tps / tx_per_ndb_per_sec;
+ double ndbs_per_pipeline = total_ndb_objects / sched->config.n_engine_threads;
+ nInst = (int) (ndbs_per_pipeline / sched->nbrokers);
+
+ /* For now, the queue size is fixed */
+ queue_size = 8192;
+
+ /* Do we need more connections? */
+ for(int n = pool->getPoolSize(); n < sched->config.n_connections; n++)
+ (void) pool->addPooledConnection(); // Maybe not all were added; that's OK.
+
+ /* Now obtain the appropriate one for this thread. */
+ Ndb_cluster_connection *conn = pool->getPooledConnection(sched->thread_id);
+ DEBUG_PRINT("broker %d, cluster %d, node %d: "
+ "%d TPS @ %d usec RTT ==> %d NDB instances.", broker_id, id,
+ conn->node_id(), conf.max_tps, pool->usec_rtt, nInst);
+
+ // Get (and store) the master NDB instance for the connection
+ pool->setNdbInstance(sched->thread_id,
+ new NdbInstance(conn, conf.nprefixes, 128));
+
+ // Get the NDB instances
+ instances = (NdbInstance**) calloc(nInst, sizeof(NdbInstance *));
+ for(int i = 0; i < nInst ; i++) {
+ NdbInstance *inst = new NdbInstance(conn, conf.nprefixes, 1);
+ instances[i] = inst;
+ inst->next = nextFree;
+ nextFree = inst;
+ }
+
+ /* Hoard a transaction (an API connect record) for each Ndb object. This
+ first call to startTransaction() will send TC_SEIZEREQ and wait for a
+ reply, but later at runtime startTransaction() should return immediately.
+ Also, for each NDB instance, ore-build the QueryPlan for the default key prefix.
+ TODO? Start one tx *per data node*.
+ */
+ QueryPlan *plan;
+ const KeyPrefix *default_prefix = conf.getDefaultPrefix();
+ NdbTransaction ** txlist;
+ txlist = ( NdbTransaction **) calloc(nInst, sizeof(NdbTransaction *));
+
+ // Open them all.
+ for(int i = 0 ; i < nInst ; i++) {
+ NdbTransaction *tx;
+ plan = instances[i]->getPlanForPrefix(default_prefix);
+ tx = instances[i]->db->startTransaction();
+ if(! tx) logger->log(LOG_WARNING, 0, instances[i]->db->getNdbError().message);
+ txlist[i] = tx;
+ }
+
+ // Close them all.
+ for(int i = 0 ; i < nInst ; i++) {
+ txlist[i]->close();
+ }
+
+ // Free the list.
+ free(txlist);
+
+ /* Allocate thread ids */
+ commit_thread_ids = (pthread_t *) calloc(sched->config.n_commit_threads,
+ sizeof(pthread_t));
+
+ /* Allocate and initialize a workqueue */
+ queue = (struct workqueue *) malloc(sizeof(struct workqueue));
+ workqueue_init(queue, queue_size, sched->config.n_commit_threads);
+
+ /* Randomly take samples of the workqueue depth */
+ do_queue_sample = random() % FLEX_STATS_INITIAL_INTERVAL;
+}
+
+
+void Scheduler_flex::Cluster::attach_thread(thread_identifier *parent) {
+ /* Adjust the thread stack size */
+ pthread_attr_t commit_thd_attr;
+ size_t thd_stack_size;
+ pthread_attr_init(& commit_thd_attr);
+ pthread_attr_getstacksize(& commit_thd_attr, & thd_stack_size);
+ pthread_attr_setstacksize(& commit_thd_attr, thd_stack_size / 4);
+
+ /* Start the commit threads */
+ for(int t = 0 ; t < sched->config.n_commit_threads ; t++) {
+ thread_spec * spec = new thread_spec(sched, parent, broker_id, id, t);
+ pthread_create(& commit_thread_ids[t], & commit_thd_attr,
+ run_flex_commit_thread, (void *) spec);
+ }
+}
+
+
+ENGINE_ERROR_CODE Scheduler_flex::Cluster::schedule(workitem *newitem,
+ const KeyPrefix *pfx) {
+ NdbInstance *inst;
+
+ if (nextFree)
+ {
+ inst = nextFree;
+ nextFree = inst->next;
+ }
+ else
+ {
+ return ENGINE_TMPFAIL;
+ }
+
+ workitem_set_NdbInstance(newitem, inst);
+
+ // Fetch the query plan for this prefix.
+ newitem->plan = inst->getPlanForPrefix(pfx);
+ if(! newitem->plan) return ENGINE_FAILED;
+
+ // Build the NDB transaction
+ if(worker_prepare_operation(newitem)) {
+
+ if(do_queue_sample-- == 0) { // Sample for statistics.
+ /* To simulate sampling the depth immediately after the item is added,
+ sample it now and add one. */
+ int depth = queue->depth + 1;
+ /* Queue depth can be erroneous because of thread races.
+ Use it only if it looks valid. */
+ if(depth > 0 && depth <= queue_size) {
+ stats.queue_total_depth += depth;
+ stats.queue_samples++;
+ }
+ do_queue_sample = random() % FLEX_STATS_SAMPLE_INTERVAL;
+ }
+
+ // Put the NdbInstance on the queue for the commit thread.
+ workqueue_add(queue, inst);
+ return ENGINE_EWOULDBLOCK;
+ }
+ else return ENGINE_ENOTSUP;
+}
+
+
+void Scheduler_flex::Cluster::io_completed(workitem *item) {
+ DEBUG_ENTER();
+ NdbInstance* inst = item->ndb_instance;
+ item->ndb_instance = NULL;
+ if(inst) {
+ inst->next = nextFree;
+ nextFree = inst;
+ }
+}
+
+
+void Scheduler_flex::Cluster::contribute_stats() {
+ if(stats.cycles)
+ sched->stats.total_avg_ndb_depth += (double) stats.ndb_depth / stats.cycles;
+ if(stats.queue_samples)
+ sched->stats.total_avg_commit_queue_depth +=
+ (double) stats.queue_total_depth / stats.queue_samples;
+}
+
+
+/*
+ Commit thread: Get an NdbInstance off the workqueue, and call pollNdb() on it.
+*/
+void * Scheduler_flex::Cluster::run_commit_thread() {
+ DEBUG_ENTER();
+
+ NdbInstance *inst;
+
+ while(1) {
+ /* Wait for something to appear on the queue */
+ inst = (NdbInstance *) workqueue_consumer_wait(queue);
+
+ if(inst == NULL) return 0; /* queue has been shut down and emptied */
+
+ /* Poll */
+ inst->db->pollNdb();
+
+ /* Now we are done with the instance */
+ atomic_cmp_swap_int(& inst->in_use, true, false); // in_use = false
+ }
+}
+
+
+void * run_flex_commit_thread(void *s) {
+ Scheduler_flex::thread_spec *spec = (Scheduler_flex::thread_spec *) s;
+ if(spec->has_broker_threads())
+ set_child_thread_id(spec->parent, "br%d.cl%d.commit%d",
+ spec->broker, spec->cluster_id, spec->number);
+ else
+ set_child_thread_id(spec->parent, "cl%d.commit%d",
+ spec->cluster_id, spec->number);
+
+ spec->run_commit_thread();
+
+ delete spec;
+ return 0;
+}
+
+
=== added file 'storage/ndb/memcache/src/schedulers/Flex_cluster.h'
--- a/storage/ndb/memcache/src/schedulers/Flex_cluster.h 1970-01-01 00:00:00 +0000
+++ b/storage/ndb/memcache/src/schedulers/Flex_cluster.h 2011-04-06 04:31:28 +0000
@@ -0,0 +1,69 @@
+/*
+ Copyright (c) 2011, Oracle and/or its affiliates. All rights
+ reserved.
+
+ This program is free software; you can redistribute it and/or
+ modify it under the terms of the GNU General Public License
+ as published by the Free Software Foundation; version 2 of
+ the License.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ 02110-1301 USA
+ */
+
+#ifndef NDBMEMCACHE_FLEX_CLUSTER_H
+#define NDBMEMCACHE_FLEX_CLUSTER_H
+
+#ifndef __cplusplus
+#error "This file is for C++ only"
+#endif
+
+#include "src/schedulers/Flex.h"
+#include "src/schedulers/Flex_broker.h"
+
+class Scheduler_flex::Cluster {
+ friend class Broker;
+ friend class thread_spec;
+
+public:
+ Cluster(Scheduler_flex *, int broker, int cluster_id);
+ ~Cluster();
+
+ void attach_thread(thread_identifier *); /*< start commit threads */
+ ENGINE_ERROR_CODE schedule(workitem *, const KeyPrefix *);
+ void io_completed(workitem *);
+
+protected:
+ void * run_commit_thread();
+ void contribute_stats();
+ struct cluster_stats {
+ uint64_t cycles;
+ uint64_t ndb_depth;
+ uint64_t queue_total_depth;
+ uint64_t queue_samples;
+ } stats;
+
+private:
+ const Configuration conf;
+ Scheduler_flex *sched;
+ int do_queue_sample;
+ int broker_id;
+ int id;
+ int queue_size;
+ struct workqueue *queue;
+ pthread_t * commit_thread_ids;
+ NdbInstance **instances;
+ NdbInstance *nextFree;
+ int nInst;
+};
+
+
+
+#endif
=== added file 'storage/ndb/memcache/src/schedulers/Flex_thread_spec.h'
--- a/storage/ndb/memcache/src/schedulers/Flex_thread_spec.h 1970-01-01 00:00:00 +0000
+++ b/storage/ndb/memcache/src/schedulers/Flex_thread_spec.h 2011-04-06 04:31:28 +0000
@@ -0,0 +1,51 @@
+/*
+ Copyright (c) 2011, Oracle and/or its affiliates. All rights
+ reserved.
+
+ This program is free software; you can redistribute it and/or
+ modify it under the terms of the GNU General Public License
+ as published by the Free Software Foundation; version 2 of
+ the License.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ 02110-1301 USA
+ */
+
+#ifndef NDBMEMCACHE_FLEX_THREAD_SPEC_H
+#define NDBMEMCACHE_FLEX_THREAD_SPEC_H
+
+#ifndef __cplusplus
+#error "This file is for C++ only"
+#endif
+
+#include "Flex.h"
+#include "Flex_broker.h"
+#include "Flex_cluster.h"
+
+class Scheduler_flex::thread_spec {
+public:
+ thread_spec(Scheduler_flex *s, thread_identifier *p,
+ int b, int c, int n) : sched(s), parent(p),
+ broker(b), cluster_id(c), number(n) {};
+
+ Broker * get_broker() { return sched->brokers[broker]; };
+ bool has_broker_threads() { return (sched->config.n_broker_threads > 0); };
+ void * run_broker_thread() { return get_broker()->run_broker_thread(); };
+ Cluster * get_cluster() { return get_broker()->clusters[cluster_id]; };
+ void * run_commit_thread() { return get_cluster()->run_commit_thread(); };
+
+ Scheduler_flex *sched;
+ thread_identifier *parent;
+ int broker;
+ int cluster_id;
+ int number;
+};
+
+#endif
=== modified file 'storage/ndb/memcache/src/schedulers/Stockholm.cc'
--- a/storage/ndb/memcache/src/schedulers/Stockholm.cc 2011-03-30 06:54:53 +0000
+++ b/storage/ndb/memcache/src/schedulers/Stockholm.cc 2011-04-06 04:31:28 +0000
@@ -47,7 +47,7 @@ extern "C" {
void Scheduler_stockholm::init(int my_thread, int nthreads, const char *config_string) {
- const Configuration conf = get_Configuration();
+ const Configuration & conf = get_Configuration();
/* For each cluster, we need a number of NDB instances;
that number is determined here, using max_tps and usec_rtt.
@@ -64,6 +64,7 @@ void Scheduler_stockholm::init(int my_th
DEBUG_PRINT("cluster %d: %d TPS @ %d usec RTT ==> %d NDB instances.",
c, conf.max_tps, pool->usec_rtt, cluster[c].nInst);
}
+
// Get the NDB instances.
for(int c = 0 ; c < conf.nclusters ; c++) {
@@ -72,11 +73,21 @@ void Scheduler_stockholm::init(int my_th
ClusterConnectionPool *pool = conf.getConnectionById(c);
Ndb_cluster_connection *conn = pool->getPooledConnection(my_thread);
-
+
+ /* Set the master NdbInstance for the connection */
+ pool->setNdbInstance(my_thread,
+ new NdbInstance(conn, conf.nprefixes, 128));
+
+ cluster[c].nextFree = NULL;
for(int i = 0; i < cluster[c].nInst ; i++) {
NdbInstance *inst = new NdbInstance(conn, conf.nprefixes, 1);
cluster[c].instances[i] = inst;
+ inst->next = cluster[c].nextFree;
+ cluster[c].nextFree = inst;
}
+
+ logger->log(LOG_WARNING, 0, "Pipeline %d using %u Ndb instances for Cluster %u.\n",
+ my_thread, cluster[c].nInst, c);
}
/* Hoard a transaction (an API connect record) for each Ndb object. This
@@ -116,7 +127,7 @@ void Scheduler_stockholm::init(int my_th
void Scheduler_stockholm::attach_thread(thread_identifier * parent) {
pipeline = parent->pipeline;
- const Configuration conf = get_Configuration();
+ const Configuration & conf = get_Configuration();
logger->log(LOG_WARNING, 0, "Pipeline %d attached to Stockholm scheduler; "
"launching %d commit thread%s.\n", pipeline->id, conf.nclusters,
@@ -137,7 +148,7 @@ void Scheduler_stockholm::attach_thread(
ENGINE_ERROR_CODE Scheduler_stockholm::schedule(workitem *newitem) {
NdbInstance *inst;
int c;
- const Configuration conf = get_Configuration();
+ const Configuration & conf = get_Configuration();
/* Fetch the config for its key prefix */
const KeyPrefix *pfx = conf.getPrefixByInfo(newitem->prefix_info);
@@ -154,21 +165,18 @@ ENGINE_ERROR_CODE Scheduler_stockholm::s
c = newitem->prefix_info.cluster_id;
- /* Get an NDB instance. But if the instance is in use, go forward and
- get the next one. If something goes wrong (cluster crash, disconnected...)
- and the commit thread can no longer release instances, it's possible that
- there will be no free instances and this will loop forever.
- FIXME: catch this condition somehow.
- */
- int i = 0;
- inst = cluster[c].instances[i];
- while(inst->in_use == 1) {
- i = ( ++i % cluster[c].nInst);
- inst = cluster[c].instances[i];
- };
-
- // Now we've got an NDB Instance.
- atomic_cmp_swap_int(& inst->in_use, false, true); // in_use = true
+ if (cluster[c].nextFree)
+ {
+ inst = cluster[c].nextFree;
+ cluster[c].nextFree = inst->next;
+ }
+ else
+ {
+ return ENGINE_TMPFAIL;
+ }
+
+
+ workitem_set_NdbInstance(newitem, inst);
// Fetch the query plan for this prefix.
newitem->plan = inst->getPlanForPrefix(pfx);
@@ -176,7 +184,8 @@ ENGINE_ERROR_CODE Scheduler_stockholm::s
// Build the NDB transaction
if(worker_prepare_operation(newitem)) {
- // Put the NdbInstance on the queue for the commit thread.
+ // Put the NdbInstance on the queue for the commit thread.
+ // Should probably be the workitem?
workqueue_add(cluster[c].queue, inst);
return ENGINE_EWOULDBLOCK;
}
@@ -184,12 +193,25 @@ ENGINE_ERROR_CODE Scheduler_stockholm::s
}
+void Scheduler_stockholm::io_completed(workitem *item) {
+ DEBUG_ENTER();
+ NdbInstance* inst = item->ndb_instance;
+ item->ndb_instance = NULL;
+
+ if(inst) {
+ int c = item->prefix_info.cluster_id;
+ inst->next = cluster[c].nextFree;
+ cluster[c].nextFree = inst;
+ }
+}
+
+
void Scheduler_stockholm::add_stats(ADD_STAT add_stat,
const void * cookie) {
char key[128];
char val[128];
int klen, vlen, p;
- const Configuration conf = get_Configuration();
+ const Configuration & conf = get_Configuration();
for(int c = 0 ; c < conf.nclusters; c++) {
klen = sprintf(key, "pipeline_%d_cluster_%d_commit_cycles", pipeline->id, c);
@@ -228,9 +250,6 @@ void * Scheduler_stockholm::run_ndb_comm
/* Poll */
inst->db->pollNdb();
- /* Now we are done with the instance */
- atomic_cmp_swap_int(& inst->in_use, true, false); // in_use = false
-
cluster[c].stats.cycles++;
if(! (cluster[c].stats.cycles % STAT_INTERVAL))
cluster[c].stats.commit_thread_vtime = get_thread_vtime();
=== modified file 'storage/ndb/memcache/src/schedulers/Stockholm.h'
--- a/storage/ndb/memcache/src/schedulers/Stockholm.h 2011-03-30 06:54:53 +0000
+++ b/storage/ndb/memcache/src/schedulers/Stockholm.h 2011-04-06 04:31:28 +0000
@@ -45,6 +45,7 @@ public:
void init(int threadnum, int nthreads, const char *config_string);
void attach_thread(thread_identifier *);
ENGINE_ERROR_CODE schedule(workitem *);
+ void io_completed(workitem *);
void add_stats(ADD_STAT, const void *);
void * run_ndb_commit_thread(int cluster_id);
@@ -58,6 +59,7 @@ private:
pthread_t commit_thread_id;
NdbInstance **instances;
int nInst;
+ NdbInstance *nextFree;
} cluster[MAX_CLUSTERS];
};
=== removed file 'storage/ndb/memcache/src/schedulers/flex.cc'
--- a/storage/ndb/memcache/src/schedulers/flex.cc 2011-03-30 06:54:53 +0000
+++ b/storage/ndb/memcache/src/schedulers/flex.cc 1970-01-01 00:00:00 +0000
@@ -1,464 +0,0 @@
-/*
- Copyright (c) 2011, Oracle and/or its affiliates. All rights
- reserved.
-
- This program is free software; you can redistribute it and/or
- modify it under the terms of the GNU General Public License
- as published by the Free Software Foundation; version 2 of
- the License.
-
- This program is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- GNU General Public License for more details.
-
- You should have received a copy of the GNU General Public License
- along with this program; if not, write to the Free Software
- Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- 02110-1301 USA
- */
-/* System headers */
-/* C++ files must define __STDC_FORMAT_MACROS in order to get PRIu64 */
-#define __STDC_FORMAT_MACROS
-#include <inttypes.h>
-#include <stdlib.h>
-#include <stdio.h>
-
-/* Memcache headers */
-#include "memcached/types.h"
-#include <memcached/extension_loggers.h>
-
-/* NDB Memcache headers */
-#include "flex.h"
-#include "workitem.h"
-#include "ndb_worker.h"
-#include "thread_identifier.h"
-#include "ClusterConnectionPool.h"
-
-/* Random stat samples will on average be taken twice as often as
- STATS_SAMPLE_INTERVAL (based on uniform random distribution).
- STATS_INITIAL_INTERVAL is lower so as to get stats quickly on startup.
-*/
-#define STATS_INITIAL_INTERVAL 20
-#define STATS_SAMPLE_INTERVAL 400
-
-extern EXTENSION_LOGGER_DESCRIPTOR *logger;
-
-
-extern "C" {
- void * run_flex_commit_thread(void *);
- void * run_flex_broker_thread(void *);
-}
-
-
-void Scheduler_flex::init(int t, int nthreads, const char *user_config) {
- thread_id = t;
- const char * active_cf;
-
- /* Set some baseline default values for the configuration */
- config.n_engine_threads = nthreads;
- config.n_broker_threads = 0;
- config.n_commit_threads = 2;
- config.n_db_pending = 0;
- config.n_connections = 1;
-
- /* The default config parameters */
- static const char * default_config = "b0,c2,p1";
-
- /* Choose which string to parse */
- if(user_config && *user_config) active_cf = user_config;
- else active_cf = default_config;
-
- /* disregard the return value from sscanf(), but test the config for validity,
- and write it to the logger when the scheduler starts running */
- sscanf(active_cf, "b%d,c%d,p%d",
- & config.n_broker_threads, & config.n_commit_threads,
- & config.n_connections);
-
- /* Test validity of configuration */
- if(config.n_broker_threads < 0 || config.n_broker_threads > 2) {
- logger->log(LOG_WARNING, 0, "Invalid scheduler configuration.\n");
- assert(config.n_broker_threads >= 0 && config.n_broker_threads <= 2);
- }
- if(config.n_commit_threads < 1 || config.n_commit_threads > 16) {
- logger->log(LOG_WARNING, 0, "Invalid scheduler configuration.\n");
- assert(config.n_commit_threads >= 1 && config.n_commit_threads <= 16);
- }
- if(config.n_db_pending < 0 || config.n_db_pending > 16) {
- logger->log(LOG_WARNING, 0, "Invalid scheduler configuration.\n");
- assert(config.n_db_pending >= 0 && config.n_db_pending <= 16);
- }
- if(config.n_connections < 1 || config.n_connections > 4) {
- logger->log(LOG_WARNING, 0, "Invalid scheduler configuration.\n");
- assert(config.n_connections >= 1 && config.n_connections <= 4);
- }
-
- /* There is always a broker, even if there is no broker thread */
- nbrokers = config.n_broker_threads > 1 ? config.n_broker_threads : 1;
-
- /* Create a workqueue for the broker thread */
- broker_queue_size = 8192;
- if(config.n_broker_threads) {
- do_queue_sample = random() % STATS_INITIAL_INTERVAL;
- broker_queue = (struct workqueue *) malloc(sizeof(struct workqueue));
- workqueue_init(broker_queue, broker_queue_size, config.n_broker_threads);
- }
- else broker_queue = 0;
-
- /* Initialize the brokers.
- If there are no broker threads, then broker[0]'s methods run in the engine
- thread. Otherwise each broker will run mostly within a broker thread.
- */
- for(int i = 0 ; i < nbrokers ; i++)
- brokers[i] = new Broker(this, i);
-}
-
-
-Scheduler_flex::Broker::Broker(Scheduler_flex *s,
- int my_id) : sched(s),
- id(my_id),
- conf(get_Configuration()),
- queue(s->broker_queue) {
- clusters = (Cluster **) calloc(conf.nclusters, sizeof(Cluster *));
-
- for(int c = 0 ; c < conf.nclusters ; c++) {
- clusters[c] = new Cluster(sched, id, c);
- }
-}
-
-
-Scheduler_flex::Cluster::Cluster(Scheduler_flex *s,
- int broker,
- int cluster) : sched(s),
- broker_id(broker),
- id(cluster),
- conf(get_Configuration()) {
- DEBUG_PRINT("thread: %d, broker: %d, cluster: %d", sched->thread_id,
- broker, cluster);
- /* Get the connection pool for my cluster */
- ClusterConnectionPool *pool = conf.getConnectionById(id);
-
- /* Get a number of NDB instances.
- That number is determined here, using max_tps and usec_rtt.
- We allow a transaction to be in-flight for 5 * RTT,
- and we need enough NDB objects to meet one thread's share of max_tps.
- I go step-by-step through the math here (the compiler may optimize it).
- */
- double tx_time_in_usec = pool->usec_rtt * 5;
- double tx_per_ndb_per_sec = 1000000 / tx_time_in_usec;
- double total_ndb_objects = conf.max_tps / tx_per_ndb_per_sec;
- double ndbs_per_pipeline = total_ndb_objects / sched->config.n_engine_threads;
- nInst = (int) (ndbs_per_pipeline / sched->nbrokers);
-
- /* For now, the queue size is fixed */
- queue_size = 8192;
-
- /* Do we need more connections? */
- for(int n = pool->getPoolSize(); n < sched->config.n_connections; n++)
- (void) pool->addPooledConnection();
- /* Maybe not all of them were added; that's OK. */
-
- /* Now obtain the appropriate one for this thread. */
- Ndb_cluster_connection *conn = pool->getPooledConnection(sched->thread_id);
-
- DEBUG_PRINT("broker %d, cluster %d, node %d: "
- "%d TPS @ %d usec RTT ==> %d NDB instances.", broker_id, id,
- conn->node_id(), conf.max_tps, pool->usec_rtt, nInst);
-
- // Get the NDB instances.
- instances = (NdbInstance**) calloc(nInst, sizeof(NdbInstance *));
- for(int i = 0; i < nInst ; i++) {
- instances[i] = new NdbInstance(conn, conf.nprefixes, 1);
- }
-
- /* Hoard a transaction (an API connect record) for each Ndb object. This
- first call to startTransaction() will send TC_SEIZEREQ and wait for a
- reply, but later at runtime startTransaction() should return immediately.
- Also, for each NDB instance, ore-build the QueryPlan for the default key prefix.
- TODO? Start one tx *per data node*.
- */
- QueryPlan *plan;
- const KeyPrefix *default_prefix = conf.getDefaultPrefix();
- NdbTransaction ** txlist;
- txlist = ( NdbTransaction **) calloc(nInst, sizeof(NdbTransaction *));
-
- // Open them all.
- for(int i = 0 ; i < nInst ; i++) {
- NdbTransaction *tx;
- plan = instances[i]->getPlanForPrefix(default_prefix);
- tx = instances[i]->db->startTransaction();
- if(! tx) logger->log(LOG_WARNING, 0, instances[i]->db->getNdbError().message);
- txlist[i] = tx;
- }
-
- // Close them all.
- for(int i = 0 ; i < nInst ; i++) {
- txlist[i]->close();
- }
-
- // Free the list.
- free(txlist);
-
- /* Allocate thread ids */
- commit_thread_ids = (pthread_t *) calloc(sched->config.n_commit_threads,
- sizeof(pthread_t));
-
- /* Allocate and initialize a workqueue */
- queue = (struct workqueue *) malloc(sizeof(struct workqueue));
- workqueue_init(queue, queue_size, sched->config.n_commit_threads);
-
- /* Randomly take samples of the workqueue depth */
- do_queue_sample = random() % STATS_INITIAL_INTERVAL;
-}
-
-
-void Scheduler_flex::attach_thread(thread_identifier * parent) {
- const Configuration & conf = get_Configuration();
- pipeline = parent->pipeline;
-
- logger->log(LOG_WARNING, 0, "Pipeline %d attached to flex scheduler: "
- "config \"b%d,c%d,p%d\"; %d cluster%s.\n", pipeline->id,
- config.n_broker_threads, config.n_commit_threads,
- config.n_connections, conf.nclusters,
- conf.nclusters == 1 ? "" : "s");
-
- // Launch commit threads for each cluster
- for(int i = 0 ; i < nbrokers; i++)
- for(int j = 0 ; j < conf.nclusters ; j++)
- brokers[i]->clusters[j]->attach_thread(parent);
-
- // Adjust the thread stack size for the broker threads
- pthread_attr_t broker_thd_attr;
- size_t thd_stack_size;
- pthread_attr_init(& broker_thd_attr);
- pthread_attr_getstacksize(& broker_thd_attr, & thd_stack_size);
- pthread_attr_setstacksize(& broker_thd_attr, thd_stack_size / 2);
-
- // Launch the broker threads
- for(int i = 0; i < config.n_broker_threads ; i++) {
- thread_spec *spec = new thread_spec(this, parent, i, 0, 0);
- pthread_create(& brokers[i]->thread_id, & broker_thd_attr,
- run_flex_broker_thread, (void *) spec);
- }
-}
-
-
-void Scheduler_flex::Cluster::attach_thread(thread_identifier *parent) {
- /* Adjust the thread stack size */
- pthread_attr_t commit_thd_attr;
- size_t thd_stack_size;
- pthread_attr_init(& commit_thd_attr);
- pthread_attr_getstacksize(& commit_thd_attr, & thd_stack_size);
- pthread_attr_setstacksize(& commit_thd_attr, thd_stack_size / 4);
-
- /* Start the commit threads */
- for(int t = 0 ; t < sched->config.n_commit_threads ; t++) {
- thread_spec * spec = new thread_spec(sched, parent, broker_id, id, t);
- pthread_create(& commit_thread_ids[t], & commit_thd_attr,
- run_flex_commit_thread, (void *) spec);
- }
-}
-
-
-ENGINE_ERROR_CODE Scheduler_flex::schedule(workitem *newitem) {
- if(config.n_broker_threads) {
-
- if(do_queue_sample-- == 0) {
- /* See comments in Cluster::schedule() about queue depth sampling */
- int depth = broker_queue->depth + 1;
- if(depth > 0 && depth <= broker_queue_size) {
- stats.broker_queue_total_depth += depth ;
- stats.broker_queue_samples++;
- }
- do_queue_sample = random() % STATS_SAMPLE_INTERVAL;
- }
-
- workqueue_add(broker_queue, newitem);
-
- return ENGINE_EWOULDBLOCK;
- }
- else {
- return brokers[0]->schedule(newitem);
- }
-}
-
-
-ENGINE_ERROR_CODE Scheduler_flex::Broker::schedule(workitem *newitem) {
- const KeyPrefix *pfx = conf.getPrefixByInfo(newitem->prefix_info);
-
- if(newitem->prefix_info.prefix_id) {
- DEBUG_PRINT("prefix %d: \"%s\" Table: %s Value Cols: %d",
- newitem->prefix_info.prefix_id, pfx->prefix,
- pfx->table->table_name, pfx->table->nvaluecols);
- }
-
- /* Record the suffix length */
- newitem->base.nsuffix = newitem->base.nkey - pfx->prefix_len;
- if(newitem->base.nsuffix == 0) return ENGINE_EINVAL; // key too short
-
- clusters[newitem->prefix_info.cluster_id]->schedule(newitem, pfx);
-}
-
-
-ENGINE_ERROR_CODE Scheduler_flex::Cluster::schedule(workitem *newitem,
- const KeyPrefix *pfx) {
- bool did_cas;
- NdbInstance *inst;
- int i = 0;
-
- /* Get an NDB instance. But if the instance is in use, go forward and
- get the next one. If something goes wrong (cluster crash, disconnected...)
- and the commit thread can no longer release instances, it's possible that
- there will be no free instances and this will loop forever.
- FIXME: catch this condition somehow.
- */
- inst = instances[i];
- while(inst->in_use == 1) {
- i = ( ++i % nInst);
- inst = instances[i];
- };
- stats.cycles++;
- stats.ndb_depth += i;
-
- // Now we've got an NDB Instance.
- atomic_cmp_swap_int(& inst->in_use, false, true); // in_use = true
-
- // Fetch the query plan for this prefix.
- newitem->plan = inst->getPlanForPrefix(pfx);
- if(! newitem->plan) return ENGINE_FAILED;
-
- // Build the NDB transaction
- if(worker_prepare_operation(newitem)) {
-
- if(do_queue_sample-- == 0) { // Sample for statistics.
- /* To simulate sampling the depth immediately after the item is added,
- sample it now and add one. */
- int depth = queue->depth + 1;
- /* Queue depth can be erroneous because of thread races.
- Use it only if it looks valid. */
- if(depth > 0 && depth <= queue_size) {
- stats.queue_total_depth += depth;
- stats.queue_samples++;
- }
- do_queue_sample = random() % STATS_SAMPLE_INTERVAL;
- }
-
- // Put the NdbInstance on the queue for the commit thread.
- workqueue_add(queue, inst);
- return ENGINE_EWOULDBLOCK;
- }
- else return ENGINE_ENOTSUP;
-}
-
-
-void Scheduler_flex::add_stats(ADD_STAT add_stat, const void * cookie) {
- char key[128];
- char val[128];
- int klen, vlen, p;
-
- stats.total_avg_ndb_depth = 0;
- stats.total_avg_commit_queue_depth = 0;
- for(int i = 0 ; i < nbrokers ; i++)
- brokers[i]->contribute_stats();
-
- klen = sprintf(key, "t%d_avg_ndb_depth", pipeline->id);
- vlen = sprintf(val, "%.3f", stats.total_avg_ndb_depth / nbrokers);
- add_stat(key, klen, val, vlen, cookie);
-
- klen = sprintf(key, "t%d_avg_commit_queue_depth", pipeline->id);
- vlen = sprintf(val, "%.3f", stats.total_avg_commit_queue_depth / nbrokers);
- add_stat(key, klen, val, vlen, cookie);
-
- if(config.n_broker_threads && stats.broker_queue_samples) {
- klen = sprintf(key, "t%d_broker_queue_avg_depth", pipeline->id);
- vlen = sprintf(val, "%.3f",
- (double) stats.broker_queue_total_depth / stats.broker_queue_samples);
- add_stat(key, klen, val, vlen, cookie);
- }
-}
-
-
-void Scheduler_flex::Broker::contribute_stats() {
- for(int i = 0 ; i < conf.nclusters ; i++)
- clusters[i]->contribute_stats();
-}
-
-
-void Scheduler_flex::Cluster::contribute_stats() {
- if(stats.cycles)
- sched->stats.total_avg_ndb_depth += (double) stats.ndb_depth / stats.cycles;
- if(stats.queue_samples)
- sched->stats.total_avg_commit_queue_depth +=
- (double) stats.queue_total_depth / stats.queue_samples;
-}
-
-
-/*
- Broker thread: get a workitem off the workqueue, and build its operations.
-*/
-void * Scheduler_flex::Broker::run_broker_thread() {
- workitem *newitem;
-
- DEBUG_ENTER();
-
- while(1) {
- /* Wait for something to appear on the queue */
- newitem = (workitem *) workqueue_consumer_wait(queue);
-
- if(newitem == NULL) return 0; /* queue has been shut down and emptied */
-
- (void) schedule(newitem);
- }
-}
-
-
-/*
- Commit thread: Get an NdbInstance off the workqueue, and call pollNdb() on it.
-*/
-void * Scheduler_flex::Cluster::run_commit_thread() {
- DEBUG_ENTER();
-
- NdbInstance *inst;
-
- while(1) {
- /* Wait for something to appear on the queue */
- inst = (NdbInstance *) workqueue_consumer_wait(queue);
-
- if(inst == NULL) return 0; /* queue has been shut down and emptied */
-
- /* Poll */
- inst->db->pollNdb();
-
- /* Now we are done with the instance */
- atomic_cmp_swap_int(& inst->in_use, true, false); // in_use = false
- }
-}
-
-
-void * run_flex_broker_thread(void *s) {
- Scheduler_flex::thread_spec *spec = (Scheduler_flex::thread_spec *) s;
- set_child_thread_id(spec->parent, "broker%d", spec->broker);
-
- spec->run_broker_thread();
-
- delete spec;
- return 0;
-}
-
-
-void * run_flex_commit_thread(void *s) {
- Scheduler_flex::thread_spec *spec = (Scheduler_flex::thread_spec *) s;
- if(spec->has_broker_threads())
- set_child_thread_id(spec->parent, "br%d.cl%d.commit%d",
- spec->broker, spec->cluster_id, spec->number);
- else
- set_child_thread_id(spec->parent, "cl%d.commit%d",
- spec->cluster_id, spec->number);
-
- spec->run_commit_thread();
-
- delete spec;
- return 0;
-}
-
-
=== removed file 'storage/ndb/memcache/src/schedulers/flex.h'
--- a/storage/ndb/memcache/src/schedulers/flex.h 2011-03-30 06:54:53 +0000
+++ b/storage/ndb/memcache/src/schedulers/flex.h 1970-01-01 00:00:00 +0000
@@ -1,167 +0,0 @@
-/*
- Copyright (c) 2011, Oracle and/or its affiliates. All rights
- reserved.
-
- This program is free software; you can redistribute it and/or
- modify it under the terms of the GNU General Public License
- as published by the Free Software Foundation; version 2 of
- the License.
-
- This program is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- GNU General Public License for more details.
-
- You should have received a copy of the GNU General Public License
- along with this program; if not, write to the Free Software
- Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- 02110-1301 USA
- */
-
-#ifndef NDBMEMCACHE_FLEX_SCHEDULER_H
-#define NDBMEMCACHE_FLEX_SCHEDULER_H
-
-#ifndef __cplusplus
-#error "This file is for C++ only"
-#endif
-
-#include <memcached/types.h>
-
-#include "config.h"
-#include "Scheduler.h"
-#include "KeyPrefix.h"
-
-
-/*
- * Flex Scheduler
- *
- * The scheduler is configurable:
- * - 0, 1, or 2 "middleman" broker threads per engine thread
- * - some number of commit threads per {broker, cluster id} tuple
- * - some max number of Ndb objects to poll concurrently in a commit thread
- *
- */
-
-
-class Scheduler_flex : public Scheduler {
-public:
- class Broker;
- class Cluster;
- class thread_spec;
-
- friend class Broker;
- friend class Cluster;
- friend class thread_spec;
-
- Scheduler_flex() {};
- ~Scheduler_flex();
- void init(int threadnum, int nthreads, const char *config_string);
- void attach_thread(thread_identifier *);
- ENGINE_ERROR_CODE schedule(workitem *);
- void add_stats(ADD_STAT, const void *);
-
-protected:
- int thread_id;
- int do_queue_sample;
- int nbrokers;
- int broker_queue_size;
- struct workqueue * broker_queue;
- struct flex_cf {
- int n_engine_threads; //< no. of libevent worker threads in memcached
- int n_broker_threads; //< no. of separate broker threads, 0 - 2
- int n_commit_threads; //< no. of commit threads per {broker,cluster} pair
- int n_db_pending; //< no. of pollable Ndbs inside a commit thread
- int n_connections; //< size of Ndb_cluster_connection pool
- } config;
- struct flex_stats {
- double total_avg_ndb_depth;
- double total_avg_commit_queue_depth;
- uint64_t broker_queue_total_depth;
- uint64_t broker_queue_samples;
- } stats;
-
- Broker * brokers[2];
-};
-
-
-class Scheduler_flex::Cluster {
-friend class Broker;
-friend class thread_spec;
-
-public:
- Cluster(Scheduler_flex *, int broker, int cluster_id);
- ~Cluster();
-
- void attach_thread(thread_identifier *); /*< start commit threads */
- ENGINE_ERROR_CODE schedule(workitem *, const KeyPrefix *);
-
-protected:
- void * run_commit_thread();
- void contribute_stats();
- struct cluster_stats {
- uint64_t cycles;
- uint64_t ndb_depth;
- uint64_t queue_total_depth;
- uint64_t queue_samples;
- } stats;
-
-private:
- const Configuration conf;
- Scheduler_flex *sched;
- int do_queue_sample;
- int broker_id;
- int id;
- int queue_size;
- struct workqueue *queue;
- pthread_t * commit_thread_ids;
- NdbInstance **instances;
- int nInst;
-};
-
-
-class Scheduler_flex::Broker {
-friend class Scheduler_flex;
-friend class thread_spec;
-
-public:
- Broker(Scheduler_flex *, int my_id);
- ~Broker();
- ENGINE_ERROR_CODE schedule(workitem *); /*< schedule item on its cluster */
-
- pthread_t thread_id;
- Cluster ** clusters;
-
-protected:
- void * run_broker_thread();
- void contribute_stats();
-
-private:
- int id;
- const Configuration conf;
- Scheduler_flex *sched;
- struct workqueue *queue;
-};
-
-
-class Scheduler_flex::thread_spec {
-public:
- thread_spec(Scheduler_flex *s, thread_identifier *p,
- int b, int c, int n) : sched(s), parent(p),
- broker(b), cluster_id(c), number(n) {};
-
- Broker * get_broker() { return sched->brokers[broker]; };
- bool has_broker_threads() { return (sched->config.n_broker_threads > 0); };
- void * run_broker_thread() { return get_broker()->run_broker_thread(); };
- Cluster * get_cluster() { return get_broker()->clusters[cluster_id]; };
- void * run_commit_thread() { return get_cluster()->run_commit_thread(); };
-
- Scheduler_flex *sched;
- thread_identifier *parent;
- int broker;
- int cluster_id;
- int number;
-};
-
-
-#endif
-
=== modified file 'storage/ndb/memcache/src/workitem.c'
--- a/storage/ndb/memcache/src/workitem.c 2011-03-30 06:54:53 +0000
+++ b/storage/ndb/memcache/src/workitem.c 2011-04-06 04:31:28 +0000
@@ -184,6 +184,13 @@ const char * workitem_get_operation(work
}
+void workitem_set_NdbInstance(workitem *item, C_OR_CPP_NDBINSTANCE * _Ndb_instance)
+{
+ assert(item->ndb_instance == NULL);
+ item->ndb_instance = _Ndb_instance;
+}
+
+
void workitem_free(workitem *item)
{
/* It's OK to free all of these; a free() with class_id 0 is a no-op */
@@ -192,6 +199,11 @@ void workitem_free(workitem *item)
pipeline_free(item->pipeline, item->ndb_key_buffer, item->keybuf1_cls);
pipeline_free(item->pipeline, item->key_buffer_2, item->keybuf2_cls);
+ if (item->ndb_instance != NULL)
+ {
+ assert(false); // TODO Not thread safe, investigate path
+ }
+
pipeline_free(item->pipeline, item, workitem_class_id);
}
=== modified file 'storage/ndb/memcache/unit/Makefile.am'
--- a/storage/ndb/memcache/unit/Makefile.am 2011-03-30 06:54:53 +0000
+++ b/storage/ndb/memcache/unit/Makefile.am 2011-04-06 04:31:28 +0000
@@ -38,7 +38,6 @@ run_unit_tests_LDADD = ../ndb_engine_la-
../ndb_engine_la-ClusterConnectionPool.lo \
../ndb_engine_la-Configuration.lo \
../ndb_engine_la-DataTypeHandler.lo \
- ../ndb_engine_la-Dispatch.lo \
../ndb_engine_la-KeyPrefix.lo \
../ndb_engine_la-NdbInstance.lo \
../ndb_engine_la-QueryPlan.lo \
@@ -49,7 +48,9 @@ run_unit_tests_LDADD = ../ndb_engine_la-
../ndb_engine_la-atomics.lo \
../ndb_engine_la-debug.lo \
../ndb_engine_la-embedded_default_engine.lo \
- ../ndb_engine_la-flex.lo \
+ ../ndb_engine_la-Flex.lo \
+ ../ndb_engine_la-Flex_broker.lo \
+ ../ndb_engine_la-Flex_cluster.lo \
../ndb_engine_la-hash_item_util.lo \
../ndb_engine_la-items.lo \
../ndb_engine_la-ndb_configuration.lo \
=== modified file 'storage/ndb/memcache/xcode/ndbmemcache.xcodeproj/project.pbxproj'
--- a/storage/ndb/memcache/xcode/ndbmemcache.xcodeproj/project.pbxproj 2011-03-30 06:54:53 +0000
+++ b/storage/ndb/memcache/xcode/ndbmemcache.xcodeproj/project.pbxproj 2011-04-06 04:31:28 +0000
@@ -9,12 +9,22 @@
/* Begin PBXFileReference section */
65278D6612BC452E00189195 /* Stockholm.cc */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.cpp.cpp; path = Stockholm.cc; sourceTree = "<group>"; };
65278D6C12BC456100189195 /* Scheduler.h */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.c.h; path = Scheduler.h; sourceTree = "<group>"; };
+ 652DC977134A8F55000C5787 /* dtrace.flush */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = text; path = dtrace.flush; sourceTree = "<group>"; };
+ 652DCB4E134BE87D000C5787 /* 3thread.cc */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.cpp.cpp; path = 3thread.cc; sourceTree = "<group>"; };
+ 652DCB4F134BE87D000C5787 /* 3thread.h */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.c.h; path = 3thread.h; sourceTree = "<group>"; };
+ 652DCB5D134BF512000C5787 /* Flex_broker.cc */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.cpp.cpp; path = Flex_broker.cc; sourceTree = "<group>"; };
+ 652DCB5E134BF512000C5787 /* Flex_broker.h */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.c.h; path = Flex_broker.h; sourceTree = "<group>"; };
+ 652DCB5F134BF512000C5787 /* Flex_cluster.cc */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.cpp.cpp; path = Flex_cluster.cc; sourceTree = "<group>"; };
+ 652DCB60134BF512000C5787 /* Flex_cluster.h */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.c.h; path = Flex_cluster.h; sourceTree = "<group>"; };
+ 652DCB61134BF637000C5787 /* Flex_thread_spec.h */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.c.h; path = Flex_thread_spec.h; sourceTree = "<group>"; };
+ 652E5A371306533A004F8795 /* binary-spec.txt */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = text; name = "binary-spec.txt"; path = "../binary-spec.txt"; sourceTree = SOURCE_ROOT; };
652E896F12EDF39300CA79EE /* dtrace.mget.default */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = text; path = dtrace.mget.default; sourceTree = "<group>"; };
652E897012EDF49500CA79EE /* ndb_engine.d */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.dtrace; path = ndb_engine.d; sourceTree = "<group>"; };
652E897112EDF49D00CA79EE /* dtrace.mget.ndb */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = text; path = dtrace.mget.ndb; sourceTree = "<group>"; };
652E8C4B12F1046700CA79EE /* trace.cas.coord */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = text; path = trace.cas.coord; sourceTree = "<group>"; };
652E8C5512F20CE700CA79EE /* trace.binary */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = text; path = trace.binary; sourceTree = "<group>"; };
652E8C7C12F2795B00CA79EE /* run-tests.sh */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = text.script.sh; name = "run-tests.sh"; path = "../run-tests.sh"; sourceTree = SOURCE_ROOT; };
+ 652E8C8512F284EF00CA79EE /* protocol.txt */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = text; path = protocol.txt; sourceTree = "<group>"; };
65570AAD1298A3060043D9B9 /* assoc.h */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.c.h; path = assoc.h; sourceTree = "<group>"; };
65570AC41298A7800043D9B9 /* visibility.h */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.c.h; path = visibility.h; sourceTree = "<group>"; };
65570AC51298A7800043D9B9 /* vbucket.h */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.c.h; path = vbucket.h; sourceTree = "<group>"; };
@@ -101,8 +111,8 @@
65BB2F01128B9F82003031C5 /* early.close */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = text; path = early.close; sourceTree = "<group>"; };
65BB2F02128B9F82003031C5 /* engine.d */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.dtrace; path = engine.d; sourceTree = "<group>"; };
65BB2F03128B9F82003031C5 /* userfunc.d */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.dtrace; path = userfunc.d; sourceTree = "<group>"; };
- 65BF5E221334286D0043FAB2 /* flex.h */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.c.h; path = flex.h; sourceTree = "<group>"; };
- 65BF5E231334286D0043FAB2 /* flex.cc */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.cpp.cpp; path = flex.cc; sourceTree = "<group>"; };
+ 65BF5E221334286D0043FAB2 /* Flex.h */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.c.h; path = Flex.h; sourceTree = "<group>"; };
+ 65BF5E231334286D0043FAB2 /* Flex.cc */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.cpp.cpp; path = Flex.cc; sourceTree = "<group>"; };
65BF5F1D13346C120043FAB2 /* thread_identifier.h */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.c.h; path = thread_identifier.h; sourceTree = "<group>"; };
65BF5F1E13346C640043FAB2 /* status_block.h */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.c.h; path = status_block.h; sourceTree = "<group>"; };
65BF5F2F13346E1A0043FAB2 /* thread_identifier.c */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.c.c; path = thread_identifier.c; sourceTree = "<group>"; };
@@ -111,11 +121,7 @@
65BF6524133F108F0043FAB2 /* pmpstack.awk */ = {isa = PBXFileReference; explicitFileType = sourcecode; fileEncoding = 4; indentWidth = 4; name = pmpstack.awk; path = ../scripts/pmpstack.awk; sourceTree = "<group>"; tabWidth = 4; };
65C1872B12D6AF00009E5AE1 /* config.h */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.c.h; name = config.h; path = ../config.h; sourceTree = SOURCE_ROOT; };
65C1875612D6EC7D009E5AE1 /* ndb_worker.h */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.c.h; path = ndb_worker.h; sourceTree = "<group>"; };
- 65C187B812D94D21009E5AE1 /* Dispatch.cc */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.cpp.cpp; path = Dispatch.cc; sourceTree = "<group>"; };
65C1886612DA682D009E5AE1 /* stats_settings.txt */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = text; path = stats_settings.txt; sourceTree = "<group>"; };
- 65C1886712DAA9BB009E5AE1 /* 3thread.cc */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.cpp.cpp; path = 3thread.cc; sourceTree = "<group>"; };
- 65C1887012DAAD56009E5AE1 /* 3thread.h */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.c.h; path = 3thread.h; sourceTree = "<group>"; };
- 65C1887112DAAD5D009E5AE1 /* Dispatch.h */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.c.h; path = Dispatch.h; sourceTree = "<group>"; };
65C1887212DAAD62009E5AE1 /* Stockholm.h */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.c.h; path = Stockholm.h; sourceTree = "<group>"; };
65E51D7412A0A1D60039115A /* timing.c */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.c.c; path = timing.c; sourceTree = "<group>"; };
65E51D7512A0A49C0039115A /* timing.h */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.c.h; path = timing.h; sourceTree = "<group>"; };
@@ -126,6 +132,8 @@
08FB7794FE84155DC02AAC07 /* ndbmemcache */ = {
isa = PBXGroup;
children = (
+ 652E8C8512F284EF00CA79EE /* protocol.txt */,
+ 652E5A371306533A004F8795 /* binary-spec.txt */,
65AA54F7127C8705003FE674 /* ToDoList.rtf */,
652E8C7C12F2795B00CA79EE /* run-tests.sh */,
65BB2EFA128B9F82003031C5 /* traces */,
@@ -278,8 +286,8 @@
65570AD21298A7800043D9B9 /* util.h */,
);
name = memcached;
- path = ../../../../../../../Desktop/ndbmem/builds/t5/include/memcached;
- sourceTree = SOURCE_ROOT;
+ path = ../../../builds/t5/include/memcached;
+ sourceTree = "<group>";
};
65AA5868127F3EFF003FE674 /* scripts */ = {
isa = PBXGroup;
@@ -310,6 +318,7 @@
652E8C5512F20CE700CA79EE /* trace.binary */,
65B977C112F3984E00FEBB0D /* trace.incr */,
6569FC85130A4A5E00DFAC09 /* dtrace.binary.set */,
+ 652DC977134A8F55000C5787 /* dtrace.flush */,
);
path = traces;
sourceTree = "<group>";
@@ -317,14 +326,17 @@
65C1871312D6A944009E5AE1 /* schedulers */ = {
isa = PBXGroup;
children = (
- 65BF5E221334286D0043FAB2 /* flex.h */,
- 65BF5E231334286D0043FAB2 /* flex.cc */,
- 65C1887012DAAD56009E5AE1 /* 3thread.h */,
- 65C1886712DAA9BB009E5AE1 /* 3thread.cc */,
- 65C1887112DAAD5D009E5AE1 /* Dispatch.h */,
- 65C187B812D94D21009E5AE1 /* Dispatch.cc */,
+ 65BF5E221334286D0043FAB2 /* Flex.h */,
+ 65BF5E231334286D0043FAB2 /* Flex.cc */,
+ 652DCB61134BF637000C5787 /* Flex_thread_spec.h */,
+ 652DCB5E134BF512000C5787 /* Flex_broker.h */,
+ 652DCB5D134BF512000C5787 /* Flex_broker.cc */,
+ 652DCB60134BF512000C5787 /* Flex_cluster.h */,
+ 652DCB5F134BF512000C5787 /* Flex_cluster.cc */,
65C1887212DAAD62009E5AE1 /* Stockholm.h */,
65278D6612BC452E00189195 /* Stockholm.cc */,
+ 652DCB4E134BE87D000C5787 /* 3thread.cc */,
+ 652DCB4F134BE87D000C5787 /* 3thread.h */,
);
path = schedulers;
sourceTree = "<group>";
Attachment: [text/bzr-bundle] bzr/john.duncan@oracle.com-20110406043128-97yzcbs68ejb7rft.bundle
| Thread |
|---|
| • bzr commit into mysql-5.1-telco-7.2 branch (john.duncan:4150) | John David Duncan | 6 Apr |