List:Commits« Previous MessageNext Message »
From:John David Duncan Date:May 17 2011 4:55am
Subject:bzr commit into mysql-5.1-telco-7.2 branch (john.duncan:4177)
View as plain text  
#At file:///Users/jdd/bzr-repo/working/cluster-7.2-labs-memcached/ based on revid:john.duncan@stripped

 4177 John David Duncan	2011-05-16
      Send NdbApi requests from the commit thread.

    modified:
      storage/ndb/memcache/include/Scheduler.h
      storage/ndb/memcache/include/ndb_worker.h
      storage/ndb/memcache/include/workitem.h
      storage/ndb/memcache/src/ClusterConnectionPool.cc
      storage/ndb/memcache/src/ndb_worker.cc
      storage/ndb/memcache/src/schedulers/Bulk.h
      storage/ndb/memcache/src/schedulers/Flex.h
      storage/ndb/memcache/src/schedulers/Flex_cluster.cc
      storage/ndb/memcache/src/schedulers/Stockholm.cc
      storage/ndb/memcache/src/schedulers/Stockholm.h
=== modified file 'storage/ndb/memcache/include/Scheduler.h'
--- a/storage/ndb/memcache/include/Scheduler.h	2011-04-08 08:14:11 +0000
+++ b/storage/ndb/memcache/include/Scheduler.h	2011-05-17 04:55:39 +0000
@@ -54,6 +54,10 @@ public:
       an Ndb object for the operation and send the workitem to be executed. */ 
   virtual ENGINE_ERROR_CODE schedule(workitem *) = 0;
 
+  /** when a workitem requires multiple NDB operations, reschedule() is used
+      to schedule the second and subsequent ones. */
+  virtual void reschedule(workitem *) const = 0;
+
   /** io_completed() is called from the NDB Engine thread when an IO
       completion notification has been received */
   virtual void io_completed(workitem *) = 0;

=== modified file 'storage/ndb/memcache/include/ndb_worker.h'
--- a/storage/ndb/memcache/include/ndb_worker.h	2011-04-06 04:31:28 +0000
+++ b/storage/ndb/memcache/include/ndb_worker.h	2011-05-17 04:55:39 +0000
@@ -21,9 +21,8 @@
 #define NDBMEMCACHE_NDB_WORKER_H
 
 
-/* worker_prepare_operation().
-   Returns TRUE if operation is prepared.
-   FALSE if it is not supported. 
+/* worker_prepare_operation():
+   Returns TRUE if an operation has been prepared with executeAsynchPrepare().
 */   
 bool worker_prepare_operation(workitem *);
 

=== modified file 'storage/ndb/memcache/include/workitem.h'
--- a/storage/ndb/memcache/include/workitem.h	2011-04-06 04:31:28 +0000
+++ b/storage/ndb/memcache/include/workitem.h	2011-05-17 04:55:39 +0000
@@ -50,7 +50,8 @@ typedef struct workitem {
     unsigned retries     : 3;  /*! how many times this job has been retried */
     unsigned complete    : 1;  /*! is this operation finished? */
     unsigned broker      : 2;  /*! for use by the flex scheduler */
-    unsigned _unused     : 2;  /*! (32 bits total) */
+    unsigned reschedule  : 1;  /*! inform scheduler to send and poll again */
+    unsigned cas_owner   : 1;  /*! set if this engine owns the CAS ID */
   } base;
   unsigned int id;
   struct workitem *previous;   /*! used to chain workitems in multi-key get */

=== modified file 'storage/ndb/memcache/src/ClusterConnectionPool.cc'
--- a/storage/ndb/memcache/src/ClusterConnectionPool.cc	2011-04-07 11:20:56 +0000
+++ b/storage/ndb/memcache/src/ClusterConnectionPool.cc	2011-05-17 04:55:39 +0000
@@ -27,6 +27,7 @@
 
 extern EXTENSION_LOGGER_DESCRIPTOR *logger;
 
+// TODO: Why doesn't this use member variable connect_string ??
 
 Ndb_cluster_connection * ClusterConnectionPool::connect(const char *connectstring) {
   DEBUG_ENTER();

=== modified file 'storage/ndb/memcache/src/ndb_worker.cc'
--- a/storage/ndb/memcache/src/ndb_worker.cc	2011-04-19 01:16:36 +0000
+++ b/storage/ndb/memcache/src/ndb_worker.cc	2011-05-17 04:55:39 +0000
@@ -46,6 +46,7 @@
 #include "NdbInstance.h"
 #include "status_block.h"
 #include "Operation.h"
+#include "Scheduler.h"
 #include "ndb_engine.h"
 #include "hash_item_util.h"
 #include "ndb_worker.h"
@@ -116,7 +117,10 @@ void worker_set_cas(ndb_pipeline *p, uin
   DEBUG_PRINT("hi:%lx lo:%lx cas:%llx (%llu)", cas_hi, cas_lo, *cas, *cas);
 }
   
-
+/* worker_prepare_operation(): 
+   Called from the scheduler. 
+   Returns true if executeAsynchPrepare() has been called on the item.
+*/
 bool worker_prepare_operation(workitem *newitem) {
   bool server_cas = (newitem->prefix_info.has_cas_col && newitem->cas);
   bool r;
@@ -148,7 +152,7 @@ bool worker_prepare_operation(workitem *
       return false;   /* not supported */
   }
 
-  return r;  /* fixme: distinguish "not supported" from "failed" */
+  return r;
 }
 
 
@@ -183,9 +187,8 @@ bool worker_do_delete(workitem *wqitem, 
     }
   }
 
-  /* NdbTransaction::executeAsynch() */   
-  tx->executeAsynch(NdbTransaction::Commit, DBcallback, (void *) wqitem,
-                    NdbOperation::DefaultAbortOption, 1);
+  /* Prepare for execution */   
+  tx->executeAsynchPrepare(NdbTransaction::Commit, DBcallback, (void *) wqitem);
   return true;
 }
 
@@ -285,9 +288,7 @@ bool worker_do_write(workitem *wqitem, b
     return false;
   }
 
-  /* NdbTransaction::executeAsynch() */   
-  tx->executeAsynch(NdbTransaction::Commit, DBcallback, (void *) wqitem,
-                    NdbOperation::DefaultAbortOption, 1);
+  tx->executeAsynchPrepare(NdbTransaction::Commit, DBcallback, (void *) wqitem);
   return true;
 }
 
@@ -321,15 +322,13 @@ bool worker_do_read(workitem *wqitem, bo
   /* Save the workitem in the transaction and prepare for async execution */   
   if(wqitem->base.verb == OPERATION_APPEND || wqitem->base.verb == OPERATION_PREPEND) 
   {
-    DEBUG_PRINT("In read() portion of APPEND.  Value = %s",  
+    DEBUG_PRINT("In read() portion of APPEND.  Value = %s", 
                 hash_item_get_data(wqitem->cache_item));
-    tx->executeAsynch(NdbTransaction::NoCommit, rewriteCallback, (void *) wqitem,
-                      NdbOperation::DefaultAbortOption, 1);
+    tx->executeAsynchPrepare(NdbTransaction::NoCommit, rewriteCallback, (void *) wqitem);
   }
   else 
   {
-    tx->executeAsynch(NdbTransaction::Commit, DBcallback, (void *) wqitem,
-                      NdbOperation::DefaultAbortOption, 1);
+    tx->executeAsynchPrepare(NdbTransaction::Commit, DBcallback, (void *) wqitem);
   }
 
   return true;
@@ -476,8 +475,7 @@ bool worker_do_math(workitem *wqitem, bo
     }
   }
 
-  tx->executeAsynch(NdbTransaction::Commit, incrCallback, (void *) wqitem,
-                    NdbOperation::DefaultAbortOption, 1);
+  tx->executeAsynchPrepare(NdbTransaction::Commit, incrCallback, (void *) wqitem);
   return true;
 }
 
@@ -630,14 +628,13 @@ void rewriteCallback(int result, NdbTran
     op.setColumnBigUnsigned(COL_STORE_CAS, * item->cas);
   ndb_op = op.updateTuple(tx);
 
-  /* Error case; operation has not been built */
   if(ndb_op) {
-    tx->executeAsynch(NdbTransaction::Commit, DBcallback, (void *) item,
-                      NdbOperation::DefaultAbortOption, 1);  
-    // fixme: this should call back into the scheduler!
-    item->ndb_instance->db->pollNdb();
+    // Inform the scheduler that this item must be re-polled
+    item->pipeline->scheduler->reschedule(item);
+    tx->executeAsynchPrepare(NdbTransaction::Commit, DBcallback, (void *) item);
   }
   else {
+    /* Error case; operation has not been built */
     DEBUG_PRINT("NDB operation failed.  workitem %d.%d", item->pipeline->id,
                 item->id);
     tx->close();

=== modified file 'storage/ndb/memcache/src/schedulers/Bulk.h'
--- a/storage/ndb/memcache/src/schedulers/Bulk.h	2011-04-19 01:16:36 +0000
+++ b/storage/ndb/memcache/src/schedulers/Bulk.h	2011-05-17 04:55:39 +0000
@@ -27,6 +27,7 @@
 #include <memcached/types.h>
 
 #include "config.h"
+#include "workitem.h"
 #include "Scheduler.h"
 #include "NdbInstance.h"
 
@@ -42,6 +43,7 @@ public:
   void init(int threadnum, int nthreads, const char *config_string);
   void attach_thread(thread_identifier *);
   ENGINE_ERROR_CODE schedule(workitem *);
+  void reschedule(workitem *) const;                                 // inlined
   void io_completed(workitem *);
   void add_stats(ADD_STAT, const void *);
   void * run_ndb_commit_thread(int thread_id);
@@ -61,5 +63,11 @@ private:  
 };
 
 
+inline void Scheduler_bulk::reschedule(workitem *item) const {
+  LockableNdbInstance * inst =  (LockableNdbInstance *) item->ndb_instance ;
+  assert(inst->is_locked);
+  inst->npending++;
+}
+
 #endif
 

=== modified file 'storage/ndb/memcache/src/schedulers/Flex.h'
--- a/storage/ndb/memcache/src/schedulers/Flex.h	2011-04-07 11:20:56 +0000
+++ b/storage/ndb/memcache/src/schedulers/Flex.h	2011-05-17 04:55:39 +0000
@@ -56,6 +56,7 @@ public:
   void init(int threadnum, int nthreads, const char *config_string);
   void attach_thread(thread_identifier *);
   ENGINE_ERROR_CODE schedule(workitem *);
+  void reschedule(workitem *) const;                                 // inlined
   void io_completed(workitem *);
   void add_stats(ADD_STAT, const void *);  
 
@@ -83,6 +84,11 @@ protected:
 };
 
 
+inline void Scheduler_flex::reschedule(workitem *item) const {
+  item->base.reschedule = 1;
+}
+
+
 /* Random stat samples will on average be taken twice as often as 
    FLEX_STATS_SAMPLE_INTERVAL (based on uniform random distribution). 
    FLEX_STATS_INITIAL_INTERVAL is lower so as to get stats quickly on startup.

=== modified file 'storage/ndb/memcache/src/schedulers/Flex_cluster.cc'
--- a/storage/ndb/memcache/src/schedulers/Flex_cluster.cc	2011-04-09 07:36:24 +0000
+++ b/storage/ndb/memcache/src/schedulers/Flex_cluster.cc	2011-05-17 04:55:39 +0000
@@ -162,8 +162,8 @@ ENGINE_ERROR_CODE Scheduler_flex::Cluste
   if(! newitem->plan) return ENGINE_FAILED;
   
   // Build the NDB transaction
-  if(worker_prepare_operation(newitem)) {
-    
+  if(worker_prepare_operation(newitem)) { 
+     
     if(do_queue_sample-- == 0) {   // Sample for statistics.
       /* To simulate sampling the depth immediately after the item is added,
        sample it now and add one.  */
@@ -174,11 +174,11 @@ ENGINE_ERROR_CODE Scheduler_flex::Cluste
         stats.queue_total_depth += depth;
         stats.queue_samples++;
       }
-      do_queue_sample = random() % FLEX_STATS_SAMPLE_INTERVAL;
+      do_queue_sample = (random() % FLEX_STATS_SAMPLE_INTERVAL) + 1;
     }
     
-    // Put the NdbInstance on the queue for the commit thread.     
-    workqueue_add(queue, inst);
+    // Put the workitem on the queue for the commit thread.
+    workqueue_add(queue, newitem);
     return ENGINE_EWOULDBLOCK;
   }
   else return ENGINE_ENOTSUP;
@@ -211,16 +211,20 @@ void Scheduler_flex::Cluster::contribute
 void * Scheduler_flex::Cluster::run_commit_thread() {
   DEBUG_ENTER();
   
-  NdbInstance *inst;
+  workitem *item;
   
   while(1) {
     /* Wait for something to appear on the queue */
-    inst = (NdbInstance *) workqueue_consumer_wait(queue);
+    item = (workitem *) workqueue_consumer_wait(queue);
     
-    if(inst == NULL) return 0;  /* queue has been shut down and emptied */
+    if(item == NULL) return 0;  /* queue has been shut down and emptied */
     
-    /* Poll */
-    inst->db->pollNdb();
+    /* Send & Poll */
+    item->ndb_instance->db->sendPollNdb(WAITFOR_RESPONSE_TIMEOUT, 1, 1);
+    while(item->base.reschedule) {
+      item->base.reschedule = 0;
+      item->ndb_instance->db->sendPollNdb(WAITFOR_RESPONSE_TIMEOUT, 1, 1);
+    }
   } 
 }
 

=== modified file 'storage/ndb/memcache/src/schedulers/Stockholm.cc'
--- a/storage/ndb/memcache/src/schedulers/Stockholm.cc	2011-04-19 01:16:36 +0000
+++ b/storage/ndb/memcache/src/schedulers/Stockholm.cc	2011-05-17 04:55:39 +0000
@@ -186,9 +186,7 @@ ENGINE_ERROR_CODE Scheduler_stockholm::s
   
   // Build the NDB transaction
   if(worker_prepare_operation(newitem)) {
-    // Put the NdbInstance on the queue for the commit thread.
-    // Should probably be the workitem?     
-    workqueue_add(cluster[c].queue, inst);
+    workqueue_add(cluster[c].queue, newitem); // place item on queue
     return ENGINE_EWOULDBLOCK;
   }
   else return ENGINE_ENOTSUP;
@@ -241,16 +239,20 @@ void * run_stockholm_commit_thread(void 
   Get an item off the workqueue, and call pollNdb() on that item.
  */
 void * Scheduler_stockholm::run_ndb_commit_thread(int c) {
-  NdbInstance *inst;
+  workitem *item;
   
   DEBUG_ENTER();
   
   while(1) {
     /* Wait for something to appear on the queue */
-    inst = (NdbInstance *) workqueue_consumer_wait(cluster[c].queue);
+    item = (workitem *) workqueue_consumer_wait(cluster[c].queue);
     
-    /* Poll */
-    inst->db->pollNdb();
+    /* Sen & poll for response */
+    item->ndb_instance->db->sendPollNdb(WAITFOR_RESPONSE_TIMEOUT, 1, 1);
+    while(item->base.reschedule == 1) {
+      item->base.reschedule = 0;
+      item->ndb_instance->db->sendPollNdb(WAITFOR_RESPONSE_TIMEOUT, 1, 1);
+    }
     
     cluster[c].stats.cycles++;
     if(! (cluster[c].stats.cycles % STAT_INTERVAL)) 

=== modified file 'storage/ndb/memcache/src/schedulers/Stockholm.h'
--- a/storage/ndb/memcache/src/schedulers/Stockholm.h	2011-04-07 11:20:56 +0000
+++ b/storage/ndb/memcache/src/schedulers/Stockholm.h	2011-05-17 04:55:39 +0000
@@ -28,6 +28,7 @@
 #include <memcached/types.h>
 
 #include "ndbmemcache_config.h"
+#include "workitem.h"
 #include "Scheduler.h"
 #include "KeyPrefix.h"
 
@@ -45,6 +46,7 @@ public:
   void init(int threadnum, int nthreads, const char *config_string);
   void attach_thread(thread_identifier *);
   ENGINE_ERROR_CODE schedule(workitem *);
+  void reschedule(workitem *) const;                                 // inlined
   void io_completed(workitem *);
   void add_stats(ADD_STAT, const void *);
   void * run_ndb_commit_thread(int cluster_id);
@@ -64,5 +66,10 @@ private:  
 };
 
 
+inline void Scheduler_stockholm::reschedule(workitem *item) const {
+  item->base.reschedule = 1;
+}
+
+
 #endif
 


Attachment: [text/bzr-bundle] bzr/john.duncan@oracle.com-20110517045539-bspq3zk083l4swtm.bundle
Thread
bzr commit into mysql-5.1-telco-7.2 branch (john.duncan:4177) John David Duncan17 May