List:Commits« Previous MessageNext Message »
From:John David Duncan Date:May 17 2011 5:23am
Subject:[Resend] bzr commit into mysql-5.1-telco-7.2 branch (john.duncan:4180)
View as plain text  
[This commit e-mail is a repeat.]

#At file:///Users/jdd/bzr-repo/working/cluster-7.2-labs-memcached/ based on
revid:john.duncan@stripped

 4180 John David Duncan	2011-05-16
      New Scheduler::yield() method.
      Fix a possible race condition in the Flex and Stockholm schedulers where
notify_io_complete()
      is called, and the worker thread invalidates an Ndb, before sendPollNdb() on that
Ndb has
      actually returned in the commit thread.  So, notify_io_complete() must be called
from the
      commit thread loop in Stockholm and Flex, but from the callback function in Bulk.

    modified:
      storage/ndb/memcache/include/Scheduler.h
      storage/ndb/memcache/src/ndb_worker.cc
      storage/ndb/memcache/src/schedulers/Bulk.cc
      storage/ndb/memcache/src/schedulers/Bulk.h
      storage/ndb/memcache/src/schedulers/Flex.h
      storage/ndb/memcache/src/schedulers/Flex_cluster.cc
      storage/ndb/memcache/src/schedulers/Stockholm.cc
      storage/ndb/memcache/src/schedulers/Stockholm.h
=== modified file 'storage/ndb/memcache/include/Scheduler.h'
--- a/storage/ndb/memcache/include/Scheduler.h	2011-05-17 04:55:39 +0000
+++ b/storage/ndb/memcache/include/Scheduler.h	2011-05-17 05:22:56 +0000
@@ -30,6 +30,7 @@
 #include "Configuration.h"
 #include "thread_identifier.h"
 
+
 /* Scheduler is an interface */
 
 class Scheduler {
@@ -54,10 +55,15 @@ public:
       an Ndb object for the operation and send the workitem to be executed. */ 
   virtual ENGINE_ERROR_CODE schedule(workitem *) = 0;
 
-  /** when a workitem requires multiple NDB operations, reschedule() is used
-      to schedule the second and subsequent ones. */
+  /** Before an NDB callback function completes, it must call either 
+      reschedule() or yield().  yield() indicates that work is comlpete. */
+  virtual void yield(workitem *) const = 0;
+
+  /** Before an NDB callback function completes, it must call either 
+      reschedule() or yield(). reschedule() indicates to that the workitem 
+      requires the scheduler to send & poll an additional operation. */
   virtual void reschedule(workitem *) const = 0;
-
+ 
   /** io_completed() is called from the NDB Engine thread when an IO
       completion notification has been received */
   virtual void io_completed(workitem *) = 0;

=== modified file 'storage/ndb/memcache/src/ndb_worker.cc'
--- a/storage/ndb/memcache/src/ndb_worker.cc	2011-05-17 05:07:20 +0000
+++ b/storage/ndb/memcache/src/ndb_worker.cc	2011-05-17 05:22:56 +0000
@@ -548,14 +548,14 @@ void DB_callback(int result, NdbTransact
   if(wqitem->base.is_sync) {
     wqitem->status = return_status;
     pipeline->engine->server.cookie->store_engine_specific(wqitem->cookie,
wqitem); 
-    pipeline->engine->server.cookie->notify_io_complete(wqitem->cookie,
io_status);    
+    pipeline->scheduler->yield(wqitem);
   }
   else {
     /* The workitem was allocated back in the engine thread; if used in a
        callback, it would be freed there, too.  But we must free it here.
     */
     pipeline->engine->server.cookie->store_engine_specific(wqitem->cookie,
wqitem->previous);
-    pipeline_io_completed(pipeline, wqitem);
+    pipeline->scheduler->io_completed(wqitem);
     workitem_free(wqitem);
   }
 }
@@ -571,7 +571,7 @@ void rewrite_callback(int result, NdbTra
     item->status = & status_block_bad_replace;
     tx->close();
    
item->pipeline->engine->server.cookie->store_engine_specific(item->cookie,
item); 
-   
item->pipeline->engine->server.cookie->notify_io_complete(item->cookie,
ENGINE_SUCCESS); 
+    item->pipeline->scheduler->yield(item);
     return;
   }
   else if(tx->getNdbError().classification != NdbError::NoError) {
@@ -734,13 +734,13 @@ void incr_callback(int result, NdbTransa
   if(wqitem->base.is_sync) {
     wqitem->status = return_status;
     pipeline->engine->server.cookie->store_engine_specific(wqitem->cookie,
wqitem); 
-    pipeline->engine->server.cookie->notify_io_complete(wqitem->cookie,
io_status);    
+    pipeline->scheduler->yield(wqitem);    
   }
   else {
     /* The workitem was allocated back in the engine thread; if used in a
        callback, it would be freed there, too.  But we must free it here.  */
     pipeline->engine->server.cookie->store_engine_specific(wqitem->cookie,
wqitem->previous);
-    pipeline_io_completed(pipeline, wqitem);
+    pipeline->scheduler->io_completed(wqitem);
     workitem_free(wqitem);
   }
 }

=== modified file 'storage/ndb/memcache/src/schedulers/Bulk.cc'
--- a/storage/ndb/memcache/src/schedulers/Bulk.cc	2011-05-17 05:07:20 +0000
+++ b/storage/ndb/memcache/src/schedulers/Bulk.cc	2011-05-17 05:22:56 +0000
@@ -283,3 +283,17 @@ void * Scheduler_bulk::run_ndb_commit_th
       cluster[c].stats.vtime = get_thread_vtime(); 
   } 
 }
+
+
+void Scheduler_bulk::reschedule(workitem *item) const {
+  LockableNdbInstance * inst =  (LockableNdbInstance *) item->ndb_instance ;
+  DEBUG_ASSERT(inst->is_locked);
+  inst->npending++;
+}
+
+
+void Scheduler_bulk::yield(workitem *item) const {
+  /* In this scheduler, a small number of Ndb objects are shared, 
+     so we call notify_io_complete() while the callback is still running. */
+  pipeline->engine->server.cookie->notify_io_complete(item->cookie,
ENGINE_SUCCESS);
+}

=== modified file 'storage/ndb/memcache/src/schedulers/Bulk.h'
--- a/storage/ndb/memcache/src/schedulers/Bulk.h	2011-05-17 04:55:39 +0000
+++ b/storage/ndb/memcache/src/schedulers/Bulk.h	2011-05-17 05:22:56 +0000
@@ -43,7 +43,8 @@ public:
   void init(int threadnum, int nthreads, const char *config_string);
   void attach_thread(thread_identifier *);
   ENGINE_ERROR_CODE schedule(workitem *);
-  void reschedule(workitem *) const;                                 // inlined
+  void yield(workitem *) const;
+  void reschedule(workitem *) const;
   void io_completed(workitem *);
   void add_stats(ADD_STAT, const void *);
   void * run_ndb_commit_thread(int thread_id);
@@ -63,11 +64,5 @@ private:  
 };
 
 
-inline void Scheduler_bulk::reschedule(workitem *item) const {
-  LockableNdbInstance * inst =  (LockableNdbInstance *) item->ndb_instance ;
-  assert(inst->is_locked);
-  inst->npending++;
-}
-
 #endif
 

=== modified file 'storage/ndb/memcache/src/schedulers/Flex.h'
--- a/storage/ndb/memcache/src/schedulers/Flex.h	2011-05-17 04:55:39 +0000
+++ b/storage/ndb/memcache/src/schedulers/Flex.h	2011-05-17 05:22:56 +0000
@@ -56,7 +56,8 @@ public:
   void init(int threadnum, int nthreads, const char *config_string);
   void attach_thread(thread_identifier *);
   ENGINE_ERROR_CODE schedule(workitem *);
-  void reschedule(workitem *) const;                                 // inlined
+  void yield(workitem *) const;                                       // inlined
+  void reschedule(workitem *) const;                                  // inlined
   void io_completed(workitem *);
   void add_stats(ADD_STAT, const void *);  
 
@@ -88,6 +89,8 @@ inline void Scheduler_flex::reschedule(w
   item->base.reschedule = 1;
 }
 
+inline void Scheduler_flex::yield(workitem *item) const { }
+
 
 /* Random stat samples will on average be taken twice as often as 
    FLEX_STATS_SAMPLE_INTERVAL (based on uniform random distribution). 

=== modified file 'storage/ndb/memcache/src/schedulers/Flex_cluster.cc'
--- a/storage/ndb/memcache/src/schedulers/Flex_cluster.cc	2011-05-17 05:15:50 +0000
+++ b/storage/ndb/memcache/src/schedulers/Flex_cluster.cc	2011-05-17 05:22:56 +0000
@@ -234,6 +234,10 @@ void * Scheduler_flex::Cluster::run_comm
       item->base.reschedule = 0;
       polled = item->ndb_instance->db->sendPollNdb(10, 1, 1);
     } while(item->base.reschedule || ! polled);
+
+    /* Now that sendPollNdb() has returned, it is OK to notify_io_complete(),
+       which will trigger the worker thread to release the Ndb instance. */ 
+   
item->pipeline->engine->server.cookie->notify_io_complete(item->cookie,
ENGINE_SUCCESS);
   } 
 
   return NULL;

=== modified file 'storage/ndb/memcache/src/schedulers/Stockholm.cc'
--- a/storage/ndb/memcache/src/schedulers/Stockholm.cc	2011-05-17 05:15:50 +0000
+++ b/storage/ndb/memcache/src/schedulers/Stockholm.cc	2011-05-17 05:22:56 +0000
@@ -269,9 +269,13 @@ void * Scheduler_stockholm::run_ndb_comm
       polled = item->ndb_instance->db->sendPollNdb(10, 1, 1);
     } while(item->base.reschedule || ! polled);
 
+    DEBUG_ASSERT(polled == 1);  // i.e. not > 1
     
-    cluster[c].stats.cycles++;
-    if(! (cluster[c].stats.cycles % STAT_INTERVAL)) 
+    /* Now that sendPollNdb() has returned, it is OK to notify_io_complete(),
+       which will trigger the worker thread to release the Ndb instance. */ 
+    pipeline->engine->server.cookie->notify_io_complete(item->cookie,
ENGINE_SUCCESS);
+    
+    if(! (cluster[c].stats.cycles++ % STAT_INTERVAL)) 
       cluster[c].stats.commit_thread_vtime = get_thread_vtime();
   } 
 

=== modified file 'storage/ndb/memcache/src/schedulers/Stockholm.h'
--- a/storage/ndb/memcache/src/schedulers/Stockholm.h	2011-05-17 04:55:39 +0000
+++ b/storage/ndb/memcache/src/schedulers/Stockholm.h	2011-05-17 05:22:56 +0000
@@ -46,7 +46,8 @@ public:
   void init(int threadnum, int nthreads, const char *config_string);
   void attach_thread(thread_identifier *);
   ENGINE_ERROR_CODE schedule(workitem *);
-  void reschedule(workitem *) const;                                 // inlined
+  void yield(workitem *) const;                                       // inlined
+  void reschedule(workitem *) const;                                  // inlined
   void io_completed(workitem *);
   void add_stats(ADD_STAT, const void *);
   void * run_ndb_commit_thread(int cluster_id);
@@ -71,5 +72,8 @@ inline void Scheduler_stockholm::resched
 }
 
 
+inline void Scheduler_stockholm::yield(workitem *item) const { } 
+
+
 #endif
 


Attachment: [text/bzr-bundle] bzr/john.duncan@oracle.com-20110517052256-1jkk9mga5228b3xb.bundle
Thread
[Resend] bzr commit into mysql-5.1-telco-7.2 branch (john.duncan:4180) John David Duncan17 May