4157 John David Duncan 2011-04-08
Implement append & prepend; run-tests.sh now passes all tests.
modified:
storage/ndb/memcache/include/NdbInstance.h
storage/ndb/memcache/include/Scheduler.h
storage/ndb/memcache/run-tests.sh
storage/ndb/memcache/src/NdbInstance.cc
storage/ndb/memcache/src/ndb_engine.c
storage/ndb/memcache/src/ndb_worker.cc
storage/ndb/memcache/src/schedulers/Flex_cluster.cc
4156 Bernhard Ocklin 2011-04-08
fix memcached ndb_engine symbol clash
modified:
storage/ndb/memcache/unit/Makefile.am
storage/ndb/memcache/unit/harness.cc
=== modified file 'storage/ndb/memcache/include/NdbInstance.h'
--- a/storage/ndb/memcache/include/NdbInstance.h 2011-04-07 11:20:56 +0000
+++ b/storage/ndb/memcache/include/NdbInstance.h 2011-04-08 08:14:11 +0000
@@ -41,7 +41,6 @@ public:
/* Public Instance Variables */
Ndb *db;
- ndbmc_atomic32_t in_use;
NdbInstance *next;
protected:
=== modified file 'storage/ndb/memcache/include/Scheduler.h'
--- a/storage/ndb/memcache/include/Scheduler.h 2011-04-06 04:31:28 +0000
+++ b/storage/ndb/memcache/include/Scheduler.h 2011-04-08 08:14:11 +0000
@@ -49,8 +49,9 @@ public:
at pipeline initialization time. */
virtual void attach_thread(thread_identifier *) = 0;
- /** schedule() is called from the NDB Engine thread when an operation
- is ready to be queued for further async processing */
+ /** schedule() is called from the NDB Engine thread when a workitem
+ is ready to be queued for further async processing. It will obtain
+ an Ndb object for the operation and send the workitem to be executed. */
virtual ENGINE_ERROR_CODE schedule(workitem *) = 0;
/** io_completed() is called from the NDB Engine thread when an IO
=== modified file 'storage/ndb/memcache/run-tests.sh'
--- a/storage/ndb/memcache/run-tests.sh 2011-03-30 06:54:53 +0000
+++ b/storage/ndb/memcache/run-tests.sh 2011-04-08 08:14:11 +0000
@@ -8,12 +8,14 @@ do_test() {
if
memcapable -T "$1" >/dev/null 2>&1
then
- echo "[pass] $1"
+ r="pass"
let npass+=1
else
- echo "*** [FAIL] $1 ***"
+ r="FAIL"
let nfail+=1
fi
+
+ printf "%-40s[%s]\n" "$1" "$r"
}
skip_test() {
@@ -28,58 +30,59 @@ do_test "ascii set"
do_test "ascii set noreply"
do_test "ascii get"
do_test "ascii gets"
+do_test "ascii mget"
+do_test "ascii flush"
+do_test "ascii flush noreply"
do_test "ascii add"
do_test "ascii add noreply"
do_test "ascii replace"
do_test "ascii replace noreply"
+do_test "ascii cas"
+do_test "ascii cas noreply"
do_test "ascii delete"
do_test "ascii delete noreply"
+do_test "ascii incr"
+do_test "ascii incr noreply"
+do_test "ascii decr"
+do_test "ascii decr noreply"
+do_test "ascii append"
+do_test "ascii append noreply"
+do_test "ascii prepend"
+do_test "ascii prepend noreply"
do_test "ascii stat"
-do_test "ascii mget"
-do_test "ascii cas"
# Passing Tests -- binary protocol
do_test "binary noop"
do_test "binary quit"
do_test "binary quitq"
-do_test "binary stat"
-do_test "binary version"
-do_test "binary get"
-do_test "binary getq"
-do_test "binary getk"
-do_test "binary getkq"
-do_test "binary delete"
-do_test "binary deleteq"
do_test "binary set"
-do_test "binary setq"
+do_test "binary setq"
+do_test "binary flush"
+do_test "binary flushq"
do_test "binary add"
do_test "binary addq"
do_test "binary replace"
do_test "binary replaceq"
+do_test "binary delete"
+do_test "binary deleteq"
+do_test "binary get"
+do_test "binary getq"
+do_test "binary getk"
+do_test "binary getkq"
do_test "binary incr"
do_test "binary incrq"
do_test "binary decr"
do_test "binary decrq"
+do_test "binary version"
+do_test "binary append"
+do_test "binary appendq"
+do_test "binary prepend"
+do_test "binary prependq"
+do_test "binary stat"
-skip_test "ascii flush"
-skip_test "ascii flush noreply"
-skip_test "ascii incr"
-skip_test "ascii incr noreply"
-skip_test "ascii decr"
-skip_test "ascii decr noreply"
-skip_test "ascii append"
-skip_test "ascii append noreply"
-skip_test "ascii prepend"
-skip_test "ascii prepend noreply"
-skip_test "binary flush"
-skip_test "binary flushq"
-skip_test "binary append"
-skip_test "binary appendq"
-skip_test "binary prepend"
-skip_test "binary prependq"
echo
=== modified file 'storage/ndb/memcache/src/NdbInstance.cc'
--- a/storage/ndb/memcache/src/NdbInstance.cc 2011-04-06 04:31:28 +0000
+++ b/storage/ndb/memcache/src/NdbInstance.cc 2011-04-08 08:14:11 +0000
@@ -32,7 +32,6 @@ NdbInstance::NdbInstance(Ndb_cluster_con
int ntransactions) {
nplans = nprefixes;
db = new Ndb(c);
- in_use = false;
next = 0;
plans = new QueryPlan *[nplans];
memset(plans, 0, (nplans * sizeof(QueryPlan *)));
=== modified file 'storage/ndb/memcache/src/ndb_engine.c'
--- a/storage/ndb/memcache/src/ndb_engine.c 2011-04-07 11:20:56 +0000
+++ b/storage/ndb/memcache/src/ndb_engine.c 2011-04-08 08:14:11 +0000
@@ -614,14 +614,14 @@ static bool ndb_get_item_info(ENGINE_HAN
/* Use the workitem. */
item_info->cas = wqitem->cas ? *(wqitem->cas) : 0;
item_info->exptime = 0;
- item_info->nbytes = wqitem->value_size; /* +2? */
+ item_info->nbytes = wqitem->value_size;
item_info->flags = 0; /* FIXME: need to get flags from the workitem */
item_info->clsid = slabs_clsid(default_handle(ndb_eng), wqitem->value_size);
item_info->nkey = wqitem->base.nkey;
item_info->nvalue = 1; /* how many iovecs */
item_info->key = wqitem->key;
item_info->value[0].iov_base = wqitem->value_ptr;
- item_info->value[0].iov_len = wqitem->value_size; /* +2? */
+ item_info->value[0].iov_len = wqitem->value_size;
DEBUG_PRINT("workitem %d.%d [%s].", wqitem->pipeline->id, wqitem->id,
workitem_get_operation(wqitem));
return true;
@@ -631,16 +631,21 @@ static bool ndb_get_item_info(ENGINE_HAN
hash_item *it = (hash_item*) item;
item_info->cas = hash_item_get_cas(it);
item_info->exptime = it->exptime;
- item_info->nbytes = wqitem ? wqitem->value_size : it->nbytes;
+ item_info->nbytes = wqitem ? wqitem->value_size : 0;
item_info->flags = it->flags;
item_info->clsid = it->slabs_clsid;
item_info->nkey = it->nkey;
item_info->nvalue = 1;
item_info->key = hash_item_get_key(it);
item_info->value[0].iov_base = hash_item_get_data(it);
- item_info->value[0].iov_len = it->nbytes;
- DEBUG_PRINT("hash_item [KEY: %.20s][CAS: %llu].",
- hash_item_get_key(it), hash_item_get_cas(it));
+ item_info->value[0].iov_len = item_info->nbytes;
+ if(item_info->nbytes) {
+ DEBUG_PRINT("hash_item [KEY: %.20s][CAS: %llu][nbytes: %d].",
+ hash_item_get_key(it), item_info->cas, item_info->nbytes);
+ }
+ else {
+ DEBUG_PRINT(" new hash_item");
+ }
return true;
}
}
=== modified file 'storage/ndb/memcache/src/ndb_worker.cc'
--- a/storage/ndb/memcache/src/ndb_worker.cc 2011-04-06 04:31:28 +0000
+++ b/storage/ndb/memcache/src/ndb_worker.cc 2011-04-08 08:14:11 +0000
@@ -25,6 +25,7 @@
#include <assert.h>
#include <string.h>
#include <inttypes.h>
+#include <ctype.h>
/* Memcache headers */
#include "memcached/types.h"
@@ -49,10 +50,12 @@
#include "hash_item_util.h"
#include "ndb_worker.h"
-void incrCallback(int, NdbTransaction *, void *); // callback for incr/decr
-void flush_scan(int result, NdbTransaction *tx, void *itemptr); // callback for flush
-void DBcallback(int, NdbTransaction *, void *); // callback for all others
+typedef void ndb_async_callback(int, NdbTransaction *, void *);
+ndb_async_callback incrCallback; // callback for incr/decr
+ndb_async_callback rewriteCallback; // callback for append/prepend
+ndb_async_callback DBcallback; // callback for all others
+
bool worker_do_read(workitem *, bool with_cas);
bool worker_do_write(workitem *, bool with_cas);
bool worker_do_delete(workitem *, bool with_cas);
@@ -121,6 +124,8 @@ bool worker_prepare_operation(workitem *
/* Jump table */
switch(newitem->base.verb) {
case OP_READ:
+ case OPERATION_APPEND:
+ case OPERATION_PREPEND:
r = worker_do_read(newitem, server_cas);
break;
@@ -139,8 +144,6 @@ bool worker_prepare_operation(workitem *
r = worker_do_math(newitem, server_cas);
break;
- case OPERATION_APPEND:
- case OPERATION_PREPEND:
default:
return false; /* not supported */
}
@@ -190,8 +193,9 @@ bool worker_do_delete(workitem *wqitem,
bool worker_do_write(workitem *wqitem, bool server_cas) {
DEBUG_PRINT("%s", workitem_get_operation(wqitem));
- uint64_t cas_in = *wqitem->cas; // read old value
- worker_set_cas(wqitem->pipeline, wqitem->cas); // generate a new value
+ uint64_t cas_in = *wqitem->cas; // read old value
+ worker_set_cas(wqitem->pipeline, wqitem->cas); // generate a new value
+ hash_item_set_cas(wqitem->cache_item, * wqitem->cas); // store it
const NdbOperation *ndb_op = 0;
QueryPlan *plan = wqitem->plan;
@@ -202,7 +206,7 @@ bool worker_do_write(workitem *wqitem, b
op.clearKeyNullBits();
op.setKeyPart(COL_STORE_KEY, dbkey, wqitem->base.nsuffix);
- /* Allocate and encode the buffer for the row */
+ /* Allocate and encode the buVALUffer for the row */
workitem_allocate_rowbuffer_1(wqitem, op.requiredBuffer());
op.buffer = wqitem->row_buffer_1;
@@ -215,20 +219,32 @@ bool worker_do_write(workitem *wqitem, b
op.setColumnBigUnsigned(COL_STORE_CAS, * wqitem->cas); // the cas
}
if(wqitem->plan->dup_numbers) {
- uint64_t number;
- if(safe_strtoull(hash_item_get_data(wqitem->cache_item), &number)) {
- /* numeric value: also set the math column */
- op.setColumnBigUnsigned(COL_STORE_MATH, number);
+ if(isdigit(* hash_item_get_data(wqitem->cache_item)) &&
+ wqitem->cache_item->nbytes < 32) { // Copy string representation
+ uint64_t number;
+ const int len = wqitem->cache_item->nbytes;
+ char value[32];
+ for(size_t i = 0 ; i < len ; i++)
+ value[i] = * (hash_item_get_data(wqitem->cache_item) + i);
+ value[len] = 0;
+ if(safe_strtoull(value, &number)) { // numeric: set the math column
+ DEBUG_PRINT(" dup_numbers -- %d", (int) number );
+ op.setColumnBigUnsigned(COL_STORE_MATH, number);
+ }
+ else { // non-numeric
+ DEBUG_PRINT(" dup_numbers but non-numeric: %s [%d] *** ", value, len);
+ op.setColumnNull(COL_STORE_MATH);
+ }
}
- else {
- /* non-numeric: set the math column to null */
- op.setColumnNull(COL_STORE_MATH);
- }
+ else op.setColumnNull(COL_STORE_MATH);
}
/* Start the transaction */
NdbTransaction *tx = op.startTransaction();
- DEBUG_ASSERT(tx);
+ if(! tx) {
+ logger->log(LOG_WARNING, 0, "tx: %s \n", plan->db->getNdbError().message);
+ DEBUG_ASSERT(false);
+ }
if(wqitem->base.verb == OPERATION_REPLACE) {
DEBUG_PRINT(" [REPLACE] \"%.20s\"", wqitem->key);
@@ -301,10 +317,21 @@ bool worker_do_read(workitem *wqitem, bo
tx->close();
return false;
}
-
+
/* Save the workitem in the transaction and prepare for async execution */
- tx->executeAsynch(NdbTransaction::Commit, DBcallback, (void *) wqitem,
- NdbOperation::DefaultAbortOption, 1);
+ if(wqitem->base.verb == OPERATION_APPEND || wqitem->base.verb == OPERATION_PREPEND)
+ {
+ DEBUG_PRINT("In read() portion of APPEND. Value = %s",
+ hash_item_get_data(wqitem->cache_item));
+ tx->executeAsynch(NdbTransaction::NoCommit, rewriteCallback, (void *) wqitem,
+ NdbOperation::DefaultAbortOption, 1);
+ }
+ else
+ {
+ tx->executeAsynch(NdbTransaction::Commit, DBcallback, (void *) wqitem,
+ NdbOperation::DefaultAbortOption, 1);
+ }
+
return true;
}
@@ -507,6 +534,8 @@ void DBcallback(int result, NdbTransacti
case OPERATION_ADD:
case OPERATION_REPLACE:
case OPERATION_CAS:
+ case OPERATION_APPEND:
+ case OPERATION_PREPEND:
finalize_write(wqitem, tx_did_match);
break;
case OP_DELETE:
@@ -524,14 +553,100 @@ void DBcallback(int result, NdbTransacti
pipeline->engine->server.cookie->notify_io_complete(wqitem->cookie, io_status);
}
else {
- /* The workitem was allocated back in the engine thread; if it is used in
- a callback, it will be freed there, too. But otherwise we free it here.
- FIXME: also pop the workitem?
+ /* The workitem was allocated back in the engine thread; if used in a
+ callback, it would be freed there, too. But we must free it here.
*/
+ pipeline->engine->server.cookie->store_engine_specific(wqitem->cookie, wqitem->previous);
+ pipeline_io_completed(pipeline, wqitem);
workitem_free(wqitem);
}
}
+
+/* Middle-step callback for APPEND and PREPEND */
+void rewriteCallback(int result, NdbTransaction *tx, void *itemptr) {
+ workitem *item = (workitem *) itemptr;
+ DEBUG_PRINT("%d.%d", item->pipeline->id, item->id);
+
+ /* Check the transaction status */
+ if(tx->getNdbError().classification == NdbError::NoDataFound) {
+ item->status = & status_block_bad_replace;
+ tx->close();
+ item->pipeline->engine->server.cookie->store_engine_specific(item->cookie, item);
+ item->pipeline->engine->server.cookie->notify_io_complete(item->cookie, ENGINE_SUCCESS);
+ return;
+ }
+ else if(tx->getNdbError().classification != NdbError::NoError) {
+ return DBcallback(result, tx, itemptr);
+ }
+
+ /* Strings and lengths: */
+ char * current_val = 0;
+ size_t current_len = 0;
+ const char * affix_val = hash_item_get_data(item->cache_item);
+ const size_t affix_len = item->cache_item->nbytes;
+
+ /* worker_do_read() has already written the key into item->ndb_key_buffer.
+ The result is sitting in wqitem->row_buffer_1.
+ Read the value.
+ */
+ Operation readop(item->plan, OP_READ);
+ readop.buffer = item->row_buffer_1;
+ assert(readop.nValues() == 1);
+ readop.getStringValueNoCopy(COL_STORE_VALUE + 0, & current_val, & current_len);
+
+ /* Generate a new CAS */
+ worker_set_cas(item->pipeline, item->cas);
+ hash_item_set_cas(item->cache_item, * item->cas);
+
+ /* Prepare a write operation */
+ Operation op(item->plan, item->base.verb, item->ndb_key_buffer);
+ const NdbOperation *ndb_op = 0;
+
+ /* Allocate a buffer for the new value */
+ size_t max_len = op.requiredBuffer();
+ workitem_allocate_rowbuffer_2(item, max_len);
+ op.buffer = item->row_buffer_2;
+
+ /* Rewrite the value */
+ size_t total_len = affix_len + current_len;
+ if(total_len > max_len) total_len = max_len;
+ if(item->base.verb == OPERATION_APPEND) {
+ memcpy(current_val + current_len, affix_val, total_len - current_len);
+ }
+ else {
+ assert(item->base.verb == OPERATION_PREPEND);
+ memmove(current_val + affix_len, current_val, current_len);
+ memcpy(current_val, affix_val, affix_len);
+ }
+ * (current_val + total_len) = 0;
+ DEBUG_PRINT("New value: %s", current_val);
+
+ /* Set the row */
+ op.clearNullBits();
+ op.setColumn(COL_STORE_KEY, workitem_get_key_suffix(item), item->base.nsuffix);
+ op.setColumn(COL_STORE_VALUE, current_val, total_len);
+ if(item->prefix_info.has_cas_col)
+ op.setColumnBigUnsigned(COL_STORE_CAS, * item->cas);
+ ndb_op = op.updateTuple(tx);
+
+ /* Error case; operation has not been built */
+ if(ndb_op) {
+ tx->executeAsynch(NdbTransaction::Commit, DBcallback, (void *) item,
+ NdbOperation::DefaultAbortOption, 1);
+ // fixme: this should call back into the scheduler!
+ item->ndb_instance->db->pollNdb();
+ }
+ else {
+ DEBUG_PRINT("NDB operation failed. workitem %d.%d", item->pipeline->id,
+ item->id);
+ tx->close();
+ // pipeline->scheduler->close(item);
+ workitem_free(item);
+ }
+}
+
+
/* Dedicated callback function for INCR and DECR operations
*/
void incrCallback(int result, NdbTransaction *tx, void *itemptr) {
@@ -619,17 +734,16 @@ void incrCallback(int result, NdbTransac
tx->close();
- // If this was a synchronous call, the server is waiting for us
if(wqitem->base.is_sync) {
wqitem->status = return_status;
pipeline->engine->server.cookie->store_engine_specific(wqitem->cookie, wqitem);
pipeline->engine->server.cookie->notify_io_complete(wqitem->cookie, io_status);
}
else {
- /* The workitem was allocated back in the engine thread; if it is used in
- a callback, it will be freed there, too. But otherwise we free it here.
- FIXME: also pop the workitem?
- */
+ /* The workitem was allocated back in the engine thread; if used in a
+ callback, it would be freed there, too. But we must free it here. */
+ pipeline->engine->server.cookie->store_engine_specific(wqitem->cookie, wqitem->previous);
+ pipeline_io_completed(pipeline, wqitem);
workitem_free(wqitem);
}
}
@@ -700,6 +814,7 @@ bool build_hash_item(workitem *wqitem, O
&& ! (op.isNull(COL_STORE_MATH))) {
/* in dup_numbers mode, copy the math value */
ncopied = op.copyValue(COL_STORE_MATH, data_ptr);
+ ncopied-- ; // drop the trailing null
}
else {
/* Build a result containing each column */
@@ -712,6 +827,7 @@ bool build_hash_item(workitem *wqitem, O
/* pad the value with \r\n -- memcached expects it there. */
* (data_ptr + ncopied) = '\r';
* (data_ptr + ncopied + 1) = '\n';
+ * (data_ptr + ncopied + 2) = '\0';
DEBUG_PRINT("nbytes: %d ncopied: %d", nbytes, ncopied + 2);
/* Point to it in the workitem */
@@ -809,6 +925,11 @@ bool scan_delete(NdbInstance *inst, Quer
NdbScanOperation *scan = tx->getNdbScanOperation(plan->table);
scan->readTuplesExclusive();
+ /* Notes: To securely scan a whole table, use an outer transaction only
+ for the scan, but take over each lock in an inner transaction (with a row
+ count) that deletes 1000 rows per transaction
+ */
+
/* execute NoCommit */
if((res = tx->execute(NdbTransaction::NoCommit)) != 0)
logger->log(LOG_WARNING, 0, "execute(NoCommit): %s\n", tx->getNdbError().message);
=== modified file 'storage/ndb/memcache/src/schedulers/Flex_cluster.cc'
--- a/storage/ndb/memcache/src/schedulers/Flex_cluster.cc 2011-04-06 05:45:04 +0000
+++ b/storage/ndb/memcache/src/schedulers/Flex_cluster.cc 2011-04-08 08:14:11 +0000
@@ -216,9 +216,6 @@ void * Scheduler_flex::Cluster::run_comm
/* Poll */
inst->db->pollNdb();
-
- /* Now we are done with the instance */
- atomic_cmp_swap_int(& inst->in_use, true, false); // in_use = false
}
}
No bundle (reason: useless for push emails).
| Thread |
|---|
| • bzr push into mysql-5.1-telco-7.2 branch (john.duncan:4156 to 4157) | John David Duncan | 8 Apr |