List:Commits« Previous MessageNext Message »
From:John David Duncan Date:March 7 2012 1:53am
Subject:bzr push into mysql-5.5-cluster-7.2 branch (john.duncan:3830 to 3831)
View as plain text  
 3831 John David Duncan	2012-03-06 [merge]
      Merge local 7.2.4-bugs tree into local master

    added:
      storage/ndb/memcache/include/ndb_engine_errors.h
      storage/ndb/memcache/src/ndb_engine_errors.cc
    modified:
      storage/ndb/memcache/CMakeLists.txt
      storage/ndb/memcache/include/LookupTable.h
      storage/ndb/memcache/include/NdbInstance.h
      storage/ndb/memcache/include/Scheduler.h
      storage/ndb/memcache/include/ndb_engine.h
      storage/ndb/memcache/include/ndb_error_logger.h
      storage/ndb/memcache/include/ndb_pipeline.h
      storage/ndb/memcache/include/workitem.h
      storage/ndb/memcache/src/Config_v1.cc
      storage/ndb/memcache/src/ndb_engine.c
      storage/ndb/memcache/src/ndb_engine_private.h
      storage/ndb/memcache/src/ndb_error_logger.cc
      storage/ndb/memcache/src/ndb_pipeline.cc
      storage/ndb/memcache/src/schedulers/S_sched.cc
      storage/ndb/memcache/src/schedulers/S_sched.h
      storage/ndb/memcache/src/schedulers/Stockholm.cc
      storage/ndb/memcache/src/schedulers/Stockholm.h
      storage/ndb/memcache/src/workitem.c
 3830 jonas oreland	2012-03-06 [merge]
      ndb - merge 71 to 72

    added:
      storage/ndb/clusterj/clusterj-core/src/main/java/com/mysql/clusterj/core/spi/SmartValueHandler.java
      storage/ndb/clusterj/clusterj-core/src/main/java/com/mysql/clusterj/core/spi/ValueHandlerFactory.java
      storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/NdbRecordSmartValueHandlerFactoryImpl.java
      storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/NdbRecordSmartValueHandlerImpl.java
    modified:
      mysql-test/suite/ndb/t/clusterj.test
      storage/ndb/clusterj/clusterj-api/src/main/java/com/mysql/clusterj/ClusterJHelper.java
      storage/ndb/clusterj/clusterj-core/src/main/java/com/mysql/clusterj/core/SessionFactoryImpl.java
      storage/ndb/clusterj/clusterj-core/src/main/java/com/mysql/clusterj/core/SessionImpl.java
      storage/ndb/clusterj/clusterj-core/src/main/java/com/mysql/clusterj/core/metadata/AbstractDomainFieldHandlerImpl.java
      storage/ndb/clusterj/clusterj-core/src/main/java/com/mysql/clusterj/core/metadata/AbstractDomainTypeHandlerImpl.java
      storage/ndb/clusterj/clusterj-core/src/main/java/com/mysql/clusterj/core/metadata/DomainFieldHandlerImpl.java
      storage/ndb/clusterj/clusterj-core/src/main/java/com/mysql/clusterj/core/metadata/DomainTypeHandlerFactoryImpl.java
      storage/ndb/clusterj/clusterj-core/src/main/java/com/mysql/clusterj/core/metadata/DomainTypeHandlerImpl.java
      storage/ndb/clusterj/clusterj-core/src/main/java/com/mysql/clusterj/core/metadata/InvocationHandlerImpl.java
      storage/ndb/clusterj/clusterj-core/src/main/java/com/mysql/clusterj/core/metadata/KeyValueHandlerImpl.java
      storage/ndb/clusterj/clusterj-core/src/main/java/com/mysql/clusterj/core/spi/DomainFieldHandler.java
      storage/ndb/clusterj/clusterj-core/src/main/java/com/mysql/clusterj/core/spi/DomainTypeHandler.java
      storage/ndb/clusterj/clusterj-core/src/main/java/com/mysql/clusterj/core/spi/DomainTypeHandlerFactory.java
      storage/ndb/clusterj/clusterj-core/src/main/java/com/mysql/clusterj/core/spi/ValueHandler.java
      storage/ndb/clusterj/clusterj-core/src/main/java/com/mysql/clusterj/core/store/ClusterConnection.java
      storage/ndb/clusterj/clusterj-core/src/main/java/com/mysql/clusterj/core/store/Column.java
      storage/ndb/clusterj/clusterj-core/src/main/java/com/mysql/clusterj/core/store/Operation.java
      storage/ndb/clusterj/clusterj-jdbc/src/main/java/com/mysql/clusterj/jdbc/ValueHandlerBatchingJDBCSetImpl.java
      storage/ndb/clusterj/clusterj-jdbc/src/main/java/com/mysql/clusterj/jdbc/ValueHandlerBindValuesImpl.java
      storage/ndb/clusterj/clusterj-jdbc/src/main/java/com/mysql/clusterj/jdbc/ValueHandlerImpl.java
      storage/ndb/clusterj/clusterj-jdbc/src/main/resources/com/mysql/clusterj/jdbc/Bundle.properties
      storage/ndb/clusterj/clusterj-openjpa/src/main/java/com/mysql/clusterj/openjpa/NdbOpenJPAConfigurationImpl.java
      storage/ndb/clusterj/clusterj-openjpa/src/main/java/com/mysql/clusterj/openjpa/NdbOpenJPADomainFieldHandlerImpl.java
      storage/ndb/clusterj/clusterj-openjpa/src/main/java/com/mysql/clusterj/openjpa/NdbOpenJPADomainTypeHandlerImpl.java
      storage/ndb/clusterj/clusterj-openjpa/src/main/java/com/mysql/clusterj/openjpa/NdbOpenJPAValueHandler.java
      storage/ndb/clusterj/clusterj-openjpa/src/main/resources/com/mysql/clusterj/openjpa/Bundle.properties
      storage/ndb/clusterj/clusterj-test/src/main/java/testsuite/clusterj/AbstractClusterJModelTest.java
      storage/ndb/clusterj/clusterj-test/src/main/java/testsuite/clusterj/AbstractClusterJTest.java
      storage/ndb/clusterj/clusterj-test/src/main/java/testsuite/clusterj/AutoCommitTest.java
      storage/ndb/clusterj/clusterj-test/src/main/java/testsuite/clusterj/BinaryPKTest.java
      storage/ndb/clusterj/clusterj-test/src/main/java/testsuite/clusterj/CharsetTest.java
      storage/ndb/clusterj/clusterj-test/src/main/java/testsuite/clusterj/LoadTest.java
      storage/ndb/clusterj/clusterj-test/src/main/java/testsuite/clusterj/MultithreadedTest.java
      storage/ndb/clusterj/clusterj-test/src/main/java/testsuite/clusterj/StressTest.java
      storage/ndb/clusterj/clusterj-test/src/main/java/testsuite/clusterj/VarcharStringLengthTest.java
      storage/ndb/clusterj/clusterj-test/src/main/java/testsuite/clusterj/domaintypehandler/CrazyDomainTypeHandlerFactoryImpl.java
      storage/ndb/clusterj/clusterj-test/src/main/resources/schema.sql
      storage/ndb/clusterj/clusterj-tie/pom.xml
      storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/BlobImpl.java
      storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/ClusterConnectionImpl.java
      storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/ClusterTransactionImpl.java
      storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/ColumnImpl.java
      storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/DbImpl.java
      storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/NdbRecordBlobImpl.java
      storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/NdbRecordDeleteOperationImpl.java
      storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/NdbRecordImpl.java
      storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/NdbRecordInsertOperationImpl.java
      storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/NdbRecordKeyOperationImpl.java
      storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/NdbRecordOperationImpl.java
      storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/NdbRecordResultDataImpl.java
      storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/OperationImpl.java
      storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/Utility.java
      storage/ndb/clusterj/clusterj-tie/src/main/resources/com/mysql/clusterj/tie/Bundle.properties
      storage/ndb/clusterj/clusterj-tie/src/test/java/testsuite/clusterj/tie/ConnectionPoolTest.java
=== modified file 'storage/ndb/memcache/CMakeLists.txt'
--- a/storage/ndb/memcache/CMakeLists.txt	2011-12-18 23:21:21 +0000
+++ b/storage/ndb/memcache/CMakeLists.txt	2012-03-03 04:28:09 +0000
@@ -92,6 +92,7 @@ set(NDB_MEMCACHE_SOURCE_FILES
     src/debug.c
     src/hash_item_util.c
     src/ndb_configuration.cc
+    src/ndb_engine_errors.cc
     src/ndb_engine_private.h
     src/ndb_error_logger.cc
     src/ndb_flush.cc

=== modified file 'storage/ndb/memcache/include/LookupTable.h'
--- a/storage/ndb/memcache/include/LookupTable.h	2011-12-12 05:53:36 +0000
+++ b/storage/ndb/memcache/include/LookupTable.h	2012-03-07 00:38:08 +0000
@@ -28,9 +28,11 @@
 template<typename T> class LookupTable {
 public:
   int elements;
+  bool do_free_values;
 
   LookupTable(int sz = 128) : 
     elements(0),
+    do_free_values(false),
     size(sz), 
     symtab(new Entry *[sz]) 
   {
@@ -43,11 +45,13 @@ public:
     for(int i = 0 ; i < size ; i++) {
       Entry *sym = symtab[i];
       while(sym) {
+        if(do_free_values) free((void *) sym->value);
         Entry *next = sym->next;
         delete sym;
         sym = next;
       }
     }
+    delete[] symtab;
   }
 
 
@@ -58,7 +62,7 @@ public:
         return sym->value;
     return 0;
   }
-
+  
   void insert(const char *name, T * value) { 
     Uint32 h = do_hash(name) % size;
     Entry *sym = new Entry(name, value);    

=== modified file 'storage/ndb/memcache/include/NdbInstance.h'
--- a/storage/ndb/memcache/include/NdbInstance.h	2011-09-30 17:04:30 +0000
+++ b/storage/ndb/memcache/include/NdbInstance.h	2012-03-07 01:22:53 +0000
@@ -30,11 +30,10 @@
 #include "ndbmemcache_config.h"
 #include "KeyPrefix.h"
 #include "QueryPlan.h"
-
-struct workitem;
+#include "workitem.h"
 
 #define VPSZ sizeof(void *)
-#define TOTAL_SZ (3 * VPSZ)
+#define TOTAL_SZ (3 * VPSZ) + sizeof(int)
 #define PADDING (64 - TOTAL_SZ)
 
 
@@ -43,8 +42,11 @@ public:
   /* Public Methods */
   NdbInstance(Ndb_cluster_connection *, int);
   ~NdbInstance();
+  void link_workitem(workitem *);
+  void unlink_workitem(workitem *);
 
   /* Public Instance Variables */  
+  int id;
   Ndb *db;
   NdbInstance *next;
   workitem *wqitem;
@@ -54,4 +56,20 @@ private:
 };
 
 
+inline void NdbInstance::link_workitem(workitem *item) {
+  assert(item->ndb_instance == NULL);
+  assert(wqitem == NULL);
+  
+  item->ndb_instance = this;
+  wqitem = item;
+}
+
+
+inline void NdbInstance::unlink_workitem(workitem *item) {
+  assert(wqitem == item);
+  wqitem->ndb_instance = NULL;
+  wqitem = NULL;
+}
+
+
 #endif

=== modified file 'storage/ndb/memcache/include/Scheduler.h'
--- a/storage/ndb/memcache/include/Scheduler.h	2011-10-02 22:32:09 +0000
+++ b/storage/ndb/memcache/include/Scheduler.h	2012-03-07 01:22:53 +0000
@@ -67,9 +67,10 @@ public:
       requires the scheduler to send & poll an additional operation. */
   virtual void reschedule(workitem *) const = 0;
  
-  /** io_completed() is called from the NDB Engine thread when an IO
-      completion notification has been received */
-  virtual void io_completed(workitem *) = 0;
+  /** release() is called from the NDB Engine thread after an operation has
+      completed.  It allows the scheduler to release any resources (such as
+      the Ndb object) that were allocated in schedule(). */
+  virtual void release(workitem *) = 0;
   
   /** add_stats() allows the engine to delegate certain statistics
       to the scheduler. */

=== modified file 'storage/ndb/memcache/include/ndb_engine.h'
--- a/storage/ndb/memcache/include/ndb_engine.h	2011-12-18 23:21:21 +0000
+++ b/storage/ndb/memcache/include/ndb_engine.h	2012-03-07 01:22:53 +0000
@@ -76,7 +76,6 @@ struct ndb_engine {
   void **pipelines;
   void **schedulers;
    
-  bool initialized;
   bool connected;
 
   unsigned int cas_hi;

=== added file 'storage/ndb/memcache/include/ndb_engine_errors.h'
--- a/storage/ndb/memcache/include/ndb_engine_errors.h	1970-01-01 00:00:00 +0000
+++ b/storage/ndb/memcache/include/ndb_engine_errors.h	2012-03-03 04:28:09 +0000
@@ -0,0 +1,29 @@
+/*
+ Copyright (c) 2012, Oracle and/or its affiliates. All rights
+ reserved.
+ 
+ This program is free software; you can redistribute it and/or
+ modify it under the terms of the GNU General Public License
+ as published by the Free Software Foundation; version 2 of
+ the License.
+ 
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+ 
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ 02110-1301  USA
+ */
+#ifndef NDBMEMCACHE_ENGINE_ERRORS_H
+#define NDBMEMCACHE_ENGINE_ERRORS_H
+
+#include <ndberror.h>
+
+extern ndberror_struct AppError9001_ReconfLock;
+extern ndberror_struct AppError9002_NoNDBs;
+
+
+#endif

=== modified file 'storage/ndb/memcache/include/ndb_error_logger.h'
--- a/storage/ndb/memcache/include/ndb_error_logger.h	2011-12-18 23:21:21 +0000
+++ b/storage/ndb/memcache/include/ndb_error_logger.h	2012-03-03 04:28:09 +0000
@@ -39,12 +39,14 @@ enum {
 };
 
 int log_ndb_error(const NdbError &);
+int log_app_error(ndberror_struct const *);
 #endif
 
 
 DECLARE_FUNCTIONS_WITH_C_LINKAGE
 
 void ndb_error_logger_init(SERVER_CORE_API *, size_t log_level);
+void ndb_error_logger_stats(ADD_STAT add_stat, const void *cookie);
 
 END_FUNCTIONS_WITH_C_LINKAGE
 

=== modified file 'storage/ndb/memcache/include/ndb_pipeline.h'
--- a/storage/ndb/memcache/include/ndb_pipeline.h	2011-12-18 23:26:44 +0000
+++ b/storage/ndb/memcache/include/ndb_pipeline.h	2012-03-07 01:22:53 +0000
@@ -105,20 +105,24 @@ ndb_pipeline * get_request_pipeline(int 
 /** call into a pipeline for its own statistics */
 void pipeline_add_stats(ndb_pipeline *, const char *key, ADD_STAT, const void *);
 
+/** execute a "flush_all" operation */
+ENGINE_ERROR_CODE pipeline_flush_all(ndb_pipeline *);
+
+
+/***** SCHEDULER APIS *****/
+
 /** Global initialization of scheduler, at startup time */
-void * initialize_scheduler(const char *name, int nthreads, int threadnum);
+void * scheduler_initialize(const char *name, int nthreads, int threadnum);
 
 /** shutdown a scheduler */
-void shutdown_scheduler(void *);
+void scheduler_shutdown(ndb_pipeline *);
 
 /** pass a workitem to the configured scheduler, for execution */
-ENGINE_ERROR_CODE pipeline_schedule_operation(ndb_pipeline *, struct workitem *);
+ENGINE_ERROR_CODE scheduler_schedule(ndb_pipeline *, struct workitem *);
 
-/** schedule a "flush_all" operation */
-ENGINE_ERROR_CODE pipeline_flush_all(ndb_pipeline *);
+/** release the resources that were used by a completed operation */
+void scheduler_release(ndb_pipeline *, struct workitem *);
 
-/** notify scheduler of IO completion */
-void pipeline_io_completed(ndb_pipeline *, struct workitem *);
 
 /***** MEMORY MANAGEMENT APIS *****/
 

=== modified file 'storage/ndb/memcache/include/workitem.h'
--- a/storage/ndb/memcache/include/workitem.h	2011-12-11 07:31:26 +0000
+++ b/storage/ndb/memcache/include/workitem.h	2012-03-05 17:51:21 +0000
@@ -26,19 +26,9 @@
 #include "ndb_pipeline.h"
 #include "status_block.h"
 
-/* struct workitem is used in both C and C++ code, requiring a small hack: */
-#ifdef __cplusplus
-#include "QueryPlan.h"
-#include "NdbInstance.h" 
-#define CPP_QUERYPLAN QueryPlan
-#define CPP_NDBINSTANCE NdbInstance
-#define CPP_EXTERNALVALUE ExternalValue
-#else 
-#define CPP_QUERYPLAN void
-#define CPP_NDBINSTANCE void
-#define CPP_EXTERNALVALUE void
-#endif
-
+struct NdbInstance;
+struct QueryPlan;
+struct ExternalValue;
 
 typedef struct workitem {
   struct {
@@ -63,11 +53,11 @@ typedef struct workitem {
   uint64_t math_value;         /*! IN: incr initial value; OUT: incr result */
   hash_item * cache_item;      /*! used for write requests */
   ndb_pipeline *pipeline;      /*! pointer back to request pipeline */
-  CPP_NDBINSTANCE *ndb_instance;   
+  struct NdbInstance *ndb_instance;
                                /*! pointer to ndb instance, if applicable */
   const void *cookie;          /*! memcached's connection cookie */
-  CPP_QUERYPLAN *plan;         /*! QueryPlan for resolving this request */
-  CPP_EXTERNALVALUE *ext_val;  /*! ExternalValue */
+  struct QueryPlan *plan;      /*! QueryPlan for resolving this request */
+  struct ExternalValue *ext_val; /*! ExternalValue */
   const char *key;             /*! pointer to the key */
   void * next_step;            /*! a worker_step function in ndb_worker.cc */
   status_block *status;        /*! A static status_block in ndb_worker.cc */
@@ -132,7 +122,6 @@ bool workitem_allocate_rowbuffer_1(worki
 */
 bool workitem_allocate_rowbuffer_2(workitem *, size_t);
 
-
 /*! returns the name of the memcached operation stored in the workitem.
 */
 const char * workitem_get_operation(workitem *);
@@ -146,15 +135,10 @@ void workitem_free(workitem *);
 */
 const char * workitem_get_key_suffix(workitem *item);
 
-
 /*!  Return the size of key buffer needed for a key of length "nkey" 
 */
 size_t workitem_get_key_buf_size(int nkey);
 
-/*! Set the workitem's NdbInstance
-*/
-void workitem_set_NdbInstance(workitem*, CPP_NDBINSTANCE *);
-
 END_FUNCTIONS_WITH_C_LINKAGE
     
 #endif

=== modified file 'storage/ndb/memcache/src/Config_v1.cc'
--- a/storage/ndb/memcache/src/Config_v1.cc	2011-12-18 23:26:44 +0000
+++ b/storage/ndb/memcache/src/Config_v1.cc	2012-03-07 00:38:08 +0000
@@ -59,6 +59,7 @@ config_v1::~config_v1() {
     delete containers_map;    
   }
   if(policies_map) {
+    policies_map->do_free_values = true;
     delete policies_map;
   }
 }

=== modified file 'storage/ndb/memcache/src/ndb_engine.c'
--- a/storage/ndb/memcache/src/ndb_engine.c	2012-02-22 10:09:58 +0000
+++ b/storage/ndb/memcache/src/ndb_engine.c	2012-03-07 01:51:06 +0000
@@ -81,7 +81,7 @@ ENGINE_ERROR_CODE create_instance(uint64
                                   GET_SERVER_API get_server_api,
                                   ENGINE_HANDLE **handle ) {
   
-  ENGINE_ERROR_CODE e;
+  ENGINE_ERROR_CODE return_status;
   
   SERVER_HANDLE_V1 *api = get_server_api();
   if (interface != 1 || api == NULL) {
@@ -96,6 +96,7 @@ ENGINE_ERROR_CODE create_instance(uint64
   logger = get_stderr_logger();
     
   ndb_eng->npipelines = 0;
+  ndb_eng->connected  = false;
 
   ndb_eng->engine.interface.interface = 1;
   ndb_eng->engine.get_info         = ndb_get_info;
@@ -139,15 +140,13 @@ ENGINE_ERROR_CODE create_instance(uint64
   ndb_eng->info.info.features[2].description = NULL;
  
   /* Now call create_instace() for the default engine */
-  e = default_engine_create_instance(interface, get_server_api, 
-                                 & (ndb_eng->m_default_engine));
-  if(e != ENGINE_SUCCESS) return e;
-  
-  ndb_eng->initialized   = true;
-  ndb_eng->connected     = false;
+  return_status = default_engine_create_instance(interface, get_server_api, 
+                                                 & (ndb_eng->m_default_engine));
 
-  *handle = (ENGINE_HANDLE*) &ndb_eng->engine;
-  return ENGINE_SUCCESS;
+  if(return_status == ENGINE_SUCCESS)
+    *handle = (ENGINE_HANDLE*) &ndb_eng->engine;
+  
+  return return_status;
 }
 
 
@@ -159,8 +158,6 @@ static const engine_info* ndb_get_info(E
 
 
 /*** initialize ***/
-
-
 static ENGINE_ERROR_CODE ndb_initialize(ENGINE_HANDLE* handle,
                                         const char* config_str) 
 {   
@@ -230,7 +227,7 @@ static ENGINE_ERROR_CODE ndb_initialize(
   for(i = 0 ; i < nthreads ; i++) {
     ndb_eng->pipelines[i] = get_request_pipeline(i);
     ndb_eng->schedulers[i] = 
-      initialize_scheduler(ndb_eng->startup_options.scheduler, nthreads, i);
+      scheduler_initialize(ndb_eng->startup_options.scheduler, nthreads, i);
     if(ndb_eng->schedulers[i] == 0) {
       logger->log(LOG_WARNING, NULL, "Illegal scheduler: \"%s\"\n", 
                   ndb_eng->startup_options.scheduler); 
@@ -265,10 +262,10 @@ static void ndb_destroy(ENGINE_HANDLE* h
   struct ndb_engine* ndb_eng = ndb_handle(handle);
   struct default_engine *def_eng = default_handle(ndb_eng);
 
-  for(unsigned i = 0 ; i < ndb_eng->npipelines; i ++) {
-    void *p = ndb_eng->schedulers[i];
+  for(unsigned int i = 0 ; i < ndb_eng->npipelines; i ++) {
+    ndb_pipeline *p = ndb_eng->pipelines[i];
     if(p) {
-      shutdown_scheduler(p);
+      scheduler_shutdown(p);
     }
   }
 
@@ -277,6 +274,28 @@ static void ndb_destroy(ENGINE_HANDLE* h
 }
 
 
+/* CALL FLOWS
+   ----------
+   GET:       eng.get(), eng.get_item_info()*, eng.release()*
+   DELETE:    eng.remove()
+   SET (etc): eng.allocate(), eng.item_set_cas(), eng.get_item_info(), 
+                 eng.store(), eng.release()*
+   INCR:      eng.arithmetic()
+   FLUSH:     eng.flush()
+
+   * Called only on success (ENGINE_SUCCESS or ENGINE_EWOULDBLOCK)
+*/
+ 
+
+
+/*** Release scheduler resources and free workitem ****/
+void release_and_free(workitem *wqitem) {
+  DEBUG_PRINT("Releasing workitem %d.%d.", wqitem->pipeline->id, wqitem->id);
+  scheduler_release(wqitem->pipeline, wqitem);  
+  workitem_free(wqitem);
+}
+
+
 /*** allocate ***/
 
 /* Allocate gets a struct item from the slab allocator, and fills in 
@@ -321,11 +340,9 @@ static ENGINE_ERROR_CODE ndb_remove(ENGI
   wqitem = ndb_eng->server.cookie->get_engine_specific(cookie);
   if(wqitem) {
     DEBUG_PRINT("Got callback: %s", wqitem->status->comment);
-    ndb_eng->server.cookie->store_engine_specific(cookie, wqitem->previous);
-    return_status = wqitem->status->status;
-    pipeline_io_completed(pipeline, wqitem);
-    workitem_free(wqitem);
-    return return_status;
+    ndb_eng->server.cookie->store_engine_specific(cookie, wqitem->previous); //pop
+    release_and_free(wqitem);
+    return wqitem->status->status;
   }
 
   prefix = get_prefix_info_for_key(nkey, key);
@@ -361,7 +378,10 @@ static ENGINE_ERROR_CODE ndb_remove(ENGI
   if(prefix.do_db_delete) {                        /* Database Delete */
     wqitem = new_workitem_for_delete_op(pipeline, prefix, cookie, nkey, key, & cas);
     DEBUG_PRINT("creating workitem %d.%d", pipeline->id, wqitem->id);
-    return_status = pipeline_schedule_operation(pipeline, wqitem);
+    return_status = scheduler_schedule(pipeline, wqitem);
+    if(return_status != ENGINE_EWOULDBLOCK) {
+      release_and_free(wqitem);
+    }
   }
   
   return return_status;
@@ -375,17 +395,12 @@ static void ndb_release(ENGINE_HANDLE* h
   struct ndb_engine* ndb_eng = ndb_handle(handle);
   struct default_engine *def_eng = default_handle(ndb_eng);
 
-  /* There may be a stack of workitems associated with the connection cookie; 
-     pop this one and free it. 
-  */
   workitem *wqitem = ndb_eng->server.cookie->get_engine_specific(cookie);  
   if(wqitem) {
-    /* pipeline_io_completed()?  */
-    DEBUG_PRINT("Releasing workitem %d.%d.", wqitem->pipeline->id, wqitem->id);
     ndb_eng->server.cookie->store_engine_specific(cookie, wqitem->previous);
-    workitem_free(wqitem);
+    release_and_free(wqitem);
   }
-
+  
   if(item && (item != wqitem)) {
     DEBUG_PRINT("Releasing a hash item.");
     item_release(def_eng, (hash_item *) item);
@@ -404,6 +419,7 @@ static ENGINE_ERROR_CODE ndb_get(ENGINE_
   struct ndb_engine* ndb_eng = ndb_handle(handle);
   ndb_pipeline *pipeline = get_my_pipeline_config(ndb_eng);
   struct workitem *wqitem;
+  ENGINE_ERROR_CODE return_status = ENGINE_KEY_ENOENT;
 
   wqitem = ndb_eng->server.cookie->get_engine_specific(cookie);
 
@@ -413,19 +429,22 @@ static ENGINE_ERROR_CODE ndb_get(ENGINE_
                 pipeline->id, wqitem->id, wqitem->status->comment);
     *item = wqitem->cache_item;
     wqitem->base.complete = 1;
-    pipeline_io_completed(pipeline, wqitem);
-    if(wqitem->status->status != ENGINE_SUCCESS) {
-      DEBUG_PRINT("pop and free the workitem.");
-      ndb_eng->server.cookie->store_engine_specific(cookie, wqitem->previous);
-      workitem_free(wqitem);
+    return_status = wqitem->status->status;
+
+    /* On success the workitem will be read in ndb_get_item_info, then released.
+       Otherwise: */
+    if(return_status != ENGINE_SUCCESS) {
+      ndb_eng->server.cookie->store_engine_specific(cookie, wqitem->previous);//pop
+      release_and_free(wqitem);
     }
-    /* The workitem will be read in ndb_get_item_info, then released */
-    return wqitem->status->status;  
+    
+    return return_status;  
   }
 
   prefix_info_t prefix = get_prefix_info_for_key(nkey, key);
   
   /* Cache read */
+  /* FIXME: Use the public APIs */
   if(prefix.do_mc_read) {
     *item = item_get(default_handle(ndb_eng), key, nkey);
     if (*item != NULL) {
@@ -439,10 +458,15 @@ static ENGINE_ERROR_CODE ndb_get(ENGINE_
   if(prefix.do_db_read) {
     wqitem = new_workitem_for_get_op(wqitem, pipeline, prefix, cookie, nkey, key);
     DEBUG_PRINT("creating workitem %d.%d", pipeline->id, wqitem->id);
-    return pipeline_schedule_operation(pipeline, wqitem);
+    return_status = scheduler_schedule(pipeline, wqitem);
+    if(! ((return_status == ENGINE_EWOULDBLOCK) || (return_status == ENGINE_SUCCESS))) {
+      /* On error we must pop and free */
+      ndb_eng->server.cookie->store_engine_specific(cookie, wqitem->previous);
+      release_and_free(wqitem);
+    }
   }
   
-  return ENGINE_KEY_ENOENT;
+  return return_status;
 }
 
 
@@ -460,9 +484,11 @@ static ENGINE_ERROR_CODE ndb_get_stats(E
   DEBUG_ENTER(); 
   
   if(stat_key && 
-     ((strncasecmp(stat_key, "ndb", 3) == 0) || 
+     ((strncasecmp(stat_key, "ndb", 3) == 0)       || 
       (strncasecmp(stat_key, "scheduler", 9) == 0) ||
-      (strncasecmp(stat_key, "reconf", 6) == 0)))
+      (strncasecmp(stat_key, "reconf", 6) == 0)    ||
+      (strncasecmp(stat_key, "errors", 6) == 0)
+    ))
   {
     /* NDB Engine stats */
     pipeline_add_stats(pipeline, stat_key, add_stat, cookie);
@@ -497,17 +523,15 @@ static ENGINE_ERROR_CODE ndb_store(ENGIN
                                    ENGINE_STORE_OPERATION op,
                                    uint16_t vbucket  __attribute__((unused)))
 {
-  prefix_info_t prefix;
-
   struct ndb_engine* ndb_eng = ndb_handle(handle);
   ndb_pipeline *pipeline = get_my_pipeline_config(ndb_eng);
+  ENGINE_ERROR_CODE return_status = ENGINE_NOT_STORED;
+  prefix_info_t prefix;
 
   /* Is this a callback after completed I/O? */  
   workitem *wqitem = ndb_eng->server.cookie->get_engine_specific(cookie);  
   if(wqitem) {
-   // fixme: chaining
     DEBUG_PRINT("Got callback: %s", wqitem->status->comment);
-    pipeline_io_completed(pipeline, wqitem);
     return wqitem->status->status;
   }
 
@@ -525,17 +549,18 @@ static ENGINE_ERROR_CODE ndb_store(ENGIN
   if(prefix.do_db_write) {
     wqitem = new_workitem_for_store_op(pipeline, op, prefix, cookie, item, cas);
     DEBUG_PRINT("creating workitem %d.%d", pipeline->id, wqitem->id);
-    return pipeline_schedule_operation(pipeline, wqitem);    
+    return_status = scheduler_schedule(pipeline, wqitem);
+    if(! ((return_status == ENGINE_EWOULDBLOCK) || (return_status == ENGINE_SUCCESS))) {
+      ndb_eng->server.cookie->store_engine_specific(cookie, wqitem->previous);//pop
+      release_and_free(wqitem);      
+    }
   }
-  
-  /* write to cache */
-  if(prefix.do_mc_write) {
+  else if(prefix.do_mc_write) {
     DEBUG_PRINT(" cache-only store.");
-    return store_item(default_handle(ndb_eng), item, cas, op, cookie);
+    return_status = store_item(default_handle(ndb_eng), item, cas, op, cookie);
   }
   
-  /* NOP case: db_write and mc_write are both disabled. */
-  return ENGINE_SUCCESS;  
+  return return_status;  
 }
 
 
@@ -557,20 +582,19 @@ static ENGINE_ERROR_CODE ndb_arithmetic(
   struct default_engine *def_eng = default_handle(ndb_eng);
   ndb_pipeline *pipeline = get_my_pipeline_config(ndb_eng);
   struct workitem *wqitem;
+  ENGINE_ERROR_CODE return_status;  
   
   /* Is this a callback after completed I/O? */
   wqitem = ndb_eng->server.cookie->get_engine_specific(cookie);
   if(wqitem && ! wqitem->base.complete) {
     DEBUG_PRINT("Got arithmetic callback: %s", wqitem->status->comment);
-    ENGINE_ERROR_CODE status = wqitem->status->status;  
+    return_status = wqitem->status->status;  
     wqitem->base.complete = 1;
     *result = wqitem->math_value;
     /* There will be no call to release(), so pop and free now. */
-    pipeline_io_completed(pipeline, wqitem);
     ndb_eng->server.cookie->store_engine_specific(cookie, wqitem->previous);
-    workitem_free(wqitem);
-
-    return status;
+    release_and_free(wqitem);
+    return return_status;
   }
   
   prefix_info_t prefix = get_prefix_info_for_key(nkey, key);
@@ -596,9 +620,14 @@ static ENGINE_ERROR_CODE ndb_arithmetic(
   wqitem = new_workitem_for_arithmetic(pipeline, prefix, cookie, key, nkey,
                                        increment, create, delta, initial, cas);
   DEBUG_PRINT("creating workitem %d.%d", pipeline->id, wqitem->id);
-  return pipeline_schedule_operation(pipeline, wqitem);  
-}
 
+  return_status = scheduler_schedule(pipeline, wqitem);
+
+  if(! ((return_status == ENGINE_EWOULDBLOCK) || (return_status == ENGINE_SUCCESS)))
+    release_and_free(wqitem);
+
+  return return_status;
+}
 
 /*** flush ***/
 static ENGINE_ERROR_CODE ndb_flush(ENGINE_HANDLE* handle,

=== added file 'storage/ndb/memcache/src/ndb_engine_errors.cc'
--- a/storage/ndb/memcache/src/ndb_engine_errors.cc	1970-01-01 00:00:00 +0000
+++ b/storage/ndb/memcache/src/ndb_engine_errors.cc	2012-03-03 04:28:09 +0000
@@ -0,0 +1,32 @@
+/*
+ Copyright (c) 2012, Oracle and/or its affiliates. All rights
+ reserved.
+ 
+ This program is free software; you can redistribute it and/or
+ modify it under the terms of the GNU General Public License
+ as published by the Free Software Foundation; version 2 of
+ the License.
+ 
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+ 
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ 02110-1301  USA 
+*/
+
+#include <ndberror.h>
+
+ndberror_struct AppError9001_ReconfLock = 
+  { ndberror_st_temporary , ndberror_cl_application , 9001, -1,
+    "Could not obtain configuration read lock" 
+  };
+
+ndberror_struct AppError9002_NoNDBs =
+  { ndberror_st_temporary , ndberror_cl_application , 9002, -1,
+    "No Ndb Instances in freelist" 
+  };
+

=== modified file 'storage/ndb/memcache/src/ndb_engine_private.h'
--- a/storage/ndb/memcache/src/ndb_engine_private.h	2011-10-02 01:39:35 +0000
+++ b/storage/ndb/memcache/src/ndb_engine_private.h	2012-03-07 01:22:53 +0000
@@ -27,6 +27,9 @@ void read_cmdline_options(struct ndb_eng
 
 int fetch_core_settings(struct ndb_engine *, struct default_engine *);
 
+void release_and_free(struct workitem *);
+
+
 /*************** Declarations of functions that implement 
                  the engine interface 
  *********************************************************/

=== modified file 'storage/ndb/memcache/src/ndb_error_logger.cc'
--- a/storage/ndb/memcache/src/ndb_error_logger.cc	2011-12-19 01:09:00 +0000
+++ b/storage/ndb/memcache/src/ndb_error_logger.cc	2012-03-03 04:28:09 +0000
@@ -53,7 +53,8 @@ class ErrorEntry;
 ErrorEntry * error_hash_table[ERROR_HASH_TABLE_SIZE];
 
 /* Prototypes */
-void manage_error(const NdbError &, const char * mesg, rel_time_t interval);
+void manage_error(int err_code, const char * err_mesg,
+                  const char * extra_mesg, rel_time_t interval);
 
 
 
@@ -70,18 +71,28 @@ void ndb_error_logger_init(SERVER_CORE_A
 }
 
 
+int log_app_error(ndberror_struct const * error) {
+  if(error->code < 9100) 
+    manage_error(error->code, error->message, "Scheduler Error", 10);
+  else
+    manage_error(error->code, error->message, "Memcached Error", 10);
+    
+  return error->status;
+}
+
+
 int log_ndb_error(const NdbError &error) {
   switch(error.status) {
     case ndberror_st_success:
       break;
 
     case ndberror_st_temporary:
-      manage_error(error, "NDB Temporary Error", 10);
+      manage_error(error.code, error.message, "NDB Temporary Error", 10);
       break;
 
     case ndberror_st_permanent:
     case ndberror_st_unknown:
-      manage_error(error, "NDB Error", 10);
+      manage_error(error.code, error.message, "NDB Error", 10);
       break;
   }
   /* NDB classifies "Out Of Memory" (827) errors as permament errors, but we 
@@ -147,33 +158,32 @@ ErrorEntry * error_table_lookup(int code
 
 
 /* Record the error message, and possibly log it. */
-void manage_error(const NdbError & error, const char *type_mesg, rel_time_t interval) {
+void manage_error(int err_code, const char * err_mesg, 
+                  const char *type_mesg, rel_time_t interval) {
   char note[256];
   ErrorEntry *entry = 0;
   bool first_ever, interval_passed, flood = false;
   int current = 0, prior = 0;  // array indexes
 
-  if(verbose_logging == 0) { 
-    entry = error_table_lookup(error.code, core_api->get_current_time());
-
-    if((entry->count | 1) == entry->count)
-      current = 1;  // odd count
-    else
-      prior   = 1;  // even count
+  entry = error_table_lookup(err_code, core_api->get_current_time());
 
-    /* We have four pieces of information: the first timestamp, the two 
-       most recent timestamps, and the error count. When to write a log message?
-       (A) On the first occurrence of an error. 
-       (B) If a time > interval has passed since the previous message.
-       (C) At certain count numbers in error flood situations
-    */
-    first_ever = (entry->count == 1);
-    interval_passed = (entry->time[current] - entry->time[prior] > interval);
-    if(! interval_passed) 
-      for(Uint32 i = 10 ; i <= entry->count ; i *= 10) 
-        if(entry->count < (10 * i) && (entry->count % i == 0))
-          { flood = true; break; }
-  }
+  if((entry->count | 1) == entry->count)
+    current = 1;  // odd count
+  else
+    prior   = 1;  // even count
+
+  /* We have four pieces of information: the first timestamp, the two 
+     most recent timestamps, and the error count. When to write a log message?
+     (A) On the first occurrence of an error. 
+     (B) If a time > interval has passed since the previous message.
+     (C) At certain count numbers in error flood situations
+  */
+  first_ever = (entry->count == 1);
+  interval_passed = (entry->time[current] - entry->time[prior] > interval);
+  if(! interval_passed) 
+    for(Uint32 i = 10 ; i <= entry->count ; i *= 10) 
+      if(entry->count < (10 * i) && (entry->count % i == 0))
+        { flood = true; break; }
   
   if(verbose_logging || first_ever || interval_passed || flood) 
   {
@@ -181,8 +191,26 @@ void manage_error(const NdbError & error
       snprintf(note, 256, "[occurrence %d of this error]", entry->count);
     else
       note[0] = '\0';
+
     logger->log(LOG_WARNING, 0, "%s %d: %s %s\n", 
-                type_mesg, error.code, error.message, note);
+                type_mesg, err_code, err_mesg, note);
+  }
+}
+
+
+void ndb_error_logger_stats(ADD_STAT add_stat, const void *cookie) {
+  char key[128];
+  char val[128];
+  int klen, vlen;
+  unsigned int i;
+  Lock lock(& error_table_lock);
+  ErrorEntry *sym;
+
+  for(i = 0 ; i < ERROR_HASH_TABLE_SIZE; i++) {
+    for(sym = error_hash_table[i] ; sym != 0 ; sym = sym->next) { 
+      klen = sprintf(key, "NDB_Error_%d", sym->error_code);
+      vlen = sprintf(val, "%lu", sym->count);
+      add_stat(key, klen, val, vlen, cookie);
+    }
   }
 }
-  

=== modified file 'storage/ndb/memcache/src/ndb_pipeline.cc'
--- a/storage/ndb/memcache/src/ndb_pipeline.cc	2011-12-19 01:09:00 +0000
+++ b/storage/ndb/memcache/src/ndb_pipeline.cc	2012-03-07 01:22:53 +0000
@@ -37,6 +37,7 @@
 
 #include "schedulers/Stockholm.h"
 #include "schedulers/S_sched.h"
+#include "ndb_error_logger.h"
 
 #define DEFAULT_SCHEDULER S::SchedulerWorker
 
@@ -142,6 +143,9 @@ void pipeline_add_stats(ndb_pipeline *se
       conf.getConnectionPoolById(i)->add_stats(key, add_stat, cookie);
     }
   }
+  else if(strncasecmp(stat_key,"errors",6) == 0) {
+    ndb_error_logger_stats(add_stat, cookie);    
+  }
   else if((strncasecmp(stat_key,"scheduler",9) == 0)
           || (strncasecmp(stat_key,"reconf",6) == 0)) {
     self->scheduler->add_stats(stat_key, add_stat, cookie);
@@ -149,7 +153,14 @@ void pipeline_add_stats(ndb_pipeline *se
 }
   
 
-void * initialize_scheduler(const char *cf, int nthreads, int athread) {
+ENGINE_ERROR_CODE pipeline_flush_all(ndb_pipeline *self) {
+  return ndb_flush_all(self);
+}
+
+
+/* The scheduler API */
+
+void * scheduler_initialize(const char *cf, int nthreads, int athread) {
   Scheduler *s = 0;
   const char *sched_options = 0;
   
@@ -175,28 +186,21 @@ void * initialize_scheduler(const char *
 }
 
 
-void shutdown_scheduler(void *v) {
-  Scheduler *s = (Scheduler *) v;
-  
-  s->shutdown();
+void scheduler_shutdown(ndb_pipeline *self) {  
+  self->scheduler->shutdown();
 }
 
 
-ENGINE_ERROR_CODE pipeline_schedule_operation(ndb_pipeline *self, 
-                                              struct workitem *item) {
+ENGINE_ERROR_CODE scheduler_schedule(ndb_pipeline *self, struct workitem *item) {
   return self->scheduler->schedule(item);
 }
 
 
-ENGINE_ERROR_CODE pipeline_flush_all(ndb_pipeline *self) {
-  return ndb_flush_all(self);
+void scheduler_release(ndb_pipeline *self, struct workitem *item) {
+  self->scheduler->release(item);
 }
 
 
-void pipeline_io_completed(ndb_pipeline *self, struct workitem *item) {
-  self->scheduler->io_completed(item);
-}
-
 
 /* The slab allocator API */
 

=== modified file 'storage/ndb/memcache/src/schedulers/S_sched.cc'
--- a/storage/ndb/memcache/src/schedulers/S_sched.cc	2012-01-14 00:52:10 +0000
+++ b/storage/ndb/memcache/src/schedulers/S_sched.cc	2012-03-07 01:22:53 +0000
@@ -35,6 +35,8 @@
 #include "thread_identifier.h"
 #include "workitem.h"
 #include "ndb_worker.h"
+#include "ndb_engine_errors.h"
+#include "ndb_error_logger.h"
 
 #include "S_sched.h"
 
@@ -283,6 +285,7 @@ void S::SchedulerWorker::attach_thread(t
 
 ENGINE_ERROR_CODE S::SchedulerWorker::schedule(workitem *item) {
   int c = item->prefix_info.cluster_id;
+  ENGINE_ERROR_CODE response_code;
   NdbInstance *inst;
   S::WorkerConnection *wc;
   const KeyPrefix *pfx;
@@ -296,7 +299,7 @@ ENGINE_ERROR_CODE S::SchedulerWorker::sc
     pthread_rwlock_unlock(& reconf_lock);
   }
   else {
-    logger->log(LOG_INFO, 0, "S Scheduler could not acquire read lock");
+    log_app_error(& AppError9001_ReconfLock);
     return ENGINE_TMPFAIL;
   }
   /* READ LOCK RELEASED */
@@ -315,12 +318,11 @@ ENGINE_ERROR_CODE S::SchedulerWorker::sc
      all we can do is return an error. 
      (Or, alternately, the scheduler may be shutting down.)
      */
-    logger->log(LOG_INFO, 0, "No free NDB instances.");
+    log_app_error(& AppError9002_NoNDBs);
     return ENGINE_TMPFAIL;
   }
   
-  inst->wqitem = item;
-  workitem_set_NdbInstance(item, inst);
+  inst->link_workitem(item);
   
   // Fetch the query plan for this prefix.
   item->plan = wc->plan_set->getPlanForPrefix(pfx);
@@ -331,39 +333,44 @@ ENGINE_ERROR_CODE S::SchedulerWorker::sc
   
   // Build the NDB transaction
   op_status_t op_status = worker_prepare_operation(item);
-  ENGINE_ERROR_CODE response_code;
-  
-  switch(op_status) {
-    case op_async_prepared:
-      /* Put the prepared item onto a send queue */
-      wc->sendqueue->produce(inst);
-      DEBUG_PRINT("%d.%d placed on send queue.", id, inst->wqitem->id);
-      
-      /* This locking is explained in run_ndb_send_thread() */
-      if(pthread_mutex_trylock( & wc->conn->sem.lock) == 0) {  // try the lock
-        wc->conn->sem.counter++;                               // increment
-        pthread_cond_signal( & wc->conn->sem.not_zero);        // signal
-        pthread_mutex_unlock( & wc->conn->sem.lock);           // release
-      }
-      
-      response_code = ENGINE_EWOULDBLOCK;
-      break;
-   case op_not_supported:
-      DEBUG_PRINT("op_status is op_not_supported");
-      response_code = ENGINE_ENOTSUP;
-      break;
-    case op_overflow:
-      DEBUG_PRINT("op_status is op_overflow");
-      response_code = ENGINE_E2BIG;
-      break;
-    case op_async_sent:
-      DEBUG_PRINT("op_async_sent could be a bug");
-      response_code = ENGINE_FAILED;
-      break;      
-    case op_failed:
-      DEBUG_PRINT("op_status is op_failed");
-      response_code = ENGINE_FAILED;
-      break;
+
+  // Success; put the workitem on the send queue and return ENGINE_EWOULDBLOCK.
+  if(op_status == op_async_prepared) {
+    /* Put the prepared item onto a send queue */
+    wc->sendqueue->produce(inst);
+    DEBUG_PRINT("%d.%d placed on send queue.", id, inst->wqitem->id);
+    
+    /* This locking is explained in run_ndb_send_thread() */
+    if(pthread_mutex_trylock( & wc->conn->sem.lock) == 0) {  // try the lock
+      wc->conn->sem.counter++;                               // increment
+      pthread_cond_signal( & wc->conn->sem.not_zero);        // signal
+      pthread_mutex_unlock( & wc->conn->sem.lock);           // release
+    }
+        
+    response_code = ENGINE_EWOULDBLOCK;
+  }
+  else {  
+    switch(op_status) {
+     case op_not_supported:
+        DEBUG_PRINT("op_status is op_not_supported");
+        response_code = ENGINE_ENOTSUP;
+        break;
+      case op_overflow:
+        DEBUG_PRINT("op_status is op_overflow");
+        response_code = ENGINE_E2BIG;
+        break;
+      case op_async_sent:
+        DEBUG_PRINT("op_async_sent could be a bug");
+        response_code = ENGINE_FAILED;
+        break;      
+      case op_failed:
+        DEBUG_PRINT("op_status is op_failed");
+        response_code = ENGINE_FAILED;
+        break;
+      default:
+        DEBUG_PRINT("UNEXPECTED: op_status is %d", op_status);
+        response_code = ENGINE_FAILED;
+    }
   }
   
   return response_code;
@@ -376,18 +383,19 @@ void S::SchedulerWorker::reschedule(work
 }
 
 
-void S::SchedulerWorker::io_completed(workitem *item) {
+/* Release the resources used by an operation.  
+   Unlink the NdbInstance from the workitem, and return it to the free list 
+   (or free it, if the scheduler is shutting down).
+*/
+void S::SchedulerWorker::release(workitem *item) {
   DEBUG_ENTER();
   NdbInstance *inst = item->ndb_instance;
-  item->ndb_instance = NULL;
   
   if(inst) {
-    assert(inst->wqitem == item);
+    inst->unlink_workitem(item);
     int c = item->prefix_info.cluster_id;
     S::WorkerConnection * wc = * (s_global->getWorkerConnectionPtr(id, c));
     if(wc && ! wc->sendqueue->is_aborted()) {
-      // assert(inst->sched_gen_number == s_global->generation);
-      inst->wqitem = NULL;
       inst->next = wc->freelist;
       wc->freelist = inst;
       DEBUG_PRINT("Returned NdbInstance to freelist.");
@@ -533,6 +541,7 @@ S::WorkerConnection::WorkerConnection(Sc
   int my_ndb_inst = conn->nInst / global->options.n_worker_threads;
   for(int j = 0 ; j < my_ndb_inst ; j++ ) {
     NdbInstance *inst = new NdbInstance(conn->conn, 2);
+    inst->id = ((id.thd + 1) * 10000) + j + 1; 
     inst->next = freelist;
     freelist = inst;
   }

=== modified file 'storage/ndb/memcache/src/schedulers/S_sched.h'
--- a/storage/ndb/memcache/src/schedulers/S_sched.h	2011-10-02 02:58:36 +0000
+++ b/storage/ndb/memcache/src/schedulers/S_sched.h	2012-03-07 01:22:53 +0000
@@ -104,7 +104,7 @@ public:  
   ENGINE_ERROR_CODE schedule(workitem *);
   void yield(workitem *) const {};
   void reschedule(workitem *) const;
-  void io_completed(workitem *);
+  void release(workitem *);
   void add_stats(const char *, ADD_STAT, const void *);
   void shutdown();
   bool global_reconfigure(Configuration *);

=== modified file 'storage/ndb/memcache/src/schedulers/Stockholm.cc'
--- a/storage/ndb/memcache/src/schedulers/Stockholm.cc	2011-12-16 10:04:43 +0000
+++ b/storage/ndb/memcache/src/schedulers/Stockholm.cc	2012-03-07 01:22:53 +0000
@@ -184,8 +184,8 @@ ENGINE_ERROR_CODE Scheduler_stockholm::s
   {
     return ENGINE_TMPFAIL;
   }
-
-  workitem_set_NdbInstance(newitem, inst);
+  
+  inst->link_workitem(newitem);
   
   // Fetch the query plan for this prefix.
   newitem->plan = cluster[c].plan_set->getPlanForPrefix(pfx);
@@ -216,12 +216,12 @@ ENGINE_ERROR_CODE Scheduler_stockholm::s
 }
 
 
-void Scheduler_stockholm::io_completed(workitem *item) {
+void Scheduler_stockholm::release(workitem *item) {
   DEBUG_ENTER();
   NdbInstance* inst = item->ndb_instance;
-  item->ndb_instance  = NULL;
   
   if(inst) {    
+    inst->unlink_workitem(item);
     int c = item->prefix_info.cluster_id;
     inst->next = cluster[c].nextFree;
     cluster[c].nextFree = inst;

=== modified file 'storage/ndb/memcache/src/schedulers/Stockholm.h'
--- a/storage/ndb/memcache/src/schedulers/Stockholm.h	2011-10-02 02:58:36 +0000
+++ b/storage/ndb/memcache/src/schedulers/Stockholm.h	2012-03-07 01:22:53 +0000
@@ -49,7 +49,7 @@ public:
   ENGINE_ERROR_CODE schedule(workitem *);
   void yield(workitem *) const;                                       // inlined
   void reschedule(workitem *) const;                                  // inlined
-  void io_completed(workitem *);
+  void release(workitem *);
   void add_stats(const char *, ADD_STAT, const void *);
   void shutdown();
   void * run_ndb_commit_thread(int cluster_id);

=== modified file 'storage/ndb/memcache/src/workitem.c'
--- a/storage/ndb/memcache/src/workitem.c	2011-12-11 07:31:26 +0000
+++ b/storage/ndb/memcache/src/workitem.c	2012-03-07 01:22:53 +0000
@@ -182,26 +182,14 @@ const char * workitem_get_operation(work
 }
 
 
-void workitem_set_NdbInstance(workitem *item, CPP_NDBINSTANCE * _Ndb_instance)
-{
-  assert(item->ndb_instance == NULL);
-  item->ndb_instance = _Ndb_instance;
-}
-
-
 void workitem_free(workitem *item)
 {
-  /* It's OK to free all of these; a free() with class_id 0 is a no-op */
+  /* It's OK to free all of these; pipeline_free() with class_id 0 is a no-op */
   pipeline_free(item->pipeline, item->row_buffer_1, item->rowbuf1_cls);
   pipeline_free(item->pipeline, item->row_buffer_2, item->rowbuf2_cls);
   pipeline_free(item->pipeline, item->ndb_key_buffer, item->keybuf1_cls);
   pipeline_free(item->pipeline, item->key_buffer_2, item->keybuf2_cls);
 
-  if (item->ndb_instance != NULL)
-  {
-    assert(false); // TODO Not thread safe, investigate path
-  }
-
   pipeline_free(item->pipeline, item, workitem_class_id);
 }
 

No bundle (reason: useless for push emails).
Thread
bzr push into mysql-5.5-cluster-7.2 branch (john.duncan:3830 to 3831) John David Duncan8 Mar