4181 John David Duncan 2011-05-16
Support writing multiple columns from tab-separated values
modified:
storage/ndb/memcache/src/ndb_worker.cc
4180 John David Duncan 2011-05-16
New Scheduler::yield() method.
Fix a possible race condition in the Flex and Stockholm schedulers where notify_io_complete()
is called, and the worker thread invalidates an Ndb, before sendPollNdb() on that Ndb has
actually returned in the commit thread. So, notify_io_complete() must be called from the
commit thread loop in Stockholm and Flex, but from the callback function in Bulk.
modified:
storage/ndb/memcache/include/Scheduler.h
storage/ndb/memcache/src/ndb_worker.cc
storage/ndb/memcache/src/schedulers/Bulk.cc
storage/ndb/memcache/src/schedulers/Bulk.h
storage/ndb/memcache/src/schedulers/Flex.h
storage/ndb/memcache/src/schedulers/Flex_cluster.cc
storage/ndb/memcache/src/schedulers/Stockholm.cc
storage/ndb/memcache/src/schedulers/Stockholm.h
4179 John David Duncan 2011-05-16
Minor edits to unify stockholm & flex commit thread loops.
modified:
storage/ndb/memcache/src/schedulers/Flex_cluster.cc
storage/ndb/memcache/src/schedulers/Stockholm.cc
4178 John David Duncan 2011-05-16
worker_prepare_operation() now returns op_status_t rather than bool.
modified:
storage/ndb/memcache/include/ndb_worker.h
storage/ndb/memcache/include/ndbmemcache_global.h
storage/ndb/memcache/src/ndb_worker.cc
storage/ndb/memcache/src/schedulers/Bulk.cc
storage/ndb/memcache/src/schedulers/Flex_cluster.cc
storage/ndb/memcache/src/schedulers/Stockholm.cc
4177 John David Duncan 2011-05-16
Send NdbApi requests from the commit thread.
modified:
storage/ndb/memcache/include/Scheduler.h
storage/ndb/memcache/include/ndb_worker.h
storage/ndb/memcache/include/workitem.h
storage/ndb/memcache/src/ClusterConnectionPool.cc
storage/ndb/memcache/src/ndb_worker.cc
storage/ndb/memcache/src/schedulers/Bulk.h
storage/ndb/memcache/src/schedulers/Flex.h
storage/ndb/memcache/src/schedulers/Flex_cluster.cc
storage/ndb/memcache/src/schedulers/Stockholm.cc
storage/ndb/memcache/src/schedulers/Stockholm.h
4176 John David Duncan 2011-05-15
New TabSeparatedValues class
added:
storage/ndb/memcache/include/TabSeparatedValues.h
storage/ndb/memcache/src/TabSeparatedValues.cc
storage/ndb/memcache/unit/tsv.cc
modified:
storage/ndb/memcache/Makefile.am
storage/ndb/memcache/unit/Makefile.am
storage/ndb/memcache/unit/all_tests.h
storage/ndb/memcache/unit/harness.cc
=== modified file 'storage/ndb/memcache/include/Scheduler.h'
--- a/storage/ndb/memcache/include/Scheduler.h 2011-04-08 08:14:11 +0000
+++ b/storage/ndb/memcache/include/Scheduler.h 2011-05-17 05:22:56 +0000
@@ -30,6 +30,7 @@
#include "Configuration.h"
#include "thread_identifier.h"
+
/* Scheduler is an interface */
class Scheduler {
@@ -54,6 +55,15 @@ public:
an Ndb object for the operation and send the workitem to be executed. */
virtual ENGINE_ERROR_CODE schedule(workitem *) = 0;
+ /** Before an NDB callback function completes, it must call either
+ reschedule() or yield(). yield() indicates that work is comlpete. */
+ virtual void yield(workitem *) const = 0;
+
+ /** Before an NDB callback function completes, it must call either
+ reschedule() or yield(). reschedule() indicates to that the workitem
+ requires the scheduler to send & poll an additional operation. */
+ virtual void reschedule(workitem *) const = 0;
+
/** io_completed() is called from the NDB Engine thread when an IO
completion notification has been received */
virtual void io_completed(workitem *) = 0;
=== modified file 'storage/ndb/memcache/include/ndb_worker.h'
--- a/storage/ndb/memcache/include/ndb_worker.h 2011-04-06 04:31:28 +0000
+++ b/storage/ndb/memcache/include/ndb_worker.h 2011-05-17 05:07:20 +0000
@@ -21,11 +21,7 @@
#define NDBMEMCACHE_NDB_WORKER_H
-/* worker_prepare_operation().
- Returns TRUE if operation is prepared.
- FALSE if it is not supported.
-*/
-bool worker_prepare_operation(workitem *);
+op_status_t worker_prepare_operation(workitem *);
bool build_hash_item(workitem *, Operation &);
=== modified file 'storage/ndb/memcache/include/ndbmemcache_global.h'
--- a/storage/ndb/memcache/include/ndbmemcache_global.h 2011-04-06 04:31:28 +0000
+++ b/storage/ndb/memcache/include/ndbmemcache_global.h 2011-05-17 05:07:20 +0000
@@ -52,4 +52,13 @@ enum {
OP_FLUSH
};
+/* Operation Status enums */
+typedef enum {
+ op_not_supported,
+ op_failed,
+ op_async_prepared,
+ op_async_sent
+} op_status_t;
+
+
#endif
=== modified file 'storage/ndb/memcache/include/workitem.h'
--- a/storage/ndb/memcache/include/workitem.h 2011-04-06 04:31:28 +0000
+++ b/storage/ndb/memcache/include/workitem.h 2011-05-17 04:55:39 +0000
@@ -50,7 +50,8 @@ typedef struct workitem {
unsigned retries : 3; /*! how many times this job has been retried */
unsigned complete : 1; /*! is this operation finished? */
unsigned broker : 2; /*! for use by the flex scheduler */
- unsigned _unused : 2; /*! (32 bits total) */
+ unsigned reschedule : 1; /*! inform scheduler to send and poll again */
+ unsigned cas_owner : 1; /*! set if this engine owns the CAS ID */
} base;
unsigned int id;
struct workitem *previous; /*! used to chain workitems in multi-key get */
=== modified file 'storage/ndb/memcache/src/ClusterConnectionPool.cc'
--- a/storage/ndb/memcache/src/ClusterConnectionPool.cc 2011-04-07 11:20:56 +0000
+++ b/storage/ndb/memcache/src/ClusterConnectionPool.cc 2011-05-17 04:55:39 +0000
@@ -27,6 +27,7 @@
extern EXTENSION_LOGGER_DESCRIPTOR *logger;
+// TODO: Why doesn't this use member variable connect_string ??
Ndb_cluster_connection * ClusterConnectionPool::connect(const char *connectstring) {
DEBUG_ENTER();
=== modified file 'storage/ndb/memcache/src/ndb_worker.cc'
--- a/storage/ndb/memcache/src/ndb_worker.cc 2011-04-19 01:16:36 +0000
+++ b/storage/ndb/memcache/src/ndb_worker.cc 2011-05-17 05:25:51 +0000
@@ -41,25 +41,27 @@
#include "workitem.h"
#include "Configuration.h"
#include "DataTypeHandler.h"
+#include "TabSeparatedValues.h"
#include "debug.h"
#include "Operation.h"
#include "NdbInstance.h"
#include "status_block.h"
#include "Operation.h"
+#include "Scheduler.h"
#include "ndb_engine.h"
#include "hash_item_util.h"
#include "ndb_worker.h"
typedef void ndb_async_callback(int, NdbTransaction *, void *);
-ndb_async_callback incrCallback; // callback for incr/decr
-ndb_async_callback rewriteCallback; // callback for append/prepend
-ndb_async_callback DBcallback; // callback for all others
+ndb_async_callback incr_callback; // callback for incr/decr
+ndb_async_callback rewrite_callback; // callback for append/prepend
+ndb_async_callback DB_callback; // callback for all others
-bool worker_do_read(workitem *, bool with_cas);
-bool worker_do_write(workitem *, bool with_cas);
-bool worker_do_delete(workitem *, bool with_cas);
-bool worker_do_math(workitem *wqitem, bool with_cas);
+op_status_t worker_do_read(workitem *, bool with_cas);
+op_status_t worker_do_write(workitem *, bool with_cas);
+op_status_t worker_do_delete(workitem *, bool with_cas);
+op_status_t worker_do_math(workitem *wqitem, bool with_cas);
void worker_set_cas(ndb_pipeline *, uint64_t *);
bool finalize_read(workitem *);
@@ -116,17 +118,29 @@ void worker_set_cas(ndb_pipeline *p, uin
DEBUG_PRINT("hi:%lx lo:%lx cas:%llx (%llu)", cas_hi, cas_lo, *cas, *cas);
}
-
-bool worker_prepare_operation(workitem *newitem) {
+/* worker_prepare_operation():
+ Called from the scheduler.
+ Returns true if executeAsynchPrepare() has been called on the item.
+*/
+op_status_t worker_prepare_operation(workitem *newitem) {
bool server_cas = (newitem->prefix_info.has_cas_col && newitem->cas);
- bool r;
+ op_status_t r;
/* Jump table */
switch(newitem->base.verb) {
case OP_READ:
+ r = worker_do_read(newitem, server_cas);
+ break;
+
case OPERATION_APPEND:
case OPERATION_PREPEND:
- r = worker_do_read(newitem, server_cas);
+ if(newitem->plan->spec->nvaluecols > 1) {
+ /* APPEND/PREPEND is currently not supported for tsv */
+ r = op_not_supported;
+ }
+ else {
+ r = worker_do_read(newitem, server_cas);
+ }
break;
case OP_DELETE:
@@ -145,14 +159,14 @@ bool worker_prepare_operation(workitem *
break;
default:
- return false; /* not supported */
+ r= op_not_supported;
}
- return r; /* fixme: distinguish "not supported" from "failed" */
+ return r;
}
-bool worker_do_delete(workitem *wqitem, bool server_cas) {
+op_status_t worker_do_delete(workitem *wqitem, bool server_cas) {
DEBUG_ENTER();
QueryPlan *plan = wqitem->plan;
@@ -179,18 +193,17 @@ bool worker_do_delete(workitem *wqitem,
if(err.status != NdbError::Success) {
logger->log(LOG_WARNING, 0, "deleteTuple(): %s\n", err.message);
tx->close();
- return false;
+ return op_failed;
}
}
- /* NdbTransaction::executeAsynch() */
- tx->executeAsynch(NdbTransaction::Commit, DBcallback, (void *) wqitem,
- NdbOperation::DefaultAbortOption, 1);
- return true;
+ /* Prepare for execution */
+ tx->executeAsynchPrepare(NdbTransaction::Commit, DB_callback, (void *) wqitem);
+ return op_async_prepared;
}
-bool worker_do_write(workitem *wqitem, bool server_cas) {
+op_status_t worker_do_write(workitem *wqitem, bool server_cas) {
DEBUG_PRINT("%s", workitem_get_operation(wqitem));
uint64_t cas_in = *wqitem->cas; // read old value
@@ -212,12 +225,33 @@ bool worker_do_write(workitem *wqitem, b
/* Set the row */
op.clearNullBits();
- op.setColumn(COL_STORE_KEY, dbkey, wqitem->base.nsuffix);
- op.setColumn(COL_STORE_VALUE, hash_item_get_data(wqitem->cache_item),
- wqitem->cache_item->nbytes); // the value
+ op.setColumn(COL_STORE_KEY, dbkey, wqitem->base.nsuffix);
+
+ if(plan->spec->nvaluecols > 1) {
+ /* Multiple Value Columns */
+ TabSeparatedValues tsv(hash_item_get_data(wqitem->cache_item),
+ plan->spec->nvaluecols, wqitem->cache_item->nbytes);
+ int idx = 0;
+ do {
+ if(tsv.getLength()) {
+ op.setColumn(COL_STORE_VALUE+idx, tsv.getPointer(), tsv.getLength());
+ }
+ else {
+ op.setColumnNull(COL_STORE_VALUE+idx);
+ }
+ idx++;
+ } while (tsv.advance());
+ }
+ else {
+ /* Just one value column */
+ op.setColumn(COL_STORE_VALUE, hash_item_get_data(wqitem->cache_item),
+ wqitem->cache_item->nbytes);
+ }
+
if(server_cas) {
op.setColumnBigUnsigned(COL_STORE_CAS, * wqitem->cas); // the cas
}
+
if(wqitem->plan->dup_numbers) {
if(isdigit(* hash_item_get_data(wqitem->cache_item)) &&
wqitem->cache_item->nbytes < 32) { // Copy string representation
@@ -282,17 +316,15 @@ bool worker_do_write(workitem *wqitem, b
DEBUG_PRINT("NDB operation failed. workitem %d.%d", wqitem->pipeline->id,
wqitem->id);
tx->close();
- return false;
+ return op_failed;
}
- /* NdbTransaction::executeAsynch() */
- tx->executeAsynch(NdbTransaction::Commit, DBcallback, (void *) wqitem,
- NdbOperation::DefaultAbortOption, 1);
- return true;
+ tx->executeAsynchPrepare(NdbTransaction::Commit, DB_callback, (void *) wqitem);
+ return op_async_prepared;
}
-bool worker_do_read(workitem *wqitem, bool server_cas) {
+op_status_t worker_do_read(workitem *wqitem, bool server_cas) {
DEBUG_ENTER();
QueryPlan *plan = wqitem->plan;
@@ -315,28 +347,26 @@ bool worker_do_read(workitem *wqitem, bo
if(! op.readTuple(tx)) {
logger->log(LOG_WARNING, 0, "readTuple(): %s\n", tx->getNdbError().message);
tx->close();
- return false;
+ return op_failed;
}
/* Save the workitem in the transaction and prepare for async execution */
if(wqitem->base.verb == OPERATION_APPEND || wqitem->base.verb == OPERATION_PREPEND)
{
- DEBUG_PRINT("In read() portion of APPEND. Value = %s",
+ DEBUG_PRINT("In read() portion of APPEND. Value = %s",
hash_item_get_data(wqitem->cache_item));
- tx->executeAsynch(NdbTransaction::NoCommit, rewriteCallback, (void *) wqitem,
- NdbOperation::DefaultAbortOption, 1);
+ tx->executeAsynchPrepare(NdbTransaction::NoCommit, rewrite_callback, (void *) wqitem);
}
else
{
- tx->executeAsynch(NdbTransaction::Commit, DBcallback, (void *) wqitem,
- NdbOperation::DefaultAbortOption, 1);
+ tx->executeAsynchPrepare(NdbTransaction::Commit, DB_callback, (void *) wqitem);
}
- return true;
+ return op_async_prepared;
}
-bool worker_do_math(workitem *wqitem, bool server_cas) {
+op_status_t worker_do_math(workitem *wqitem, bool server_cas) {
DEBUG_PRINT("create: %d retries: %d",
wqitem->base.math_create, wqitem->base.retries);
worker_set_cas(wqitem->pipeline, wqitem->cas);
@@ -409,7 +439,7 @@ bool worker_do_math(workitem *wqitem, bo
if(! ndbop1) {
logger->log(LOG_WARNING, 0, "readMasked(): %s\n", tx->getNdbError().message);
tx->close();
- return false;
+ return op_failed;
}
}
@@ -432,7 +462,7 @@ bool worker_do_math(workitem *wqitem, bo
if(! ndbop2) {
logger->log(LOG_WARNING, 0, "insertMasked(): %s\n", tx->getNdbError().message);
tx->close();
- return false;
+ return op_failed;
}
}
@@ -472,17 +502,16 @@ bool worker_do_math(workitem *wqitem, bo
if(! ndbop3) {
logger->log(LOG_WARNING, 0, "updateInterpreted(): %s\n", tx->getNdbError().message);
tx->close();
- return false;
+ return op_failed;
}
}
- tx->executeAsynch(NdbTransaction::Commit, incrCallback, (void *) wqitem,
- NdbOperation::DefaultAbortOption, 1);
- return true;
+ tx->executeAsynchPrepare(NdbTransaction::Commit, incr_callback, (void *) wqitem);
+ return op_async_prepared;
}
-void DBcallback(int result, NdbTransaction *tx, void *itemptr) {
+void DB_callback(int result, NdbTransaction *tx, void *itemptr) {
workitem *wqitem = (workitem *) itemptr;
ndb_pipeline * & pipeline = wqitem->pipeline;
status_block * return_status;
@@ -550,21 +579,21 @@ void DBcallback(int result, NdbTransacti
if(wqitem->base.is_sync) {
wqitem->status = return_status;
pipeline->engine->server.cookie->store_engine_specific(wqitem->cookie, wqitem);
- pipeline->engine->server.cookie->notify_io_complete(wqitem->cookie, io_status);
+ pipeline->scheduler->yield(wqitem);
}
else {
/* The workitem was allocated back in the engine thread; if used in a
callback, it would be freed there, too. But we must free it here.
*/
pipeline->engine->server.cookie->store_engine_specific(wqitem->cookie, wqitem->previous);
- pipeline_io_completed(pipeline, wqitem);
+ pipeline->scheduler->io_completed(wqitem);
workitem_free(wqitem);
}
}
/* Middle-step callback for APPEND and PREPEND */
-void rewriteCallback(int result, NdbTransaction *tx, void *itemptr) {
+void rewrite_callback(int result, NdbTransaction *tx, void *itemptr) {
workitem *item = (workitem *) itemptr;
DEBUG_PRINT("%d.%d", item->pipeline->id, item->id);
@@ -573,11 +602,11 @@ void rewriteCallback(int result, NdbTran
item->status = & status_block_bad_replace;
tx->close();
item->pipeline->engine->server.cookie->store_engine_specific(item->cookie, item);
- item->pipeline->engine->server.cookie->notify_io_complete(item->cookie, ENGINE_SUCCESS);
+ item->pipeline->scheduler->yield(item);
return;
}
else if(tx->getNdbError().classification != NdbError::NoError) {
- return DBcallback(result, tx, itemptr);
+ return DB_callback(result, tx, itemptr);
}
/* Strings and lengths: */
@@ -630,14 +659,13 @@ void rewriteCallback(int result, NdbTran
op.setColumnBigUnsigned(COL_STORE_CAS, * item->cas);
ndb_op = op.updateTuple(tx);
- /* Error case; operation has not been built */
if(ndb_op) {
- tx->executeAsynch(NdbTransaction::Commit, DBcallback, (void *) item,
- NdbOperation::DefaultAbortOption, 1);
- // fixme: this should call back into the scheduler!
- item->ndb_instance->db->pollNdb();
+ // Inform the scheduler that this item must be re-polled
+ item->pipeline->scheduler->reschedule(item);
+ tx->executeAsynchPrepare(NdbTransaction::Commit, DB_callback, (void *) item);
}
else {
+ /* Error case; operation has not been built */
DEBUG_PRINT("NDB operation failed. workitem %d.%d", item->pipeline->id,
item->id);
tx->close();
@@ -649,7 +677,7 @@ void rewriteCallback(int result, NdbTran
/* Dedicated callback function for INCR and DECR operations
*/
-void incrCallback(int result, NdbTransaction *tx, void *itemptr) {
+void incr_callback(int result, NdbTransaction *tx, void *itemptr) {
workitem *wqitem = (workitem *) itemptr;
ndb_pipeline * & pipeline = wqitem->pipeline;
status_block * return_status;
@@ -737,13 +765,13 @@ void incrCallback(int result, NdbTransac
if(wqitem->base.is_sync) {
wqitem->status = return_status;
pipeline->engine->server.cookie->store_engine_specific(wqitem->cookie, wqitem);
- pipeline->engine->server.cookie->notify_io_complete(wqitem->cookie, io_status);
+ pipeline->scheduler->yield(wqitem);
}
else {
/* The workitem was allocated back in the engine thread; if used in a
callback, it would be freed there, too. But we must free it here. */
pipeline->engine->server.cookie->store_engine_specific(wqitem->cookie, wqitem->previous);
- pipeline_io_completed(pipeline, wqitem);
+ pipeline->scheduler->io_completed(wqitem);
workitem_free(wqitem);
}
}
=== modified file 'storage/ndb/memcache/src/schedulers/Bulk.cc'
--- a/storage/ndb/memcache/src/schedulers/Bulk.cc 2011-04-21 08:05:13 +0000
+++ b/storage/ndb/memcache/src/schedulers/Bulk.cc 2011-05-17 05:22:56 +0000
@@ -129,7 +129,6 @@ ENGINE_ERROR_CODE Scheduler_bulk::schedu
DEBUG_ENTER();
const Configuration & conf = get_Configuration(); // see note
int lockerr;
- ENGINE_ERROR_CODE response_code = ENGINE_EWOULDBLOCK;
/* Fetch the config for its key prefix */
const KeyPrefix *pfx = conf.getPrefixByInfo(newitem->prefix_info);
@@ -164,18 +163,22 @@ ENGINE_ERROR_CODE Scheduler_bulk::schedu
workitem_set_NdbInstance(newitem, inst);
// Build the NDB transaction
- if(worker_prepare_operation(newitem)) {
- inst->npending++;
+ ENGINE_ERROR_CODE response_code;
+ op_status_t op_status = worker_prepare_operation(newitem);
+ switch(op_status) {
+ case op_async_prepared:
+ case op_async_sent:
+ inst->npending++;
+ response_code = ENGINE_EWOULDBLOCK;
+ break;
+ case op_not_supported:
+ response_code = ENGINE_ENOTSUP;
+ break;
+ case op_failed:
+ response_code = ENGINE_FAILED;
+ break;
}
- else {
- /******* Error handling. The operation is not supported. */
- if(newitem->base.is_sync) {
- newitem->status = & status_block_unsupported;
- pipeline->engine->server.cookie->store_engine_specific(newitem->cookie, newitem);
- pipeline->engine->server.cookie->notify_io_complete(newitem->cookie, ENGINE_ENOTSUP);
- }
- }
-
+
inst->unlock();
return response_code;
@@ -280,3 +283,17 @@ void * Scheduler_bulk::run_ndb_commit_th
cluster[c].stats.vtime = get_thread_vtime();
}
}
+
+
+void Scheduler_bulk::reschedule(workitem *item) const {
+ LockableNdbInstance * inst = (LockableNdbInstance *) item->ndb_instance ;
+ DEBUG_ASSERT(inst->is_locked);
+ inst->npending++;
+}
+
+
+void Scheduler_bulk::yield(workitem *item) const {
+ /* In this scheduler, a small number of Ndb objects are shared,
+ so we call notify_io_complete() while the callback is still running. */
+ pipeline->engine->server.cookie->notify_io_complete(item->cookie, ENGINE_SUCCESS);
+}
=== modified file 'storage/ndb/memcache/src/schedulers/Bulk.h'
--- a/storage/ndb/memcache/src/schedulers/Bulk.h 2011-04-19 01:16:36 +0000
+++ b/storage/ndb/memcache/src/schedulers/Bulk.h 2011-05-17 05:22:56 +0000
@@ -27,6 +27,7 @@
#include <memcached/types.h>
#include "config.h"
+#include "workitem.h"
#include "Scheduler.h"
#include "NdbInstance.h"
@@ -42,6 +43,8 @@ public:
void init(int threadnum, int nthreads, const char *config_string);
void attach_thread(thread_identifier *);
ENGINE_ERROR_CODE schedule(workitem *);
+ void yield(workitem *) const;
+ void reschedule(workitem *) const;
void io_completed(workitem *);
void add_stats(ADD_STAT, const void *);
void * run_ndb_commit_thread(int thread_id);
=== modified file 'storage/ndb/memcache/src/schedulers/Flex.h'
--- a/storage/ndb/memcache/src/schedulers/Flex.h 2011-04-07 11:20:56 +0000
+++ b/storage/ndb/memcache/src/schedulers/Flex.h 2011-05-17 05:22:56 +0000
@@ -56,6 +56,8 @@ public:
void init(int threadnum, int nthreads, const char *config_string);
void attach_thread(thread_identifier *);
ENGINE_ERROR_CODE schedule(workitem *);
+ void yield(workitem *) const; // inlined
+ void reschedule(workitem *) const; // inlined
void io_completed(workitem *);
void add_stats(ADD_STAT, const void *);
@@ -83,6 +85,13 @@ protected:
};
+inline void Scheduler_flex::reschedule(workitem *item) const {
+ item->base.reschedule = 1;
+}
+
+inline void Scheduler_flex::yield(workitem *item) const { }
+
+
/* Random stat samples will on average be taken twice as often as
FLEX_STATS_SAMPLE_INTERVAL (based on uniform random distribution).
FLEX_STATS_INITIAL_INTERVAL is lower so as to get stats quickly on startup.
=== modified file 'storage/ndb/memcache/src/schedulers/Flex_cluster.cc'
--- a/storage/ndb/memcache/src/schedulers/Flex_cluster.cc 2011-04-09 07:36:24 +0000
+++ b/storage/ndb/memcache/src/schedulers/Flex_cluster.cc 2011-05-17 05:22:56 +0000
@@ -157,31 +157,40 @@ ENGINE_ERROR_CODE Scheduler_flex::Cluste
workitem_set_NdbInstance(newitem, inst);
- // Fetch the query plan for this prefix.
+ /* Fetch the query plan for this prefix. */
newitem->plan = inst->getPlanForPrefix(pfx);
if(! newitem->plan) return ENGINE_FAILED;
- // Build the NDB transaction
- if(worker_prepare_operation(newitem)) {
-
- if(do_queue_sample-- == 0) { // Sample for statistics.
- /* To simulate sampling the depth immediately after the item is added,
- sample it now and add one. */
- int depth = queue->depth + 1;
- /* Queue depth can be erroneous because of thread races.
- Use it only if it looks valid. */
- if(depth > 0 && depth <= queue_size) {
- stats.queue_total_depth += depth;
- stats.queue_samples++;
- }
- do_queue_sample = random() % FLEX_STATS_SAMPLE_INTERVAL;
+ /* Sample for statistics */
+ if(do_queue_sample-- == 0) {
+ int depth = queue->depth + 1;
+ // Depth can be erroneous due to thread races; use it only if it looks valid
+ if(depth > 0 && depth <= queue_size) {
+ stats.queue_total_depth += depth;
+ stats.queue_samples++;
}
-
- // Put the NdbInstance on the queue for the commit thread.
- workqueue_add(queue, inst);
- return ENGINE_EWOULDBLOCK;
+ do_queue_sample = (random() % FLEX_STATS_SAMPLE_INTERVAL) + 1;
+ }
+
+ /* Build the NDB transaction */
+ ENGINE_ERROR_CODE response_code;
+ op_status_t op_status = worker_prepare_operation(newitem);
+
+ switch(op_status) {
+ case op_async_prepared:
+ case op_async_sent:
+ workqueue_add(queue, newitem); // place item on queue
+ response_code = ENGINE_EWOULDBLOCK;
+ break;
+ case op_not_supported:
+ response_code = ENGINE_ENOTSUP;
+ break;
+ case op_failed:
+ response_code = ENGINE_FAILED;
+ break;
}
- else return ENGINE_ENOTSUP;
+
+ return response_code;
}
@@ -209,19 +218,29 @@ void Scheduler_flex::Cluster::contribute
Commit thread: Get an NdbInstance off the workqueue, and call pollNdb() on it.
*/
void * Scheduler_flex::Cluster::run_commit_thread() {
+ workitem *item;
+ int polled;
+
DEBUG_ENTER();
- NdbInstance *inst;
-
while(1) {
/* Wait for something to appear on the queue */
- inst = (NdbInstance *) workqueue_consumer_wait(queue);
+ item = (workitem *) workqueue_consumer_wait(queue);
- if(inst == NULL) return 0; /* queue has been shut down and emptied */
+ if(item == NULL) break; /* queue has been shut down and emptied */
- /* Poll */
- inst->db->pollNdb();
+ /* Send & poll for response; reschedule if needed */
+ do {
+ item->base.reschedule = 0;
+ polled = item->ndb_instance->db->sendPollNdb(10, 1, 1);
+ } while(item->base.reschedule || ! polled);
+
+ /* Now that sendPollNdb() has returned, it is OK to notify_io_complete(),
+ which will trigger the worker thread to release the Ndb instance. */
+ item->pipeline->engine->server.cookie->notify_io_complete(item->cookie, ENGINE_SUCCESS);
}
+
+ return NULL;
}
=== modified file 'storage/ndb/memcache/src/schedulers/Stockholm.cc'
--- a/storage/ndb/memcache/src/schedulers/Stockholm.cc 2011-04-19 01:16:36 +0000
+++ b/storage/ndb/memcache/src/schedulers/Stockholm.cc 2011-05-17 05:22:56 +0000
@@ -185,13 +185,24 @@ ENGINE_ERROR_CODE Scheduler_stockholm::s
if(! newitem->plan) return ENGINE_FAILED;
// Build the NDB transaction
- if(worker_prepare_operation(newitem)) {
- // Put the NdbInstance on the queue for the commit thread.
- // Should probably be the workitem?
- workqueue_add(cluster[c].queue, inst);
- return ENGINE_EWOULDBLOCK;
+ op_status_t op_status = worker_prepare_operation(newitem);
+ ENGINE_ERROR_CODE response_code;
+
+ switch(op_status) {
+ case op_async_prepared:
+ case op_async_sent:
+ workqueue_add(cluster[c].queue, newitem); // place item on queue
+ response_code = ENGINE_EWOULDBLOCK;
+ break;
+ case op_not_supported:
+ response_code = ENGINE_ENOTSUP;
+ break;
+ case op_failed:
+ response_code = ENGINE_FAILED;
+ break;
}
- else return ENGINE_ENOTSUP;
+
+ return response_code;
}
@@ -241,21 +252,34 @@ void * run_stockholm_commit_thread(void
Get an item off the workqueue, and call pollNdb() on that item.
*/
void * Scheduler_stockholm::run_ndb_commit_thread(int c) {
- NdbInstance *inst;
+ workitem *item;
+ int polled;
DEBUG_ENTER();
while(1) {
/* Wait for something to appear on the queue */
- inst = (NdbInstance *) workqueue_consumer_wait(cluster[c].queue);
+ item = (workitem *) workqueue_consumer_wait(cluster[c].queue);
+
+ if(item == NULL) break; /* queue has been shut down and emptied */
- /* Poll */
- inst->db->pollNdb();
+ /* Send & poll for response; reschedule if needed */
+ do {
+ item->base.reschedule = 0;
+ polled = item->ndb_instance->db->sendPollNdb(10, 1, 1);
+ } while(item->base.reschedule || ! polled);
+
+ DEBUG_ASSERT(polled == 1); // i.e. not > 1
+
+ /* Now that sendPollNdb() has returned, it is OK to notify_io_complete(),
+ which will trigger the worker thread to release the Ndb instance. */
+ pipeline->engine->server.cookie->notify_io_complete(item->cookie, ENGINE_SUCCESS);
- cluster[c].stats.cycles++;
- if(! (cluster[c].stats.cycles % STAT_INTERVAL))
+ if(! (cluster[c].stats.cycles++ % STAT_INTERVAL))
cluster[c].stats.commit_thread_vtime = get_thread_vtime();
}
+
+ return NULL;
}
=== modified file 'storage/ndb/memcache/src/schedulers/Stockholm.h'
--- a/storage/ndb/memcache/src/schedulers/Stockholm.h 2011-04-07 11:20:56 +0000
+++ b/storage/ndb/memcache/src/schedulers/Stockholm.h 2011-05-17 05:22:56 +0000
@@ -28,6 +28,7 @@
#include <memcached/types.h>
#include "ndbmemcache_config.h"
+#include "workitem.h"
#include "Scheduler.h"
#include "KeyPrefix.h"
@@ -45,6 +46,8 @@ public:
void init(int threadnum, int nthreads, const char *config_string);
void attach_thread(thread_identifier *);
ENGINE_ERROR_CODE schedule(workitem *);
+ void yield(workitem *) const; // inlined
+ void reschedule(workitem *) const; // inlined
void io_completed(workitem *);
void add_stats(ADD_STAT, const void *);
void * run_ndb_commit_thread(int cluster_id);
@@ -64,5 +67,13 @@ private:
};
+inline void Scheduler_stockholm::reschedule(workitem *item) const {
+ item->base.reschedule = 1;
+}
+
+
+inline void Scheduler_stockholm::yield(workitem *item) const { }
+
+
#endif
No bundle (reason: useless for push emails).
| Thread |
|---|
| • bzr push into mysql-5.1-telco-7.2 branch (john.duncan:4176 to 4181) | John David Duncan | 19 May |