List:Commits« Previous MessageNext Message »
From:John David Duncan Date:May 17 2011 5:07am
Subject:bzr commit into mysql-5.1-telco-7.2 branch (john.duncan:4178)
View as plain text  
#At file:///Users/jdd/bzr-repo/working/cluster-7.2-labs-memcached/ based on revid:john.duncan@stripped

 4178 John David Duncan	2011-05-16
      worker_prepare_operation() now returns op_status_t rather than bool.

    modified:
      storage/ndb/memcache/include/ndb_worker.h
      storage/ndb/memcache/include/ndbmemcache_global.h
      storage/ndb/memcache/src/ndb_worker.cc
      storage/ndb/memcache/src/schedulers/Bulk.cc
      storage/ndb/memcache/src/schedulers/Flex_cluster.cc
      storage/ndb/memcache/src/schedulers/Stockholm.cc
=== modified file 'storage/ndb/memcache/include/ndb_worker.h'
--- a/storage/ndb/memcache/include/ndb_worker.h	2011-05-17 04:55:39 +0000
+++ b/storage/ndb/memcache/include/ndb_worker.h	2011-05-17 05:07:20 +0000
@@ -21,10 +21,7 @@
 #define NDBMEMCACHE_NDB_WORKER_H
 
 
-/* worker_prepare_operation():
-   Returns TRUE if an operation has been prepared with executeAsynchPrepare().
-*/   
-bool worker_prepare_operation(workitem *);
+op_status_t worker_prepare_operation(workitem *);
 
 bool build_hash_item(workitem *, Operation &);
 

=== modified file 'storage/ndb/memcache/include/ndbmemcache_global.h'
--- a/storage/ndb/memcache/include/ndbmemcache_global.h	2011-04-06 04:31:28 +0000
+++ b/storage/ndb/memcache/include/ndbmemcache_global.h	2011-05-17 05:07:20 +0000
@@ -52,4 +52,13 @@ enum {  
   OP_FLUSH
 };
 
+/* Operation Status enums */
+typedef enum {
+  op_not_supported,
+  op_failed,
+  op_async_prepared,
+  op_async_sent
+} op_status_t;
+
+
 #endif

=== modified file 'storage/ndb/memcache/src/ndb_worker.cc'
--- a/storage/ndb/memcache/src/ndb_worker.cc	2011-05-17 04:55:39 +0000
+++ b/storage/ndb/memcache/src/ndb_worker.cc	2011-05-17 05:07:20 +0000
@@ -53,14 +53,14 @@
 
 typedef void ndb_async_callback(int, NdbTransaction *, void *);
 
-ndb_async_callback incrCallback;     // callback for incr/decr
-ndb_async_callback rewriteCallback;  // callback for append/prepend
-ndb_async_callback DBcallback;       // callback for all others
+ndb_async_callback incr_callback;     // callback for incr/decr
+ndb_async_callback rewrite_callback;  // callback for append/prepend
+ndb_async_callback DB_callback;       // callback for all others
  
-bool worker_do_read(workitem *, bool with_cas); 
-bool worker_do_write(workitem *, bool with_cas); 
-bool worker_do_delete(workitem *, bool with_cas); 
-bool worker_do_math(workitem *wqitem, bool with_cas);
+op_status_t worker_do_read(workitem *, bool with_cas); 
+op_status_t worker_do_write(workitem *, bool with_cas); 
+op_status_t worker_do_delete(workitem *, bool with_cas); 
+op_status_t worker_do_math(workitem *wqitem, bool with_cas);
 
 void worker_set_cas(ndb_pipeline *, uint64_t *);
 bool finalize_read(workitem *);
@@ -121,9 +121,9 @@ void worker_set_cas(ndb_pipeline *p, uin
    Called from the scheduler. 
    Returns true if executeAsynchPrepare() has been called on the item.
 */
-bool worker_prepare_operation(workitem *newitem) {
+op_status_t worker_prepare_operation(workitem *newitem) {
   bool server_cas = (newitem->prefix_info.has_cas_col && newitem->cas);
-  bool r;
+  op_status_t r;
 
   /* Jump table */
   switch(newitem->base.verb) {
@@ -149,14 +149,14 @@ bool worker_prepare_operation(workitem *
       break;
       
     default:
-      return false;   /* not supported */
+      r= op_not_supported;
   }
 
   return r;
 }
 
 
-bool worker_do_delete(workitem *wqitem, bool server_cas) {  
+op_status_t worker_do_delete(workitem *wqitem, bool server_cas) {  
   DEBUG_ENTER();
   
   QueryPlan *plan = wqitem->plan;
@@ -183,17 +183,17 @@ bool worker_do_delete(workitem *wqitem, 
     if(err.status != NdbError::Success) {
       logger->log(LOG_WARNING, 0, "deleteTuple(): %s\n", err.message);
       tx->close();
-      return false;
+      return op_failed;
     }
   }
 
   /* Prepare for execution */   
-  tx->executeAsynchPrepare(NdbTransaction::Commit, DBcallback, (void *) wqitem);
-  return true;
+  tx->executeAsynchPrepare(NdbTransaction::Commit, DB_callback, (void *) wqitem);
+  return op_async_prepared;
 }
 
 
-bool worker_do_write(workitem *wqitem, bool server_cas) {
+op_status_t worker_do_write(workitem *wqitem, bool server_cas) {
   DEBUG_PRINT("%s", workitem_get_operation(wqitem));
 
   uint64_t cas_in = *wqitem->cas;                       // read old value
@@ -285,15 +285,15 @@ bool worker_do_write(workitem *wqitem, b
     DEBUG_PRINT("NDB operation failed.  workitem %d.%d", wqitem->pipeline->id,
                 wqitem->id);
     tx->close();
-    return false;
+    return op_failed;
   }
 
-  tx->executeAsynchPrepare(NdbTransaction::Commit, DBcallback, (void *) wqitem);
-  return true;
+  tx->executeAsynchPrepare(NdbTransaction::Commit, DB_callback, (void *) wqitem);
+  return op_async_prepared;
 }
 
 
-bool worker_do_read(workitem *wqitem, bool server_cas) {
+op_status_t worker_do_read(workitem *wqitem, bool server_cas) {
   DEBUG_ENTER();
 
   QueryPlan *plan = wqitem->plan;
@@ -316,7 +316,7 @@ bool worker_do_read(workitem *wqitem, bo
   if(! op.readTuple(tx)) {
     logger->log(LOG_WARNING, 0, "readTuple(): %s\n", tx->getNdbError().message);
     tx->close();
-    return false;
+    return op_failed;
   }
 
   /* Save the workitem in the transaction and prepare for async execution */   
@@ -324,18 +324,18 @@ bool worker_do_read(workitem *wqitem, bo
   {
     DEBUG_PRINT("In read() portion of APPEND.  Value = %s", 
                 hash_item_get_data(wqitem->cache_item));
-    tx->executeAsynchPrepare(NdbTransaction::NoCommit, rewriteCallback, (void *) wqitem);
+    tx->executeAsynchPrepare(NdbTransaction::NoCommit, rewrite_callback, (void *) wqitem);
   }
   else 
   {
-    tx->executeAsynchPrepare(NdbTransaction::Commit, DBcallback, (void *) wqitem);
+    tx->executeAsynchPrepare(NdbTransaction::Commit, DB_callback, (void *) wqitem);
   }
 
-  return true;
+  return op_async_prepared;
 }
 
 
-bool worker_do_math(workitem *wqitem, bool server_cas) {
+op_status_t worker_do_math(workitem *wqitem, bool server_cas) {
   DEBUG_PRINT("create: %d   retries: %d", 
                      wqitem->base.math_create, wqitem->base.retries);
   worker_set_cas(wqitem->pipeline, wqitem->cas);
@@ -408,7 +408,7 @@ bool worker_do_math(workitem *wqitem, bo
     if(! ndbop1) {
       logger->log(LOG_WARNING, 0, "readMasked(): %s\n", tx->getNdbError().message);
       tx->close();
-      return false; 
+      return op_failed; 
     }
   }
 
@@ -431,7 +431,7 @@ bool worker_do_math(workitem *wqitem, bo
     if(! ndbop2) {
       logger->log(LOG_WARNING, 0, "insertMasked(): %s\n", tx->getNdbError().message);
       tx->close();
-      return false;
+      return op_failed;
     }
   }
 
@@ -471,16 +471,16 @@ bool worker_do_math(workitem *wqitem, bo
     if(! ndbop3) {
       logger->log(LOG_WARNING, 0, "updateInterpreted(): %s\n", tx->getNdbError().message);
       tx->close();
-      return false;
+      return op_failed;
     }
   }
 
-  tx->executeAsynchPrepare(NdbTransaction::Commit, incrCallback, (void *) wqitem);
-  return true;
+  tx->executeAsynchPrepare(NdbTransaction::Commit, incr_callback, (void *) wqitem);
+  return op_async_prepared;
 }
 
 
-void DBcallback(int result, NdbTransaction *tx, void *itemptr) {
+void DB_callback(int result, NdbTransaction *tx, void *itemptr) {
   workitem *wqitem = (workitem *) itemptr;
   ndb_pipeline * & pipeline = wqitem->pipeline;
   status_block * return_status;
@@ -562,7 +562,7 @@ void DBcallback(int result, NdbTransacti
 
 
 /* Middle-step callback for APPEND and PREPEND */
-void rewriteCallback(int result, NdbTransaction *tx, void *itemptr) {
+void rewrite_callback(int result, NdbTransaction *tx, void *itemptr) {
   workitem *item = (workitem *) itemptr;
   DEBUG_PRINT("%d.%d", item->pipeline->id, item->id);
  
@@ -575,7 +575,7 @@ void rewriteCallback(int result, NdbTran
     return;
   }
   else if(tx->getNdbError().classification != NdbError::NoError) {
-    return DBcallback(result, tx, itemptr);
+    return DB_callback(result, tx, itemptr);
   }  
 
   /* Strings and lengths: */
@@ -631,7 +631,7 @@ void rewriteCallback(int result, NdbTran
   if(ndb_op) {
     // Inform the scheduler that this item must be re-polled
     item->pipeline->scheduler->reschedule(item);
-    tx->executeAsynchPrepare(NdbTransaction::Commit, DBcallback, (void *) item);
+    tx->executeAsynchPrepare(NdbTransaction::Commit, DB_callback, (void *) item);
   }
   else {
     /* Error case; operation has not been built */
@@ -646,7 +646,7 @@ void rewriteCallback(int result, NdbTran
 
 /* Dedicated callback function for INCR and DECR operations
 */
-void incrCallback(int result, NdbTransaction *tx, void *itemptr) {
+void incr_callback(int result, NdbTransaction *tx, void *itemptr) {
   workitem *wqitem = (workitem *) itemptr;
   ndb_pipeline * & pipeline = wqitem->pipeline;
   status_block * return_status;

=== modified file 'storage/ndb/memcache/src/schedulers/Bulk.cc'
--- a/storage/ndb/memcache/src/schedulers/Bulk.cc	2011-04-21 08:05:13 +0000
+++ b/storage/ndb/memcache/src/schedulers/Bulk.cc	2011-05-17 05:07:20 +0000
@@ -129,7 +129,6 @@ ENGINE_ERROR_CODE Scheduler_bulk::schedu
   DEBUG_ENTER();
   const Configuration & conf = get_Configuration();     // see note
   int lockerr;
-  ENGINE_ERROR_CODE response_code = ENGINE_EWOULDBLOCK;
   /* Fetch the config for its key prefix */
   const KeyPrefix *pfx = conf.getPrefixByInfo(newitem->prefix_info);
 
@@ -164,18 +163,22 @@ ENGINE_ERROR_CODE Scheduler_bulk::schedu
   workitem_set_NdbInstance(newitem, inst);
 
   // Build the NDB transaction
-  if(worker_prepare_operation(newitem)) {
-    inst->npending++;
+  ENGINE_ERROR_CODE response_code;
+  op_status_t op_status = worker_prepare_operation(newitem);
+  switch(op_status) {
+    case op_async_prepared:
+    case op_async_sent:
+      inst->npending++;
+      response_code = ENGINE_EWOULDBLOCK;
+      break;
+    case op_not_supported:
+      response_code = ENGINE_ENOTSUP;
+      break;
+    case op_failed:
+      response_code = ENGINE_FAILED;
+      break;
   }
-  else {
-    /******* Error handling.  The operation is not supported. */
-    if(newitem->base.is_sync) {
-      newitem->status = & status_block_unsupported;
-      pipeline->engine->server.cookie->store_engine_specific(newitem->cookie, newitem); 
-      pipeline->engine->server.cookie->notify_io_complete(newitem->cookie, ENGINE_ENOTSUP);
-    }
-  }
-  
+   
   inst->unlock();
 
   return response_code;

=== modified file 'storage/ndb/memcache/src/schedulers/Flex_cluster.cc'
--- a/storage/ndb/memcache/src/schedulers/Flex_cluster.cc	2011-05-17 04:55:39 +0000
+++ b/storage/ndb/memcache/src/schedulers/Flex_cluster.cc	2011-05-17 05:07:20 +0000
@@ -157,31 +157,40 @@ ENGINE_ERROR_CODE Scheduler_flex::Cluste
   
   workitem_set_NdbInstance(newitem, inst);
 
-  // Fetch the query plan for this prefix.
+  /* Fetch the query plan for this prefix. */
   newitem->plan = inst->getPlanForPrefix(pfx);
   if(! newitem->plan) return ENGINE_FAILED;
   
-  // Build the NDB transaction
-  if(worker_prepare_operation(newitem)) { 
-     
-    if(do_queue_sample-- == 0) {   // Sample for statistics.
-      /* To simulate sampling the depth immediately after the item is added,
-       sample it now and add one.  */
-      int depth = queue->depth + 1;
-      /* Queue depth can be erroneous because of thread races.
-       Use it only if it looks valid. */
-      if(depth > 0 && depth <= queue_size) {
-        stats.queue_total_depth += depth;
-        stats.queue_samples++;
-      }
-      do_queue_sample = (random() % FLEX_STATS_SAMPLE_INTERVAL) + 1;
+  /* Sample for statistics */
+  if(do_queue_sample-- == 0) {
+    int depth = queue->depth + 1;
+    // Depth can be erroneous due to thread races; use it only if it looks valid
+    if(depth > 0 && depth <= queue_size) {
+      stats.queue_total_depth += depth;
+      stats.queue_samples++;
     }
-    
-    // Put the workitem on the queue for the commit thread.
-    workqueue_add(queue, newitem);
-    return ENGINE_EWOULDBLOCK;
+    do_queue_sample = (random() % FLEX_STATS_SAMPLE_INTERVAL) + 1;
+  }  
+  
+  /* Build the NDB transaction */
+  ENGINE_ERROR_CODE response_code;
+  op_status_t op_status = worker_prepare_operation(newitem);
+
+  switch(op_status) {
+    case op_async_prepared:
+    case op_async_sent:
+      workqueue_add(queue, newitem); // place item on queue
+      response_code = ENGINE_EWOULDBLOCK;
+      break;
+    case op_not_supported:
+      response_code = ENGINE_ENOTSUP;
+      break;
+    case op_failed:
+      response_code = ENGINE_FAILED;
+      break;
   }
-  else return ENGINE_ENOTSUP;
+  
+  return response_code;
 }
 
 

=== modified file 'storage/ndb/memcache/src/schedulers/Stockholm.cc'
--- a/storage/ndb/memcache/src/schedulers/Stockholm.cc	2011-05-17 04:55:39 +0000
+++ b/storage/ndb/memcache/src/schedulers/Stockholm.cc	2011-05-17 05:07:20 +0000
@@ -185,11 +185,24 @@ ENGINE_ERROR_CODE Scheduler_stockholm::s
   if(! newitem->plan) return ENGINE_FAILED;
   
   // Build the NDB transaction
-  if(worker_prepare_operation(newitem)) {
-    workqueue_add(cluster[c].queue, newitem); // place item on queue
-    return ENGINE_EWOULDBLOCK;
+  op_status_t op_status = worker_prepare_operation(newitem);
+  ENGINE_ERROR_CODE response_code;
+  
+  switch(op_status) {
+    case op_async_prepared:
+    case op_async_sent:
+      workqueue_add(cluster[c].queue, newitem); // place item on queue
+      response_code = ENGINE_EWOULDBLOCK;
+      break;
+    case op_not_supported:
+      response_code = ENGINE_ENOTSUP;
+      break;
+    case op_failed:
+      response_code = ENGINE_FAILED;
+      break;
   }
-  else return ENGINE_ENOTSUP;
+
+  return response_code;
 }
 
 
@@ -240,6 +253,7 @@ void * run_stockholm_commit_thread(void 
  */
 void * Scheduler_stockholm::run_ndb_commit_thread(int c) {
   workitem *item;
+  int polled;
   
   DEBUG_ENTER();
   
@@ -247,12 +261,12 @@ void * Scheduler_stockholm::run_ndb_comm
     /* Wait for something to appear on the queue */
     item = (workitem *) workqueue_consumer_wait(cluster[c].queue);
     
-    /* Sen & poll for response */
-    item->ndb_instance->db->sendPollNdb(WAITFOR_RESPONSE_TIMEOUT, 1, 1);
-    while(item->base.reschedule == 1) {
+    /* Send & poll for response; reschedule if needed */
+    do {
       item->base.reschedule = 0;
-      item->ndb_instance->db->sendPollNdb(WAITFOR_RESPONSE_TIMEOUT, 1, 1);
-    }
+      polled = item->ndb_instance->db->sendPollNdb(10, 1, 1);
+    } while(item->base.reschedule || ! polled);
+
     
     cluster[c].stats.cycles++;
     if(! (cluster[c].stats.cycles % STAT_INTERVAL)) 


Attachment: [text/bzr-bundle] bzr/john.duncan@oracle.com-20110517050720-e8lzjvy4csn31g81.bundle
Thread
bzr commit into mysql-5.1-telco-7.2 branch (john.duncan:4178) John David Duncan17 May