From: John David Duncan Date: May 17 2011 6:12pm Subject: bzr push into mysql-5.1-telco-7.2 branch (john.duncan:4176 to 4181) List-Archive: http://lists.mysql.com/commits/137558 Message-Id: <201105171812.p4HICJJp025922@acsmt358.oracle.com> MIME-Version: 1.0 Content-Type: text/plain; charset="us-ascii" Content-Transfer-Encoding: 7bit 4181 John David Duncan 2011-05-16 Support writing multiple columns from tab-separated values modified: storage/ndb/memcache/src/ndb_worker.cc 4180 John David Duncan 2011-05-16 New Scheduler::yield() method. Fix a possible race condition in the Flex and Stockholm schedulers where notify_io_complete() is called, and the worker thread invalidates an Ndb, before sendPollNdb() on that Ndb has actually returned in the commit thread. So, notify_io_complete() must be called from the commit thread loop in Stockholm and Flex, but from the callback function in Bulk. modified: storage/ndb/memcache/include/Scheduler.h storage/ndb/memcache/src/ndb_worker.cc storage/ndb/memcache/src/schedulers/Bulk.cc storage/ndb/memcache/src/schedulers/Bulk.h storage/ndb/memcache/src/schedulers/Flex.h storage/ndb/memcache/src/schedulers/Flex_cluster.cc storage/ndb/memcache/src/schedulers/Stockholm.cc storage/ndb/memcache/src/schedulers/Stockholm.h 4179 John David Duncan 2011-05-16 Minor edits to unify stockholm & flex commit thread loops. modified: storage/ndb/memcache/src/schedulers/Flex_cluster.cc storage/ndb/memcache/src/schedulers/Stockholm.cc 4178 John David Duncan 2011-05-16 worker_prepare_operation() now returns op_status_t rather than bool. modified: storage/ndb/memcache/include/ndb_worker.h storage/ndb/memcache/include/ndbmemcache_global.h storage/ndb/memcache/src/ndb_worker.cc storage/ndb/memcache/src/schedulers/Bulk.cc storage/ndb/memcache/src/schedulers/Flex_cluster.cc storage/ndb/memcache/src/schedulers/Stockholm.cc 4177 John David Duncan 2011-05-16 Send NdbApi requests from the commit thread. modified: storage/ndb/memcache/include/Scheduler.h storage/ndb/memcache/include/ndb_worker.h storage/ndb/memcache/include/workitem.h storage/ndb/memcache/src/ClusterConnectionPool.cc storage/ndb/memcache/src/ndb_worker.cc storage/ndb/memcache/src/schedulers/Bulk.h storage/ndb/memcache/src/schedulers/Flex.h storage/ndb/memcache/src/schedulers/Flex_cluster.cc storage/ndb/memcache/src/schedulers/Stockholm.cc storage/ndb/memcache/src/schedulers/Stockholm.h 4176 John David Duncan 2011-05-15 New TabSeparatedValues class added: storage/ndb/memcache/include/TabSeparatedValues.h storage/ndb/memcache/src/TabSeparatedValues.cc storage/ndb/memcache/unit/tsv.cc modified: storage/ndb/memcache/Makefile.am storage/ndb/memcache/unit/Makefile.am storage/ndb/memcache/unit/all_tests.h storage/ndb/memcache/unit/harness.cc === modified file 'storage/ndb/memcache/include/Scheduler.h' --- a/storage/ndb/memcache/include/Scheduler.h 2011-04-08 08:14:11 +0000 +++ b/storage/ndb/memcache/include/Scheduler.h 2011-05-17 05:22:56 +0000 @@ -30,6 +30,7 @@ #include "Configuration.h" #include "thread_identifier.h" + /* Scheduler is an interface */ class Scheduler { @@ -54,6 +55,15 @@ public: an Ndb object for the operation and send the workitem to be executed. */ virtual ENGINE_ERROR_CODE schedule(workitem *) = 0; + /** Before an NDB callback function completes, it must call either + reschedule() or yield(). yield() indicates that work is comlpete. */ + virtual void yield(workitem *) const = 0; + + /** Before an NDB callback function completes, it must call either + reschedule() or yield(). reschedule() indicates to that the workitem + requires the scheduler to send & poll an additional operation. */ + virtual void reschedule(workitem *) const = 0; + /** io_completed() is called from the NDB Engine thread when an IO completion notification has been received */ virtual void io_completed(workitem *) = 0; === modified file 'storage/ndb/memcache/include/ndb_worker.h' --- a/storage/ndb/memcache/include/ndb_worker.h 2011-04-06 04:31:28 +0000 +++ b/storage/ndb/memcache/include/ndb_worker.h 2011-05-17 05:07:20 +0000 @@ -21,11 +21,7 @@ #define NDBMEMCACHE_NDB_WORKER_H -/* worker_prepare_operation(). - Returns TRUE if operation is prepared. - FALSE if it is not supported. -*/ -bool worker_prepare_operation(workitem *); +op_status_t worker_prepare_operation(workitem *); bool build_hash_item(workitem *, Operation &); === modified file 'storage/ndb/memcache/include/ndbmemcache_global.h' --- a/storage/ndb/memcache/include/ndbmemcache_global.h 2011-04-06 04:31:28 +0000 +++ b/storage/ndb/memcache/include/ndbmemcache_global.h 2011-05-17 05:07:20 +0000 @@ -52,4 +52,13 @@ enum { OP_FLUSH }; +/* Operation Status enums */ +typedef enum { + op_not_supported, + op_failed, + op_async_prepared, + op_async_sent +} op_status_t; + + #endif === modified file 'storage/ndb/memcache/include/workitem.h' --- a/storage/ndb/memcache/include/workitem.h 2011-04-06 04:31:28 +0000 +++ b/storage/ndb/memcache/include/workitem.h 2011-05-17 04:55:39 +0000 @@ -50,7 +50,8 @@ typedef struct workitem { unsigned retries : 3; /*! how many times this job has been retried */ unsigned complete : 1; /*! is this operation finished? */ unsigned broker : 2; /*! for use by the flex scheduler */ - unsigned _unused : 2; /*! (32 bits total) */ + unsigned reschedule : 1; /*! inform scheduler to send and poll again */ + unsigned cas_owner : 1; /*! set if this engine owns the CAS ID */ } base; unsigned int id; struct workitem *previous; /*! used to chain workitems in multi-key get */ === modified file 'storage/ndb/memcache/src/ClusterConnectionPool.cc' --- a/storage/ndb/memcache/src/ClusterConnectionPool.cc 2011-04-07 11:20:56 +0000 +++ b/storage/ndb/memcache/src/ClusterConnectionPool.cc 2011-05-17 04:55:39 +0000 @@ -27,6 +27,7 @@ extern EXTENSION_LOGGER_DESCRIPTOR *logger; +// TODO: Why doesn't this use member variable connect_string ?? Ndb_cluster_connection * ClusterConnectionPool::connect(const char *connectstring) { DEBUG_ENTER(); === modified file 'storage/ndb/memcache/src/ndb_worker.cc' --- a/storage/ndb/memcache/src/ndb_worker.cc 2011-04-19 01:16:36 +0000 +++ b/storage/ndb/memcache/src/ndb_worker.cc 2011-05-17 05:25:51 +0000 @@ -41,25 +41,27 @@ #include "workitem.h" #include "Configuration.h" #include "DataTypeHandler.h" +#include "TabSeparatedValues.h" #include "debug.h" #include "Operation.h" #include "NdbInstance.h" #include "status_block.h" #include "Operation.h" +#include "Scheduler.h" #include "ndb_engine.h" #include "hash_item_util.h" #include "ndb_worker.h" typedef void ndb_async_callback(int, NdbTransaction *, void *); -ndb_async_callback incrCallback; // callback for incr/decr -ndb_async_callback rewriteCallback; // callback for append/prepend -ndb_async_callback DBcallback; // callback for all others +ndb_async_callback incr_callback; // callback for incr/decr +ndb_async_callback rewrite_callback; // callback for append/prepend +ndb_async_callback DB_callback; // callback for all others -bool worker_do_read(workitem *, bool with_cas); -bool worker_do_write(workitem *, bool with_cas); -bool worker_do_delete(workitem *, bool with_cas); -bool worker_do_math(workitem *wqitem, bool with_cas); +op_status_t worker_do_read(workitem *, bool with_cas); +op_status_t worker_do_write(workitem *, bool with_cas); +op_status_t worker_do_delete(workitem *, bool with_cas); +op_status_t worker_do_math(workitem *wqitem, bool with_cas); void worker_set_cas(ndb_pipeline *, uint64_t *); bool finalize_read(workitem *); @@ -116,17 +118,29 @@ void worker_set_cas(ndb_pipeline *p, uin DEBUG_PRINT("hi:%lx lo:%lx cas:%llx (%llu)", cas_hi, cas_lo, *cas, *cas); } - -bool worker_prepare_operation(workitem *newitem) { +/* worker_prepare_operation(): + Called from the scheduler. + Returns true if executeAsynchPrepare() has been called on the item. +*/ +op_status_t worker_prepare_operation(workitem *newitem) { bool server_cas = (newitem->prefix_info.has_cas_col && newitem->cas); - bool r; + op_status_t r; /* Jump table */ switch(newitem->base.verb) { case OP_READ: + r = worker_do_read(newitem, server_cas); + break; + case OPERATION_APPEND: case OPERATION_PREPEND: - r = worker_do_read(newitem, server_cas); + if(newitem->plan->spec->nvaluecols > 1) { + /* APPEND/PREPEND is currently not supported for tsv */ + r = op_not_supported; + } + else { + r = worker_do_read(newitem, server_cas); + } break; case OP_DELETE: @@ -145,14 +159,14 @@ bool worker_prepare_operation(workitem * break; default: - return false; /* not supported */ + r= op_not_supported; } - return r; /* fixme: distinguish "not supported" from "failed" */ + return r; } -bool worker_do_delete(workitem *wqitem, bool server_cas) { +op_status_t worker_do_delete(workitem *wqitem, bool server_cas) { DEBUG_ENTER(); QueryPlan *plan = wqitem->plan; @@ -179,18 +193,17 @@ bool worker_do_delete(workitem *wqitem, if(err.status != NdbError::Success) { logger->log(LOG_WARNING, 0, "deleteTuple(): %s\n", err.message); tx->close(); - return false; + return op_failed; } } - /* NdbTransaction::executeAsynch() */ - tx->executeAsynch(NdbTransaction::Commit, DBcallback, (void *) wqitem, - NdbOperation::DefaultAbortOption, 1); - return true; + /* Prepare for execution */ + tx->executeAsynchPrepare(NdbTransaction::Commit, DB_callback, (void *) wqitem); + return op_async_prepared; } -bool worker_do_write(workitem *wqitem, bool server_cas) { +op_status_t worker_do_write(workitem *wqitem, bool server_cas) { DEBUG_PRINT("%s", workitem_get_operation(wqitem)); uint64_t cas_in = *wqitem->cas; // read old value @@ -212,12 +225,33 @@ bool worker_do_write(workitem *wqitem, b /* Set the row */ op.clearNullBits(); - op.setColumn(COL_STORE_KEY, dbkey, wqitem->base.nsuffix); - op.setColumn(COL_STORE_VALUE, hash_item_get_data(wqitem->cache_item), - wqitem->cache_item->nbytes); // the value + op.setColumn(COL_STORE_KEY, dbkey, wqitem->base.nsuffix); + + if(plan->spec->nvaluecols > 1) { + /* Multiple Value Columns */ + TabSeparatedValues tsv(hash_item_get_data(wqitem->cache_item), + plan->spec->nvaluecols, wqitem->cache_item->nbytes); + int idx = 0; + do { + if(tsv.getLength()) { + op.setColumn(COL_STORE_VALUE+idx, tsv.getPointer(), tsv.getLength()); + } + else { + op.setColumnNull(COL_STORE_VALUE+idx); + } + idx++; + } while (tsv.advance()); + } + else { + /* Just one value column */ + op.setColumn(COL_STORE_VALUE, hash_item_get_data(wqitem->cache_item), + wqitem->cache_item->nbytes); + } + if(server_cas) { op.setColumnBigUnsigned(COL_STORE_CAS, * wqitem->cas); // the cas } + if(wqitem->plan->dup_numbers) { if(isdigit(* hash_item_get_data(wqitem->cache_item)) && wqitem->cache_item->nbytes < 32) { // Copy string representation @@ -282,17 +316,15 @@ bool worker_do_write(workitem *wqitem, b DEBUG_PRINT("NDB operation failed. workitem %d.%d", wqitem->pipeline->id, wqitem->id); tx->close(); - return false; + return op_failed; } - /* NdbTransaction::executeAsynch() */ - tx->executeAsynch(NdbTransaction::Commit, DBcallback, (void *) wqitem, - NdbOperation::DefaultAbortOption, 1); - return true; + tx->executeAsynchPrepare(NdbTransaction::Commit, DB_callback, (void *) wqitem); + return op_async_prepared; } -bool worker_do_read(workitem *wqitem, bool server_cas) { +op_status_t worker_do_read(workitem *wqitem, bool server_cas) { DEBUG_ENTER(); QueryPlan *plan = wqitem->plan; @@ -315,28 +347,26 @@ bool worker_do_read(workitem *wqitem, bo if(! op.readTuple(tx)) { logger->log(LOG_WARNING, 0, "readTuple(): %s\n", tx->getNdbError().message); tx->close(); - return false; + return op_failed; } /* Save the workitem in the transaction and prepare for async execution */ if(wqitem->base.verb == OPERATION_APPEND || wqitem->base.verb == OPERATION_PREPEND) { - DEBUG_PRINT("In read() portion of APPEND. Value = %s", + DEBUG_PRINT("In read() portion of APPEND. Value = %s", hash_item_get_data(wqitem->cache_item)); - tx->executeAsynch(NdbTransaction::NoCommit, rewriteCallback, (void *) wqitem, - NdbOperation::DefaultAbortOption, 1); + tx->executeAsynchPrepare(NdbTransaction::NoCommit, rewrite_callback, (void *) wqitem); } else { - tx->executeAsynch(NdbTransaction::Commit, DBcallback, (void *) wqitem, - NdbOperation::DefaultAbortOption, 1); + tx->executeAsynchPrepare(NdbTransaction::Commit, DB_callback, (void *) wqitem); } - return true; + return op_async_prepared; } -bool worker_do_math(workitem *wqitem, bool server_cas) { +op_status_t worker_do_math(workitem *wqitem, bool server_cas) { DEBUG_PRINT("create: %d retries: %d", wqitem->base.math_create, wqitem->base.retries); worker_set_cas(wqitem->pipeline, wqitem->cas); @@ -409,7 +439,7 @@ bool worker_do_math(workitem *wqitem, bo if(! ndbop1) { logger->log(LOG_WARNING, 0, "readMasked(): %s\n", tx->getNdbError().message); tx->close(); - return false; + return op_failed; } } @@ -432,7 +462,7 @@ bool worker_do_math(workitem *wqitem, bo if(! ndbop2) { logger->log(LOG_WARNING, 0, "insertMasked(): %s\n", tx->getNdbError().message); tx->close(); - return false; + return op_failed; } } @@ -472,17 +502,16 @@ bool worker_do_math(workitem *wqitem, bo if(! ndbop3) { logger->log(LOG_WARNING, 0, "updateInterpreted(): %s\n", tx->getNdbError().message); tx->close(); - return false; + return op_failed; } } - tx->executeAsynch(NdbTransaction::Commit, incrCallback, (void *) wqitem, - NdbOperation::DefaultAbortOption, 1); - return true; + tx->executeAsynchPrepare(NdbTransaction::Commit, incr_callback, (void *) wqitem); + return op_async_prepared; } -void DBcallback(int result, NdbTransaction *tx, void *itemptr) { +void DB_callback(int result, NdbTransaction *tx, void *itemptr) { workitem *wqitem = (workitem *) itemptr; ndb_pipeline * & pipeline = wqitem->pipeline; status_block * return_status; @@ -550,21 +579,21 @@ void DBcallback(int result, NdbTransacti if(wqitem->base.is_sync) { wqitem->status = return_status; pipeline->engine->server.cookie->store_engine_specific(wqitem->cookie, wqitem); - pipeline->engine->server.cookie->notify_io_complete(wqitem->cookie, io_status); + pipeline->scheduler->yield(wqitem); } else { /* The workitem was allocated back in the engine thread; if used in a callback, it would be freed there, too. But we must free it here. */ pipeline->engine->server.cookie->store_engine_specific(wqitem->cookie, wqitem->previous); - pipeline_io_completed(pipeline, wqitem); + pipeline->scheduler->io_completed(wqitem); workitem_free(wqitem); } } /* Middle-step callback for APPEND and PREPEND */ -void rewriteCallback(int result, NdbTransaction *tx, void *itemptr) { +void rewrite_callback(int result, NdbTransaction *tx, void *itemptr) { workitem *item = (workitem *) itemptr; DEBUG_PRINT("%d.%d", item->pipeline->id, item->id); @@ -573,11 +602,11 @@ void rewriteCallback(int result, NdbTran item->status = & status_block_bad_replace; tx->close(); item->pipeline->engine->server.cookie->store_engine_specific(item->cookie, item); - item->pipeline->engine->server.cookie->notify_io_complete(item->cookie, ENGINE_SUCCESS); + item->pipeline->scheduler->yield(item); return; } else if(tx->getNdbError().classification != NdbError::NoError) { - return DBcallback(result, tx, itemptr); + return DB_callback(result, tx, itemptr); } /* Strings and lengths: */ @@ -630,14 +659,13 @@ void rewriteCallback(int result, NdbTran op.setColumnBigUnsigned(COL_STORE_CAS, * item->cas); ndb_op = op.updateTuple(tx); - /* Error case; operation has not been built */ if(ndb_op) { - tx->executeAsynch(NdbTransaction::Commit, DBcallback, (void *) item, - NdbOperation::DefaultAbortOption, 1); - // fixme: this should call back into the scheduler! - item->ndb_instance->db->pollNdb(); + // Inform the scheduler that this item must be re-polled + item->pipeline->scheduler->reschedule(item); + tx->executeAsynchPrepare(NdbTransaction::Commit, DB_callback, (void *) item); } else { + /* Error case; operation has not been built */ DEBUG_PRINT("NDB operation failed. workitem %d.%d", item->pipeline->id, item->id); tx->close(); @@ -649,7 +677,7 @@ void rewriteCallback(int result, NdbTran /* Dedicated callback function for INCR and DECR operations */ -void incrCallback(int result, NdbTransaction *tx, void *itemptr) { +void incr_callback(int result, NdbTransaction *tx, void *itemptr) { workitem *wqitem = (workitem *) itemptr; ndb_pipeline * & pipeline = wqitem->pipeline; status_block * return_status; @@ -737,13 +765,13 @@ void incrCallback(int result, NdbTransac if(wqitem->base.is_sync) { wqitem->status = return_status; pipeline->engine->server.cookie->store_engine_specific(wqitem->cookie, wqitem); - pipeline->engine->server.cookie->notify_io_complete(wqitem->cookie, io_status); + pipeline->scheduler->yield(wqitem); } else { /* The workitem was allocated back in the engine thread; if used in a callback, it would be freed there, too. But we must free it here. */ pipeline->engine->server.cookie->store_engine_specific(wqitem->cookie, wqitem->previous); - pipeline_io_completed(pipeline, wqitem); + pipeline->scheduler->io_completed(wqitem); workitem_free(wqitem); } } === modified file 'storage/ndb/memcache/src/schedulers/Bulk.cc' --- a/storage/ndb/memcache/src/schedulers/Bulk.cc 2011-04-21 08:05:13 +0000 +++ b/storage/ndb/memcache/src/schedulers/Bulk.cc 2011-05-17 05:22:56 +0000 @@ -129,7 +129,6 @@ ENGINE_ERROR_CODE Scheduler_bulk::schedu DEBUG_ENTER(); const Configuration & conf = get_Configuration(); // see note int lockerr; - ENGINE_ERROR_CODE response_code = ENGINE_EWOULDBLOCK; /* Fetch the config for its key prefix */ const KeyPrefix *pfx = conf.getPrefixByInfo(newitem->prefix_info); @@ -164,18 +163,22 @@ ENGINE_ERROR_CODE Scheduler_bulk::schedu workitem_set_NdbInstance(newitem, inst); // Build the NDB transaction - if(worker_prepare_operation(newitem)) { - inst->npending++; + ENGINE_ERROR_CODE response_code; + op_status_t op_status = worker_prepare_operation(newitem); + switch(op_status) { + case op_async_prepared: + case op_async_sent: + inst->npending++; + response_code = ENGINE_EWOULDBLOCK; + break; + case op_not_supported: + response_code = ENGINE_ENOTSUP; + break; + case op_failed: + response_code = ENGINE_FAILED; + break; } - else { - /******* Error handling. The operation is not supported. */ - if(newitem->base.is_sync) { - newitem->status = & status_block_unsupported; - pipeline->engine->server.cookie->store_engine_specific(newitem->cookie, newitem); - pipeline->engine->server.cookie->notify_io_complete(newitem->cookie, ENGINE_ENOTSUP); - } - } - + inst->unlock(); return response_code; @@ -280,3 +283,17 @@ void * Scheduler_bulk::run_ndb_commit_th cluster[c].stats.vtime = get_thread_vtime(); } } + + +void Scheduler_bulk::reschedule(workitem *item) const { + LockableNdbInstance * inst = (LockableNdbInstance *) item->ndb_instance ; + DEBUG_ASSERT(inst->is_locked); + inst->npending++; +} + + +void Scheduler_bulk::yield(workitem *item) const { + /* In this scheduler, a small number of Ndb objects are shared, + so we call notify_io_complete() while the callback is still running. */ + pipeline->engine->server.cookie->notify_io_complete(item->cookie, ENGINE_SUCCESS); +} === modified file 'storage/ndb/memcache/src/schedulers/Bulk.h' --- a/storage/ndb/memcache/src/schedulers/Bulk.h 2011-04-19 01:16:36 +0000 +++ b/storage/ndb/memcache/src/schedulers/Bulk.h 2011-05-17 05:22:56 +0000 @@ -27,6 +27,7 @@ #include #include "config.h" +#include "workitem.h" #include "Scheduler.h" #include "NdbInstance.h" @@ -42,6 +43,8 @@ public: void init(int threadnum, int nthreads, const char *config_string); void attach_thread(thread_identifier *); ENGINE_ERROR_CODE schedule(workitem *); + void yield(workitem *) const; + void reschedule(workitem *) const; void io_completed(workitem *); void add_stats(ADD_STAT, const void *); void * run_ndb_commit_thread(int thread_id); === modified file 'storage/ndb/memcache/src/schedulers/Flex.h' --- a/storage/ndb/memcache/src/schedulers/Flex.h 2011-04-07 11:20:56 +0000 +++ b/storage/ndb/memcache/src/schedulers/Flex.h 2011-05-17 05:22:56 +0000 @@ -56,6 +56,8 @@ public: void init(int threadnum, int nthreads, const char *config_string); void attach_thread(thread_identifier *); ENGINE_ERROR_CODE schedule(workitem *); + void yield(workitem *) const; // inlined + void reschedule(workitem *) const; // inlined void io_completed(workitem *); void add_stats(ADD_STAT, const void *); @@ -83,6 +85,13 @@ protected: }; +inline void Scheduler_flex::reschedule(workitem *item) const { + item->base.reschedule = 1; +} + +inline void Scheduler_flex::yield(workitem *item) const { } + + /* Random stat samples will on average be taken twice as often as FLEX_STATS_SAMPLE_INTERVAL (based on uniform random distribution). FLEX_STATS_INITIAL_INTERVAL is lower so as to get stats quickly on startup. === modified file 'storage/ndb/memcache/src/schedulers/Flex_cluster.cc' --- a/storage/ndb/memcache/src/schedulers/Flex_cluster.cc 2011-04-09 07:36:24 +0000 +++ b/storage/ndb/memcache/src/schedulers/Flex_cluster.cc 2011-05-17 05:22:56 +0000 @@ -157,31 +157,40 @@ ENGINE_ERROR_CODE Scheduler_flex::Cluste workitem_set_NdbInstance(newitem, inst); - // Fetch the query plan for this prefix. + /* Fetch the query plan for this prefix. */ newitem->plan = inst->getPlanForPrefix(pfx); if(! newitem->plan) return ENGINE_FAILED; - // Build the NDB transaction - if(worker_prepare_operation(newitem)) { - - if(do_queue_sample-- == 0) { // Sample for statistics. - /* To simulate sampling the depth immediately after the item is added, - sample it now and add one. */ - int depth = queue->depth + 1; - /* Queue depth can be erroneous because of thread races. - Use it only if it looks valid. */ - if(depth > 0 && depth <= queue_size) { - stats.queue_total_depth += depth; - stats.queue_samples++; - } - do_queue_sample = random() % FLEX_STATS_SAMPLE_INTERVAL; + /* Sample for statistics */ + if(do_queue_sample-- == 0) { + int depth = queue->depth + 1; + // Depth can be erroneous due to thread races; use it only if it looks valid + if(depth > 0 && depth <= queue_size) { + stats.queue_total_depth += depth; + stats.queue_samples++; } - - // Put the NdbInstance on the queue for the commit thread. - workqueue_add(queue, inst); - return ENGINE_EWOULDBLOCK; + do_queue_sample = (random() % FLEX_STATS_SAMPLE_INTERVAL) + 1; + } + + /* Build the NDB transaction */ + ENGINE_ERROR_CODE response_code; + op_status_t op_status = worker_prepare_operation(newitem); + + switch(op_status) { + case op_async_prepared: + case op_async_sent: + workqueue_add(queue, newitem); // place item on queue + response_code = ENGINE_EWOULDBLOCK; + break; + case op_not_supported: + response_code = ENGINE_ENOTSUP; + break; + case op_failed: + response_code = ENGINE_FAILED; + break; } - else return ENGINE_ENOTSUP; + + return response_code; } @@ -209,19 +218,29 @@ void Scheduler_flex::Cluster::contribute Commit thread: Get an NdbInstance off the workqueue, and call pollNdb() on it. */ void * Scheduler_flex::Cluster::run_commit_thread() { + workitem *item; + int polled; + DEBUG_ENTER(); - NdbInstance *inst; - while(1) { /* Wait for something to appear on the queue */ - inst = (NdbInstance *) workqueue_consumer_wait(queue); + item = (workitem *) workqueue_consumer_wait(queue); - if(inst == NULL) return 0; /* queue has been shut down and emptied */ + if(item == NULL) break; /* queue has been shut down and emptied */ - /* Poll */ - inst->db->pollNdb(); + /* Send & poll for response; reschedule if needed */ + do { + item->base.reschedule = 0; + polled = item->ndb_instance->db->sendPollNdb(10, 1, 1); + } while(item->base.reschedule || ! polled); + + /* Now that sendPollNdb() has returned, it is OK to notify_io_complete(), + which will trigger the worker thread to release the Ndb instance. */ + item->pipeline->engine->server.cookie->notify_io_complete(item->cookie, ENGINE_SUCCESS); } + + return NULL; } === modified file 'storage/ndb/memcache/src/schedulers/Stockholm.cc' --- a/storage/ndb/memcache/src/schedulers/Stockholm.cc 2011-04-19 01:16:36 +0000 +++ b/storage/ndb/memcache/src/schedulers/Stockholm.cc 2011-05-17 05:22:56 +0000 @@ -185,13 +185,24 @@ ENGINE_ERROR_CODE Scheduler_stockholm::s if(! newitem->plan) return ENGINE_FAILED; // Build the NDB transaction - if(worker_prepare_operation(newitem)) { - // Put the NdbInstance on the queue for the commit thread. - // Should probably be the workitem? - workqueue_add(cluster[c].queue, inst); - return ENGINE_EWOULDBLOCK; + op_status_t op_status = worker_prepare_operation(newitem); + ENGINE_ERROR_CODE response_code; + + switch(op_status) { + case op_async_prepared: + case op_async_sent: + workqueue_add(cluster[c].queue, newitem); // place item on queue + response_code = ENGINE_EWOULDBLOCK; + break; + case op_not_supported: + response_code = ENGINE_ENOTSUP; + break; + case op_failed: + response_code = ENGINE_FAILED; + break; } - else return ENGINE_ENOTSUP; + + return response_code; } @@ -241,21 +252,34 @@ void * run_stockholm_commit_thread(void Get an item off the workqueue, and call pollNdb() on that item. */ void * Scheduler_stockholm::run_ndb_commit_thread(int c) { - NdbInstance *inst; + workitem *item; + int polled; DEBUG_ENTER(); while(1) { /* Wait for something to appear on the queue */ - inst = (NdbInstance *) workqueue_consumer_wait(cluster[c].queue); + item = (workitem *) workqueue_consumer_wait(cluster[c].queue); + + if(item == NULL) break; /* queue has been shut down and emptied */ - /* Poll */ - inst->db->pollNdb(); + /* Send & poll for response; reschedule if needed */ + do { + item->base.reschedule = 0; + polled = item->ndb_instance->db->sendPollNdb(10, 1, 1); + } while(item->base.reschedule || ! polled); + + DEBUG_ASSERT(polled == 1); // i.e. not > 1 + + /* Now that sendPollNdb() has returned, it is OK to notify_io_complete(), + which will trigger the worker thread to release the Ndb instance. */ + pipeline->engine->server.cookie->notify_io_complete(item->cookie, ENGINE_SUCCESS); - cluster[c].stats.cycles++; - if(! (cluster[c].stats.cycles % STAT_INTERVAL)) + if(! (cluster[c].stats.cycles++ % STAT_INTERVAL)) cluster[c].stats.commit_thread_vtime = get_thread_vtime(); } + + return NULL; } === modified file 'storage/ndb/memcache/src/schedulers/Stockholm.h' --- a/storage/ndb/memcache/src/schedulers/Stockholm.h 2011-04-07 11:20:56 +0000 +++ b/storage/ndb/memcache/src/schedulers/Stockholm.h 2011-05-17 05:22:56 +0000 @@ -28,6 +28,7 @@ #include #include "ndbmemcache_config.h" +#include "workitem.h" #include "Scheduler.h" #include "KeyPrefix.h" @@ -45,6 +46,8 @@ public: void init(int threadnum, int nthreads, const char *config_string); void attach_thread(thread_identifier *); ENGINE_ERROR_CODE schedule(workitem *); + void yield(workitem *) const; // inlined + void reschedule(workitem *) const; // inlined void io_completed(workitem *); void add_stats(ADD_STAT, const void *); void * run_ndb_commit_thread(int cluster_id); @@ -64,5 +67,13 @@ private: }; +inline void Scheduler_stockholm::reschedule(workitem *item) const { + item->base.reschedule = 1; +} + + +inline void Scheduler_stockholm::yield(workitem *item) const { } + + #endif No bundle (reason: useless for push emails).