List:Commits« Previous MessageNext Message »
From:John David Duncan Date:April 8 2011 8:14am
Subject:bzr commit into mysql-5.1-telco-7.2 branch (john.duncan:4157)
View as plain text  
#At file:///Users/jdd/bzr-repo/working/cluster-7.2-bernd/ based on revid:bocklin@stripped

 4157 John David Duncan	2011-04-08
      Implement append & prepend; run-tests.sh now passes all tests.

    modified:
      storage/ndb/memcache/include/NdbInstance.h
      storage/ndb/memcache/include/Scheduler.h
      storage/ndb/memcache/run-tests.sh
      storage/ndb/memcache/src/NdbInstance.cc
      storage/ndb/memcache/src/ndb_engine.c
      storage/ndb/memcache/src/ndb_worker.cc
      storage/ndb/memcache/src/schedulers/Flex_cluster.cc
=== modified file 'storage/ndb/memcache/include/NdbInstance.h'
--- a/storage/ndb/memcache/include/NdbInstance.h	2011-04-07 11:20:56 +0000
+++ b/storage/ndb/memcache/include/NdbInstance.h	2011-04-08 08:14:11 +0000
@@ -41,7 +41,6 @@ public:
 
   /* Public Instance Variables */  
   Ndb *db;
-  ndbmc_atomic32_t in_use;
   NdbInstance *next;
  
 protected:

=== modified file 'storage/ndb/memcache/include/Scheduler.h'
--- a/storage/ndb/memcache/include/Scheduler.h	2011-04-06 04:31:28 +0000
+++ b/storage/ndb/memcache/include/Scheduler.h	2011-04-08 08:14:11 +0000
@@ -49,8 +49,9 @@ public:
       at pipeline initialization time. */
   virtual void attach_thread(thread_identifier *) = 0;
 
-  /** schedule() is called from the NDB Engine thread when an operation
-      is ready to be queued for further async processing */
+  /** schedule() is called from the NDB Engine thread when a workitem
+      is ready to be queued for further async processing.  It will obtain
+      an Ndb object for the operation and send the workitem to be executed. */ 
   virtual ENGINE_ERROR_CODE schedule(workitem *) = 0;
 
   /** io_completed() is called from the NDB Engine thread when an IO

=== modified file 'storage/ndb/memcache/run-tests.sh'
--- a/storage/ndb/memcache/run-tests.sh	2011-03-30 06:54:53 +0000
+++ b/storage/ndb/memcache/run-tests.sh	2011-04-08 08:14:11 +0000
@@ -8,12 +8,14 @@ do_test() {
   if 
     memcapable -T "$1" >/dev/null 2>&1
   then 
-    echo "[pass]   $1"
+    r="pass"
     let npass+=1
   else
-    echo "*** [FAIL]   $1 ***"
+    r="FAIL"
     let nfail+=1
   fi
+  
+  printf "%-40s[%s]\n" "$1" "$r"
 }
 
 skip_test() { 
@@ -28,58 +30,59 @@ do_test          "ascii set"
 do_test          "ascii set noreply"
 do_test          "ascii get"
 do_test          "ascii gets"
+do_test          "ascii mget"
+do_test          "ascii flush"
+do_test          "ascii flush noreply"
 do_test          "ascii add"
 do_test          "ascii add noreply"
 do_test          "ascii replace"
 do_test          "ascii replace noreply"
+do_test          "ascii cas"  
+do_test          "ascii cas noreply" 
 do_test          "ascii delete"
 do_test          "ascii delete noreply"
+do_test          "ascii incr"                        
+do_test          "ascii incr noreply"                 
+do_test          "ascii decr"                              
+do_test          "ascii decr noreply"                      
+do_test          "ascii append"                            
+do_test          "ascii append noreply"                    
+do_test          "ascii prepend"                   
+do_test          "ascii prepend noreply"           
 do_test          "ascii stat"
-do_test          "ascii mget"
-do_test          "ascii cas"   
 
 
 # Passing Tests -- binary protocol
 do_test          "binary noop"
 do_test          "binary quit"
 do_test          "binary quitq"
-do_test          "binary stat"
-do_test          "binary version"
-do_test          "binary get"                 
-do_test          "binary getq"                 
-do_test          "binary getk"                 
-do_test          "binary getkq"    
-do_test          "binary delete"                   
-do_test          "binary deleteq"                   
 do_test          "binary set"                              
-do_test          "binary setq"      
+do_test          "binary setq"
+do_test          "binary flush"                           
+do_test          "binary flushq"           
 do_test          "binary add"                       
 do_test          "binary addq"                        
 do_test          "binary replace"                     
 do_test          "binary replaceq"
+do_test          "binary delete"                   
+do_test          "binary deleteq"
+do_test          "binary get"
+do_test          "binary getq"
+do_test          "binary getk"
+do_test          "binary getkq"
 do_test          "binary incr"
 do_test          "binary incrq"
 do_test          "binary decr"
 do_test          "binary decrq"
+do_test          "binary version"
+do_test          "binary append"
+do_test          "binary appendq"
+do_test          "binary prepend"
+do_test          "binary prependq"
+do_test          "binary stat"
 
-skip_test "ascii flush"                 
-skip_test "ascii flush noreply"            
-skip_test "ascii incr"                        
-skip_test "ascii incr noreply"                 
-skip_test "ascii decr"                              
-skip_test "ascii decr noreply"                      
-skip_test "ascii append"                            
-skip_test "ascii append noreply"                    
-skip_test "ascii prepend"                   
-skip_test "ascii prepend noreply"                   
 
                
-skip_test "binary flush"                           
-skip_test "binary flushq"      
-skip_test "binary append"               
-skip_test "binary appendq"              
-skip_test "binary prepend"             
-skip_test "binary prependq"     
 
 
 echo 

=== modified file 'storage/ndb/memcache/src/NdbInstance.cc'
--- a/storage/ndb/memcache/src/NdbInstance.cc	2011-04-06 04:31:28 +0000
+++ b/storage/ndb/memcache/src/NdbInstance.cc	2011-04-08 08:14:11 +0000
@@ -32,7 +32,6 @@ NdbInstance::NdbInstance(Ndb_cluster_con
                          int ntransactions) {
   nplans = nprefixes;
   db = new Ndb(c);
-  in_use = false;
   next = 0;
   plans = new QueryPlan *[nplans];
   memset(plans, 0, (nplans * sizeof(QueryPlan *)));

=== modified file 'storage/ndb/memcache/src/ndb_engine.c'
--- a/storage/ndb/memcache/src/ndb_engine.c	2011-04-07 11:20:56 +0000
+++ b/storage/ndb/memcache/src/ndb_engine.c	2011-04-08 08:14:11 +0000
@@ -614,14 +614,14 @@ static bool ndb_get_item_info(ENGINE_HAN
     /* Use the workitem. */
     item_info->cas = wqitem->cas ? *(wqitem->cas) : 0;
     item_info->exptime = 0;  
-    item_info->nbytes = wqitem->value_size; /* +2? */
+    item_info->nbytes = wqitem->value_size;
     item_info->flags = 0;  /* FIXME: need to get flags from the workitem */
     item_info->clsid = slabs_clsid(default_handle(ndb_eng), wqitem->value_size);
     item_info->nkey = wqitem->base.nkey;
     item_info->nvalue = 1;  /* how many iovecs */
     item_info->key = wqitem->key;
     item_info->value[0].iov_base = wqitem->value_ptr;
-    item_info->value[0].iov_len = wqitem->value_size;  /* +2? */    
+    item_info->value[0].iov_len = wqitem->value_size;   
     DEBUG_PRINT("workitem %d.%d [%s].", wqitem->pipeline->id, wqitem->id, 
                        workitem_get_operation(wqitem));
     return true;
@@ -631,16 +631,21 @@ static bool ndb_get_item_info(ENGINE_HAN
     hash_item *it = (hash_item*) item;
     item_info->cas = hash_item_get_cas(it);
     item_info->exptime = it->exptime;
-    item_info->nbytes = wqitem ? wqitem->value_size : it->nbytes;
+    item_info->nbytes = wqitem ? wqitem->value_size : 0;
     item_info->flags = it->flags;
     item_info->clsid = it->slabs_clsid;
     item_info->nkey = it->nkey;
     item_info->nvalue = 1;
     item_info->key = hash_item_get_key(it);
     item_info->value[0].iov_base = hash_item_get_data(it);
-    item_info->value[0].iov_len = it->nbytes;    
-    DEBUG_PRINT("hash_item [KEY: %.20s][CAS: %llu].", 
-                       hash_item_get_key(it), hash_item_get_cas(it));
+    item_info->value[0].iov_len = item_info->nbytes;
+    if(item_info->nbytes) {
+      DEBUG_PRINT("hash_item [KEY: %.20s][CAS: %llu][nbytes: %d].", 
+                  hash_item_get_key(it), item_info->cas, item_info->nbytes);
+    }
+    else {
+      DEBUG_PRINT(" new hash_item");
+    }
     return true;
   }
 }

=== modified file 'storage/ndb/memcache/src/ndb_worker.cc'
--- a/storage/ndb/memcache/src/ndb_worker.cc	2011-04-06 04:31:28 +0000
+++ b/storage/ndb/memcache/src/ndb_worker.cc	2011-04-08 08:14:11 +0000
@@ -25,6 +25,7 @@
 #include <assert.h>
 #include <string.h>
 #include <inttypes.h>
+#include <ctype.h>
 
 /* Memcache headers */
 #include "memcached/types.h"
@@ -49,10 +50,12 @@
 #include "hash_item_util.h"
 #include "ndb_worker.h"
 
-void incrCallback(int, NdbTransaction *, void *); // callback for incr/decr
-void flush_scan(int result, NdbTransaction *tx, void *itemptr);  // callback for flush
-void DBcallback(int, NdbTransaction *, void *);   // callback for all others
+typedef void ndb_async_callback(int, NdbTransaction *, void *);
 
+ndb_async_callback incrCallback;     // callback for incr/decr
+ndb_async_callback rewriteCallback;  // callback for append/prepend
+ndb_async_callback DBcallback;       // callback for all others
+ 
 bool worker_do_read(workitem *, bool with_cas); 
 bool worker_do_write(workitem *, bool with_cas); 
 bool worker_do_delete(workitem *, bool with_cas); 
@@ -121,6 +124,8 @@ bool worker_prepare_operation(workitem *
   /* Jump table */
   switch(newitem->base.verb) {
     case OP_READ:
+    case OPERATION_APPEND:
+    case OPERATION_PREPEND:
       r = worker_do_read(newitem, server_cas);
       break;
 
@@ -139,8 +144,6 @@ bool worker_prepare_operation(workitem *
       r = worker_do_math(newitem, server_cas);
       break;
       
-    case OPERATION_APPEND:
-    case OPERATION_PREPEND:
     default:
       return false;   /* not supported */
   }
@@ -190,8 +193,9 @@ bool worker_do_delete(workitem *wqitem, 
 bool worker_do_write(workitem *wqitem, bool server_cas) {
   DEBUG_PRINT("%s", workitem_get_operation(wqitem));
 
-  uint64_t cas_in = *wqitem->cas;                      // read old value
-  worker_set_cas(wqitem->pipeline, wqitem->cas);       // generate a new value
+  uint64_t cas_in = *wqitem->cas;                       // read old value
+  worker_set_cas(wqitem->pipeline, wqitem->cas);        // generate a new value
+  hash_item_set_cas(wqitem->cache_item, * wqitem->cas); // store it
 
   const NdbOperation *ndb_op = 0;
   QueryPlan *plan = wqitem->plan;
@@ -202,7 +206,7 @@ bool worker_do_write(workitem *wqitem, b
   op.clearKeyNullBits();
   op.setKeyPart(COL_STORE_KEY, dbkey, wqitem->base.nsuffix);
 
-  /* Allocate and encode the buffer for the row */ 
+  /* Allocate and encode the buVALUffer for the row */ 
   workitem_allocate_rowbuffer_1(wqitem, op.requiredBuffer());
   op.buffer = wqitem->row_buffer_1;
 
@@ -215,20 +219,32 @@ bool worker_do_write(workitem *wqitem, b
     op.setColumnBigUnsigned(COL_STORE_CAS, * wqitem->cas);   // the cas
   }
   if(wqitem->plan->dup_numbers) {
-    uint64_t number;
-    if(safe_strtoull(hash_item_get_data(wqitem->cache_item), &number)) {
-      /* numeric value: also set the math column */
-      op.setColumnBigUnsigned(COL_STORE_MATH, number);
+    if(isdigit(* hash_item_get_data(wqitem->cache_item)) && 
+       wqitem->cache_item->nbytes < 32) {      // Copy string representation 
+      uint64_t number;
+      const int len = wqitem->cache_item->nbytes;
+      char value[32];
+      for(size_t i = 0 ; i  < len ; i++) 
+        value[i] = * (hash_item_get_data(wqitem->cache_item) + i); 
+      value[len] = 0;
+      if(safe_strtoull(value, &number)) { // numeric: set the math column
+        DEBUG_PRINT(" dup_numbers -- %d", (int) number );
+        op.setColumnBigUnsigned(COL_STORE_MATH, number);
+      }
+      else {  // non-numeric
+        DEBUG_PRINT(" dup_numbers but non-numeric: %s [%d] *** ", value, len);
+        op.setColumnNull(COL_STORE_MATH);
+      }
     }
-    else {
-      /* non-numeric: set the math column to null */
-      op.setColumnNull(COL_STORE_MATH);
-    }  
+    else op.setColumnNull(COL_STORE_MATH);      
   }
 
   /* Start the transaction */
   NdbTransaction *tx = op.startTransaction();
-  DEBUG_ASSERT(tx);
+  if(! tx) {
+    logger->log(LOG_WARNING, 0, "tx: %s \n", plan->db->getNdbError().message);
+    DEBUG_ASSERT(false);
+  }
   
   if(wqitem->base.verb == OPERATION_REPLACE) {
     DEBUG_PRINT(" [REPLACE] \"%.20s\"", wqitem->key);
@@ -301,10 +317,21 @@ bool worker_do_read(workitem *wqitem, bo
     tx->close();
     return false;
   }
-    
+
   /* Save the workitem in the transaction and prepare for async execution */   
-  tx->executeAsynch(NdbTransaction::Commit, DBcallback, (void *) wqitem,
-                    NdbOperation::DefaultAbortOption, 1);
+  if(wqitem->base.verb == OPERATION_APPEND || wqitem->base.verb == OPERATION_PREPEND) 
+  {
+    DEBUG_PRINT("In read() portion of APPEND.  Value = %s",  
+                hash_item_get_data(wqitem->cache_item));
+    tx->executeAsynch(NdbTransaction::NoCommit, rewriteCallback, (void *) wqitem,
+                      NdbOperation::DefaultAbortOption, 1);
+  }
+  else 
+  {
+    tx->executeAsynch(NdbTransaction::Commit, DBcallback, (void *) wqitem,
+                      NdbOperation::DefaultAbortOption, 1);
+  }
+
   return true;
 }
 
@@ -507,6 +534,8 @@ void DBcallback(int result, NdbTransacti
     case OPERATION_ADD:
     case OPERATION_REPLACE:
     case OPERATION_CAS:
+    case OPERATION_APPEND:
+    case OPERATION_PREPEND:
       finalize_write(wqitem, tx_did_match);
       break;
     case OP_DELETE:
@@ -524,14 +553,100 @@ void DBcallback(int result, NdbTransacti
     pipeline->engine->server.cookie->notify_io_complete(wqitem->cookie, io_status);    
   }
   else {
-    /* The workitem was allocated back in the engine thread; if it is used in
-       a callback, it will be freed there, too.  But otherwise we free it here.
-       FIXME: also pop the workitem?
+    /* The workitem was allocated back in the engine thread; if used in a
+       callback, it would be freed there, too.  But we must free it here.
     */
+    pipeline->engine->server.cookie->store_engine_specific(wqitem->cookie, wqitem->previous);
+    pipeline_io_completed(pipeline, wqitem);
     workitem_free(wqitem);
   }
 }
 
+
+/* Middle-step callback for APPEND and PREPEND */
+void rewriteCallback(int result, NdbTransaction *tx, void *itemptr) {
+  workitem *item = (workitem *) itemptr;
+  DEBUG_PRINT("%d.%d", item->pipeline->id, item->id);
+ 
+  /* Check the transaction status */
+  if(tx->getNdbError().classification == NdbError::NoDataFound) {
+    item->status = & status_block_bad_replace;
+    tx->close();
+    item->pipeline->engine->server.cookie->store_engine_specific(item->cookie, item); 
+    item->pipeline->engine->server.cookie->notify_io_complete(item->cookie, ENGINE_SUCCESS); 
+    return;
+  }
+  else if(tx->getNdbError().classification != NdbError::NoError) {
+    return DBcallback(result, tx, itemptr);
+  }  
+
+  /* Strings and lengths: */
+  char * current_val = 0; 
+  size_t current_len = 0;
+  const char * affix_val = hash_item_get_data(item->cache_item);
+  const size_t affix_len = item->cache_item->nbytes;
+  
+  /* worker_do_read() has already written the key into item->ndb_key_buffer. 
+     The result is sitting in wqitem->row_buffer_1. 
+     Read the value.
+  */  
+  Operation readop(item->plan, OP_READ);
+  readop.buffer = item->row_buffer_1;
+  assert(readop.nValues() == 1);
+  readop.getStringValueNoCopy(COL_STORE_VALUE + 0, & current_val, & current_len);
+    
+  /* Generate a new CAS */
+  worker_set_cas(item->pipeline, item->cas);  
+  hash_item_set_cas(item->cache_item, * item->cas);
+
+  /* Prepare a write operation */
+  Operation op(item->plan, item->base.verb, item->ndb_key_buffer);
+  const NdbOperation *ndb_op = 0;  
+  
+  /* Allocate a buffer for the new value */ 
+  size_t max_len = op.requiredBuffer();
+  workitem_allocate_rowbuffer_2(item, max_len);
+  op.buffer = item->row_buffer_2;
+
+  /* Rewrite the value */
+  size_t total_len = affix_len + current_len;
+  if(total_len > max_len) total_len = max_len;
+  if(item->base.verb == OPERATION_APPEND) {
+    memcpy(current_val + current_len, affix_val, total_len - current_len);
+  }
+  else {
+    assert(item->base.verb == OPERATION_PREPEND);
+    memmove(current_val + affix_len, current_val, current_len);
+    memcpy(current_val, affix_val, affix_len); 
+  }
+  * (current_val + total_len) = 0;
+  DEBUG_PRINT("New value: %s", current_val);
+  
+  /* Set the row */
+  op.clearNullBits();
+  op.setColumn(COL_STORE_KEY, workitem_get_key_suffix(item), item->base.nsuffix);
+  op.setColumn(COL_STORE_VALUE, current_val, total_len);
+  if(item->prefix_info.has_cas_col) 
+    op.setColumnBigUnsigned(COL_STORE_CAS, * item->cas);
+  ndb_op = op.updateTuple(tx);
+
+  /* Error case; operation has not been built */
+  if(ndb_op) {
+    tx->executeAsynch(NdbTransaction::Commit, DBcallback, (void *) item,
+                      NdbOperation::DefaultAbortOption, 1);  
+    // fixme: this should call back into the scheduler!
+    item->ndb_instance->db->pollNdb();
+  }
+  else {
+    DEBUG_PRINT("NDB operation failed.  workitem %d.%d", item->pipeline->id,
+                item->id);
+    tx->close();
+    // pipeline->scheduler->close(item);
+    workitem_free(item);
+  }
+}
+
+
 /* Dedicated callback function for INCR and DECR operations
 */
 void incrCallback(int result, NdbTransaction *tx, void *itemptr) {
@@ -619,17 +734,16 @@ void incrCallback(int result, NdbTransac
   
   tx->close();
   
-  // If this was a synchronous call, the server is waiting for us 
   if(wqitem->base.is_sync) {
     wqitem->status = return_status;
     pipeline->engine->server.cookie->store_engine_specific(wqitem->cookie, wqitem); 
     pipeline->engine->server.cookie->notify_io_complete(wqitem->cookie, io_status);    
   }
   else {
-    /* The workitem was allocated back in the engine thread; if it is used in
-     a callback, it will be freed there, too.  But otherwise we free it here.
-     FIXME: also pop the workitem?
-     */
+    /* The workitem was allocated back in the engine thread; if used in a
+       callback, it would be freed there, too.  But we must free it here.  */
+    pipeline->engine->server.cookie->store_engine_specific(wqitem->cookie, wqitem->previous);
+    pipeline_io_completed(pipeline, wqitem);
     workitem_free(wqitem);
   }
 }
@@ -700,6 +814,7 @@ bool build_hash_item(workitem *wqitem, O
        && ! (op.isNull(COL_STORE_MATH))) {
       /* in dup_numbers mode, copy the math value */
       ncopied = op.copyValue(COL_STORE_MATH, data_ptr);
+      ncopied-- ; // drop the trailing null
     }
     else {
       /* Build a result containing each column */
@@ -712,6 +827,7 @@ bool build_hash_item(workitem *wqitem, O
     /* pad the value with \r\n -- memcached expects it there. */
     * (data_ptr + ncopied)     = '\r';
     * (data_ptr + ncopied + 1) = '\n';
+    * (data_ptr + ncopied + 2) = '\0';
     DEBUG_PRINT("nbytes: %d   ncopied: %d", nbytes, ncopied + 2);
     
     /* Point to it in the workitem */
@@ -809,6 +925,11 @@ bool scan_delete(NdbInstance *inst, Quer
   NdbScanOperation *scan = tx->getNdbScanOperation(plan->table);
   scan->readTuplesExclusive();
   
+  /* Notes: To securely scan a whole table, use an outer transaction only 
+     for the scan, but take over each lock in an inner transaction (with a row 
+     count) that deletes 1000 rows per transaction 
+  */
+  
   /* execute NoCommit */
   if((res = tx->execute(NdbTransaction::NoCommit)) != 0) 
     logger->log(LOG_WARNING, 0, "execute(NoCommit): %s\n", tx->getNdbError().message);

=== modified file 'storage/ndb/memcache/src/schedulers/Flex_cluster.cc'
--- a/storage/ndb/memcache/src/schedulers/Flex_cluster.cc	2011-04-06 05:45:04 +0000
+++ b/storage/ndb/memcache/src/schedulers/Flex_cluster.cc	2011-04-08 08:14:11 +0000
@@ -216,9 +216,6 @@ void * Scheduler_flex::Cluster::run_comm
     
     /* Poll */
     inst->db->pollNdb();
-    
-    /* Now we are done with the instance */
-    atomic_cmp_swap_int(& inst->in_use, true, false);  // in_use = false
   } 
 }
 


Attachment: [text/bzr-bundle] bzr/john.duncan@oracle.com-20110408081411-gaajc0adu1tx2tgy.bundle
Thread
bzr commit into mysql-5.1-telco-7.2 branch (john.duncan:4157) John David Duncan8 Apr