List:Commits« Previous MessageNext Message »
From:John David Duncan Date:April 6 2011 4:31am
Subject:bzr commit into mysql-5.1-telco-7.2 branch (john.duncan:4150)
View as plain text  
#At file:///Users/jdd/bzr-repo/working/5.1-telco-7.2-memcache/ based on revid:john.duncan@stripped

 4150 John David Duncan	2011-04-05
      Remove Dispatch scheduler; fix deadlock in Stockholm & Flex schedulers; implement flush_all

    removed:
      storage/ndb/memcache/src/schedulers/Dispatch.cc
      storage/ndb/memcache/src/schedulers/Dispatch.h
      storage/ndb/memcache/src/schedulers/flex.cc
      storage/ndb/memcache/src/schedulers/flex.h
    added:
      storage/ndb/memcache/src/schedulers/Flex_broker.cc
      storage/ndb/memcache/src/schedulers/Flex_broker.h
      storage/ndb/memcache/src/schedulers/Flex_cluster.cc
      storage/ndb/memcache/src/schedulers/Flex_cluster.h
      storage/ndb/memcache/src/schedulers/Flex_thread_spec.h
    modified:
      storage/ndb/memcache/Makefile.am
      storage/ndb/memcache/README
      storage/ndb/memcache/include/ClusterConnectionPool.h
      storage/ndb/memcache/include/NdbInstance.h
      storage/ndb/memcache/include/Operation.h
      storage/ndb/memcache/include/QueryPlan.h
      storage/ndb/memcache/include/Scheduler.h
      storage/ndb/memcache/include/ndb_pipeline.h
      storage/ndb/memcache/include/ndb_worker.h
      storage/ndb/memcache/include/ndbmemcache_global.h
      storage/ndb/memcache/include/workitem.h
      storage/ndb/memcache/scripts/metadata.sql
      storage/ndb/memcache/src/Configuration.cc
      storage/ndb/memcache/src/NdbInstance.cc
      storage/ndb/memcache/src/ndb_engine.c
      storage/ndb/memcache/src/ndb_pipeline.cc
      storage/ndb/memcache/src/ndb_worker.cc
      storage/ndb/memcache/src/schedulers/3thread.cc
      storage/ndb/memcache/src/schedulers/3thread.h
      storage/ndb/memcache/src/schedulers/Stockholm.cc
      storage/ndb/memcache/src/schedulers/Stockholm.h
      storage/ndb/memcache/src/workitem.c
      storage/ndb/memcache/unit/Makefile.am
      storage/ndb/memcache/xcode/ndbmemcache.xcodeproj/project.pbxproj
=== modified file 'storage/ndb/memcache/Makefile.am'
--- a/storage/ndb/memcache/Makefile.am	2011-03-30 06:54:53 +0000
+++ b/storage/ndb/memcache/Makefile.am	2011-04-06 04:31:28 +0000
@@ -34,12 +34,14 @@ ndb_engine_la_SOURCES= \
                     src/workqueue.c \
                     src/schedulers/Stockholm.h \
                     src/schedulers/Stockholm.cc \
-                    src/schedulers/Dispatch.h \
-                    src/schedulers/Dispatch.cc \
                     src/schedulers/3thread.h \
                     src/schedulers/3thread.cc \
-                    src/schedulers/flex.h \
-                    src/schedulers/flex.cc \
+                    src/schedulers/Flex.h \
+                    src/schedulers/Flex.cc \
+                    src/schedulers/Flex_broker.h \
+                    src/schedulers/Flex_broker.cc \
+                    src/schedulers/Flex_cluster.h \
+                    src/schedulers/Flex_cluster.cc \
                     src/atomics.c \
                     src/debug.c \
                     src/hash_item_util.c \

=== modified file 'storage/ndb/memcache/README'
--- a/storage/ndb/memcache/README	2011-03-30 06:54:53 +0000
+++ b/storage/ndb/memcache/README	2011-04-06 04:31:28 +0000
@@ -19,11 +19,18 @@ QUICK START
 
 OBTAIN MEMCACHED
 --------------------------
-Obtain memcached 1.6 source from github:
-git clone http://github.com/memcached/memcached
-git checkout engine-pu 
+# Obtain memcached 1.6 source from github, and then checkout the 
+# "engine-pu" branch which contains the "storage engine API" code
+git clone git://github.com/memcached/memcached
+cd memcached
+git checkout -t origin/engine-pu 
 
 
+DECIDE WHERE MEMCACHED WILL BE INSTALLED
+--------------------------
+# You will have to configure memcached and the NDB engine separately, but
+# it is important 
+
 BUILD AND INSTALL MEMCACHED
 --------------------------
 Note that memcached requires libevent 1.3 or newer.

=== modified file 'storage/ndb/memcache/include/ClusterConnectionPool.h'
--- a/storage/ndb/memcache/include/ClusterConnectionPool.h	2011-03-30 06:54:53 +0000
+++ b/storage/ndb/memcache/include/ClusterConnectionPool.h	2011-04-06 04:31:28 +0000
@@ -24,6 +24,8 @@
 
 #include <NdbApi.hpp>
 
+#include "NdbInstance.h"
+
 class ClusterConnectionPool {
 
 /* public instance variables */
@@ -35,7 +37,8 @@ public:
 private:
   Ndb_cluster_connection *conn;                                      
   unsigned int pool_size;                  
-  Ndb_cluster_connection *pool_connections[MAX_CONNECT_POOL];
+  Ndb_cluster_connection * pool_connections[MAX_CONNECT_POOL];
+  NdbInstance * master_instances[MAX_CONNECT_POOL];
 
 /* public class methods */
 public:
@@ -59,6 +62,12 @@ public:
 
   /** Get the connection numbered "my_id modulo pool_size" */
   Ndb_cluster_connection *getPooledConnection(int my_id) const;      // inlined
+
+  /** Set an NdbInstance to be the master instance for a connection */
+  void setNdbInstance(int my_id, NdbInstance *);                    // inlined
+   
+  /** Get the master NdbInstance for this connection */ 
+  NdbInstance * getNdbInstance(int my_id) const;                    // inlined
 };
 
 /* Inline functions */
@@ -73,4 +82,12 @@ inline Ndb_cluster_connection * ClusterC
   return pool_connections[i % pool_size];
 }
 
+inline void ClusterConnectionPool::setNdbInstance(int i, NdbInstance *inst) {
+  master_instances[i % pool_size] = inst;
+}
+
+inline NdbInstance * ClusterConnectionPool::getNdbInstance(int i) const {
+  return master_instances[i % pool_size];
+}
+
 #endif

=== modified file 'storage/ndb/memcache/include/NdbInstance.h'
--- a/storage/ndb/memcache/include/NdbInstance.h	2011-03-30 06:54:53 +0000
+++ b/storage/ndb/memcache/include/NdbInstance.h	2011-04-06 04:31:28 +0000
@@ -42,6 +42,7 @@ public:
   /* Public Instance Variables */  
   Ndb *db;
   ndbmc_atomic32_t in_use;
+  NdbInstance *next;
  
 protected:
   int nplans;

=== modified file 'storage/ndb/memcache/include/Operation.h'
--- a/storage/ndb/memcache/include/Operation.h	2011-03-30 06:54:53 +0000
+++ b/storage/ndb/memcache/include/Operation.h	2011-04-06 04:31:28 +0000
@@ -120,6 +120,7 @@ inline Operation::Operation(QueryPlan *p
                                                                key_buffer(kbuf)
 {
   if(op == OP_READ) record = plan->val_record;
+  else if(op == OP_FLUSH) record = plan->key_record;  // scanning delete 
   else record = plan->row_record;
 }
 

=== modified file 'storage/ndb/memcache/include/QueryPlan.h'
--- a/storage/ndb/memcache/include/QueryPlan.h	2011-03-30 06:54:53 +0000
+++ b/storage/ndb/memcache/include/QueryPlan.h	2011-04-06 04:31:28 +0000
@@ -46,6 +46,7 @@ class QueryPlan {
   public: 
   QueryPlan() : initialized(0)  {};  
   QueryPlan(Ndb *, const TableSpec *, PlanOpts opts = NoOptions); 
+  bool keyIsPrimaryKey();
   
   /* public instance variables */
   bool initialized;
@@ -71,7 +72,6 @@ class QueryPlan {
 
   private:
   /* Private methods */
-  bool keyIsPrimaryKey();
   const NdbDictionary::Index * chooseIndex();
 
   /* Private instance variables */

=== modified file 'storage/ndb/memcache/include/Scheduler.h'
--- a/storage/ndb/memcache/include/Scheduler.h	2011-03-30 06:54:53 +0000
+++ b/storage/ndb/memcache/include/Scheduler.h	2011-04-06 04:31:28 +0000
@@ -52,6 +52,10 @@ public:
   /** schedule() is called from the NDB Engine thread when an operation
       is ready to be queued for further async processing */
   virtual ENGINE_ERROR_CODE schedule(workitem *) = 0;
+
+  /** io_completed() is called from the NDB Engine thread when an IO
+      completion notification has been received */
+  virtual void io_completed(workitem *) = 0;
   
   /** add_stats() allows the engine to delegate certain statistics
      to the scheduler. */

=== modified file 'storage/ndb/memcache/include/ndb_pipeline.h'
--- a/storage/ndb/memcache/include/ndb_pipeline.h	2011-03-30 06:54:53 +0000
+++ b/storage/ndb/memcache/include/ndb_pipeline.h	2011-04-06 04:31:28 +0000
@@ -118,6 +118,11 @@ void * initialize_scheduler(const char *
 /** pass a workitem to the configured scheduler, for execution */
 ENGINE_ERROR_CODE pipeline_schedule_operation(ndb_pipeline *, struct workitem *);
 
+/** schedule a "flush_all" operation */
+ENGINE_ERROR_CODE pipeline_flush_all(ndb_pipeline *);
+
+/** notify scheduler of IO completion */
+void pipeline_io_completed(ndb_pipeline *, struct workitem *);
 
 /***** MEMORY MANAGEMENT APIS *****/
 

=== modified file 'storage/ndb/memcache/include/ndb_worker.h'
--- a/storage/ndb/memcache/include/ndb_worker.h	2011-03-30 06:54:53 +0000
+++ b/storage/ndb/memcache/include/ndb_worker.h	2011-04-06 04:31:28 +0000
@@ -29,4 +29,6 @@ bool worker_prepare_operation(workitem *
 
 bool build_hash_item(workitem *, Operation &);
 
+ENGINE_ERROR_CODE ndb_flush_all(ndb_pipeline *);
+
 #endif

=== modified file 'storage/ndb/memcache/include/ndbmemcache_global.h'
--- a/storage/ndb/memcache/include/ndbmemcache_global.h	2011-03-30 06:54:53 +0000
+++ b/storage/ndb/memcache/include/ndbmemcache_global.h	2011-04-06 04:31:28 +0000
@@ -48,7 +48,8 @@ enum {  
   OP_READ = 8,    
   OP_DELETE,
   OP_ARITHMETIC,
-  OP_SCAN
+  OP_SCAN,
+  OP_FLUSH
 };
 
 #endif

=== modified file 'storage/ndb/memcache/include/workitem.h'
--- a/storage/ndb/memcache/include/workitem.h	2011-03-30 06:54:53 +0000
+++ b/storage/ndb/memcache/include/workitem.h	2011-04-06 04:31:28 +0000
@@ -29,9 +29,12 @@
 /* struct workitem is used in both C and C++ code, requiring a small hack: */
 #ifdef __cplusplus
 #include "QueryPlan.h"
+#include "NdbInstance.h" 
 #define C_OR_CPP_QUERYPLAN QueryPlan
+#define C_OR_CPP_NDBINSTANCE NdbInstance
 #else 
 #define C_OR_CPP_QUERYPLAN void
+#define C_OR_CPP_NDBINSTANCE void
 #endif
 
 
@@ -46,7 +49,8 @@ typedef struct workitem {
     unsigned has_value   : 1;  /*! are we able to use a no-copy value? */
     unsigned retries     : 3;  /*! how many times this job has been retried */
     unsigned complete    : 1;  /*! is this operation finished? */
-    unsigned _unused     : 4;  /*! (32 bits total) */
+    unsigned broker      : 2;  /*! for use by the flex scheduler */
+    unsigned _unused     : 2;  /*! (32 bits total) */
   } base;
   unsigned int id;
   struct workitem *previous;   /*! used to chain workitems in multi-key get */
@@ -56,6 +60,8 @@ typedef struct workitem {
   uint64_t math_value;         /*! IN: incr initial value; OUT: incr result */
   hash_item * cache_item;      /*! used for write requests */
   ndb_pipeline *pipeline;      /*! pointer back to request pipeline */
+  C_OR_CPP_NDBINSTANCE *ndb_instance;   
+                               /*! pointer to ndb instance, if applicable */
   const void *cookie;          /*! memcached's connection cookie */
   C_OR_CPP_QUERYPLAN *plan;    /*! QueryPlan for resolving this request */ 
   const char *key;             /*! pointer to the key */
@@ -137,6 +143,9 @@ const char * workitem_get_key_suffix(wor
 */
 size_t workitem_get_key_buf_size(int nkey);
 
+/*! Set the workitem's NdbInstance
+*/
+void workitem_set_NdbInstance(workitem*, C_OR_CPP_NDBINSTANCE *);
 
 END_FUNCTIONS_WITH_C_LINKAGE
     

=== modified file 'storage/ndb/memcache/scripts/metadata.sql'
--- a/storage/ndb/memcache/scripts/metadata.sql	2011-03-30 06:54:53 +0000
+++ b/storage/ndb/memcache/scripts/metadata.sql	2011-04-06 04:31:28 +0000
@@ -209,6 +209,7 @@ INSERT INTO memcache_server_roles (role_
 INSERT INTO memcache_server_roles (role_name, role_id) VALUES ("db-only", 1);
 INSERT INTO memcache_server_roles (role_name, role_id) VALUES ("mc-only", 2);
 INSERT INTO memcache_server_roles (role_name, role_id) VALUES ("ndb-caching", 3);
+INSERT INTO memcache_server_roles (role_name, role_id) VALUES ("test", 4);
 
 -- ndb_clusters table 
 -- Create an entry for the primary cluster.
@@ -216,9 +217,10 @@ INSERT INTO ndb_clusters values (0, @@nd
 
 -- cache_policies table
 -- Create some sample policies.
-INSERT INTO cache_policies(policy_name, get_policy, set_policy, delete_policy)
+INSERT INTO cache_policies
   VALUES("memcache-only", "cache_only", "cache_only", "cache_only"),
         ("ndb-only", "ndb_only", "ndb_only", "ndb_only"),
+        ("ndb-test", "ndb_only", "ndb_only", "ndb_only", "true"),
         ("caching", "caching", "caching", "caching"),
         ("caching-with-local-deletes", "caching", "caching", "cache_only"),
         ("ndb-read-only", "ndb_only", "disabled", "disabled");
@@ -241,7 +243,8 @@ INSERT INTO key_prefixes (server_role_id
          (1, "",    0, "ndb-only", "demo_table"),
          (1, "t:",  0, "ndb-only", "demo_tabs"),
          (2, "",    0, "memcache-only", NULL),
-         (3, "",    0, "caching", "demo_table") 
+         (3, "",    0, "caching", "demo_table"),
+         (4, "",    0, "ndb-test", "demo_table")
 ;
 
 

=== modified file 'storage/ndb/memcache/src/Configuration.cc'
--- a/storage/ndb/memcache/src/Configuration.cc	2011-03-30 06:54:53 +0000
+++ b/storage/ndb/memcache/src/Configuration.cc	2011-04-06 04:31:28 +0000
@@ -513,7 +513,9 @@ bool config_v1::get_policies() {
     success = false;
   }
   DEBUG_PRINT("map size: %d", policies.size());
-    
+  
+  tx->close();
+      
   return success;
 }
 

=== modified file 'storage/ndb/memcache/src/NdbInstance.cc'
--- a/storage/ndb/memcache/src/NdbInstance.cc	2011-03-30 06:54:53 +0000
+++ b/storage/ndb/memcache/src/NdbInstance.cc	2011-04-06 04:31:28 +0000
@@ -33,6 +33,7 @@ NdbInstance::NdbInstance(Ndb_cluster_con
   nplans = nprefixes;
   db = new Ndb(c);
   in_use = false;
+  next = 0;
   plans = new QueryPlan *[nplans];
   memset(plans, 0, (nplans * sizeof(QueryPlan *)));
   db->init(ntransactions);

=== modified file 'storage/ndb/memcache/src/ndb_engine.c'
--- a/storage/ndb/memcache/src/ndb_engine.c	2011-03-30 06:54:53 +0000
+++ b/storage/ndb/memcache/src/ndb_engine.c	2011-04-06 04:31:28 +0000
@@ -100,22 +100,27 @@ ENGINE_ERROR_CODE create_instance(uint64
   ndb_eng->npipelines = 0;
 
   ndb_eng->engine.interface.interface = 1;
-  ndb_eng->engine.get_info        = ndb_get_info;
-  ndb_eng->engine.initialize      = ndb_initialize;
-  ndb_eng->engine.destroy         = ndb_destroy;
-  ndb_eng->engine.allocate        = ndb_allocate;
-  ndb_eng->engine.remove          = ndb_remove;
-  ndb_eng->engine.release         = ndb_release;
-  ndb_eng->engine.get             = ndb_get;
-  ndb_eng->engine.get_stats       = ndb_get_stats;
-  ndb_eng->engine.reset_stats     = ndb_reset_stats;
-  ndb_eng->engine.store           = ndb_store;
-  ndb_eng->engine.arithmetic      = ndb_arithmetic;
-  ndb_eng->engine.flush           = ndb_flush;
-  ndb_eng->engine.unknown_command = ndb_unknown_command;
-  ndb_eng->engine.item_set_cas    = item_set_cas;           /* reused */
-  ndb_eng->engine.get_item_info   = ndb_get_item_info; 
-
+  ndb_eng->engine.get_info         = ndb_get_info;
+  ndb_eng->engine.initialize       = ndb_initialize;
+  ndb_eng->engine.destroy          = ndb_destroy;
+  ndb_eng->engine.allocate         = ndb_allocate;
+  ndb_eng->engine.remove           = ndb_remove;
+  ndb_eng->engine.release          = ndb_release;
+  ndb_eng->engine.get              = ndb_get;
+  ndb_eng->engine.get_stats        = ndb_get_stats;
+  ndb_eng->engine.reset_stats      = ndb_reset_stats;
+  ndb_eng->engine.store            = ndb_store;
+  ndb_eng->engine.arithmetic       = ndb_arithmetic;
+  ndb_eng->engine.flush            = ndb_flush;
+  ndb_eng->engine.unknown_command  = ndb_unknown_command;
+  ndb_eng->engine.item_set_cas     = item_set_cas;           /* reused */
+  ndb_eng->engine.get_item_info    = ndb_get_item_info; 
+  ndb_eng->engine.get_stats_struct = NULL;
+  ndb_eng->engine.aggregate_stats  = NULL;
+  ndb_eng->engine.tap_notify       = NULL;
+  ndb_eng->engine.get_tap_iterator = NULL;
+  ndb_eng->engine.errinfo          = NULL;
+  
   ndb_eng->server = *api;
   ndb_eng->get_server_api = get_server_api;
   
@@ -127,8 +132,11 @@ ENGINE_ERROR_CODE create_instance(uint64
   ndb_eng->info.info.description = "NDB Memcache " VERSION;
   ndb_eng->info.info.num_features = 3;
   ndb_eng->info.info.features[0].feature = ENGINE_FEATURE_CAS;
+  ndb_eng->info.info.features[0].description = NULL;
   ndb_eng->info.info.features[1].feature = ENGINE_FEATURE_PERSISTENT_STORAGE;
+  ndb_eng->info.info.features[1].description = NULL;
   ndb_eng->info.info.features[2].feature = ENGINE_FEATURE_LRU;
+  ndb_eng->info.info.features[2].description = NULL;
  
   /* Now call create_instace() for the default engine */
   e = create_my_default_instance(interface, get_server_api, 
@@ -259,11 +267,10 @@ static ENGINE_ERROR_CODE ndb_allocate(EN
                                            const int flags,
                                            const rel_time_t exptime)
 {
+  DEBUG_ENTER();
   struct ndb_engine* ndb_eng = ndb_handle(handle);
   struct default_engine *def_eng = default_handle(ndb_eng);
 
-  /*  ndb_pipeline *pipeline = get_my_pipeline_config(ndb_eng); */
-  /* DEBUG_ENTER();  */
   
   return def_eng->engine.allocate(ndb_eng->m_default_engine, cookie, item, 
                                   key, nkey, nbytes, flags, exptime);
@@ -284,7 +291,6 @@ static ENGINE_ERROR_CODE ndb_remove(ENGI
   ENGINE_ERROR_CODE return_status = ENGINE_KEY_ENOENT;
   prefix_info_t prefix;
   workitem *wqitem;
-  DEBUG_ENTER();
 
   /* Is this a callback after completed I/O? */
   wqitem = ndb_eng->server.cookie->get_engine_specific(cookie);
@@ -292,6 +298,7 @@ static ENGINE_ERROR_CODE ndb_remove(ENGI
     DEBUG_PRINT("Got callback: %s", wqitem->status->comment);
     ndb_eng->server.cookie->store_engine_specific(cookie, wqitem->previous);
     return_status = wqitem->status->status;
+    pipeline_io_completed(pipeline, wqitem);
     workitem_free(wqitem);
     return return_status;
   }
@@ -380,6 +387,7 @@ static ENGINE_ERROR_CODE ndb_get(ENGINE_
                        pipeline->id, wqitem->id, wqitem->status->comment);
     *item = wqitem->cache_item;
     wqitem->base.complete = 1;
+    pipeline_io_completed(pipeline, wqitem);
     if(wqitem->status->status != ENGINE_SUCCESS) {
       DEBUG_PRINT("pop and free the workitem.");
       ndb_eng->server.cookie->store_engine_specific(cookie, wqitem->previous);
@@ -458,6 +466,7 @@ static ENGINE_ERROR_CODE ndb_store(ENGIN
   if(wqitem) {
    // fixme: chaining
     DEBUG_PRINT("Got callback: %s", wqitem->status->comment);
+    pipeline_io_completed(pipeline, wqitem);
     return wqitem->status->status;
   }
 
@@ -513,6 +522,7 @@ static ENGINE_ERROR_CODE ndb_arithmetic(
     DEBUG_PRINT("Got arithmetic callback: %s", wqitem->status->comment);
     wqitem->base.complete = 1;
     *result = wqitem->math_value;
+    pipeline_io_completed(pipeline, wqitem);
     if(wqitem->status->status != ENGINE_SUCCESS) {
       DEBUG_PRINT("pop and free the workitem.");
       ndb_eng->server.cookie->store_engine_specific(cookie, wqitem->previous);
@@ -552,11 +562,22 @@ static ENGINE_ERROR_CODE ndb_arithmetic(
 static ENGINE_ERROR_CODE ndb_flush(ENGINE_HANDLE* handle,
                                    const void* cookie, time_t when)
 {                                   
+/* 
+   Notes on flush:
+   The server will call *only* into ndb_flush (not to allocate or release).
+   The NDB engine ignores the "when" parameter.    
+   Flush operations have special handling, outside of the scheduler.
+   They are performed synchronously. 
+   And we always send the flush command to the cache engine.   
+*/
+
+  DEBUG_ENTER();
   struct ndb_engine* ndb_eng = ndb_handle(handle);
   struct default_engine *def_eng = default_handle(ndb_eng);
-  /*  ndb_pipeline *pipeline = get_my_pipeline_config(ndb_eng); */
-  /*  DEBUG_ENTER(); */
-  return def_eng->engine.flush(ndb_eng->m_default_engine, cookie, when);
+  ndb_pipeline *pipeline = get_my_pipeline_config(ndb_eng);
+
+  (void) def_eng->engine.flush(ndb_eng->m_default_engine, cookie, when);
+  return pipeline_flush_all(pipeline);
 }
 
 
@@ -566,9 +587,9 @@ static ENGINE_ERROR_CODE ndb_unknown_com
                                              protocol_binary_request_header *request,
                                              ADD_RESPONSE response)
 {
+  DEBUG_ENTER();
   struct ndb_engine* ndb_eng = ndb_handle(handle);
   struct default_engine *def_eng = default_handle(ndb_eng);
-  /*  ndb_pipeline *pipeline = get_my_pipeline_config(ndb_eng); */
 
   return def_eng->engine.unknown_command(ndb_eng->m_default_engine, cookie, 
                                          request, response);

=== modified file 'storage/ndb/memcache/src/ndb_pipeline.cc'
--- a/storage/ndb/memcache/src/ndb_pipeline.cc	2011-03-30 06:54:53 +0000
+++ b/storage/ndb/memcache/src/ndb_pipeline.cc	2011-04-06 04:31:28 +0000
@@ -33,11 +33,11 @@
 #include "ndb_engine.h"
 #include "debug.h"
 #include "thread_identifier.h"
+#include "ndb_worker.h"
 
 #include "schedulers/3thread.h"
 #include "schedulers/Stockholm.h"
-#include "schedulers/Dispatch.h"
-#include "schedulers/flex.h"
+#include "schedulers/Flex.h"
 
 #define DEFAULT_SCHEDULER Scheduler_flex
 
@@ -96,7 +96,7 @@ ndb_pipeline * ndb_pipeline_initialize(s
   self->engine_thread_id = pthread_self(); 
     
   /* Get the configuration */
-  Configuration Conf = get_Configuration();
+  const Configuration & Conf = get_Configuration();
   
   /* Create and set a thread identity */
   tid = (thread_identifier *) memory_pool_alloc(self->pool, sizeof(thread_identifier));
@@ -181,8 +181,6 @@ void * initialize_scheduler(const char *
       s = new Scheduler_flex;
     if(!strncasecmp(cf, "stockholm", 10))
       s = new Scheduler_stockholm;
-    else if(!strncasecmp(cf, "dispatch", 9))
-      s = new Scheduler_dispatch;
     else if(!strncasecmp(cf, "3-thread", 9))
       s = new Scheduler_3thread;
     else {
@@ -205,6 +203,16 @@ ENGINE_ERROR_CODE pipeline_schedule_oper
 }
 
 
+ENGINE_ERROR_CODE pipeline_flush_all(ndb_pipeline *self) {
+  return ndb_flush_all(self);
+}
+
+
+void pipeline_io_completed(ndb_pipeline *self, struct workitem *item) {
+  self->scheduler->io_completed(item);
+}
+
+
 /* The slab allocator API */
 
 int pipeline_get_size_class_id(size_t object_size) {

=== modified file 'storage/ndb/memcache/src/ndb_worker.cc'
--- a/storage/ndb/memcache/src/ndb_worker.cc	2011-03-30 06:54:53 +0000
+++ b/storage/ndb/memcache/src/ndb_worker.cc	2011-04-06 04:31:28 +0000
@@ -46,11 +46,11 @@
 #include "status_block.h"
 #include "Operation.h"
 #include "ndb_engine.h"
-#include "debug.h"
 #include "hash_item_util.h"
 #include "ndb_worker.h"
 
 void incrCallback(int, NdbTransaction *, void *); // callback for incr/decr
+void flush_scan(int result, NdbTransaction *tx, void *itemptr);  // callback for flush
 void DBcallback(int, NdbTransaction *, void *);   // callback for all others
 
 bool worker_do_read(workitem *, bool with_cas); 
@@ -62,6 +62,7 @@ void worker_set_cas(ndb_pipeline *, uint
 bool finalize_read(workitem *);
 bool finalize_write(workitem *, bool);
 int build_cas_routine(NdbInterpretedCode *r, int cas_col, uint64_t cas_val);
+bool scan_delete(NdbInstance *, QueryPlan *);
 
 
 extern EXTENSION_LOGGER_DESCRIPTOR *logger;
@@ -96,7 +97,7 @@ status_block status_block_bad_add = 
 
 status_block status_block_bad_replace =
   { ENGINE_NOT_STORED, "Tuple not found"              };
-
+  
 
 void worker_set_cas(ndb_pipeline *p, uint64_t *cas) {  
   /* Be careful here --  ndbmc_atomic32_t might be a signed type.
@@ -143,6 +144,7 @@ bool worker_prepare_operation(workitem *
     default:
       return false;   /* not supported */
   }
+
   return r;  /* fixme: distinguish "not supported" from "failed" */
 }
 
@@ -651,7 +653,7 @@ bool finalize_read(workitem *wqitem) {
      && op.getStringValueNoCopy(COL_STORE_VALUE, & wqitem->value_ptr, & wqitem->value_size)
      && op.appendCRLF(COL_STORE_VALUE, wqitem->value_size))
   {
-    /* The workiten's value_ptr and value_size were set above. */
+    /* The workitem's value_ptr and value_size were set above. */
     DEBUG_PRINT("using no-copy buffer.");
     wqitem->base.has_value = true;
     need_hash_item = false;
@@ -766,7 +768,70 @@ int build_cas_routine(NdbInterpretedCode
   return r->finalise();                      // resolve the label/branch
 }
 
+
+/* Flush all is a fully synchronous operation -- 
+ the memcache server is waiting for a response, and the thread is blocked.
+*/
+ENGINE_ERROR_CODE ndb_flush_all(ndb_pipeline *pipeline) {
+  DEBUG_ENTER();
+  const Configuration &conf = get_Configuration();
+  
+  DEBUG_PRINT(" %d prefixes", conf.nprefixes);
+  for(int i = 0 ; i < conf.nprefixes ; i++) {
+    const KeyPrefix *p = conf.getPrefix(i);
+    if(p->info.use_ndb && p->info.do_db_flush) {
+      NdbInstance *inst = conf.getConnectionById(p->info.cluster_id)->
+      getNdbInstance(pipeline->id);
+      QueryPlan *plan = inst->getPlanForPrefix(p);
+      if(plan->keyIsPrimaryKey()) {
+        /* To flush, scan the table and delete every row */
+        /* ToDo: ensure against duplicates (don't scan the same table twice) */
+        DEBUG_PRINT("deleting from %s", p->table->table_name);
+        scan_delete(inst, plan);      
+      }
+      else DEBUG_PRINT(" not scanning table %s -- access path is not primary key",
+                       p->table->table_name);
+    }
+    else DEBUG_PRINT(" Not scanning table %s -- use_ndb:%d flush:%d",
+                     p->table->table_name, p->info.use_ndb, p->info.do_db_flush);    
+  }
+  
+  return ENGINE_SUCCESS;
+}
+
+
+bool scan_delete(NdbInstance *inst, QueryPlan *plan) {
+  DEBUG_ENTER();
+  int res = 0;
+  int check;
   
+  NdbTransaction *tx = inst->db->startTransaction();
+  NdbScanOperation *scan = tx->getNdbScanOperation(plan->table);
+  scan->readTuplesExclusive();
+  
+  /* execute NoCommit */
+  if((res = tx->execute(NdbTransaction::NoCommit)) != 0) 
+    logger->log(LOG_WARNING, 0, "execute(NoCommit): %s\n", tx->getNdbError().message);
+  
+  /* scan and delete */
+  while(scan->nextResult(true) == 0) {
+    do {
+      if((res = scan->deleteCurrentTuple()) != 0) {
+        logger->log(LOG_WARNING, 0, "deleteCurrentTuple(): %s\n", 
+                    tx->getNdbError().message);
+      }
+    } while((check = scan->nextResult(false)) == 0);
+    
+    /* execute a batch */
+    if(check != -1)
+      if((res = tx->execute(NdbTransaction::NoCommit)) != 0)
+        logger->log(LOG_WARNING, 0, "execute(NoCommit): %s\n", 
+                    tx->getNdbError().message);
+  }
   
+  tx->execute(NdbTransaction::Commit);   // execute & commit 
+  tx->close();
   
+  return (res == 0);
+}
 

=== modified file 'storage/ndb/memcache/src/schedulers/3thread.cc'
--- a/storage/ndb/memcache/src/schedulers/3thread.cc	2011-03-30 06:54:53 +0000
+++ b/storage/ndb/memcache/src/schedulers/3thread.cc	2011-04-06 04:31:28 +0000
@@ -22,6 +22,7 @@
 #define __STDC_FORMAT_MACROS 
 #include <inttypes.h>
 #include <stdio.h>
+#include <unistd.h>
 
 /* Memcache headers */
 #include "memcached/types.h"
@@ -43,7 +44,7 @@ extern "C" {
 status_block status_block_unsupported = { ENGINE_ENOTSUP, "Not supported." };
 
 void Scheduler_3thread::init(int my_thread, int, const char *) {
-  const Configuration conf = get_Configuration();
+  const Configuration & conf = get_Configuration();
   ClusterConnectionPool *pool;
   Ndb_cluster_connection *conn;
   LockableNdbInstance *inst;
@@ -175,7 +176,7 @@ void * run_3thd_worker_thread(void *s) {
 void * Scheduler_3thread::run_ndb_worker_thread() {
   DEBUG_ENTER();
   workitem *newitem;
-  const Configuration conf = get_Configuration();     // see note
+  const Configuration & conf = get_Configuration();     // see note
   int lockerr;
   unsigned int ncycles = 0;
   

=== modified file 'storage/ndb/memcache/src/schedulers/3thread.h'
--- a/storage/ndb/memcache/src/schedulers/3thread.h	2011-03-30 06:54:53 +0000
+++ b/storage/ndb/memcache/src/schedulers/3thread.h	2011-04-06 04:31:28 +0000
@@ -42,6 +42,7 @@ public:
   void init(int threadnum, int nthreads, const char *config_string);
   void attach_thread(thread_identifier *);
   ENGINE_ERROR_CODE schedule(workitem *);
+  void io_completed(workitem *) {};
   void add_stats(ADD_STAT, const void *);
   void * run_ndb_worker_thread();
   void * run_ndb_commit_thread();

=== removed file 'storage/ndb/memcache/src/schedulers/Dispatch.cc'
--- a/storage/ndb/memcache/src/schedulers/Dispatch.cc	2011-03-30 06:54:53 +0000
+++ b/storage/ndb/memcache/src/schedulers/Dispatch.cc	1970-01-01 00:00:00 +0000
@@ -1,187 +0,0 @@
-/*
- Copyright (c) 2011, Oracle and/or its affiliates. All rights
- reserved.
- 
- This program is free software; you can redistribute it and/or
- modify it under the terms of the GNU General Public License
- as published by the Free Software Foundation; version 2 of
- the License.
- 
- This program is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- GNU General Public License for more details.
- 
- You should have received a copy of the GNU General Public License
- along with this program; if not, write to the Free Software
- Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- 02110-1301  USA
- */
-/* System headers */
-/* C++ files must define __STDC_FORMAT_MACROS in order to get PRIu64 */
-#include "config.h"
-#ifdef HAVE_DISPATCH_DISPATCH_H
-
-#define __STDC_FORMAT_MACROS 
-#include <inttypes.h>
-#include <dispatch/dispatch.h>
-#include <stdio.h>
-
-/* Memcache headers */
-#include "memcached/types.h"
-#include <memcached/extension_loggers.h>
-
-/* NDB Memcache headers */
-#include "Dispatch.h"
-#include "workitem.h"
-#include "ndb_worker.h"
-
-extern EXTENSION_LOGGER_DESCRIPTOR *logger;
-
-extern "C" {
-  void dispatch_pollNdb(void *arg);
-}
-
-
-void Scheduler_dispatch::init(int, int nthreads, const char *) {
-  const Configuration conf = get_Configuration();
-
-  /* Get the NDB instances. 
-   
-   Each pipeline has 75 NDB instances per cluster.  
-   Each instance can handle 1 transaction.  
-   
-   With 75 instances, a thread can handle 25,000 ops/sec (25 ops per ms) 
-   and all operations can be in-flight for 3 milliseconds.
-   
-   We maintain the instances in an array of lists, so  
-   instances[n] is the head list of instances for cluster n.
-   
-   TODO: This needs to work with all defined clusters
-   */  
-  
-  for(int j = 0 ; j < 75 ; j++) {
-    NdbInstance *inst = new NdbInstance(conf.getConnectionById(0), 
-                                        conf.nprefixes, 
-                                        1);
-    inst->db->init(1);
-    instances[j] = inst; 
-  }
-  
-  /* Now hoarde some transactions (API connect records).  startTransaction() 
-   will send TC_SEIZEREQ and wait for a reply.  This really needs to be done
-   at initialization time when nobody is waiting for it.
-   TODO: this should be one array per cluster
-   */
-  NdbTransaction * txlist[75];
-  for(int j = 0 ; j < 75 ; j++) {
-    txlist[j] = instances[j]->db->startTransaction();
-  }
-  
-  for(int j = 0 ; j < 75 ; j++) {
-    txlist[j]->close();
-  }
-
-  /* Get the dispatch queue */
-  asyncqueue = dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_HIGH, 0);
-}
-
-
-void Scheduler_dispatch::attach_thread(thread_identifier * p) {
-
-  pipeline = parent->pipeline;
-  logger->log(LOG_WARNING, 0, "Pipeline %d attached to libdispatch scheduler.\n",
-              pipeline->id);
-
-  /* Initialize counters */
-  instance_counter = 0;
-}                                     
-
-
-ENGINE_ERROR_CODE Scheduler_dispatch::schedule(workitem *newitem) {
-  NdbInstance *inst;
-  const Configuration conf = get_Configuration();     // see note
-  
-  /* Fetch the config for its key prefix */
-  const KeyPrefix *pfx = conf.getPrefixByInfo(newitem->prefix_info);
-  
-  /* From here on we will work mainly with the suffix part of the key. */
-  newitem->base.nsuffix = newitem->base.nkey - pfx->prefix_len;
-  // FIXME: runtime error back to client? 
-  DEBUG_ASSERT(newitem->base.nsuffix > 0);
-  
-  /* Get an NDB instance.  But if the instance is in use, go forward and
-   get the next one.  If something goes wrong (cluster crash, disconnected...)
-   and the commit thread can no longer release instances, it's possible that
-   there will be no free instances and this will loop forever.
-   FIXME: catch this condition somehow.
-   */  
-  do {
-    inst = instances[instance_counter];
-    instance_counter = (++ instance_counter % 75);   
-  } while(inst->in_use == true);
-  
-  // Now we've got an NDB Instance. 
-  inst->in_use = true;
-  
-  // Fetch the query plan for this prefix.
-  newitem->plan = inst->getPlanForPrefix(pfx);
-  
-  // Build the NDB transaction
-  if(worker_prepare_operation(newitem)) {
-    // Queue the operation for a dispatch thread.
-    dispatch_async_f(asyncqueue, inst, dispatch_pollNdb);
-    return ENGINE_EWOULDBLOCK;
-  }
-  else return ENGINE_ENOTSUP;
-}
-  
-
-void Scheduler_dispatch::add_stats(ADD_STAT add_stat, 
-                                    const void * cookie) {
-  return;
-}
-
-
-/* 
-  libdispatch version of the commit thread:  
-  Take an NdbInstance, call pollNdb() on it, then mark it as free:
-*/
-void dispatch_pollNdb(void *arg) {
-  NdbInstance *inst = (NdbInstance *) arg;
-  
-  /* Poll */
-  inst->db->pollNdb();
-  
-  /* Now we are done with the instance */
-  inst->in_use = false;
-}
-
-
-#else 
-
-/* HAVE_DISPATCH_DISPATCH_H */
-/* Stub version of the dispatch scheduler. */
-/* This allows the C++ compiler to create a vtable, so that the engine can be
-   linked into memcached, but the stub constructor in dispatch.h will produce
-   an error message and exit.
-*/
-
-/* System headers */
-#include <stdio.h>
-
-/* Memcache headers */
-#include "memcached/types.h"
-#include <memcached/extension_loggers.h>
-
-/* NDB Memcache headers */
-#include "Dispatch.h"
-
-
-void Scheduler_dispatch::init(int, int, const char *) { return; }
-void Scheduler_dispatch::attach_thread(thread_identifier *) { return; } 
-void Scheduler_dispatch::add_stats(ADD_STAT,const void *) { return; }
-ENGINE_ERROR_CODE Scheduler_dispatch::schedule(workitem *) { return ENGINE_FAILED; }
-
-
-#endif

=== removed file 'storage/ndb/memcache/src/schedulers/Dispatch.h'
--- a/storage/ndb/memcache/src/schedulers/Dispatch.h	2011-03-30 06:54:53 +0000
+++ b/storage/ndb/memcache/src/schedulers/Dispatch.h	1970-01-01 00:00:00 +0000
@@ -1,83 +0,0 @@
-/*
- Copyright (c) 2011, Oracle and/or its affiliates. All rights
- reserved.
- 
- This program is free software; you can redistribute it and/or
- modify it under the terms of the GNU General Public License
- as published by the Free Software Foundation; version 2 of
- the License.
- 
- This program is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- GNU General Public License for more details.
- 
- You should have received a copy of the GNU General Public License
- along with this program; if not, write to the Free Software
- Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- 02110-1301  USA
- */
-#ifndef NDBMEMCACHE_DISPATCH_SCHEDULER_H
-#define NDBMEMCACHE_DISPATCH_SCHEDULER_H
-
-#ifndef __cplusplus
-#error "This file is for C++ only"
-#endif
-
-#include <memcached/types.h>
-
-#include "config.h"
-#include "Scheduler.h"
-
-#ifdef HAVE_DISPATCH_DISPATCH_H
-#include <dispatch/dispatch.h>
-
-/* 
- *              Dispatch Scheduler 
- * 
- * The dispatch scheduler runs only on platforms that support libdispatch.
- * It uses a large number of Ndb objects.
- */
-
-
-class Scheduler_dispatch : public Scheduler {
-public:
-  Scheduler_dispatch() {};
-  ~Scheduler_dispatch();
-  void init(int threadnum, int nthreads, const char *config_string);
-  void attach_thread(thread_identifier *);
-  ENGINE_ERROR_CODE schedule(workitem *);
-  void add_stats(ADD_STAT, const void *);
-  
-private:
-  dispatch_queue_t asyncqueue;
-  int instance_counter;
-  NdbInstance *instances[75];  
-};
-
-
-#else 
- /* Crippled version of the libdispatch scheduler. */
- 
-class Scheduler_dispatch : public Scheduler {
-public:
-  Scheduler_dispatch();
-  void init(int threadnum, int nthreads, const char *config_string);
-  void attach_thread(thread_identifier *);
-  ENGINE_ERROR_CODE schedule(workitem *);
-  void add_stats(ADD_STAT, const void *);
-};
-
-
-inline Scheduler_dispatch::Scheduler_dispatch() {
-  fprintf(stderr, "ndbmemcache was compiled without libdispatch. "
-                  "The dispatch scheduler is not available.\n "
-                  "memcached will now exit.\n");
-  exit(99);
-}
-
-#endif
-
-
-#endif
-

=== added file 'storage/ndb/memcache/src/schedulers/Flex_broker.cc'
--- a/storage/ndb/memcache/src/schedulers/Flex_broker.cc	1970-01-01 00:00:00 +0000
+++ b/storage/ndb/memcache/src/schedulers/Flex_broker.cc	2011-04-06 04:31:28 +0000
@@ -0,0 +1,92 @@
+/*
+ Copyright (c) 2011, Oracle and/or its affiliates. All rights
+ reserved.
+ 
+ This program is free software; you can redistribute it and/or
+ modify it under the terms of the GNU General Public License
+ as published by the Free Software Foundation; version 2 of
+ the License.
+ 
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+ 
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ 02110-1301  USA
+ */
+ 
+#include <stdlib.h>
+#include <unistd.h>
+
+#include "workitem.h"
+#include "KeyPrefix.h"
+#include "Configuration.h"
+#include "debug.h"
+#include "Flex.h"
+#include "Flex_thread_spec.h"
+#include "Flex_broker.h"
+#include "Flex_cluster.h"
+
+
+Scheduler_flex::Broker::Broker(Scheduler_flex *s, int my_id) : 
+                               sched(s),
+                               id(my_id),
+                               conf(get_Configuration()), 
+                               queue(s->broker_queue)
+{
+  clusters = (Cluster **) calloc(conf.nclusters, sizeof(Cluster *));
+  
+  for(int c = 0 ; c < conf.nclusters ; c++) {
+    clusters[c] = new Cluster(sched, id, c);
+  }
+}
+
+
+ENGINE_ERROR_CODE Scheduler_flex::Broker::schedule(workitem *item) {  
+  const KeyPrefix *pfx = conf.getPrefixByInfo(item->prefix_info);
+  
+  if(item->prefix_info.prefix_id) {
+    DEBUG_PRINT("prefix %d: \"%s\" Table: %s  Value Cols: %d", 
+                item->prefix_info.prefix_id, pfx->prefix, 
+                pfx->table->table_name, pfx->table->nvaluecols);
+  }
+  
+  /* Record the suffix length */
+  item->base.nsuffix = item->base.nkey - pfx->prefix_len;  
+  if(item->base.nsuffix == 0) return ENGINE_EINVAL; // key too short
+
+  /* Set my broker id in the workitem */
+  item->base.broker = id;
+  
+  clusters[item->prefix_info.cluster_id]->schedule(item, pfx);
+}
+
+
+void Scheduler_flex::Broker::contribute_stats() {
+  for(int i = 0 ; i < conf.nclusters ; i++) 
+    clusters[i]->contribute_stats();
+}
+
+
+/* 
+   Broker thread: get a workitem off the workqueue, and build its operations.
+*/
+void * Scheduler_flex::Broker::run_broker_thread() {
+  workitem *newitem;
+  
+  DEBUG_ENTER();
+  
+  while(1) {
+    /* Wait for something to appear on the queue */
+    newitem = (workitem *) workqueue_consumer_wait(queue);
+    
+    if(newitem == NULL) return 0;  /* queue has been shut down and emptied */
+    
+    (void) schedule(newitem);
+  }
+}
+
+

=== added file 'storage/ndb/memcache/src/schedulers/Flex_broker.h'
--- a/storage/ndb/memcache/src/schedulers/Flex_broker.h	1970-01-01 00:00:00 +0000
+++ b/storage/ndb/memcache/src/schedulers/Flex_broker.h	2011-04-06 04:31:28 +0000
@@ -0,0 +1,54 @@
+/*
+ Copyright (c) 2011, Oracle and/or its affiliates. All rights
+ reserved.
+ 
+ This program is free software; you can redistribute it and/or
+ modify it under the terms of the GNU General Public License
+ as published by the Free Software Foundation; version 2 of
+ the License.
+ 
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+ 
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ 02110-1301  USA
+ */
+
+#ifndef NDBMEMCACHE_FLEX_BROKER_H
+#define NDBMEMCACHE_FLEX_BROKER_H
+
+#ifndef __cplusplus
+#error "This file is for C++ only"
+#endif
+
+
+class Scheduler_flex::Broker {
+  friend class Scheduler_flex;
+  friend class thread_spec;
+  
+public:
+  Broker(Scheduler_flex *, int my_id);
+  ~Broker();             
+  ENGINE_ERROR_CODE schedule(workitem *);      /*< schedule item on its cluster */
+  
+  pthread_t thread_id;
+  Cluster ** clusters;
+  
+protected:
+  void * run_broker_thread();
+  void contribute_stats();
+  
+private:
+  int id;
+  const Configuration conf;
+  Scheduler_flex *sched;
+  struct workqueue *queue;
+};
+
+
+#endif
+

=== added file 'storage/ndb/memcache/src/schedulers/Flex_cluster.cc'
--- a/storage/ndb/memcache/src/schedulers/Flex_cluster.cc	1970-01-01 00:00:00 +0000
+++ b/storage/ndb/memcache/src/schedulers/Flex_cluster.cc	2011-04-06 04:31:28 +0000
@@ -0,0 +1,248 @@
+/*
+ Copyright (c) 2011, Oracle and/or its affiliates. All rights
+ reserved.
+ 
+ This program is free software; you can redistribute it and/or
+ modify it under the terms of the GNU General Public License
+ as published by the Free Software Foundation; version 2 of
+ the License.
+ 
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+ 
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ 02110-1301  USA
+ */
+
+
+#include "workitem.h"
+#include "KeyPrefix.h"
+#include "Configuration.h"
+#include "debug.h"
+#include "Flex.h"
+#include "Flex_thread_spec.h"
+#include "Flex_broker.h"
+#include "Flex_cluster.h"
+#include "ndb_worker.h"
+
+extern EXTENSION_LOGGER_DESCRIPTOR *logger;
+
+extern "C" {
+  void * run_flex_commit_thread(void *);
+}
+
+
+Scheduler_flex::Cluster::Cluster(Scheduler_flex *s,
+                                 int broker,
+                                 int cluster) : sched(s),
+                                                broker_id(broker),
+                                                id(cluster),
+                                                conf(get_Configuration())      {
+  DEBUG_PRINT("thread: %d, broker: %d, cluster: %d", sched->thread_id, 
+              broker, cluster);
+  /* Get the connection pool for my cluster */
+  ClusterConnectionPool *pool = conf.getConnectionById(id);
+  
+  /* Get a number of NDB instances.
+   That number is determined here, using max_tps and usec_rtt.
+   We allow a transaction to be in-flight for 5 * RTT, 
+   and we need enough NDB objects to meet one thread's share of max_tps.
+   I go step-by-step through the math here (the compiler may optimize it).
+   */
+  double tx_time_in_usec = pool->usec_rtt * 5;
+  double tx_per_ndb_per_sec = 1000000 / tx_time_in_usec;
+  double total_ndb_objects = conf.max_tps / tx_per_ndb_per_sec;
+  double ndbs_per_pipeline = total_ndb_objects / sched->config.n_engine_threads;
+  nInst = (int) (ndbs_per_pipeline / sched->nbrokers);
+  
+  /* For now, the queue size is fixed */
+  queue_size = 8192;
+  
+  /* Do we need more connections? */  
+  for(int n = pool->getPoolSize(); n < sched->config.n_connections; n++)
+    (void) pool->addPooledConnection(); // Maybe not all were added; that's OK.
+  
+  /* Now obtain the appropriate one for this thread. */
+  Ndb_cluster_connection *conn = pool->getPooledConnection(sched->thread_id);  
+  DEBUG_PRINT("broker %d, cluster %d, node %d: "
+              "%d TPS @ %d usec RTT ==> %d NDB instances.", broker_id, id, 
+              conn->node_id(), conf.max_tps, pool->usec_rtt, nInst);
+  
+  // Get (and store) the master NDB instance for the connection
+  pool->setNdbInstance(sched->thread_id, 
+                       new NdbInstance(conn, conf.nprefixes, 128));
+  
+  // Get the NDB instances
+  instances = (NdbInstance**) calloc(nInst, sizeof(NdbInstance *));    
+  for(int i = 0; i < nInst ; i++) {
+    NdbInstance *inst = new NdbInstance(conn, conf.nprefixes, 1);
+    instances[i] = inst;
+    inst->next = nextFree;
+    nextFree = inst;
+  }
+  
+  /* Hoard a transaction (an API connect record) for each Ndb object.  This
+   first call to startTransaction() will send TC_SEIZEREQ and wait for a 
+   reply, but later at runtime startTransaction() should return immediately.
+   Also, for each NDB instance, ore-build the QueryPlan for the default key prefix.
+   TODO? Start one tx *per data node*.
+   */
+  QueryPlan *plan;
+  const KeyPrefix *default_prefix = conf.getDefaultPrefix();
+  NdbTransaction ** txlist;
+  txlist = ( NdbTransaction **) calloc(nInst, sizeof(NdbTransaction *));
+  
+  // Open them all.
+  for(int i = 0 ; i < nInst ; i++) {
+    NdbTransaction *tx;
+    plan = instances[i]->getPlanForPrefix(default_prefix);
+    tx = instances[i]->db->startTransaction();
+    if(! tx) logger->log(LOG_WARNING, 0, instances[i]->db->getNdbError().message);
+    txlist[i] = tx;
+  }
+  
+  // Close them all.
+  for(int i = 0 ; i < nInst ; i++) {
+    txlist[i]->close();
+  }    
+  
+  // Free the list.
+  free(txlist);
+  
+  /* Allocate thread ids */
+  commit_thread_ids = (pthread_t *) calloc(sched->config.n_commit_threads, 
+                                           sizeof(pthread_t));
+  
+  /* Allocate and initialize a workqueue */
+  queue = (struct workqueue *) malloc(sizeof(struct workqueue));
+  workqueue_init(queue, queue_size, sched->config.n_commit_threads);
+  
+  /* Randomly take samples of the workqueue depth */
+  do_queue_sample = random() % FLEX_STATS_INITIAL_INTERVAL;
+}
+
+
+void Scheduler_flex::Cluster::attach_thread(thread_identifier *parent) {
+  /* Adjust the thread stack size */
+  pthread_attr_t commit_thd_attr;
+  size_t thd_stack_size;
+  pthread_attr_init(& commit_thd_attr);
+  pthread_attr_getstacksize(& commit_thd_attr, & thd_stack_size);
+  pthread_attr_setstacksize(& commit_thd_attr, thd_stack_size / 4);
+  
+  /* Start the commit threads */
+  for(int t = 0 ; t < sched->config.n_commit_threads ; t++) {
+    thread_spec * spec = new thread_spec(sched, parent, broker_id, id, t);
+    pthread_create(& commit_thread_ids[t], & commit_thd_attr,
+                   run_flex_commit_thread, (void *) spec);
+  }
+}
+
+
+ENGINE_ERROR_CODE Scheduler_flex::Cluster::schedule(workitem *newitem,
+                                                    const KeyPrefix *pfx) {
+  NdbInstance *inst;
+  
+  if (nextFree)
+  {
+    inst = nextFree;
+    nextFree = inst->next;
+  }
+  else
+  {
+    return ENGINE_TMPFAIL;
+  }  
+  
+  workitem_set_NdbInstance(newitem, inst);
+
+  // Fetch the query plan for this prefix.
+  newitem->plan = inst->getPlanForPrefix(pfx);
+  if(! newitem->plan) return ENGINE_FAILED;
+  
+  // Build the NDB transaction
+  if(worker_prepare_operation(newitem)) {
+    
+    if(do_queue_sample-- == 0) {   // Sample for statistics.
+      /* To simulate sampling the depth immediately after the item is added,
+       sample it now and add one.  */
+      int depth = queue->depth + 1;
+      /* Queue depth can be erroneous because of thread races.
+       Use it only if it looks valid. */
+      if(depth > 0 && depth <= queue_size) {
+        stats.queue_total_depth += depth;
+        stats.queue_samples++;
+      }
+      do_queue_sample = random() % FLEX_STATS_SAMPLE_INTERVAL;
+    }
+    
+    // Put the NdbInstance on the queue for the commit thread.     
+    workqueue_add(queue, inst);
+    return ENGINE_EWOULDBLOCK;
+  }
+  else return ENGINE_ENOTSUP;
+}
+
+
+void Scheduler_flex::Cluster::io_completed(workitem *item) {
+  DEBUG_ENTER();
+  NdbInstance* inst = item->ndb_instance;
+  item->ndb_instance  = NULL;
+  if(inst) {    
+    inst->next = nextFree;
+    nextFree = inst;
+  }
+}
+
+
+void Scheduler_flex::Cluster::contribute_stats() {
+  if(stats.cycles) 
+    sched->stats.total_avg_ndb_depth += (double) stats.ndb_depth / stats.cycles;
+  if(stats.queue_samples)
+    sched->stats.total_avg_commit_queue_depth += 
+    (double) stats.queue_total_depth / stats.queue_samples;
+}
+
+
+/* 
+  Commit thread: Get an NdbInstance off the workqueue, and call pollNdb() on it.
+*/
+void * Scheduler_flex::Cluster::run_commit_thread() {
+  DEBUG_ENTER();
+  
+  NdbInstance *inst;
+  
+  while(1) {
+    /* Wait for something to appear on the queue */
+    inst = (NdbInstance *) workqueue_consumer_wait(queue);
+    
+    if(inst == NULL) return 0;  /* queue has been shut down and emptied */
+    
+    /* Poll */
+    inst->db->pollNdb();
+    
+    /* Now we are done with the instance */
+    atomic_cmp_swap_int(& inst->in_use, true, false);  // in_use = false
+  } 
+}
+
+
+void * run_flex_commit_thread(void *s) {
+  Scheduler_flex::thread_spec *spec = (Scheduler_flex::thread_spec *) s;
+  if(spec->has_broker_threads()) 
+    set_child_thread_id(spec->parent, "br%d.cl%d.commit%d",
+                        spec->broker, spec->cluster_id, spec->number);
+  else
+    set_child_thread_id(spec->parent, "cl%d.commit%d", 
+                        spec->cluster_id, spec->number);
+  
+  spec->run_commit_thread();
+  
+  delete spec;
+  return 0;
+}
+
+

=== added file 'storage/ndb/memcache/src/schedulers/Flex_cluster.h'
--- a/storage/ndb/memcache/src/schedulers/Flex_cluster.h	1970-01-01 00:00:00 +0000
+++ b/storage/ndb/memcache/src/schedulers/Flex_cluster.h	2011-04-06 04:31:28 +0000
@@ -0,0 +1,69 @@
+/*
+ Copyright (c) 2011, Oracle and/or its affiliates. All rights
+ reserved.
+ 
+ This program is free software; you can redistribute it and/or
+ modify it under the terms of the GNU General Public License
+ as published by the Free Software Foundation; version 2 of
+ the License.
+ 
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+ 
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ 02110-1301  USA
+ */
+
+#ifndef NDBMEMCACHE_FLEX_CLUSTER_H
+#define NDBMEMCACHE_FLEX_CLUSTER_H
+
+#ifndef __cplusplus
+#error "This file is for C++ only"
+#endif
+
+#include "src/schedulers/Flex.h"
+#include "src/schedulers/Flex_broker.h"
+
+class Scheduler_flex::Cluster {
+  friend class Broker;
+  friend class thread_spec;
+  
+public:
+  Cluster(Scheduler_flex *, int broker, int cluster_id);
+  ~Cluster();
+  
+  void attach_thread(thread_identifier *);     /*< start commit threads */
+  ENGINE_ERROR_CODE schedule(workitem *, const KeyPrefix *); 
+  void io_completed(workitem *);
+  
+protected:
+  void * run_commit_thread();
+  void contribute_stats();
+  struct cluster_stats {
+    uint64_t cycles;
+    uint64_t ndb_depth;
+    uint64_t queue_total_depth;
+    uint64_t queue_samples;
+  } stats;
+  
+private:
+  const Configuration conf;
+  Scheduler_flex *sched;             
+  int do_queue_sample;
+  int broker_id;
+  int id;
+  int queue_size;
+  struct workqueue *queue;
+  pthread_t * commit_thread_ids;
+  NdbInstance **instances;
+  NdbInstance *nextFree;           
+  int nInst;
+};
+
+
+
+#endif

=== added file 'storage/ndb/memcache/src/schedulers/Flex_thread_spec.h'
--- a/storage/ndb/memcache/src/schedulers/Flex_thread_spec.h	1970-01-01 00:00:00 +0000
+++ b/storage/ndb/memcache/src/schedulers/Flex_thread_spec.h	2011-04-06 04:31:28 +0000
@@ -0,0 +1,51 @@
+/*
+ Copyright (c) 2011, Oracle and/or its affiliates. All rights
+ reserved.
+ 
+ This program is free software; you can redistribute it and/or
+ modify it under the terms of the GNU General Public License
+ as published by the Free Software Foundation; version 2 of
+ the License.
+ 
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+ 
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ 02110-1301  USA
+ */
+
+#ifndef NDBMEMCACHE_FLEX_THREAD_SPEC_H
+#define NDBMEMCACHE_FLEX_THREAD_SPEC_H
+
+#ifndef __cplusplus
+#error "This file is for C++ only"
+#endif
+
+#include "Flex.h"
+#include "Flex_broker.h"
+#include "Flex_cluster.h"
+
+class Scheduler_flex::thread_spec {
+public:
+  thread_spec(Scheduler_flex *s, thread_identifier *p, 
+              int b, int c, int n) :    sched(s), parent(p), 
+  broker(b), cluster_id(c), number(n) {};
+  
+  Broker *  get_broker()      { return sched->brokers[broker];               };
+  bool has_broker_threads()   { return (sched->config.n_broker_threads > 0); };
+  void * run_broker_thread()  { return get_broker()->run_broker_thread();    };
+  Cluster * get_cluster()     { return get_broker()->clusters[cluster_id];   };
+  void * run_commit_thread()  { return get_cluster()->run_commit_thread();   };  
+  
+  Scheduler_flex *sched;
+  thread_identifier *parent;
+  int broker;
+  int cluster_id;
+  int number;
+};
+
+#endif

=== modified file 'storage/ndb/memcache/src/schedulers/Stockholm.cc'
--- a/storage/ndb/memcache/src/schedulers/Stockholm.cc	2011-03-30 06:54:53 +0000
+++ b/storage/ndb/memcache/src/schedulers/Stockholm.cc	2011-04-06 04:31:28 +0000
@@ -47,7 +47,7 @@ extern "C" {
 
 
 void Scheduler_stockholm::init(int my_thread, int nthreads, const char *config_string) {
-  const Configuration conf = get_Configuration();
+  const Configuration & conf = get_Configuration();
 
   /* For each cluster, we need a number of NDB instances; 
      that number is determined here, using max_tps and usec_rtt.
@@ -64,6 +64,7 @@ void Scheduler_stockholm::init(int my_th
     DEBUG_PRINT("cluster %d: %d TPS @ %d usec RTT ==> %d NDB instances.",
                 c, conf.max_tps, pool->usec_rtt, cluster[c].nInst);
   }
+  
     
   // Get the NDB instances. 
   for(int c = 0 ; c < conf.nclusters ; c++) {
@@ -72,11 +73,21 @@ void Scheduler_stockholm::init(int my_th
     
     ClusterConnectionPool *pool = conf.getConnectionById(c);
     Ndb_cluster_connection *conn = pool->getPooledConnection(my_thread);
-    
+
+    /* Set the master NdbInstance for the connection */
+    pool->setNdbInstance(my_thread, 
+                         new NdbInstance(conn, conf.nprefixes, 128));
+
+    cluster[c].nextFree = NULL;    
     for(int i = 0; i < cluster[c].nInst ; i++) {
       NdbInstance *inst = new NdbInstance(conn, conf.nprefixes, 1);
       cluster[c].instances[i] = inst;
+      inst->next = cluster[c].nextFree;
+      cluster[c].nextFree = inst;
     }
+
+    logger->log(LOG_WARNING, 0, "Pipeline %d using %u Ndb instances for Cluster %u.\n",
+                my_thread, cluster[c].nInst, c);
   }
   
   /* Hoard a transaction (an API connect record) for each Ndb object.  This
@@ -116,7 +127,7 @@ void Scheduler_stockholm::init(int my_th
 
 void Scheduler_stockholm::attach_thread(thread_identifier * parent) {
   pipeline = parent->pipeline;
-  const Configuration conf = get_Configuration();
+  const Configuration & conf = get_Configuration();
 
   logger->log(LOG_WARNING, 0, "Pipeline %d attached to Stockholm scheduler; "
               "launching %d commit thread%s.\n", pipeline->id, conf.nclusters,
@@ -137,7 +148,7 @@ void Scheduler_stockholm::attach_thread(
 ENGINE_ERROR_CODE Scheduler_stockholm::schedule(workitem *newitem) {
   NdbInstance *inst;
   int c;
-  const Configuration conf = get_Configuration();
+  const Configuration & conf = get_Configuration();
   
   /* Fetch the config for its key prefix */
   const KeyPrefix *pfx = conf.getPrefixByInfo(newitem->prefix_info);
@@ -154,21 +165,18 @@ ENGINE_ERROR_CODE Scheduler_stockholm::s
   
   c = newitem->prefix_info.cluster_id;
   
-  /* Get an NDB instance.  But if the instance is in use, go forward and
-   get the next one.  If something goes wrong (cluster crash, disconnected...)
-   and the commit thread can no longer release instances, it's possible that
-   there will be no free instances and this will loop forever.
-   FIXME: catch this condition somehow.
-   */  
-  int i = 0;
-  inst = cluster[c].instances[i];
-  while(inst->in_use == 1) {
-    i = ( ++i % cluster[c].nInst);
-    inst = cluster[c].instances[i];
-  };
-  
-  // Now we've got an NDB Instance. 
-  atomic_cmp_swap_int(& inst->in_use, false, true);  // in_use = true
+  if (cluster[c].nextFree)
+  {
+    inst = cluster[c].nextFree;
+    cluster[c].nextFree = inst->next;
+  }
+  else
+  {
+    return ENGINE_TMPFAIL;
+  }
+
+
+  workitem_set_NdbInstance(newitem, inst);
   
   // Fetch the query plan for this prefix.
   newitem->plan = inst->getPlanForPrefix(pfx);
@@ -176,7 +184,8 @@ ENGINE_ERROR_CODE Scheduler_stockholm::s
   
   // Build the NDB transaction
   if(worker_prepare_operation(newitem)) {
-    // Put the NdbInstance on the queue for the commit thread.     
+    // Put the NdbInstance on the queue for the commit thread.
+    // Should probably be the workitem?     
     workqueue_add(cluster[c].queue, inst);
     return ENGINE_EWOULDBLOCK;
   }
@@ -184,12 +193,25 @@ ENGINE_ERROR_CODE Scheduler_stockholm::s
 }
 
 
+void Scheduler_stockholm::io_completed(workitem *item) {
+  DEBUG_ENTER();
+  NdbInstance* inst = item->ndb_instance;
+  item->ndb_instance  = NULL;
+  
+  if(inst) {    
+    int c = item->prefix_info.cluster_id;
+    inst->next = cluster[c].nextFree;
+    cluster[c].nextFree = inst;
+  }
+}
+
+
 void Scheduler_stockholm::add_stats(ADD_STAT add_stat, 
                                     const void * cookie) {
   char key[128];
   char val[128];
   int klen, vlen, p;
-  const Configuration conf = get_Configuration();
+  const Configuration & conf = get_Configuration();
   
   for(int c = 0 ; c < conf.nclusters; c++) {  
     klen = sprintf(key, "pipeline_%d_cluster_%d_commit_cycles", pipeline->id, c);
@@ -228,9 +250,6 @@ void * Scheduler_stockholm::run_ndb_comm
     /* Poll */
     inst->db->pollNdb();
     
-    /* Now we are done with the instance */
-    atomic_cmp_swap_int(& inst->in_use, true, false);  // in_use = false
-    
     cluster[c].stats.cycles++;
     if(! (cluster[c].stats.cycles % STAT_INTERVAL)) 
       cluster[c].stats.commit_thread_vtime = get_thread_vtime();

=== modified file 'storage/ndb/memcache/src/schedulers/Stockholm.h'
--- a/storage/ndb/memcache/src/schedulers/Stockholm.h	2011-03-30 06:54:53 +0000
+++ b/storage/ndb/memcache/src/schedulers/Stockholm.h	2011-04-06 04:31:28 +0000
@@ -45,6 +45,7 @@ public:
   void init(int threadnum, int nthreads, const char *config_string);
   void attach_thread(thread_identifier *);
   ENGINE_ERROR_CODE schedule(workitem *);
+  void io_completed(workitem *);
   void add_stats(ADD_STAT, const void *);
   void * run_ndb_commit_thread(int cluster_id);
   
@@ -58,6 +59,7 @@ private:  
     pthread_t commit_thread_id;
     NdbInstance **instances;
     int nInst;
+    NdbInstance *nextFree;
   } cluster[MAX_CLUSTERS];
 };
 

=== removed file 'storage/ndb/memcache/src/schedulers/flex.cc'
--- a/storage/ndb/memcache/src/schedulers/flex.cc	2011-03-30 06:54:53 +0000
+++ b/storage/ndb/memcache/src/schedulers/flex.cc	1970-01-01 00:00:00 +0000
@@ -1,464 +0,0 @@
-/*
- Copyright (c) 2011, Oracle and/or its affiliates. All rights
- reserved.
- 
- This program is free software; you can redistribute it and/or
- modify it under the terms of the GNU General Public License
- as published by the Free Software Foundation; version 2 of
- the License.
- 
- This program is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- GNU General Public License for more details.
- 
- You should have received a copy of the GNU General Public License
- along with this program; if not, write to the Free Software
- Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- 02110-1301  USA
- */
-/* System headers */
-/* C++ files must define __STDC_FORMAT_MACROS in order to get PRIu64 */
-#define __STDC_FORMAT_MACROS 
-#include <inttypes.h>
-#include <stdlib.h>
-#include <stdio.h>
-
-/* Memcache headers */
-#include "memcached/types.h"
-#include <memcached/extension_loggers.h>
-
-/* NDB Memcache headers */
-#include "flex.h"
-#include "workitem.h"
-#include "ndb_worker.h"
-#include "thread_identifier.h"
-#include "ClusterConnectionPool.h"
-
-/* Random stat samples will on average be taken twice as often as 
-   STATS_SAMPLE_INTERVAL (based on uniform random distribution). 
-   STATS_INITIAL_INTERVAL is lower so as to get stats quickly on startup.
-*/
-#define STATS_INITIAL_INTERVAL 20
-#define STATS_SAMPLE_INTERVAL 400
-
-extern EXTENSION_LOGGER_DESCRIPTOR *logger;
-
-
-extern "C" {
-  void * run_flex_commit_thread(void *);
-  void * run_flex_broker_thread(void *);
-}
-
-
-void Scheduler_flex::init(int t, int nthreads, const char *user_config) {
-  thread_id = t;
-  const char * active_cf;
-  
-  /* Set some baseline default values for the configuration */
-  config.n_engine_threads = nthreads;
-  config.n_broker_threads = 0;
-  config.n_commit_threads = 2;
-  config.n_db_pending = 0;
-  config.n_connections = 1;
-  
-  /* The default config parameters */
-  static const char * default_config = "b0,c2,p1";
- 
-  /* Choose which string to parse */
-  if(user_config && *user_config) active_cf = user_config;
-  else active_cf = default_config;
-
-  /* disregard the return value from sscanf(), but test the config for validity,
-     and write it to the logger when the scheduler starts running */
-  sscanf(active_cf, "b%d,c%d,p%d", 
-         & config.n_broker_threads, & config.n_commit_threads, 
-         & config.n_connections);
-
-  /* Test validity of configuration */
-  if(config.n_broker_threads < 0 || config.n_broker_threads > 2) {
-    logger->log(LOG_WARNING, 0, "Invalid scheduler configuration.\n");
-    assert(config.n_broker_threads >= 0 && config.n_broker_threads <= 2);
-  }
-  if(config.n_commit_threads < 1 || config.n_commit_threads > 16) {
-    logger->log(LOG_WARNING, 0, "Invalid scheduler configuration.\n");
-    assert(config.n_commit_threads >= 1 && config.n_commit_threads <= 16);
-  }
-  if(config.n_db_pending < 0 || config.n_db_pending > 16) {
-    logger->log(LOG_WARNING, 0, "Invalid scheduler configuration.\n");
-    assert(config.n_db_pending >= 0 && config.n_db_pending <= 16);
-  }
-  if(config.n_connections < 1 || config.n_connections > 4) {
-    logger->log(LOG_WARNING, 0, "Invalid scheduler configuration.\n");
-    assert(config.n_connections >= 1 && config.n_connections <= 4);
-  }
-  
-  /* There is always a broker, even if there is no broker thread */
-  nbrokers = config.n_broker_threads > 1 ? config.n_broker_threads : 1;
-   
-  /* Create a workqueue for the broker thread */
-  broker_queue_size = 8192;
-  if(config.n_broker_threads) {
-    do_queue_sample = random() % STATS_INITIAL_INTERVAL;
-    broker_queue = (struct workqueue *) malloc(sizeof(struct workqueue));
-    workqueue_init(broker_queue, broker_queue_size, config.n_broker_threads); 
-  }
-  else broker_queue = 0;
-  
-  /* Initialize the brokers.
-     If there are no broker threads, then broker[0]'s methods run in the engine 
-     thread.  Otherwise each broker will run mostly within a broker thread.
-   */
-  for(int i = 0 ; i < nbrokers ; i++) 
-    brokers[i] = new Broker(this, i);
-}
-
-
-Scheduler_flex::Broker::Broker(Scheduler_flex *s,
-                               int my_id) : sched(s),
-                                            id(my_id),
-                                            conf(get_Configuration()), 
-                                            queue(s->broker_queue)            {
-  clusters = (Cluster **) calloc(conf.nclusters, sizeof(Cluster *));
-  
-  for(int c = 0 ; c < conf.nclusters ; c++) {
-    clusters[c] = new Cluster(sched, id, c);
-  }
-}
-
-
-Scheduler_flex::Cluster::Cluster(Scheduler_flex *s,
-                                 int broker,
-                                 int cluster) : sched(s),
-                                                broker_id(broker),
-                                                id(cluster),
-                                                conf(get_Configuration())      {
-  DEBUG_PRINT("thread: %d, broker: %d, cluster: %d", sched->thread_id, 
-              broker, cluster);
-  /* Get the connection pool for my cluster */
-  ClusterConnectionPool *pool = conf.getConnectionById(id);
-  
-  /* Get a number of NDB instances.
-     That number is determined here, using max_tps and usec_rtt.
-     We allow a transaction to be in-flight for 5 * RTT, 
-     and we need enough NDB objects to meet one thread's share of max_tps.
-     I go step-by-step through the math here (the compiler may optimize it).
-  */
-  double tx_time_in_usec = pool->usec_rtt * 5;
-  double tx_per_ndb_per_sec = 1000000 / tx_time_in_usec;
-  double total_ndb_objects = conf.max_tps / tx_per_ndb_per_sec;
-  double ndbs_per_pipeline = total_ndb_objects / sched->config.n_engine_threads;
-  nInst = (int) (ndbs_per_pipeline / sched->nbrokers);
-
-  /* For now, the queue size is fixed */
-  queue_size = 8192;
-
-  /* Do we need more connections? */  
-  for(int n = pool->getPoolSize(); n < sched->config.n_connections; n++)
-    (void) pool->addPooledConnection();
-  /* Maybe not all of them were added; that's OK. */
-
-  /* Now obtain the appropriate one for this thread. */
-  Ndb_cluster_connection *conn = pool->getPooledConnection(sched->thread_id);
-  
-  DEBUG_PRINT("broker %d, cluster %d, node %d: "
-              "%d TPS @ %d usec RTT ==> %d NDB instances.", broker_id, id, 
-              conn->node_id(), conf.max_tps, pool->usec_rtt, nInst);
-  
-  // Get the NDB instances. 
-  instances = (NdbInstance**) calloc(nInst, sizeof(NdbInstance *));    
-  for(int i = 0; i < nInst ; i++) {
-    instances[i] = new NdbInstance(conn, conf.nprefixes, 1);
-  }
-  
-  /* Hoard a transaction (an API connect record) for each Ndb object.  This
-   first call to startTransaction() will send TC_SEIZEREQ and wait for a 
-   reply, but later at runtime startTransaction() should return immediately.
-   Also, for each NDB instance, ore-build the QueryPlan for the default key prefix.
-   TODO? Start one tx *per data node*.
-   */
-  QueryPlan *plan;
-  const KeyPrefix *default_prefix = conf.getDefaultPrefix();
-  NdbTransaction ** txlist;
-  txlist = ( NdbTransaction **) calloc(nInst, sizeof(NdbTransaction *));
-  
-  // Open them all.
-  for(int i = 0 ; i < nInst ; i++) {
-    NdbTransaction *tx;
-    plan = instances[i]->getPlanForPrefix(default_prefix);
-    tx = instances[i]->db->startTransaction();
-    if(! tx) logger->log(LOG_WARNING, 0, instances[i]->db->getNdbError().message);
-    txlist[i] = tx;
-  }
-  
-  // Close them all.
-  for(int i = 0 ; i < nInst ; i++) {
-    txlist[i]->close();
-  }    
-  
-  // Free the list.
-  free(txlist);
-  
-  /* Allocate thread ids */
-  commit_thread_ids = (pthread_t *) calloc(sched->config.n_commit_threads, 
-                                           sizeof(pthread_t));
-  
-  /* Allocate and initialize a workqueue */
-  queue = (struct workqueue *) malloc(sizeof(struct workqueue));
-  workqueue_init(queue, queue_size, sched->config.n_commit_threads);
-  
-  /* Randomly take samples of the workqueue depth */
-  do_queue_sample = random() % STATS_INITIAL_INTERVAL;
-}
-
-
-void Scheduler_flex::attach_thread(thread_identifier * parent) {
-  const Configuration & conf = get_Configuration();
-  pipeline = parent->pipeline;
-  
-  logger->log(LOG_WARNING, 0, "Pipeline %d attached to flex scheduler: "
-              "config \"b%d,c%d,p%d\"; %d cluster%s.\n", pipeline->id,
-              config.n_broker_threads, config.n_commit_threads,
-              config.n_connections, conf.nclusters,
-              conf.nclusters == 1 ? "" : "s");
-
-  // Launch commit threads for each cluster
-  for(int i = 0 ; i < nbrokers; i++) 
-    for(int j = 0 ; j < conf.nclusters ; j++) 
-      brokers[i]->clusters[j]->attach_thread(parent);
-
-  // Adjust the thread stack size for the broker threads
-  pthread_attr_t broker_thd_attr;
-  size_t thd_stack_size;
-  pthread_attr_init(& broker_thd_attr);
-  pthread_attr_getstacksize(& broker_thd_attr, & thd_stack_size);
-  pthread_attr_setstacksize(& broker_thd_attr, thd_stack_size / 2);
-  
-  // Launch the broker threads
-  for(int i = 0; i < config.n_broker_threads ; i++) {
-    thread_spec *spec = new thread_spec(this, parent, i, 0, 0);
-    pthread_create(& brokers[i]->thread_id, & broker_thd_attr, 
-                   run_flex_broker_thread, (void *) spec);
-  }
-}
-
-
-void Scheduler_flex::Cluster::attach_thread(thread_identifier *parent) {
-  /* Adjust the thread stack size */
-  pthread_attr_t commit_thd_attr;
-  size_t thd_stack_size;
-  pthread_attr_init(& commit_thd_attr);
-  pthread_attr_getstacksize(& commit_thd_attr, & thd_stack_size);
-  pthread_attr_setstacksize(& commit_thd_attr, thd_stack_size / 4);
-
-  /* Start the commit threads */
-  for(int t = 0 ; t < sched->config.n_commit_threads ; t++) {
-    thread_spec * spec = new thread_spec(sched, parent, broker_id, id, t);
-    pthread_create(& commit_thread_ids[t], & commit_thd_attr,
-                   run_flex_commit_thread, (void *) spec);
-  }
-}                                     
-
-
-ENGINE_ERROR_CODE Scheduler_flex::schedule(workitem *newitem) {
-  if(config.n_broker_threads) {
-
-    if(do_queue_sample-- == 0) {
-      /* See comments in Cluster::schedule() about queue depth sampling */
-      int depth = broker_queue->depth + 1;
-      if(depth > 0 && depth <= broker_queue_size) {
-        stats.broker_queue_total_depth += depth ;
-        stats.broker_queue_samples++;
-      }
-      do_queue_sample = random() % STATS_SAMPLE_INTERVAL;
-    }
-
-    workqueue_add(broker_queue, newitem);
-
-    return ENGINE_EWOULDBLOCK;
-  }
-  else {
-    return brokers[0]->schedule(newitem);
-  }
-}
-
-
-ENGINE_ERROR_CODE Scheduler_flex::Broker::schedule(workitem *newitem) {  
-  const KeyPrefix *pfx = conf.getPrefixByInfo(newitem->prefix_info);
-  
-  if(newitem->prefix_info.prefix_id) {
-    DEBUG_PRINT("prefix %d: \"%s\" Table: %s  Value Cols: %d", 
-                newitem->prefix_info.prefix_id, pfx->prefix, 
-                pfx->table->table_name, pfx->table->nvaluecols);
-  }
-  
-  /* Record the suffix length */
-  newitem->base.nsuffix = newitem->base.nkey - pfx->prefix_len;  
-  if(newitem->base.nsuffix == 0) return ENGINE_EINVAL; // key too short
-  
-  clusters[newitem->prefix_info.cluster_id]->schedule(newitem, pfx);
-}
-
-
-ENGINE_ERROR_CODE Scheduler_flex::Cluster::schedule(workitem *newitem,
-                                                    const KeyPrefix *pfx) {
-  bool did_cas;
-  NdbInstance *inst;
-  int i = 0;
-  
-  /* Get an NDB instance.  But if the instance is in use, go forward and
-   get the next one.  If something goes wrong (cluster crash, disconnected...)
-   and the commit thread can no longer release instances, it's possible that
-   there will be no free instances and this will loop forever.
-   FIXME: catch this condition somehow.
-   */  
-  inst = instances[i];
-  while(inst->in_use == 1) {
-    i = ( ++i % nInst);
-    inst = instances[i];
-  };
-  stats.cycles++;
-  stats.ndb_depth += i;
-  
-  // Now we've got an NDB Instance. 
-  atomic_cmp_swap_int(& inst->in_use, false, true);  // in_use = true
-  
-  // Fetch the query plan for this prefix.
-  newitem->plan = inst->getPlanForPrefix(pfx);
-  if(! newitem->plan) return ENGINE_FAILED;
-  
-  // Build the NDB transaction
-  if(worker_prepare_operation(newitem)) {
-    
-    if(do_queue_sample-- == 0) {   // Sample for statistics.
-      /* To simulate sampling the depth immediately after the item is added,
-         sample it now and add one.  */
-      int depth = queue->depth + 1;
-      /* Queue depth can be erroneous because of thread races.
-         Use it only if it looks valid. */
-      if(depth > 0 && depth <= queue_size) {
-        stats.queue_total_depth += depth;
-        stats.queue_samples++;
-      }
-      do_queue_sample = random() % STATS_SAMPLE_INTERVAL;
-    }
-    
-    // Put the NdbInstance on the queue for the commit thread.     
-    workqueue_add(queue, inst);
-    return ENGINE_EWOULDBLOCK;
-  }
-  else return ENGINE_ENOTSUP;
-}
-
-
-void Scheduler_flex::add_stats(ADD_STAT add_stat, const void * cookie) {
-  char key[128];
-  char val[128];
-  int klen, vlen, p;
-  
-  stats.total_avg_ndb_depth = 0;
-  stats.total_avg_commit_queue_depth = 0;
-  for(int i = 0 ; i < nbrokers ; i++) 
-    brokers[i]->contribute_stats();
-  
-  klen = sprintf(key, "t%d_avg_ndb_depth", pipeline->id);
-  vlen = sprintf(val, "%.3f", stats.total_avg_ndb_depth / nbrokers);
-  add_stat(key, klen, val, vlen, cookie);
-  
-  klen = sprintf(key, "t%d_avg_commit_queue_depth", pipeline->id);
-  vlen = sprintf(val, "%.3f", stats.total_avg_commit_queue_depth / nbrokers);
-  add_stat(key, klen, val, vlen, cookie);
-  
-  if(config.n_broker_threads && stats.broker_queue_samples) {
-    klen = sprintf(key, "t%d_broker_queue_avg_depth", pipeline->id);
-    vlen = sprintf(val, "%.3f", 
-           (double) stats.broker_queue_total_depth / stats.broker_queue_samples);
-    add_stat(key, klen, val, vlen, cookie);
-  }
-}
-
-
-void Scheduler_flex::Broker::contribute_stats() {
-  for(int i = 0 ; i < conf.nclusters ; i++) 
-    clusters[i]->contribute_stats();
-}
-
-
-void Scheduler_flex::Cluster::contribute_stats() {
-  if(stats.cycles) 
-    sched->stats.total_avg_ndb_depth += (double) stats.ndb_depth / stats.cycles;
-  if(stats.queue_samples)
-    sched->stats.total_avg_commit_queue_depth += 
-      (double) stats.queue_total_depth / stats.queue_samples;
-}
-
-
-/* 
-   Broker thread: get a workitem off the workqueue, and build its operations.
-*/
-void * Scheduler_flex::Broker::run_broker_thread() {
-  workitem *newitem;
-
-  DEBUG_ENTER();
-
-  while(1) {
-    /* Wait for something to appear on the queue */
-    newitem = (workitem *) workqueue_consumer_wait(queue);
-    
-    if(newitem == NULL) return 0;  /* queue has been shut down and emptied */
-    
-    (void) schedule(newitem);
-  }
-}
-
-
-/* 
-   Commit thread: Get an NdbInstance off the workqueue, and call pollNdb() on it.
-*/
-void * Scheduler_flex::Cluster::run_commit_thread() {
-  DEBUG_ENTER();
-
-  NdbInstance *inst;
-    
-  while(1) {
-    /* Wait for something to appear on the queue */
-    inst = (NdbInstance *) workqueue_consumer_wait(queue);
-
-    if(inst == NULL) return 0;  /* queue has been shut down and emptied */
-    
-    /* Poll */
-    inst->db->pollNdb();
-    
-    /* Now we are done with the instance */
-    atomic_cmp_swap_int(& inst->in_use, true, false);  // in_use = false
-  } 
-}
-
-
-void * run_flex_broker_thread(void *s) {
-  Scheduler_flex::thread_spec *spec = (Scheduler_flex::thread_spec *) s;
-  set_child_thread_id(spec->parent, "broker%d", spec->broker);
-  
-  spec->run_broker_thread();
-  
-  delete spec;
-  return 0;
-}
-
-
-void * run_flex_commit_thread(void *s) {
-  Scheduler_flex::thread_spec *spec = (Scheduler_flex::thread_spec *) s;
-  if(spec->has_broker_threads()) 
-    set_child_thread_id(spec->parent, "br%d.cl%d.commit%d",
-                        spec->broker, spec->cluster_id, spec->number);
-  else
-    set_child_thread_id(spec->parent, "cl%d.commit%d", 
-                        spec->cluster_id, spec->number);
-  
-  spec->run_commit_thread();
-  
-  delete spec;
-  return 0;
-}
-
-

=== removed file 'storage/ndb/memcache/src/schedulers/flex.h'
--- a/storage/ndb/memcache/src/schedulers/flex.h	2011-03-30 06:54:53 +0000
+++ b/storage/ndb/memcache/src/schedulers/flex.h	1970-01-01 00:00:00 +0000
@@ -1,167 +0,0 @@
-/*
- Copyright (c) 2011, Oracle and/or its affiliates. All rights
- reserved.
- 
- This program is free software; you can redistribute it and/or
- modify it under the terms of the GNU General Public License
- as published by the Free Software Foundation; version 2 of
- the License.
- 
- This program is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- GNU General Public License for more details.
- 
- You should have received a copy of the GNU General Public License
- along with this program; if not, write to the Free Software
- Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- 02110-1301  USA
- */
-
-#ifndef NDBMEMCACHE_FLEX_SCHEDULER_H
-#define NDBMEMCACHE_FLEX_SCHEDULER_H
-
-#ifndef __cplusplus
-#error "This file is for C++ only"
-#endif
-
-#include <memcached/types.h>
-
-#include "config.h"
-#include "Scheduler.h"
-#include "KeyPrefix.h"
-
-
-/* 
- *              Flex Scheduler 
- *
- *  The scheduler is configurable:
- *     - 0, 1, or 2 "middleman" broker threads per engine thread
- *     - some number of commit threads per {broker, cluster id} tuple
- *     - some max number of Ndb objects to poll concurrently in a commit thread
- * 
- */
-
-
-class Scheduler_flex : public Scheduler {  
-public:
-  class Broker;
-  class Cluster;
-  class thread_spec;
-
-  friend class Broker;
-  friend class Cluster;
-  friend class thread_spec;
-    
-  Scheduler_flex() {};
-  ~Scheduler_flex();
-  void init(int threadnum, int nthreads, const char *config_string);
-  void attach_thread(thread_identifier *);
-  ENGINE_ERROR_CODE schedule(workitem *);
-  void add_stats(ADD_STAT, const void *);  
-
-protected:
-  int thread_id;
-  int do_queue_sample;
-  int nbrokers;
-  int broker_queue_size;
-  struct workqueue * broker_queue;  
-  struct flex_cf {
-    int n_engine_threads;    //< no. of libevent worker threads in memcached
-    int n_broker_threads;    //< no. of separate broker threads, 0 - 2
-    int n_commit_threads;    //< no. of commit threads per {broker,cluster} pair
-    int n_db_pending;        //< no. of pollable Ndbs inside a commit thread
-    int n_connections;       //< size of Ndb_cluster_connection pool
-  } config;  
-  struct flex_stats {
-    double total_avg_ndb_depth;
-    double total_avg_commit_queue_depth;
-    uint64_t broker_queue_total_depth;
-    uint64_t broker_queue_samples;
-  } stats;
-
-  Broker * brokers[2];
-};
-
-
-class Scheduler_flex::Cluster {
-friend class Broker;
-friend class thread_spec;
-
-public:
-  Cluster(Scheduler_flex *, int broker, int cluster_id);
-  ~Cluster();
-  
-  void attach_thread(thread_identifier *);     /*< start commit threads */
-  ENGINE_ERROR_CODE schedule(workitem *, const KeyPrefix *); 
-  
-protected:
-  void * run_commit_thread();
-  void contribute_stats();
-  struct cluster_stats {
-    uint64_t cycles;
-    uint64_t ndb_depth;
-    uint64_t queue_total_depth;
-    uint64_t queue_samples;
-  } stats;
-  
-private:
-  const Configuration conf;
-  Scheduler_flex *sched;             
-  int do_queue_sample;
-  int broker_id;
-  int id;
-  int queue_size;
-  struct workqueue *queue;
-  pthread_t * commit_thread_ids;
-  NdbInstance **instances;           
-  int nInst;
-};
-
-
-class Scheduler_flex::Broker {
-friend class Scheduler_flex;
-friend class thread_spec;
-
-public:
-  Broker(Scheduler_flex *, int my_id);
-  ~Broker();             
-  ENGINE_ERROR_CODE schedule(workitem *);      /*< schedule item on its cluster */
-
-  pthread_t thread_id;
-  Cluster ** clusters;
-
-protected:
-  void * run_broker_thread();
-  void contribute_stats();
-
-private:
-  int id;
-  const Configuration conf;
-  Scheduler_flex *sched;
-  struct workqueue *queue;
-};
-
-
-class Scheduler_flex::thread_spec {
-public:
-  thread_spec(Scheduler_flex *s, thread_identifier *p, 
-              int b, int c, int n) :    sched(s), parent(p), 
-                                        broker(b), cluster_id(c), number(n) {};
-
-  Broker *  get_broker()      { return sched->brokers[broker];               };
-  bool has_broker_threads()   { return (sched->config.n_broker_threads > 0); };
-  void * run_broker_thread()  { return get_broker()->run_broker_thread();    };
-  Cluster * get_cluster()     { return get_broker()->clusters[cluster_id];   };
-  void * run_commit_thread()  { return get_cluster()->run_commit_thread();   };  
-
-  Scheduler_flex *sched;
-  thread_identifier *parent;
-  int broker;
-  int cluster_id;
-  int number;
-};
-
-
-#endif
-

=== modified file 'storage/ndb/memcache/src/workitem.c'
--- a/storage/ndb/memcache/src/workitem.c	2011-03-30 06:54:53 +0000
+++ b/storage/ndb/memcache/src/workitem.c	2011-04-06 04:31:28 +0000
@@ -184,6 +184,13 @@ const char * workitem_get_operation(work
 }
 
 
+void workitem_set_NdbInstance(workitem *item, C_OR_CPP_NDBINSTANCE * _Ndb_instance)
+{
+  assert(item->ndb_instance == NULL);
+  item->ndb_instance = _Ndb_instance;
+}
+
+
 void workitem_free(workitem *item)
 {
   /* It's OK to free all of these; a free() with class_id 0 is a no-op */
@@ -192,6 +199,11 @@ void workitem_free(workitem *item)
   pipeline_free(item->pipeline, item->ndb_key_buffer, item->keybuf1_cls);
   pipeline_free(item->pipeline, item->key_buffer_2, item->keybuf2_cls);
 
+  if (item->ndb_instance != NULL)
+  {
+    assert(false); // TODO Not thread safe, investigate path
+  }
+
   pipeline_free(item->pipeline, item, workitem_class_id);
 }
 

=== modified file 'storage/ndb/memcache/unit/Makefile.am'
--- a/storage/ndb/memcache/unit/Makefile.am	2011-03-30 06:54:53 +0000
+++ b/storage/ndb/memcache/unit/Makefile.am	2011-04-06 04:31:28 +0000
@@ -38,7 +38,6 @@ run_unit_tests_LDADD = ../ndb_engine_la-
                        ../ndb_engine_la-ClusterConnectionPool.lo  \
                        ../ndb_engine_la-Configuration.lo  \
                        ../ndb_engine_la-DataTypeHandler.lo  \
-                       ../ndb_engine_la-Dispatch.lo  \
                        ../ndb_engine_la-KeyPrefix.lo  \
                        ../ndb_engine_la-NdbInstance.lo  \
                        ../ndb_engine_la-QueryPlan.lo  \
@@ -49,7 +48,9 @@ run_unit_tests_LDADD = ../ndb_engine_la-
                        ../ndb_engine_la-atomics.lo  \
                        ../ndb_engine_la-debug.lo  \
                        ../ndb_engine_la-embedded_default_engine.lo  \
-                       ../ndb_engine_la-flex.lo \
+                       ../ndb_engine_la-Flex.lo \
+                       ../ndb_engine_la-Flex_broker.lo \
+                       ../ndb_engine_la-Flex_cluster.lo \
                        ../ndb_engine_la-hash_item_util.lo  \
                        ../ndb_engine_la-items.lo  \
                        ../ndb_engine_la-ndb_configuration.lo  \

=== modified file 'storage/ndb/memcache/xcode/ndbmemcache.xcodeproj/project.pbxproj'
--- a/storage/ndb/memcache/xcode/ndbmemcache.xcodeproj/project.pbxproj	2011-03-30 06:54:53 +0000
+++ b/storage/ndb/memcache/xcode/ndbmemcache.xcodeproj/project.pbxproj	2011-04-06 04:31:28 +0000
@@ -9,12 +9,22 @@
 /* Begin PBXFileReference section */
 		65278D6612BC452E00189195 /* Stockholm.cc */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.cpp.cpp; path = Stockholm.cc; sourceTree = "<group>"; };
 		65278D6C12BC456100189195 /* Scheduler.h */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.c.h; path = Scheduler.h; sourceTree = "<group>"; };
+		652DC977134A8F55000C5787 /* dtrace.flush */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = text; path = dtrace.flush; sourceTree = "<group>"; };
+		652DCB4E134BE87D000C5787 /* 3thread.cc */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.cpp.cpp; path = 3thread.cc; sourceTree = "<group>"; };
+		652DCB4F134BE87D000C5787 /* 3thread.h */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.c.h; path = 3thread.h; sourceTree = "<group>"; };
+		652DCB5D134BF512000C5787 /* Flex_broker.cc */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.cpp.cpp; path = Flex_broker.cc; sourceTree = "<group>"; };
+		652DCB5E134BF512000C5787 /* Flex_broker.h */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.c.h; path = Flex_broker.h; sourceTree = "<group>"; };
+		652DCB5F134BF512000C5787 /* Flex_cluster.cc */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.cpp.cpp; path = Flex_cluster.cc; sourceTree = "<group>"; };
+		652DCB60134BF512000C5787 /* Flex_cluster.h */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.c.h; path = Flex_cluster.h; sourceTree = "<group>"; };
+		652DCB61134BF637000C5787 /* Flex_thread_spec.h */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.c.h; path = Flex_thread_spec.h; sourceTree = "<group>"; };
+		652E5A371306533A004F8795 /* binary-spec.txt */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = text; name = "binary-spec.txt"; path = "../binary-spec.txt"; sourceTree = SOURCE_ROOT; };
 		652E896F12EDF39300CA79EE /* dtrace.mget.default */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = text; path = dtrace.mget.default; sourceTree = "<group>"; };
 		652E897012EDF49500CA79EE /* ndb_engine.d */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.dtrace; path = ndb_engine.d; sourceTree = "<group>"; };
 		652E897112EDF49D00CA79EE /* dtrace.mget.ndb */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = text; path = dtrace.mget.ndb; sourceTree = "<group>"; };
 		652E8C4B12F1046700CA79EE /* trace.cas.coord */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = text; path = trace.cas.coord; sourceTree = "<group>"; };
 		652E8C5512F20CE700CA79EE /* trace.binary */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = text; path = trace.binary; sourceTree = "<group>"; };
 		652E8C7C12F2795B00CA79EE /* run-tests.sh */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = text.script.sh; name = "run-tests.sh"; path = "../run-tests.sh"; sourceTree = SOURCE_ROOT; };
+		652E8C8512F284EF00CA79EE /* protocol.txt */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = text; path = protocol.txt; sourceTree = "<group>"; };
 		65570AAD1298A3060043D9B9 /* assoc.h */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.c.h; path = assoc.h; sourceTree = "<group>"; };
 		65570AC41298A7800043D9B9 /* visibility.h */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.c.h; path = visibility.h; sourceTree = "<group>"; };
 		65570AC51298A7800043D9B9 /* vbucket.h */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.c.h; path = vbucket.h; sourceTree = "<group>"; };
@@ -101,8 +111,8 @@
 		65BB2F01128B9F82003031C5 /* early.close */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = text; path = early.close; sourceTree = "<group>"; };
 		65BB2F02128B9F82003031C5 /* engine.d */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.dtrace; path = engine.d; sourceTree = "<group>"; };
 		65BB2F03128B9F82003031C5 /* userfunc.d */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.dtrace; path = userfunc.d; sourceTree = "<group>"; };
-		65BF5E221334286D0043FAB2 /* flex.h */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.c.h; path = flex.h; sourceTree = "<group>"; };
-		65BF5E231334286D0043FAB2 /* flex.cc */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.cpp.cpp; path = flex.cc; sourceTree = "<group>"; };
+		65BF5E221334286D0043FAB2 /* Flex.h */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.c.h; path = Flex.h; sourceTree = "<group>"; };
+		65BF5E231334286D0043FAB2 /* Flex.cc */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.cpp.cpp; path = Flex.cc; sourceTree = "<group>"; };
 		65BF5F1D13346C120043FAB2 /* thread_identifier.h */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.c.h; path = thread_identifier.h; sourceTree = "<group>"; };
 		65BF5F1E13346C640043FAB2 /* status_block.h */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.c.h; path = status_block.h; sourceTree = "<group>"; };
 		65BF5F2F13346E1A0043FAB2 /* thread_identifier.c */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.c.c; path = thread_identifier.c; sourceTree = "<group>"; };
@@ -111,11 +121,7 @@
 		65BF6524133F108F0043FAB2 /* pmpstack.awk */ = {isa = PBXFileReference; explicitFileType = sourcecode; fileEncoding = 4; indentWidth = 4; name = pmpstack.awk; path = ../scripts/pmpstack.awk; sourceTree = "<group>"; tabWidth = 4; };
 		65C1872B12D6AF00009E5AE1 /* config.h */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.c.h; name = config.h; path = ../config.h; sourceTree = SOURCE_ROOT; };
 		65C1875612D6EC7D009E5AE1 /* ndb_worker.h */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.c.h; path = ndb_worker.h; sourceTree = "<group>"; };
-		65C187B812D94D21009E5AE1 /* Dispatch.cc */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.cpp.cpp; path = Dispatch.cc; sourceTree = "<group>"; };
 		65C1886612DA682D009E5AE1 /* stats_settings.txt */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = text; path = stats_settings.txt; sourceTree = "<group>"; };
-		65C1886712DAA9BB009E5AE1 /* 3thread.cc */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.cpp.cpp; path = 3thread.cc; sourceTree = "<group>"; };
-		65C1887012DAAD56009E5AE1 /* 3thread.h */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.c.h; path = 3thread.h; sourceTree = "<group>"; };
-		65C1887112DAAD5D009E5AE1 /* Dispatch.h */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.c.h; path = Dispatch.h; sourceTree = "<group>"; };
 		65C1887212DAAD62009E5AE1 /* Stockholm.h */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.c.h; path = Stockholm.h; sourceTree = "<group>"; };
 		65E51D7412A0A1D60039115A /* timing.c */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.c.c; path = timing.c; sourceTree = "<group>"; };
 		65E51D7512A0A49C0039115A /* timing.h */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.c.h; path = timing.h; sourceTree = "<group>"; };
@@ -126,6 +132,8 @@
 		08FB7794FE84155DC02AAC07 /* ndbmemcache */ = {
 			isa = PBXGroup;
 			children = (
+				652E8C8512F284EF00CA79EE /* protocol.txt */,
+				652E5A371306533A004F8795 /* binary-spec.txt */,
 				65AA54F7127C8705003FE674 /* ToDoList.rtf */,
 				652E8C7C12F2795B00CA79EE /* run-tests.sh */,
 				65BB2EFA128B9F82003031C5 /* traces */,
@@ -278,8 +286,8 @@
 				65570AD21298A7800043D9B9 /* util.h */,
 			);
 			name = memcached;
-			path = ../../../../../../../Desktop/ndbmem/builds/t5/include/memcached;
-			sourceTree = SOURCE_ROOT;
+			path = ../../../builds/t5/include/memcached;
+			sourceTree = "<group>";
 		};
 		65AA5868127F3EFF003FE674 /* scripts */ = {
 			isa = PBXGroup;
@@ -310,6 +318,7 @@
 				652E8C5512F20CE700CA79EE /* trace.binary */,
 				65B977C112F3984E00FEBB0D /* trace.incr */,
 				6569FC85130A4A5E00DFAC09 /* dtrace.binary.set */,
+				652DC977134A8F55000C5787 /* dtrace.flush */,
 			);
 			path = traces;
 			sourceTree = "<group>";
@@ -317,14 +326,17 @@
 		65C1871312D6A944009E5AE1 /* schedulers */ = {
 			isa = PBXGroup;
 			children = (
-				65BF5E221334286D0043FAB2 /* flex.h */,
-				65BF5E231334286D0043FAB2 /* flex.cc */,
-				65C1887012DAAD56009E5AE1 /* 3thread.h */,
-				65C1886712DAA9BB009E5AE1 /* 3thread.cc */,
-				65C1887112DAAD5D009E5AE1 /* Dispatch.h */,
-				65C187B812D94D21009E5AE1 /* Dispatch.cc */,
+				65BF5E221334286D0043FAB2 /* Flex.h */,
+				65BF5E231334286D0043FAB2 /* Flex.cc */,
+				652DCB61134BF637000C5787 /* Flex_thread_spec.h */,
+				652DCB5E134BF512000C5787 /* Flex_broker.h */,
+				652DCB5D134BF512000C5787 /* Flex_broker.cc */,
+				652DCB60134BF512000C5787 /* Flex_cluster.h */,
+				652DCB5F134BF512000C5787 /* Flex_cluster.cc */,
 				65C1887212DAAD62009E5AE1 /* Stockholm.h */,
 				65278D6612BC452E00189195 /* Stockholm.cc */,
+				652DCB4E134BE87D000C5787 /* 3thread.cc */,
+				652DCB4F134BE87D000C5787 /* 3thread.h */,
 			);
 			path = schedulers;
 			sourceTree = "<group>";


Attachment: [text/bzr-bundle] bzr/john.duncan@oracle.com-20110406043128-97yzcbs68ejb7rft.bundle
Thread
bzr commit into mysql-5.1-telco-7.2 branch (john.duncan:4150) John David Duncan6 Apr