#At file:///Users/jdd/bzr-repo/working/cluster-7.2-labs-memcached/ based on revid:john.duncan@stripped
4178 John David Duncan 2011-05-16
worker_prepare_operation() now returns op_status_t rather than bool.
modified:
storage/ndb/memcache/include/ndb_worker.h
storage/ndb/memcache/include/ndbmemcache_global.h
storage/ndb/memcache/src/ndb_worker.cc
storage/ndb/memcache/src/schedulers/Bulk.cc
storage/ndb/memcache/src/schedulers/Flex_cluster.cc
storage/ndb/memcache/src/schedulers/Stockholm.cc
=== modified file 'storage/ndb/memcache/include/ndb_worker.h'
--- a/storage/ndb/memcache/include/ndb_worker.h 2011-05-17 04:55:39 +0000
+++ b/storage/ndb/memcache/include/ndb_worker.h 2011-05-17 05:07:20 +0000
@@ -21,10 +21,7 @@
#define NDBMEMCACHE_NDB_WORKER_H
-/* worker_prepare_operation():
- Returns TRUE if an operation has been prepared with executeAsynchPrepare().
-*/
-bool worker_prepare_operation(workitem *);
+op_status_t worker_prepare_operation(workitem *);
bool build_hash_item(workitem *, Operation &);
=== modified file 'storage/ndb/memcache/include/ndbmemcache_global.h'
--- a/storage/ndb/memcache/include/ndbmemcache_global.h 2011-04-06 04:31:28 +0000
+++ b/storage/ndb/memcache/include/ndbmemcache_global.h 2011-05-17 05:07:20 +0000
@@ -52,4 +52,13 @@ enum {
OP_FLUSH
};
+/* Operation Status enums */
+typedef enum {
+ op_not_supported,
+ op_failed,
+ op_async_prepared,
+ op_async_sent
+} op_status_t;
+
+
#endif
=== modified file 'storage/ndb/memcache/src/ndb_worker.cc'
--- a/storage/ndb/memcache/src/ndb_worker.cc 2011-05-17 04:55:39 +0000
+++ b/storage/ndb/memcache/src/ndb_worker.cc 2011-05-17 05:07:20 +0000
@@ -53,14 +53,14 @@
typedef void ndb_async_callback(int, NdbTransaction *, void *);
-ndb_async_callback incrCallback; // callback for incr/decr
-ndb_async_callback rewriteCallback; // callback for append/prepend
-ndb_async_callback DBcallback; // callback for all others
+ndb_async_callback incr_callback; // callback for incr/decr
+ndb_async_callback rewrite_callback; // callback for append/prepend
+ndb_async_callback DB_callback; // callback for all others
-bool worker_do_read(workitem *, bool with_cas);
-bool worker_do_write(workitem *, bool with_cas);
-bool worker_do_delete(workitem *, bool with_cas);
-bool worker_do_math(workitem *wqitem, bool with_cas);
+op_status_t worker_do_read(workitem *, bool with_cas);
+op_status_t worker_do_write(workitem *, bool with_cas);
+op_status_t worker_do_delete(workitem *, bool with_cas);
+op_status_t worker_do_math(workitem *wqitem, bool with_cas);
void worker_set_cas(ndb_pipeline *, uint64_t *);
bool finalize_read(workitem *);
@@ -121,9 +121,9 @@ void worker_set_cas(ndb_pipeline *p, uin
Called from the scheduler.
Returns true if executeAsynchPrepare() has been called on the item.
*/
-bool worker_prepare_operation(workitem *newitem) {
+op_status_t worker_prepare_operation(workitem *newitem) {
bool server_cas = (newitem->prefix_info.has_cas_col && newitem->cas);
- bool r;
+ op_status_t r;
/* Jump table */
switch(newitem->base.verb) {
@@ -149,14 +149,14 @@ bool worker_prepare_operation(workitem *
break;
default:
- return false; /* not supported */
+ r= op_not_supported;
}
return r;
}
-bool worker_do_delete(workitem *wqitem, bool server_cas) {
+op_status_t worker_do_delete(workitem *wqitem, bool server_cas) {
DEBUG_ENTER();
QueryPlan *plan = wqitem->plan;
@@ -183,17 +183,17 @@ bool worker_do_delete(workitem *wqitem,
if(err.status != NdbError::Success) {
logger->log(LOG_WARNING, 0, "deleteTuple(): %s\n", err.message);
tx->close();
- return false;
+ return op_failed;
}
}
/* Prepare for execution */
- tx->executeAsynchPrepare(NdbTransaction::Commit, DBcallback, (void *) wqitem);
- return true;
+ tx->executeAsynchPrepare(NdbTransaction::Commit, DB_callback, (void *) wqitem);
+ return op_async_prepared;
}
-bool worker_do_write(workitem *wqitem, bool server_cas) {
+op_status_t worker_do_write(workitem *wqitem, bool server_cas) {
DEBUG_PRINT("%s", workitem_get_operation(wqitem));
uint64_t cas_in = *wqitem->cas; // read old value
@@ -285,15 +285,15 @@ bool worker_do_write(workitem *wqitem, b
DEBUG_PRINT("NDB operation failed. workitem %d.%d", wqitem->pipeline->id,
wqitem->id);
tx->close();
- return false;
+ return op_failed;
}
- tx->executeAsynchPrepare(NdbTransaction::Commit, DBcallback, (void *) wqitem);
- return true;
+ tx->executeAsynchPrepare(NdbTransaction::Commit, DB_callback, (void *) wqitem);
+ return op_async_prepared;
}
-bool worker_do_read(workitem *wqitem, bool server_cas) {
+op_status_t worker_do_read(workitem *wqitem, bool server_cas) {
DEBUG_ENTER();
QueryPlan *plan = wqitem->plan;
@@ -316,7 +316,7 @@ bool worker_do_read(workitem *wqitem, bo
if(! op.readTuple(tx)) {
logger->log(LOG_WARNING, 0, "readTuple(): %s\n", tx->getNdbError().message);
tx->close();
- return false;
+ return op_failed;
}
/* Save the workitem in the transaction and prepare for async execution */
@@ -324,18 +324,18 @@ bool worker_do_read(workitem *wqitem, bo
{
DEBUG_PRINT("In read() portion of APPEND. Value = %s",
hash_item_get_data(wqitem->cache_item));
- tx->executeAsynchPrepare(NdbTransaction::NoCommit, rewriteCallback, (void *) wqitem);
+ tx->executeAsynchPrepare(NdbTransaction::NoCommit, rewrite_callback, (void *) wqitem);
}
else
{
- tx->executeAsynchPrepare(NdbTransaction::Commit, DBcallback, (void *) wqitem);
+ tx->executeAsynchPrepare(NdbTransaction::Commit, DB_callback, (void *) wqitem);
}
- return true;
+ return op_async_prepared;
}
-bool worker_do_math(workitem *wqitem, bool server_cas) {
+op_status_t worker_do_math(workitem *wqitem, bool server_cas) {
DEBUG_PRINT("create: %d retries: %d",
wqitem->base.math_create, wqitem->base.retries);
worker_set_cas(wqitem->pipeline, wqitem->cas);
@@ -408,7 +408,7 @@ bool worker_do_math(workitem *wqitem, bo
if(! ndbop1) {
logger->log(LOG_WARNING, 0, "readMasked(): %s\n", tx->getNdbError().message);
tx->close();
- return false;
+ return op_failed;
}
}
@@ -431,7 +431,7 @@ bool worker_do_math(workitem *wqitem, bo
if(! ndbop2) {
logger->log(LOG_WARNING, 0, "insertMasked(): %s\n", tx->getNdbError().message);
tx->close();
- return false;
+ return op_failed;
}
}
@@ -471,16 +471,16 @@ bool worker_do_math(workitem *wqitem, bo
if(! ndbop3) {
logger->log(LOG_WARNING, 0, "updateInterpreted(): %s\n", tx->getNdbError().message);
tx->close();
- return false;
+ return op_failed;
}
}
- tx->executeAsynchPrepare(NdbTransaction::Commit, incrCallback, (void *) wqitem);
- return true;
+ tx->executeAsynchPrepare(NdbTransaction::Commit, incr_callback, (void *) wqitem);
+ return op_async_prepared;
}
-void DBcallback(int result, NdbTransaction *tx, void *itemptr) {
+void DB_callback(int result, NdbTransaction *tx, void *itemptr) {
workitem *wqitem = (workitem *) itemptr;
ndb_pipeline * & pipeline = wqitem->pipeline;
status_block * return_status;
@@ -562,7 +562,7 @@ void DBcallback(int result, NdbTransacti
/* Middle-step callback for APPEND and PREPEND */
-void rewriteCallback(int result, NdbTransaction *tx, void *itemptr) {
+void rewrite_callback(int result, NdbTransaction *tx, void *itemptr) {
workitem *item = (workitem *) itemptr;
DEBUG_PRINT("%d.%d", item->pipeline->id, item->id);
@@ -575,7 +575,7 @@ void rewriteCallback(int result, NdbTran
return;
}
else if(tx->getNdbError().classification != NdbError::NoError) {
- return DBcallback(result, tx, itemptr);
+ return DB_callback(result, tx, itemptr);
}
/* Strings and lengths: */
@@ -631,7 +631,7 @@ void rewriteCallback(int result, NdbTran
if(ndb_op) {
// Inform the scheduler that this item must be re-polled
item->pipeline->scheduler->reschedule(item);
- tx->executeAsynchPrepare(NdbTransaction::Commit, DBcallback, (void *) item);
+ tx->executeAsynchPrepare(NdbTransaction::Commit, DB_callback, (void *) item);
}
else {
/* Error case; operation has not been built */
@@ -646,7 +646,7 @@ void rewriteCallback(int result, NdbTran
/* Dedicated callback function for INCR and DECR operations
*/
-void incrCallback(int result, NdbTransaction *tx, void *itemptr) {
+void incr_callback(int result, NdbTransaction *tx, void *itemptr) {
workitem *wqitem = (workitem *) itemptr;
ndb_pipeline * & pipeline = wqitem->pipeline;
status_block * return_status;
=== modified file 'storage/ndb/memcache/src/schedulers/Bulk.cc'
--- a/storage/ndb/memcache/src/schedulers/Bulk.cc 2011-04-21 08:05:13 +0000
+++ b/storage/ndb/memcache/src/schedulers/Bulk.cc 2011-05-17 05:07:20 +0000
@@ -129,7 +129,6 @@ ENGINE_ERROR_CODE Scheduler_bulk::schedu
DEBUG_ENTER();
const Configuration & conf = get_Configuration(); // see note
int lockerr;
- ENGINE_ERROR_CODE response_code = ENGINE_EWOULDBLOCK;
/* Fetch the config for its key prefix */
const KeyPrefix *pfx = conf.getPrefixByInfo(newitem->prefix_info);
@@ -164,18 +163,22 @@ ENGINE_ERROR_CODE Scheduler_bulk::schedu
workitem_set_NdbInstance(newitem, inst);
// Build the NDB transaction
- if(worker_prepare_operation(newitem)) {
- inst->npending++;
+ ENGINE_ERROR_CODE response_code;
+ op_status_t op_status = worker_prepare_operation(newitem);
+ switch(op_status) {
+ case op_async_prepared:
+ case op_async_sent:
+ inst->npending++;
+ response_code = ENGINE_EWOULDBLOCK;
+ break;
+ case op_not_supported:
+ response_code = ENGINE_ENOTSUP;
+ break;
+ case op_failed:
+ response_code = ENGINE_FAILED;
+ break;
}
- else {
- /******* Error handling. The operation is not supported. */
- if(newitem->base.is_sync) {
- newitem->status = & status_block_unsupported;
- pipeline->engine->server.cookie->store_engine_specific(newitem->cookie, newitem);
- pipeline->engine->server.cookie->notify_io_complete(newitem->cookie, ENGINE_ENOTSUP);
- }
- }
-
+
inst->unlock();
return response_code;
=== modified file 'storage/ndb/memcache/src/schedulers/Flex_cluster.cc'
--- a/storage/ndb/memcache/src/schedulers/Flex_cluster.cc 2011-05-17 04:55:39 +0000
+++ b/storage/ndb/memcache/src/schedulers/Flex_cluster.cc 2011-05-17 05:07:20 +0000
@@ -157,31 +157,40 @@ ENGINE_ERROR_CODE Scheduler_flex::Cluste
workitem_set_NdbInstance(newitem, inst);
- // Fetch the query plan for this prefix.
+ /* Fetch the query plan for this prefix. */
newitem->plan = inst->getPlanForPrefix(pfx);
if(! newitem->plan) return ENGINE_FAILED;
- // Build the NDB transaction
- if(worker_prepare_operation(newitem)) {
-
- if(do_queue_sample-- == 0) { // Sample for statistics.
- /* To simulate sampling the depth immediately after the item is added,
- sample it now and add one. */
- int depth = queue->depth + 1;
- /* Queue depth can be erroneous because of thread races.
- Use it only if it looks valid. */
- if(depth > 0 && depth <= queue_size) {
- stats.queue_total_depth += depth;
- stats.queue_samples++;
- }
- do_queue_sample = (random() % FLEX_STATS_SAMPLE_INTERVAL) + 1;
+ /* Sample for statistics */
+ if(do_queue_sample-- == 0) {
+ int depth = queue->depth + 1;
+ // Depth can be erroneous due to thread races; use it only if it looks valid
+ if(depth > 0 && depth <= queue_size) {
+ stats.queue_total_depth += depth;
+ stats.queue_samples++;
}
-
- // Put the workitem on the queue for the commit thread.
- workqueue_add(queue, newitem);
- return ENGINE_EWOULDBLOCK;
+ do_queue_sample = (random() % FLEX_STATS_SAMPLE_INTERVAL) + 1;
+ }
+
+ /* Build the NDB transaction */
+ ENGINE_ERROR_CODE response_code;
+ op_status_t op_status = worker_prepare_operation(newitem);
+
+ switch(op_status) {
+ case op_async_prepared:
+ case op_async_sent:
+ workqueue_add(queue, newitem); // place item on queue
+ response_code = ENGINE_EWOULDBLOCK;
+ break;
+ case op_not_supported:
+ response_code = ENGINE_ENOTSUP;
+ break;
+ case op_failed:
+ response_code = ENGINE_FAILED;
+ break;
}
- else return ENGINE_ENOTSUP;
+
+ return response_code;
}
=== modified file 'storage/ndb/memcache/src/schedulers/Stockholm.cc'
--- a/storage/ndb/memcache/src/schedulers/Stockholm.cc 2011-05-17 04:55:39 +0000
+++ b/storage/ndb/memcache/src/schedulers/Stockholm.cc 2011-05-17 05:07:20 +0000
@@ -185,11 +185,24 @@ ENGINE_ERROR_CODE Scheduler_stockholm::s
if(! newitem->plan) return ENGINE_FAILED;
// Build the NDB transaction
- if(worker_prepare_operation(newitem)) {
- workqueue_add(cluster[c].queue, newitem); // place item on queue
- return ENGINE_EWOULDBLOCK;
+ op_status_t op_status = worker_prepare_operation(newitem);
+ ENGINE_ERROR_CODE response_code;
+
+ switch(op_status) {
+ case op_async_prepared:
+ case op_async_sent:
+ workqueue_add(cluster[c].queue, newitem); // place item on queue
+ response_code = ENGINE_EWOULDBLOCK;
+ break;
+ case op_not_supported:
+ response_code = ENGINE_ENOTSUP;
+ break;
+ case op_failed:
+ response_code = ENGINE_FAILED;
+ break;
}
- else return ENGINE_ENOTSUP;
+
+ return response_code;
}
@@ -240,6 +253,7 @@ void * run_stockholm_commit_thread(void
*/
void * Scheduler_stockholm::run_ndb_commit_thread(int c) {
workitem *item;
+ int polled;
DEBUG_ENTER();
@@ -247,12 +261,12 @@ void * Scheduler_stockholm::run_ndb_comm
/* Wait for something to appear on the queue */
item = (workitem *) workqueue_consumer_wait(cluster[c].queue);
- /* Sen & poll for response */
- item->ndb_instance->db->sendPollNdb(WAITFOR_RESPONSE_TIMEOUT, 1, 1);
- while(item->base.reschedule == 1) {
+ /* Send & poll for response; reschedule if needed */
+ do {
item->base.reschedule = 0;
- item->ndb_instance->db->sendPollNdb(WAITFOR_RESPONSE_TIMEOUT, 1, 1);
- }
+ polled = item->ndb_instance->db->sendPollNdb(10, 1, 1);
+ } while(item->base.reschedule || ! polled);
+
cluster[c].stats.cycles++;
if(! (cluster[c].stats.cycles % STAT_INTERVAL))
Attachment: [text/bzr-bundle] bzr/john.duncan@oracle.com-20110517050720-e8lzjvy4csn31g81.bundle
| Thread |
|---|
| • bzr commit into mysql-5.1-telco-7.2 branch (john.duncan:4178) | John David Duncan | 17 May |