List:Commits« Previous MessageNext Message »
From:John David Duncan Date:May 17 2011 6:12pm
Subject:bzr push into mysql-5.1-telco-7.2 branch (john.duncan:4176 to 4181)
View as plain text  
 4181 John David Duncan	2011-05-16
      Support writing multiple columns from tab-separated values

    modified:
      storage/ndb/memcache/src/ndb_worker.cc
 4180 John David Duncan	2011-05-16
      New Scheduler::yield() method.
      Fix a possible race condition in the Flex and Stockholm schedulers where notify_io_complete()
      is called, and the worker thread invalidates an Ndb, before sendPollNdb() on that Ndb has
      actually returned in the commit thread.  So, notify_io_complete() must be called from the
      commit thread loop in Stockholm and Flex, but from the callback function in Bulk.

    modified:
      storage/ndb/memcache/include/Scheduler.h
      storage/ndb/memcache/src/ndb_worker.cc
      storage/ndb/memcache/src/schedulers/Bulk.cc
      storage/ndb/memcache/src/schedulers/Bulk.h
      storage/ndb/memcache/src/schedulers/Flex.h
      storage/ndb/memcache/src/schedulers/Flex_cluster.cc
      storage/ndb/memcache/src/schedulers/Stockholm.cc
      storage/ndb/memcache/src/schedulers/Stockholm.h
 4179 John David Duncan	2011-05-16
      Minor edits to unify stockholm & flex commit thread loops.

    modified:
      storage/ndb/memcache/src/schedulers/Flex_cluster.cc
      storage/ndb/memcache/src/schedulers/Stockholm.cc
 4178 John David Duncan	2011-05-16
      worker_prepare_operation() now returns op_status_t rather than bool.

    modified:
      storage/ndb/memcache/include/ndb_worker.h
      storage/ndb/memcache/include/ndbmemcache_global.h
      storage/ndb/memcache/src/ndb_worker.cc
      storage/ndb/memcache/src/schedulers/Bulk.cc
      storage/ndb/memcache/src/schedulers/Flex_cluster.cc
      storage/ndb/memcache/src/schedulers/Stockholm.cc
 4177 John David Duncan	2011-05-16
      Send NdbApi requests from the commit thread.

    modified:
      storage/ndb/memcache/include/Scheduler.h
      storage/ndb/memcache/include/ndb_worker.h
      storage/ndb/memcache/include/workitem.h
      storage/ndb/memcache/src/ClusterConnectionPool.cc
      storage/ndb/memcache/src/ndb_worker.cc
      storage/ndb/memcache/src/schedulers/Bulk.h
      storage/ndb/memcache/src/schedulers/Flex.h
      storage/ndb/memcache/src/schedulers/Flex_cluster.cc
      storage/ndb/memcache/src/schedulers/Stockholm.cc
      storage/ndb/memcache/src/schedulers/Stockholm.h
 4176 John David Duncan	2011-05-15
      New TabSeparatedValues class

    added:
      storage/ndb/memcache/include/TabSeparatedValues.h
      storage/ndb/memcache/src/TabSeparatedValues.cc
      storage/ndb/memcache/unit/tsv.cc
    modified:
      storage/ndb/memcache/Makefile.am
      storage/ndb/memcache/unit/Makefile.am
      storage/ndb/memcache/unit/all_tests.h
      storage/ndb/memcache/unit/harness.cc
=== modified file 'storage/ndb/memcache/include/Scheduler.h'
--- a/storage/ndb/memcache/include/Scheduler.h	2011-04-08 08:14:11 +0000
+++ b/storage/ndb/memcache/include/Scheduler.h	2011-05-17 05:22:56 +0000
@@ -30,6 +30,7 @@
 #include "Configuration.h"
 #include "thread_identifier.h"
 
+
 /* Scheduler is an interface */
 
 class Scheduler {
@@ -54,6 +55,15 @@ public:
       an Ndb object for the operation and send the workitem to be executed. */ 
   virtual ENGINE_ERROR_CODE schedule(workitem *) = 0;
 
+  /** Before an NDB callback function completes, it must call either 
+      reschedule() or yield().  yield() indicates that work is comlpete. */
+  virtual void yield(workitem *) const = 0;
+
+  /** Before an NDB callback function completes, it must call either 
+      reschedule() or yield(). reschedule() indicates to that the workitem 
+      requires the scheduler to send & poll an additional operation. */
+  virtual void reschedule(workitem *) const = 0;
+ 
   /** io_completed() is called from the NDB Engine thread when an IO
       completion notification has been received */
   virtual void io_completed(workitem *) = 0;

=== modified file 'storage/ndb/memcache/include/ndb_worker.h'
--- a/storage/ndb/memcache/include/ndb_worker.h	2011-04-06 04:31:28 +0000
+++ b/storage/ndb/memcache/include/ndb_worker.h	2011-05-17 05:07:20 +0000
@@ -21,11 +21,7 @@
 #define NDBMEMCACHE_NDB_WORKER_H
 
 
-/* worker_prepare_operation().
-   Returns TRUE if operation is prepared.
-   FALSE if it is not supported. 
-*/   
-bool worker_prepare_operation(workitem *);
+op_status_t worker_prepare_operation(workitem *);
 
 bool build_hash_item(workitem *, Operation &);
 

=== modified file 'storage/ndb/memcache/include/ndbmemcache_global.h'
--- a/storage/ndb/memcache/include/ndbmemcache_global.h	2011-04-06 04:31:28 +0000
+++ b/storage/ndb/memcache/include/ndbmemcache_global.h	2011-05-17 05:07:20 +0000
@@ -52,4 +52,13 @@ enum {  
   OP_FLUSH
 };
 
+/* Operation Status enums */
+typedef enum {
+  op_not_supported,
+  op_failed,
+  op_async_prepared,
+  op_async_sent
+} op_status_t;
+
+
 #endif

=== modified file 'storage/ndb/memcache/include/workitem.h'
--- a/storage/ndb/memcache/include/workitem.h	2011-04-06 04:31:28 +0000
+++ b/storage/ndb/memcache/include/workitem.h	2011-05-17 04:55:39 +0000
@@ -50,7 +50,8 @@ typedef struct workitem {
     unsigned retries     : 3;  /*! how many times this job has been retried */
     unsigned complete    : 1;  /*! is this operation finished? */
     unsigned broker      : 2;  /*! for use by the flex scheduler */
-    unsigned _unused     : 2;  /*! (32 bits total) */
+    unsigned reschedule  : 1;  /*! inform scheduler to send and poll again */
+    unsigned cas_owner   : 1;  /*! set if this engine owns the CAS ID */
   } base;
   unsigned int id;
   struct workitem *previous;   /*! used to chain workitems in multi-key get */

=== modified file 'storage/ndb/memcache/src/ClusterConnectionPool.cc'
--- a/storage/ndb/memcache/src/ClusterConnectionPool.cc	2011-04-07 11:20:56 +0000
+++ b/storage/ndb/memcache/src/ClusterConnectionPool.cc	2011-05-17 04:55:39 +0000
@@ -27,6 +27,7 @@
 
 extern EXTENSION_LOGGER_DESCRIPTOR *logger;
 
+// TODO: Why doesn't this use member variable connect_string ??
 
 Ndb_cluster_connection * ClusterConnectionPool::connect(const char *connectstring) {
   DEBUG_ENTER();

=== modified file 'storage/ndb/memcache/src/ndb_worker.cc'
--- a/storage/ndb/memcache/src/ndb_worker.cc	2011-04-19 01:16:36 +0000
+++ b/storage/ndb/memcache/src/ndb_worker.cc	2011-05-17 05:25:51 +0000
@@ -41,25 +41,27 @@
 #include "workitem.h"
 #include "Configuration.h"
 #include "DataTypeHandler.h"
+#include "TabSeparatedValues.h"
 #include "debug.h"
 #include "Operation.h"
 #include "NdbInstance.h"
 #include "status_block.h"
 #include "Operation.h"
+#include "Scheduler.h"
 #include "ndb_engine.h"
 #include "hash_item_util.h"
 #include "ndb_worker.h"
 
 typedef void ndb_async_callback(int, NdbTransaction *, void *);
 
-ndb_async_callback incrCallback;     // callback for incr/decr
-ndb_async_callback rewriteCallback;  // callback for append/prepend
-ndb_async_callback DBcallback;       // callback for all others
+ndb_async_callback incr_callback;     // callback for incr/decr
+ndb_async_callback rewrite_callback;  // callback for append/prepend
+ndb_async_callback DB_callback;       // callback for all others
  
-bool worker_do_read(workitem *, bool with_cas); 
-bool worker_do_write(workitem *, bool with_cas); 
-bool worker_do_delete(workitem *, bool with_cas); 
-bool worker_do_math(workitem *wqitem, bool with_cas);
+op_status_t worker_do_read(workitem *, bool with_cas); 
+op_status_t worker_do_write(workitem *, bool with_cas); 
+op_status_t worker_do_delete(workitem *, bool with_cas); 
+op_status_t worker_do_math(workitem *wqitem, bool with_cas);
 
 void worker_set_cas(ndb_pipeline *, uint64_t *);
 bool finalize_read(workitem *);
@@ -116,17 +118,29 @@ void worker_set_cas(ndb_pipeline *p, uin
   DEBUG_PRINT("hi:%lx lo:%lx cas:%llx (%llu)", cas_hi, cas_lo, *cas, *cas);
 }
   
-
-bool worker_prepare_operation(workitem *newitem) {
+/* worker_prepare_operation(): 
+   Called from the scheduler. 
+   Returns true if executeAsynchPrepare() has been called on the item.
+*/
+op_status_t worker_prepare_operation(workitem *newitem) {
   bool server_cas = (newitem->prefix_info.has_cas_col && newitem->cas);
-  bool r;
+  op_status_t r;
 
   /* Jump table */
   switch(newitem->base.verb) {
     case OP_READ:
+      r = worker_do_read(newitem, server_cas);
+      break;
+      
     case OPERATION_APPEND:
     case OPERATION_PREPEND:
-      r = worker_do_read(newitem, server_cas);
+      if(newitem->plan->spec->nvaluecols > 1) {
+        /* APPEND/PREPEND is currently not supported for tsv */
+        r = op_not_supported;
+      }
+      else {
+        r = worker_do_read(newitem, server_cas);
+      }
       break;
 
     case OP_DELETE: 
@@ -145,14 +159,14 @@ bool worker_prepare_operation(workitem *
       break;
       
     default:
-      return false;   /* not supported */
+      r= op_not_supported;
   }
 
-  return r;  /* fixme: distinguish "not supported" from "failed" */
+  return r;
 }
 
 
-bool worker_do_delete(workitem *wqitem, bool server_cas) {  
+op_status_t worker_do_delete(workitem *wqitem, bool server_cas) {  
   DEBUG_ENTER();
   
   QueryPlan *plan = wqitem->plan;
@@ -179,18 +193,17 @@ bool worker_do_delete(workitem *wqitem, 
     if(err.status != NdbError::Success) {
       logger->log(LOG_WARNING, 0, "deleteTuple(): %s\n", err.message);
       tx->close();
-      return false;
+      return op_failed;
     }
   }
 
-  /* NdbTransaction::executeAsynch() */   
-  tx->executeAsynch(NdbTransaction::Commit, DBcallback, (void *) wqitem,
-                    NdbOperation::DefaultAbortOption, 1);
-  return true;
+  /* Prepare for execution */   
+  tx->executeAsynchPrepare(NdbTransaction::Commit, DB_callback, (void *) wqitem);
+  return op_async_prepared;
 }
 
 
-bool worker_do_write(workitem *wqitem, bool server_cas) {
+op_status_t worker_do_write(workitem *wqitem, bool server_cas) {
   DEBUG_PRINT("%s", workitem_get_operation(wqitem));
 
   uint64_t cas_in = *wqitem->cas;                       // read old value
@@ -212,12 +225,33 @@ bool worker_do_write(workitem *wqitem, b
 
   /* Set the row */
   op.clearNullBits();
-  op.setColumn(COL_STORE_KEY, dbkey, wqitem->base.nsuffix);    
-  op.setColumn(COL_STORE_VALUE, hash_item_get_data(wqitem->cache_item),
-               wqitem->cache_item->nbytes);                        // the value
+  op.setColumn(COL_STORE_KEY, dbkey, wqitem->base.nsuffix);
+  
+  if(plan->spec->nvaluecols > 1) {
+    /* Multiple Value Columns */
+    TabSeparatedValues tsv(hash_item_get_data(wqitem->cache_item), 
+                           plan->spec->nvaluecols, wqitem->cache_item->nbytes); 
+    int idx = 0;
+    do {
+      if(tsv.getLength()) {
+        op.setColumn(COL_STORE_VALUE+idx, tsv.getPointer(), tsv.getLength());
+      }
+      else {
+        op.setColumnNull(COL_STORE_VALUE+idx);
+      }
+      idx++;
+    } while (tsv.advance());
+  }
+  else {
+    /* Just one value column */
+    op.setColumn(COL_STORE_VALUE, hash_item_get_data(wqitem->cache_item),
+                 wqitem->cache_item->nbytes);
+  }
+
   if(server_cas) {
     op.setColumnBigUnsigned(COL_STORE_CAS, * wqitem->cas);   // the cas
   }
+
   if(wqitem->plan->dup_numbers) {
     if(isdigit(* hash_item_get_data(wqitem->cache_item)) && 
        wqitem->cache_item->nbytes < 32) {      // Copy string representation 
@@ -282,17 +316,15 @@ bool worker_do_write(workitem *wqitem, b
     DEBUG_PRINT("NDB operation failed.  workitem %d.%d", wqitem->pipeline->id,
                 wqitem->id);
     tx->close();
-    return false;
+    return op_failed;
   }
 
-  /* NdbTransaction::executeAsynch() */   
-  tx->executeAsynch(NdbTransaction::Commit, DBcallback, (void *) wqitem,
-                    NdbOperation::DefaultAbortOption, 1);
-  return true;
+  tx->executeAsynchPrepare(NdbTransaction::Commit, DB_callback, (void *) wqitem);
+  return op_async_prepared;
 }
 
 
-bool worker_do_read(workitem *wqitem, bool server_cas) {
+op_status_t worker_do_read(workitem *wqitem, bool server_cas) {
   DEBUG_ENTER();
 
   QueryPlan *plan = wqitem->plan;
@@ -315,28 +347,26 @@ bool worker_do_read(workitem *wqitem, bo
   if(! op.readTuple(tx)) {
     logger->log(LOG_WARNING, 0, "readTuple(): %s\n", tx->getNdbError().message);
     tx->close();
-    return false;
+    return op_failed;
   }
 
   /* Save the workitem in the transaction and prepare for async execution */   
   if(wqitem->base.verb == OPERATION_APPEND || wqitem->base.verb == OPERATION_PREPEND) 
   {
-    DEBUG_PRINT("In read() portion of APPEND.  Value = %s",  
+    DEBUG_PRINT("In read() portion of APPEND.  Value = %s", 
                 hash_item_get_data(wqitem->cache_item));
-    tx->executeAsynch(NdbTransaction::NoCommit, rewriteCallback, (void *) wqitem,
-                      NdbOperation::DefaultAbortOption, 1);
+    tx->executeAsynchPrepare(NdbTransaction::NoCommit, rewrite_callback, (void *) wqitem);
   }
   else 
   {
-    tx->executeAsynch(NdbTransaction::Commit, DBcallback, (void *) wqitem,
-                      NdbOperation::DefaultAbortOption, 1);
+    tx->executeAsynchPrepare(NdbTransaction::Commit, DB_callback, (void *) wqitem);
   }
 
-  return true;
+  return op_async_prepared;
 }
 
 
-bool worker_do_math(workitem *wqitem, bool server_cas) {
+op_status_t worker_do_math(workitem *wqitem, bool server_cas) {
   DEBUG_PRINT("create: %d   retries: %d", 
                      wqitem->base.math_create, wqitem->base.retries);
   worker_set_cas(wqitem->pipeline, wqitem->cas);
@@ -409,7 +439,7 @@ bool worker_do_math(workitem *wqitem, bo
     if(! ndbop1) {
       logger->log(LOG_WARNING, 0, "readMasked(): %s\n", tx->getNdbError().message);
       tx->close();
-      return false; 
+      return op_failed; 
     }
   }
 
@@ -432,7 +462,7 @@ bool worker_do_math(workitem *wqitem, bo
     if(! ndbop2) {
       logger->log(LOG_WARNING, 0, "insertMasked(): %s\n", tx->getNdbError().message);
       tx->close();
-      return false;
+      return op_failed;
     }
   }
 
@@ -472,17 +502,16 @@ bool worker_do_math(workitem *wqitem, bo
     if(! ndbop3) {
       logger->log(LOG_WARNING, 0, "updateInterpreted(): %s\n", tx->getNdbError().message);
       tx->close();
-      return false;
+      return op_failed;
     }
   }
 
-  tx->executeAsynch(NdbTransaction::Commit, incrCallback, (void *) wqitem,
-                    NdbOperation::DefaultAbortOption, 1);
-  return true;
+  tx->executeAsynchPrepare(NdbTransaction::Commit, incr_callback, (void *) wqitem);
+  return op_async_prepared;
 }
 
 
-void DBcallback(int result, NdbTransaction *tx, void *itemptr) {
+void DB_callback(int result, NdbTransaction *tx, void *itemptr) {
   workitem *wqitem = (workitem *) itemptr;
   ndb_pipeline * & pipeline = wqitem->pipeline;
   status_block * return_status;
@@ -550,21 +579,21 @@ void DBcallback(int result, NdbTransacti
   if(wqitem->base.is_sync) {
     wqitem->status = return_status;
     pipeline->engine->server.cookie->store_engine_specific(wqitem->cookie, wqitem); 
-    pipeline->engine->server.cookie->notify_io_complete(wqitem->cookie, io_status);    
+    pipeline->scheduler->yield(wqitem);
   }
   else {
     /* The workitem was allocated back in the engine thread; if used in a
        callback, it would be freed there, too.  But we must free it here.
     */
     pipeline->engine->server.cookie->store_engine_specific(wqitem->cookie, wqitem->previous);
-    pipeline_io_completed(pipeline, wqitem);
+    pipeline->scheduler->io_completed(wqitem);
     workitem_free(wqitem);
   }
 }
 
 
 /* Middle-step callback for APPEND and PREPEND */
-void rewriteCallback(int result, NdbTransaction *tx, void *itemptr) {
+void rewrite_callback(int result, NdbTransaction *tx, void *itemptr) {
   workitem *item = (workitem *) itemptr;
   DEBUG_PRINT("%d.%d", item->pipeline->id, item->id);
  
@@ -573,11 +602,11 @@ void rewriteCallback(int result, NdbTran
     item->status = & status_block_bad_replace;
     tx->close();
     item->pipeline->engine->server.cookie->store_engine_specific(item->cookie, item); 
-    item->pipeline->engine->server.cookie->notify_io_complete(item->cookie, ENGINE_SUCCESS); 
+    item->pipeline->scheduler->yield(item);
     return;
   }
   else if(tx->getNdbError().classification != NdbError::NoError) {
-    return DBcallback(result, tx, itemptr);
+    return DB_callback(result, tx, itemptr);
   }  
 
   /* Strings and lengths: */
@@ -630,14 +659,13 @@ void rewriteCallback(int result, NdbTran
     op.setColumnBigUnsigned(COL_STORE_CAS, * item->cas);
   ndb_op = op.updateTuple(tx);
 
-  /* Error case; operation has not been built */
   if(ndb_op) {
-    tx->executeAsynch(NdbTransaction::Commit, DBcallback, (void *) item,
-                      NdbOperation::DefaultAbortOption, 1);  
-    // fixme: this should call back into the scheduler!
-    item->ndb_instance->db->pollNdb();
+    // Inform the scheduler that this item must be re-polled
+    item->pipeline->scheduler->reschedule(item);
+    tx->executeAsynchPrepare(NdbTransaction::Commit, DB_callback, (void *) item);
   }
   else {
+    /* Error case; operation has not been built */
     DEBUG_PRINT("NDB operation failed.  workitem %d.%d", item->pipeline->id,
                 item->id);
     tx->close();
@@ -649,7 +677,7 @@ void rewriteCallback(int result, NdbTran
 
 /* Dedicated callback function for INCR and DECR operations
 */
-void incrCallback(int result, NdbTransaction *tx, void *itemptr) {
+void incr_callback(int result, NdbTransaction *tx, void *itemptr) {
   workitem *wqitem = (workitem *) itemptr;
   ndb_pipeline * & pipeline = wqitem->pipeline;
   status_block * return_status;
@@ -737,13 +765,13 @@ void incrCallback(int result, NdbTransac
   if(wqitem->base.is_sync) {
     wqitem->status = return_status;
     pipeline->engine->server.cookie->store_engine_specific(wqitem->cookie, wqitem); 
-    pipeline->engine->server.cookie->notify_io_complete(wqitem->cookie, io_status);    
+    pipeline->scheduler->yield(wqitem);    
   }
   else {
     /* The workitem was allocated back in the engine thread; if used in a
        callback, it would be freed there, too.  But we must free it here.  */
     pipeline->engine->server.cookie->store_engine_specific(wqitem->cookie, wqitem->previous);
-    pipeline_io_completed(pipeline, wqitem);
+    pipeline->scheduler->io_completed(wqitem);
     workitem_free(wqitem);
   }
 }

=== modified file 'storage/ndb/memcache/src/schedulers/Bulk.cc'
--- a/storage/ndb/memcache/src/schedulers/Bulk.cc	2011-04-21 08:05:13 +0000
+++ b/storage/ndb/memcache/src/schedulers/Bulk.cc	2011-05-17 05:22:56 +0000
@@ -129,7 +129,6 @@ ENGINE_ERROR_CODE Scheduler_bulk::schedu
   DEBUG_ENTER();
   const Configuration & conf = get_Configuration();     // see note
   int lockerr;
-  ENGINE_ERROR_CODE response_code = ENGINE_EWOULDBLOCK;
   /* Fetch the config for its key prefix */
   const KeyPrefix *pfx = conf.getPrefixByInfo(newitem->prefix_info);
 
@@ -164,18 +163,22 @@ ENGINE_ERROR_CODE Scheduler_bulk::schedu
   workitem_set_NdbInstance(newitem, inst);
 
   // Build the NDB transaction
-  if(worker_prepare_operation(newitem)) {
-    inst->npending++;
+  ENGINE_ERROR_CODE response_code;
+  op_status_t op_status = worker_prepare_operation(newitem);
+  switch(op_status) {
+    case op_async_prepared:
+    case op_async_sent:
+      inst->npending++;
+      response_code = ENGINE_EWOULDBLOCK;
+      break;
+    case op_not_supported:
+      response_code = ENGINE_ENOTSUP;
+      break;
+    case op_failed:
+      response_code = ENGINE_FAILED;
+      break;
   }
-  else {
-    /******* Error handling.  The operation is not supported. */
-    if(newitem->base.is_sync) {
-      newitem->status = & status_block_unsupported;
-      pipeline->engine->server.cookie->store_engine_specific(newitem->cookie, newitem); 
-      pipeline->engine->server.cookie->notify_io_complete(newitem->cookie, ENGINE_ENOTSUP);
-    }
-  }
-  
+   
   inst->unlock();
 
   return response_code;
@@ -280,3 +283,17 @@ void * Scheduler_bulk::run_ndb_commit_th
       cluster[c].stats.vtime = get_thread_vtime(); 
   } 
 }
+
+
+void Scheduler_bulk::reschedule(workitem *item) const {
+  LockableNdbInstance * inst =  (LockableNdbInstance *) item->ndb_instance ;
+  DEBUG_ASSERT(inst->is_locked);
+  inst->npending++;
+}
+
+
+void Scheduler_bulk::yield(workitem *item) const {
+  /* In this scheduler, a small number of Ndb objects are shared, 
+     so we call notify_io_complete() while the callback is still running. */
+  pipeline->engine->server.cookie->notify_io_complete(item->cookie, ENGINE_SUCCESS);
+}

=== modified file 'storage/ndb/memcache/src/schedulers/Bulk.h'
--- a/storage/ndb/memcache/src/schedulers/Bulk.h	2011-04-19 01:16:36 +0000
+++ b/storage/ndb/memcache/src/schedulers/Bulk.h	2011-05-17 05:22:56 +0000
@@ -27,6 +27,7 @@
 #include <memcached/types.h>
 
 #include "config.h"
+#include "workitem.h"
 #include "Scheduler.h"
 #include "NdbInstance.h"
 
@@ -42,6 +43,8 @@ public:
   void init(int threadnum, int nthreads, const char *config_string);
   void attach_thread(thread_identifier *);
   ENGINE_ERROR_CODE schedule(workitem *);
+  void yield(workitem *) const;
+  void reschedule(workitem *) const;
   void io_completed(workitem *);
   void add_stats(ADD_STAT, const void *);
   void * run_ndb_commit_thread(int thread_id);

=== modified file 'storage/ndb/memcache/src/schedulers/Flex.h'
--- a/storage/ndb/memcache/src/schedulers/Flex.h	2011-04-07 11:20:56 +0000
+++ b/storage/ndb/memcache/src/schedulers/Flex.h	2011-05-17 05:22:56 +0000
@@ -56,6 +56,8 @@ public:
   void init(int threadnum, int nthreads, const char *config_string);
   void attach_thread(thread_identifier *);
   ENGINE_ERROR_CODE schedule(workitem *);
+  void yield(workitem *) const;                                       // inlined
+  void reschedule(workitem *) const;                                  // inlined
   void io_completed(workitem *);
   void add_stats(ADD_STAT, const void *);  
 
@@ -83,6 +85,13 @@ protected:
 };
 
 
+inline void Scheduler_flex::reschedule(workitem *item) const {
+  item->base.reschedule = 1;
+}
+
+inline void Scheduler_flex::yield(workitem *item) const { }
+
+
 /* Random stat samples will on average be taken twice as often as 
    FLEX_STATS_SAMPLE_INTERVAL (based on uniform random distribution). 
    FLEX_STATS_INITIAL_INTERVAL is lower so as to get stats quickly on startup.

=== modified file 'storage/ndb/memcache/src/schedulers/Flex_cluster.cc'
--- a/storage/ndb/memcache/src/schedulers/Flex_cluster.cc	2011-04-09 07:36:24 +0000
+++ b/storage/ndb/memcache/src/schedulers/Flex_cluster.cc	2011-05-17 05:22:56 +0000
@@ -157,31 +157,40 @@ ENGINE_ERROR_CODE Scheduler_flex::Cluste
   
   workitem_set_NdbInstance(newitem, inst);
 
-  // Fetch the query plan for this prefix.
+  /* Fetch the query plan for this prefix. */
   newitem->plan = inst->getPlanForPrefix(pfx);
   if(! newitem->plan) return ENGINE_FAILED;
   
-  // Build the NDB transaction
-  if(worker_prepare_operation(newitem)) {
-    
-    if(do_queue_sample-- == 0) {   // Sample for statistics.
-      /* To simulate sampling the depth immediately after the item is added,
-       sample it now and add one.  */
-      int depth = queue->depth + 1;
-      /* Queue depth can be erroneous because of thread races.
-       Use it only if it looks valid. */
-      if(depth > 0 && depth <= queue_size) {
-        stats.queue_total_depth += depth;
-        stats.queue_samples++;
-      }
-      do_queue_sample = random() % FLEX_STATS_SAMPLE_INTERVAL;
+  /* Sample for statistics */
+  if(do_queue_sample-- == 0) {
+    int depth = queue->depth + 1;
+    // Depth can be erroneous due to thread races; use it only if it looks valid
+    if(depth > 0 && depth <= queue_size) {
+      stats.queue_total_depth += depth;
+      stats.queue_samples++;
     }
-    
-    // Put the NdbInstance on the queue for the commit thread.     
-    workqueue_add(queue, inst);
-    return ENGINE_EWOULDBLOCK;
+    do_queue_sample = (random() % FLEX_STATS_SAMPLE_INTERVAL) + 1;
+  }  
+  
+  /* Build the NDB transaction */
+  ENGINE_ERROR_CODE response_code;
+  op_status_t op_status = worker_prepare_operation(newitem);
+
+  switch(op_status) {
+    case op_async_prepared:
+    case op_async_sent:
+      workqueue_add(queue, newitem); // place item on queue
+      response_code = ENGINE_EWOULDBLOCK;
+      break;
+    case op_not_supported:
+      response_code = ENGINE_ENOTSUP;
+      break;
+    case op_failed:
+      response_code = ENGINE_FAILED;
+      break;
   }
-  else return ENGINE_ENOTSUP;
+  
+  return response_code;
 }
 
 
@@ -209,19 +218,29 @@ void Scheduler_flex::Cluster::contribute
   Commit thread: Get an NdbInstance off the workqueue, and call pollNdb() on it.
 */
 void * Scheduler_flex::Cluster::run_commit_thread() {
+  workitem *item;
+  int polled;
+
   DEBUG_ENTER();
   
-  NdbInstance *inst;
-  
   while(1) {
     /* Wait for something to appear on the queue */
-    inst = (NdbInstance *) workqueue_consumer_wait(queue);
+    item = (workitem *) workqueue_consumer_wait(queue);
     
-    if(inst == NULL) return 0;  /* queue has been shut down and emptied */
+    if(item == NULL) break;  /* queue has been shut down and emptied */
     
-    /* Poll */
-    inst->db->pollNdb();
+    /* Send & poll for response; reschedule if needed */
+    do {
+      item->base.reschedule = 0;
+      polled = item->ndb_instance->db->sendPollNdb(10, 1, 1);
+    } while(item->base.reschedule || ! polled);
+
+    /* Now that sendPollNdb() has returned, it is OK to notify_io_complete(),
+       which will trigger the worker thread to release the Ndb instance. */ 
+    item->pipeline->engine->server.cookie->notify_io_complete(item->cookie, ENGINE_SUCCESS);
   } 
+
+  return NULL;
 }
 
 

=== modified file 'storage/ndb/memcache/src/schedulers/Stockholm.cc'
--- a/storage/ndb/memcache/src/schedulers/Stockholm.cc	2011-04-19 01:16:36 +0000
+++ b/storage/ndb/memcache/src/schedulers/Stockholm.cc	2011-05-17 05:22:56 +0000
@@ -185,13 +185,24 @@ ENGINE_ERROR_CODE Scheduler_stockholm::s
   if(! newitem->plan) return ENGINE_FAILED;
   
   // Build the NDB transaction
-  if(worker_prepare_operation(newitem)) {
-    // Put the NdbInstance on the queue for the commit thread.
-    // Should probably be the workitem?     
-    workqueue_add(cluster[c].queue, inst);
-    return ENGINE_EWOULDBLOCK;
+  op_status_t op_status = worker_prepare_operation(newitem);
+  ENGINE_ERROR_CODE response_code;
+  
+  switch(op_status) {
+    case op_async_prepared:
+    case op_async_sent:
+      workqueue_add(cluster[c].queue, newitem); // place item on queue
+      response_code = ENGINE_EWOULDBLOCK;
+      break;
+    case op_not_supported:
+      response_code = ENGINE_ENOTSUP;
+      break;
+    case op_failed:
+      response_code = ENGINE_FAILED;
+      break;
   }
-  else return ENGINE_ENOTSUP;
+
+  return response_code;
 }
 
 
@@ -241,21 +252,34 @@ void * run_stockholm_commit_thread(void 
   Get an item off the workqueue, and call pollNdb() on that item.
  */
 void * Scheduler_stockholm::run_ndb_commit_thread(int c) {
-  NdbInstance *inst;
+  workitem *item;
+  int polled;
   
   DEBUG_ENTER();
   
   while(1) {
     /* Wait for something to appear on the queue */
-    inst = (NdbInstance *) workqueue_consumer_wait(cluster[c].queue);
+    item = (workitem *) workqueue_consumer_wait(cluster[c].queue);
+
+    if(item == NULL) break;  /* queue has been shut down and emptied */
     
-    /* Poll */
-    inst->db->pollNdb();
+    /* Send & poll for response; reschedule if needed */
+    do {
+      item->base.reschedule = 0;
+      polled = item->ndb_instance->db->sendPollNdb(10, 1, 1);
+    } while(item->base.reschedule || ! polled);
+
+    DEBUG_ASSERT(polled == 1);  // i.e. not > 1
+    
+    /* Now that sendPollNdb() has returned, it is OK to notify_io_complete(),
+       which will trigger the worker thread to release the Ndb instance. */ 
+    pipeline->engine->server.cookie->notify_io_complete(item->cookie, ENGINE_SUCCESS);
     
-    cluster[c].stats.cycles++;
-    if(! (cluster[c].stats.cycles % STAT_INTERVAL)) 
+    if(! (cluster[c].stats.cycles++ % STAT_INTERVAL)) 
       cluster[c].stats.commit_thread_vtime = get_thread_vtime();
   } 
+
+  return NULL;
 }
 
 

=== modified file 'storage/ndb/memcache/src/schedulers/Stockholm.h'
--- a/storage/ndb/memcache/src/schedulers/Stockholm.h	2011-04-07 11:20:56 +0000
+++ b/storage/ndb/memcache/src/schedulers/Stockholm.h	2011-05-17 05:22:56 +0000
@@ -28,6 +28,7 @@
 #include <memcached/types.h>
 
 #include "ndbmemcache_config.h"
+#include "workitem.h"
 #include "Scheduler.h"
 #include "KeyPrefix.h"
 
@@ -45,6 +46,8 @@ public:
   void init(int threadnum, int nthreads, const char *config_string);
   void attach_thread(thread_identifier *);
   ENGINE_ERROR_CODE schedule(workitem *);
+  void yield(workitem *) const;                                       // inlined
+  void reschedule(workitem *) const;                                  // inlined
   void io_completed(workitem *);
   void add_stats(ADD_STAT, const void *);
   void * run_ndb_commit_thread(int cluster_id);
@@ -64,5 +67,13 @@ private:  
 };
 
 
+inline void Scheduler_stockholm::reschedule(workitem *item) const {
+  item->base.reschedule = 1;
+}
+
+
+inline void Scheduler_stockholm::yield(workitem *item) const { } 
+
+
 #endif
 

No bundle (reason: useless for push emails).
Thread
bzr push into mysql-5.1-telco-7.2 branch (john.duncan:4176 to 4181) John David Duncan19 May