From: John David Duncan Date: April 8 2011 8:16am Subject: bzr push into mysql-5.1-telco-7.2 branch (john.duncan:4156 to 4157) List-Archive: http://lists.mysql.com/commits/135029 Message-Id: <201104080816.p388GTF8025299@acsmt357.oracle.com> MIME-Version: 1.0 Content-Type: text/plain; charset="us-ascii" Content-Transfer-Encoding: 7bit 4157 John David Duncan 2011-04-08 Implement append & prepend; run-tests.sh now passes all tests. modified: storage/ndb/memcache/include/NdbInstance.h storage/ndb/memcache/include/Scheduler.h storage/ndb/memcache/run-tests.sh storage/ndb/memcache/src/NdbInstance.cc storage/ndb/memcache/src/ndb_engine.c storage/ndb/memcache/src/ndb_worker.cc storage/ndb/memcache/src/schedulers/Flex_cluster.cc 4156 Bernhard Ocklin 2011-04-08 fix memcached ndb_engine symbol clash modified: storage/ndb/memcache/unit/Makefile.am storage/ndb/memcache/unit/harness.cc === modified file 'storage/ndb/memcache/include/NdbInstance.h' --- a/storage/ndb/memcache/include/NdbInstance.h 2011-04-07 11:20:56 +0000 +++ b/storage/ndb/memcache/include/NdbInstance.h 2011-04-08 08:14:11 +0000 @@ -41,7 +41,6 @@ public: /* Public Instance Variables */ Ndb *db; - ndbmc_atomic32_t in_use; NdbInstance *next; protected: === modified file 'storage/ndb/memcache/include/Scheduler.h' --- a/storage/ndb/memcache/include/Scheduler.h 2011-04-06 04:31:28 +0000 +++ b/storage/ndb/memcache/include/Scheduler.h 2011-04-08 08:14:11 +0000 @@ -49,8 +49,9 @@ public: at pipeline initialization time. */ virtual void attach_thread(thread_identifier *) = 0; - /** schedule() is called from the NDB Engine thread when an operation - is ready to be queued for further async processing */ + /** schedule() is called from the NDB Engine thread when a workitem + is ready to be queued for further async processing. It will obtain + an Ndb object for the operation and send the workitem to be executed. */ virtual ENGINE_ERROR_CODE schedule(workitem *) = 0; /** io_completed() is called from the NDB Engine thread when an IO === modified file 'storage/ndb/memcache/run-tests.sh' --- a/storage/ndb/memcache/run-tests.sh 2011-03-30 06:54:53 +0000 +++ b/storage/ndb/memcache/run-tests.sh 2011-04-08 08:14:11 +0000 @@ -8,12 +8,14 @@ do_test() { if memcapable -T "$1" >/dev/null 2>&1 then - echo "[pass] $1" + r="pass" let npass+=1 else - echo "*** [FAIL] $1 ***" + r="FAIL" let nfail+=1 fi + + printf "%-40s[%s]\n" "$1" "$r" } skip_test() { @@ -28,58 +30,59 @@ do_test "ascii set" do_test "ascii set noreply" do_test "ascii get" do_test "ascii gets" +do_test "ascii mget" +do_test "ascii flush" +do_test "ascii flush noreply" do_test "ascii add" do_test "ascii add noreply" do_test "ascii replace" do_test "ascii replace noreply" +do_test "ascii cas" +do_test "ascii cas noreply" do_test "ascii delete" do_test "ascii delete noreply" +do_test "ascii incr" +do_test "ascii incr noreply" +do_test "ascii decr" +do_test "ascii decr noreply" +do_test "ascii append" +do_test "ascii append noreply" +do_test "ascii prepend" +do_test "ascii prepend noreply" do_test "ascii stat" -do_test "ascii mget" -do_test "ascii cas" # Passing Tests -- binary protocol do_test "binary noop" do_test "binary quit" do_test "binary quitq" -do_test "binary stat" -do_test "binary version" -do_test "binary get" -do_test "binary getq" -do_test "binary getk" -do_test "binary getkq" -do_test "binary delete" -do_test "binary deleteq" do_test "binary set" -do_test "binary setq" +do_test "binary setq" +do_test "binary flush" +do_test "binary flushq" do_test "binary add" do_test "binary addq" do_test "binary replace" do_test "binary replaceq" +do_test "binary delete" +do_test "binary deleteq" +do_test "binary get" +do_test "binary getq" +do_test "binary getk" +do_test "binary getkq" do_test "binary incr" do_test "binary incrq" do_test "binary decr" do_test "binary decrq" +do_test "binary version" +do_test "binary append" +do_test "binary appendq" +do_test "binary prepend" +do_test "binary prependq" +do_test "binary stat" -skip_test "ascii flush" -skip_test "ascii flush noreply" -skip_test "ascii incr" -skip_test "ascii incr noreply" -skip_test "ascii decr" -skip_test "ascii decr noreply" -skip_test "ascii append" -skip_test "ascii append noreply" -skip_test "ascii prepend" -skip_test "ascii prepend noreply" -skip_test "binary flush" -skip_test "binary flushq" -skip_test "binary append" -skip_test "binary appendq" -skip_test "binary prepend" -skip_test "binary prependq" echo === modified file 'storage/ndb/memcache/src/NdbInstance.cc' --- a/storage/ndb/memcache/src/NdbInstance.cc 2011-04-06 04:31:28 +0000 +++ b/storage/ndb/memcache/src/NdbInstance.cc 2011-04-08 08:14:11 +0000 @@ -32,7 +32,6 @@ NdbInstance::NdbInstance(Ndb_cluster_con int ntransactions) { nplans = nprefixes; db = new Ndb(c); - in_use = false; next = 0; plans = new QueryPlan *[nplans]; memset(plans, 0, (nplans * sizeof(QueryPlan *))); === modified file 'storage/ndb/memcache/src/ndb_engine.c' --- a/storage/ndb/memcache/src/ndb_engine.c 2011-04-07 11:20:56 +0000 +++ b/storage/ndb/memcache/src/ndb_engine.c 2011-04-08 08:14:11 +0000 @@ -614,14 +614,14 @@ static bool ndb_get_item_info(ENGINE_HAN /* Use the workitem. */ item_info->cas = wqitem->cas ? *(wqitem->cas) : 0; item_info->exptime = 0; - item_info->nbytes = wqitem->value_size; /* +2? */ + item_info->nbytes = wqitem->value_size; item_info->flags = 0; /* FIXME: need to get flags from the workitem */ item_info->clsid = slabs_clsid(default_handle(ndb_eng), wqitem->value_size); item_info->nkey = wqitem->base.nkey; item_info->nvalue = 1; /* how many iovecs */ item_info->key = wqitem->key; item_info->value[0].iov_base = wqitem->value_ptr; - item_info->value[0].iov_len = wqitem->value_size; /* +2? */ + item_info->value[0].iov_len = wqitem->value_size; DEBUG_PRINT("workitem %d.%d [%s].", wqitem->pipeline->id, wqitem->id, workitem_get_operation(wqitem)); return true; @@ -631,16 +631,21 @@ static bool ndb_get_item_info(ENGINE_HAN hash_item *it = (hash_item*) item; item_info->cas = hash_item_get_cas(it); item_info->exptime = it->exptime; - item_info->nbytes = wqitem ? wqitem->value_size : it->nbytes; + item_info->nbytes = wqitem ? wqitem->value_size : 0; item_info->flags = it->flags; item_info->clsid = it->slabs_clsid; item_info->nkey = it->nkey; item_info->nvalue = 1; item_info->key = hash_item_get_key(it); item_info->value[0].iov_base = hash_item_get_data(it); - item_info->value[0].iov_len = it->nbytes; - DEBUG_PRINT("hash_item [KEY: %.20s][CAS: %llu].", - hash_item_get_key(it), hash_item_get_cas(it)); + item_info->value[0].iov_len = item_info->nbytes; + if(item_info->nbytes) { + DEBUG_PRINT("hash_item [KEY: %.20s][CAS: %llu][nbytes: %d].", + hash_item_get_key(it), item_info->cas, item_info->nbytes); + } + else { + DEBUG_PRINT(" new hash_item"); + } return true; } } === modified file 'storage/ndb/memcache/src/ndb_worker.cc' --- a/storage/ndb/memcache/src/ndb_worker.cc 2011-04-06 04:31:28 +0000 +++ b/storage/ndb/memcache/src/ndb_worker.cc 2011-04-08 08:14:11 +0000 @@ -25,6 +25,7 @@ #include #include #include +#include /* Memcache headers */ #include "memcached/types.h" @@ -49,10 +50,12 @@ #include "hash_item_util.h" #include "ndb_worker.h" -void incrCallback(int, NdbTransaction *, void *); // callback for incr/decr -void flush_scan(int result, NdbTransaction *tx, void *itemptr); // callback for flush -void DBcallback(int, NdbTransaction *, void *); // callback for all others +typedef void ndb_async_callback(int, NdbTransaction *, void *); +ndb_async_callback incrCallback; // callback for incr/decr +ndb_async_callback rewriteCallback; // callback for append/prepend +ndb_async_callback DBcallback; // callback for all others + bool worker_do_read(workitem *, bool with_cas); bool worker_do_write(workitem *, bool with_cas); bool worker_do_delete(workitem *, bool with_cas); @@ -121,6 +124,8 @@ bool worker_prepare_operation(workitem * /* Jump table */ switch(newitem->base.verb) { case OP_READ: + case OPERATION_APPEND: + case OPERATION_PREPEND: r = worker_do_read(newitem, server_cas); break; @@ -139,8 +144,6 @@ bool worker_prepare_operation(workitem * r = worker_do_math(newitem, server_cas); break; - case OPERATION_APPEND: - case OPERATION_PREPEND: default: return false; /* not supported */ } @@ -190,8 +193,9 @@ bool worker_do_delete(workitem *wqitem, bool worker_do_write(workitem *wqitem, bool server_cas) { DEBUG_PRINT("%s", workitem_get_operation(wqitem)); - uint64_t cas_in = *wqitem->cas; // read old value - worker_set_cas(wqitem->pipeline, wqitem->cas); // generate a new value + uint64_t cas_in = *wqitem->cas; // read old value + worker_set_cas(wqitem->pipeline, wqitem->cas); // generate a new value + hash_item_set_cas(wqitem->cache_item, * wqitem->cas); // store it const NdbOperation *ndb_op = 0; QueryPlan *plan = wqitem->plan; @@ -202,7 +206,7 @@ bool worker_do_write(workitem *wqitem, b op.clearKeyNullBits(); op.setKeyPart(COL_STORE_KEY, dbkey, wqitem->base.nsuffix); - /* Allocate and encode the buffer for the row */ + /* Allocate and encode the buVALUffer for the row */ workitem_allocate_rowbuffer_1(wqitem, op.requiredBuffer()); op.buffer = wqitem->row_buffer_1; @@ -215,20 +219,32 @@ bool worker_do_write(workitem *wqitem, b op.setColumnBigUnsigned(COL_STORE_CAS, * wqitem->cas); // the cas } if(wqitem->plan->dup_numbers) { - uint64_t number; - if(safe_strtoull(hash_item_get_data(wqitem->cache_item), &number)) { - /* numeric value: also set the math column */ - op.setColumnBigUnsigned(COL_STORE_MATH, number); + if(isdigit(* hash_item_get_data(wqitem->cache_item)) && + wqitem->cache_item->nbytes < 32) { // Copy string representation + uint64_t number; + const int len = wqitem->cache_item->nbytes; + char value[32]; + for(size_t i = 0 ; i < len ; i++) + value[i] = * (hash_item_get_data(wqitem->cache_item) + i); + value[len] = 0; + if(safe_strtoull(value, &number)) { // numeric: set the math column + DEBUG_PRINT(" dup_numbers -- %d", (int) number ); + op.setColumnBigUnsigned(COL_STORE_MATH, number); + } + else { // non-numeric + DEBUG_PRINT(" dup_numbers but non-numeric: %s [%d] *** ", value, len); + op.setColumnNull(COL_STORE_MATH); + } } - else { - /* non-numeric: set the math column to null */ - op.setColumnNull(COL_STORE_MATH); - } + else op.setColumnNull(COL_STORE_MATH); } /* Start the transaction */ NdbTransaction *tx = op.startTransaction(); - DEBUG_ASSERT(tx); + if(! tx) { + logger->log(LOG_WARNING, 0, "tx: %s \n", plan->db->getNdbError().message); + DEBUG_ASSERT(false); + } if(wqitem->base.verb == OPERATION_REPLACE) { DEBUG_PRINT(" [REPLACE] \"%.20s\"", wqitem->key); @@ -301,10 +317,21 @@ bool worker_do_read(workitem *wqitem, bo tx->close(); return false; } - + /* Save the workitem in the transaction and prepare for async execution */ - tx->executeAsynch(NdbTransaction::Commit, DBcallback, (void *) wqitem, - NdbOperation::DefaultAbortOption, 1); + if(wqitem->base.verb == OPERATION_APPEND || wqitem->base.verb == OPERATION_PREPEND) + { + DEBUG_PRINT("In read() portion of APPEND. Value = %s", + hash_item_get_data(wqitem->cache_item)); + tx->executeAsynch(NdbTransaction::NoCommit, rewriteCallback, (void *) wqitem, + NdbOperation::DefaultAbortOption, 1); + } + else + { + tx->executeAsynch(NdbTransaction::Commit, DBcallback, (void *) wqitem, + NdbOperation::DefaultAbortOption, 1); + } + return true; } @@ -507,6 +534,8 @@ void DBcallback(int result, NdbTransacti case OPERATION_ADD: case OPERATION_REPLACE: case OPERATION_CAS: + case OPERATION_APPEND: + case OPERATION_PREPEND: finalize_write(wqitem, tx_did_match); break; case OP_DELETE: @@ -524,14 +553,100 @@ void DBcallback(int result, NdbTransacti pipeline->engine->server.cookie->notify_io_complete(wqitem->cookie, io_status); } else { - /* The workitem was allocated back in the engine thread; if it is used in - a callback, it will be freed there, too. But otherwise we free it here. - FIXME: also pop the workitem? + /* The workitem was allocated back in the engine thread; if used in a + callback, it would be freed there, too. But we must free it here. */ + pipeline->engine->server.cookie->store_engine_specific(wqitem->cookie, wqitem->previous); + pipeline_io_completed(pipeline, wqitem); workitem_free(wqitem); } } + +/* Middle-step callback for APPEND and PREPEND */ +void rewriteCallback(int result, NdbTransaction *tx, void *itemptr) { + workitem *item = (workitem *) itemptr; + DEBUG_PRINT("%d.%d", item->pipeline->id, item->id); + + /* Check the transaction status */ + if(tx->getNdbError().classification == NdbError::NoDataFound) { + item->status = & status_block_bad_replace; + tx->close(); + item->pipeline->engine->server.cookie->store_engine_specific(item->cookie, item); + item->pipeline->engine->server.cookie->notify_io_complete(item->cookie, ENGINE_SUCCESS); + return; + } + else if(tx->getNdbError().classification != NdbError::NoError) { + return DBcallback(result, tx, itemptr); + } + + /* Strings and lengths: */ + char * current_val = 0; + size_t current_len = 0; + const char * affix_val = hash_item_get_data(item->cache_item); + const size_t affix_len = item->cache_item->nbytes; + + /* worker_do_read() has already written the key into item->ndb_key_buffer. + The result is sitting in wqitem->row_buffer_1. + Read the value. + */ + Operation readop(item->plan, OP_READ); + readop.buffer = item->row_buffer_1; + assert(readop.nValues() == 1); + readop.getStringValueNoCopy(COL_STORE_VALUE + 0, & current_val, & current_len); + + /* Generate a new CAS */ + worker_set_cas(item->pipeline, item->cas); + hash_item_set_cas(item->cache_item, * item->cas); + + /* Prepare a write operation */ + Operation op(item->plan, item->base.verb, item->ndb_key_buffer); + const NdbOperation *ndb_op = 0; + + /* Allocate a buffer for the new value */ + size_t max_len = op.requiredBuffer(); + workitem_allocate_rowbuffer_2(item, max_len); + op.buffer = item->row_buffer_2; + + /* Rewrite the value */ + size_t total_len = affix_len + current_len; + if(total_len > max_len) total_len = max_len; + if(item->base.verb == OPERATION_APPEND) { + memcpy(current_val + current_len, affix_val, total_len - current_len); + } + else { + assert(item->base.verb == OPERATION_PREPEND); + memmove(current_val + affix_len, current_val, current_len); + memcpy(current_val, affix_val, affix_len); + } + * (current_val + total_len) = 0; + DEBUG_PRINT("New value: %s", current_val); + + /* Set the row */ + op.clearNullBits(); + op.setColumn(COL_STORE_KEY, workitem_get_key_suffix(item), item->base.nsuffix); + op.setColumn(COL_STORE_VALUE, current_val, total_len); + if(item->prefix_info.has_cas_col) + op.setColumnBigUnsigned(COL_STORE_CAS, * item->cas); + ndb_op = op.updateTuple(tx); + + /* Error case; operation has not been built */ + if(ndb_op) { + tx->executeAsynch(NdbTransaction::Commit, DBcallback, (void *) item, + NdbOperation::DefaultAbortOption, 1); + // fixme: this should call back into the scheduler! + item->ndb_instance->db->pollNdb(); + } + else { + DEBUG_PRINT("NDB operation failed. workitem %d.%d", item->pipeline->id, + item->id); + tx->close(); + // pipeline->scheduler->close(item); + workitem_free(item); + } +} + + /* Dedicated callback function for INCR and DECR operations */ void incrCallback(int result, NdbTransaction *tx, void *itemptr) { @@ -619,17 +734,16 @@ void incrCallback(int result, NdbTransac tx->close(); - // If this was a synchronous call, the server is waiting for us if(wqitem->base.is_sync) { wqitem->status = return_status; pipeline->engine->server.cookie->store_engine_specific(wqitem->cookie, wqitem); pipeline->engine->server.cookie->notify_io_complete(wqitem->cookie, io_status); } else { - /* The workitem was allocated back in the engine thread; if it is used in - a callback, it will be freed there, too. But otherwise we free it here. - FIXME: also pop the workitem? - */ + /* The workitem was allocated back in the engine thread; if used in a + callback, it would be freed there, too. But we must free it here. */ + pipeline->engine->server.cookie->store_engine_specific(wqitem->cookie, wqitem->previous); + pipeline_io_completed(pipeline, wqitem); workitem_free(wqitem); } } @@ -700,6 +814,7 @@ bool build_hash_item(workitem *wqitem, O && ! (op.isNull(COL_STORE_MATH))) { /* in dup_numbers mode, copy the math value */ ncopied = op.copyValue(COL_STORE_MATH, data_ptr); + ncopied-- ; // drop the trailing null } else { /* Build a result containing each column */ @@ -712,6 +827,7 @@ bool build_hash_item(workitem *wqitem, O /* pad the value with \r\n -- memcached expects it there. */ * (data_ptr + ncopied) = '\r'; * (data_ptr + ncopied + 1) = '\n'; + * (data_ptr + ncopied + 2) = '\0'; DEBUG_PRINT("nbytes: %d ncopied: %d", nbytes, ncopied + 2); /* Point to it in the workitem */ @@ -809,6 +925,11 @@ bool scan_delete(NdbInstance *inst, Quer NdbScanOperation *scan = tx->getNdbScanOperation(plan->table); scan->readTuplesExclusive(); + /* Notes: To securely scan a whole table, use an outer transaction only + for the scan, but take over each lock in an inner transaction (with a row + count) that deletes 1000 rows per transaction + */ + /* execute NoCommit */ if((res = tx->execute(NdbTransaction::NoCommit)) != 0) logger->log(LOG_WARNING, 0, "execute(NoCommit): %s\n", tx->getNdbError().message); === modified file 'storage/ndb/memcache/src/schedulers/Flex_cluster.cc' --- a/storage/ndb/memcache/src/schedulers/Flex_cluster.cc 2011-04-06 05:45:04 +0000 +++ b/storage/ndb/memcache/src/schedulers/Flex_cluster.cc 2011-04-08 08:14:11 +0000 @@ -216,9 +216,6 @@ void * Scheduler_flex::Cluster::run_comm /* Poll */ inst->db->pollNdb(); - - /* Now we are done with the instance */ - atomic_cmp_swap_int(& inst->in_use, true, false); // in_use = false } } No bundle (reason: useless for push emails).