List:Commits« Previous MessageNext Message »
From:John David Duncan Date:April 19 2011 1:32am
Subject:bzr push into mysql-5.1-telco-7.2 branch (john.duncan:4160 to 4161)
View as plain text  
 4161 John David Duncan	2011-04-18 [merge]
      Merge

    removed:
      storage/ndb/memcache/src/schedulers/3thread.cc
      storage/ndb/memcache/src/schedulers/3thread.h
    added:
      storage/ndb/memcache/src/schedulers/Bulk.cc
      storage/ndb/memcache/src/schedulers/Bulk.h
    modified:
      storage/ndb/memcache/Makefile.am
      storage/ndb/memcache/include/NdbInstance.h
      storage/ndb/memcache/src/NdbInstance.cc
      storage/ndb/memcache/src/ndb_pipeline.cc
      storage/ndb/memcache/src/ndb_worker.cc
      storage/ndb/memcache/src/schedulers/Stockholm.cc
      storage/ndb/memcache/src/workqueue.c
      storage/ndb/memcache/unit/Makefile.am
      storage/ndb/memcache/unit/harness.cc
 4160 John David Duncan	2011-04-18
      Add Ndb Custom Data ptr

    modified:
      storage/ndb/include/ndbapi/Ndb.hpp
      storage/ndb/src/ndbapi/Ndb.cpp
      storage/ndb/src/ndbapi/NdbImpl.hpp
      storage/ndb/src/ndbapi/Ndbinit.cpp
=== modified file 'storage/ndb/memcache/Makefile.am'
--- a/storage/ndb/memcache/Makefile.am	2011-04-09 07:36:24 +0000
+++ b/storage/ndb/memcache/Makefile.am	2011-04-19 01:23:01 +0000
@@ -34,8 +34,8 @@ ndb_engine_la_SOURCES= \
                     src/workqueue.c \
                     src/schedulers/Stockholm.h \
                     src/schedulers/Stockholm.cc \
-                    src/schedulers/3thread.h \
-                    src/schedulers/3thread.cc \
+                    src/schedulers/Bulk.h \
+                    src/schedulers/Bulk.cc \
                     src/schedulers/Flex.h \
                     src/schedulers/Flex.cc \
                     src/schedulers/Flex_broker.h \

=== modified file 'storage/ndb/memcache/include/NdbInstance.h'
--- a/storage/ndb/memcache/include/NdbInstance.h	2011-04-08 08:14:11 +0000
+++ b/storage/ndb/memcache/include/NdbInstance.h	2011-04-19 01:16:36 +0000
@@ -62,6 +62,7 @@ public:
   Uint64 batches;          // number of pollNdb() calls that got results.
   Uint64 transactions;     // total polled transactions from pollNdb().
   Uint64 locks_failed;     // number of failed lock attempts.
+  int npending;
   bool is_locked;
   
 private:
@@ -74,6 +75,7 @@ inline void LockableNdbInstance::addPoll
   if(n) {
     batches++;
     transactions += n;
+    npending -= n;
   }
 }
 

=== modified file 'storage/ndb/memcache/src/NdbInstance.cc'
--- a/storage/ndb/memcache/src/NdbInstance.cc	2011-04-08 08:14:11 +0000
+++ b/storage/ndb/memcache/src/NdbInstance.cc	2011-04-19 01:16:36 +0000
@@ -75,6 +75,7 @@ LockableNdbInstance::LockableNdbInstance
   batches = 0;
   transactions = 0;
   next = 0;
+  npending = 0;
 }
 
 

=== modified file 'storage/ndb/memcache/src/ndb_pipeline.cc'
--- a/storage/ndb/memcache/src/ndb_pipeline.cc	2011-04-06 04:31:28 +0000
+++ b/storage/ndb/memcache/src/ndb_pipeline.cc	2011-04-19 01:16:36 +0000
@@ -35,9 +35,9 @@
 #include "thread_identifier.h"
 #include "ndb_worker.h"
 
-#include "schedulers/3thread.h"
-#include "schedulers/Stockholm.h"
 #include "schedulers/Flex.h"
+#include "schedulers/Bulk.h"
+#include "schedulers/Stockholm.h"
 
 #define DEFAULT_SCHEDULER Scheduler_flex
 
@@ -181,8 +181,8 @@ void * initialize_scheduler(const char *
       s = new Scheduler_flex;
     if(!strncasecmp(cf, "stockholm", 10))
       s = new Scheduler_stockholm;
-    else if(!strncasecmp(cf, "3-thread", 9))
-      s = new Scheduler_3thread;
+    else if(!strncasecmp(cf, "bulk", 5))
+      s = new Scheduler_bulk;
     else {
       /* pass the config string to the default scheduler */
       sched_options = cf;

=== modified file 'storage/ndb/memcache/src/ndb_worker.cc'
--- a/storage/ndb/memcache/src/ndb_worker.cc	2011-04-09 07:36:24 +0000
+++ b/storage/ndb/memcache/src/ndb_worker.cc	2011-04-19 01:16:36 +0000
@@ -853,9 +853,13 @@ bool finalize_write(workitem *wqitem, bo
   se = (struct default_engine *) pipeline->engine->m_default_engine;      
 
   /* If the write was succesful, update the local cache */
+  /* Possible bugs here: 
+     (1) store_item will store nbytes as length, which is wrong.
+     (2) The CAS may be incorrect.
+  */
   if(wqitem->prefix_info.do_mc_write && tx_did_match) {
     return (store_item(se, wqitem->cache_item, 
-                       hash_item_get_cas_ptr(wqitem->cache_item),  // is that right?
+                       hash_item_get_cas_ptr(wqitem->cache_item),
                        OPERATION_SET, wqitem->cookie) == ENGINE_SUCCESS);
   }
   return true;

=== removed file 'storage/ndb/memcache/src/schedulers/3thread.cc'
--- a/storage/ndb/memcache/src/schedulers/3thread.cc	2011-04-06 04:31:28 +0000
+++ b/storage/ndb/memcache/src/schedulers/3thread.cc	1970-01-01 00:00:00 +0000
@@ -1,274 +0,0 @@
-/*
- Copyright (c) 2011, Oracle and/or its affiliates. All rights
- reserved.
- 
- This program is free software; you can redistribute it and/or
- modify it under the terms of the GNU General Public License
- as published by the Free Software Foundation; version 2 of
- the License.
- 
- This program is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- GNU General Public License for more details.
- 
- You should have received a copy of the GNU General Public License
- along with this program; if not, write to the Free Software
- Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- 02110-1301  USA
- */
-/* System headers */
-/* C++ files must define __STDC_FORMAT_MACROS in order to get PRIu64 */
-#define __STDC_FORMAT_MACROS 
-#include <inttypes.h>
-#include <stdio.h>
-#include <unistd.h>
-
-/* Memcache headers */
-#include "memcached/types.h"
-#include <memcached/extension_loggers.h>
-
-/* NDB Memcache headers */
-#include "3thread.h"
-#include "workitem.h"
-#include "ndb_worker.h"
-#include "status_block.h"
-
-extern EXTENSION_LOGGER_DESCRIPTOR *logger;
-
-extern "C" {
-  void * run_3thd_worker_thread(void *);
-  void * run_3thd_commit_thread(void *);
-}
-
-status_block status_block_unsupported = { ENGINE_ENOTSUP, "Not supported." };
-
-void Scheduler_3thread::init(int my_thread, int, const char *) {
-  const Configuration & conf = get_Configuration();
-  ClusterConnectionPool *pool;
-  Ndb_cluster_connection *conn;
-  LockableNdbInstance *inst;
-
-  /* Allocate and initialize the workqueue.  
-   The engine thread will add items to this queue, and the worker thread will 
-   consume them. 
-   */
-  queue = (struct workqueue *) malloc(sizeof(struct workqueue));
-  workqueue_init(queue, 8192, 1);
-  
-  /* Do initialization steps that are configuration-dependent */
-  nclusters = conf.nclusters;
-   
-  /* Get the NDB instances. 
-   For now, the plan is that we get 2 NDB instances for cluster 0,
-   and one instance for each additional cluster.  
-   (This is open to refinement later).
-   
-   We maintain the instances in an array of lists, so  
-   instances[n] is the head list of instances for cluster n.
-   
-   We also have to decide how many transactions to accommodate on each 
-   instance.  For now we'll use WORK_QUEUE_SIZE on *each* instance, though
-   actually WORK_QUEUE_SIZE is the upper bound on transactions for all 
-   instances combined.
-   */  
-  instances = new LockableNdbInstance *[nclusters];  
-  for(int i = 0 ; i < nclusters ; i++) {
-    pool = conf.getConnectionById(i);
-    conn = pool->getPooledConnection(my_thread);
-    inst = new LockableNdbInstance(conn, conf.nprefixes, 8192);
-    instances[i] = inst;    
-  }
-  
-  /* Plus the extra instance for cluster 0: */
-  pool = conf.getConnectionById(0);
-  conn = pool->getPooledConnection(my_thread);
-  inst = new LockableNdbInstance(conn, conf.nprefixes, 8192);
-  instances[0]->next = inst;
-    
-  // Launch the worker thread
-  pthread_create(& worker_thread_id, NULL, run_3thd_worker_thread, (void *) this);  
-  
-  // Launch the commit thread
-  pthread_create(& commit_thread_id, NULL, run_3thd_commit_thread, (void *) this);  
-}
-
-
-void Scheduler_3thread::attach_thread(thread_identifier * parent) {
-  
-  pipeline = parent->pipeline;
-  logger->log(LOG_WARNING, 0, "Pipeline %d attached to 3-thread scheduler.\n",
-              pipeline->id);
-  stats.cycles = 0;
-  stats.worker_thread_vtime = 0;
-  stats.commit_thread_vtime = 0;
-}
-
-
-ENGINE_ERROR_CODE Scheduler_3thread::schedule(workitem *newitem) {
-  workqueue_add(queue, newitem);
-  return ENGINE_EWOULDBLOCK;
-}
-
-
-void Scheduler_3thread::add_stats(ADD_STAT add_stat, 
-                                  const void * cookie) {  
-  char key[128];
-  char val[128];
-  const char *units[4] = { "ns", "us", "ms", "s" };
-  int klen, vlen, p;
-  char id;
-  LockableNdbInstance *inst;
-  
-  klen = sprintf(key, "pipeline_%d_commit_cycles", pipeline->id);
-  vlen = sprintf(val, "%"PRIu64, stats.cycles);
-  add_stat(key, klen, val, vlen, cookie);
-  
-  if(stats.worker_thread_vtime != 0) {
-    klen = sprintf(key, "pipeline_%d_ndb_worker_thread_time", pipeline->id);
-    vlen = sprintf(val, "%"PRIu64, stats.worker_thread_vtime);
-    add_stat(key, klen, val, vlen, cookie);
-  }
-  if(stats.commit_thread_vtime != 0) {  
-    klen = sprintf(key, "pipeline_%d_commit_thread_time", pipeline->id);
-    vlen = sprintf(val, "%"PRIu64, stats.commit_thread_vtime);
-    add_stat(key, klen, val, vlen, cookie);
-  }
-  
-  for(int i = 0; i < nclusters; i++) {
-    for(id = 'a', inst = instances[i] ; inst ; id++, inst = inst->next) {
-      
-      /* transactions */
-      klen = sprintf(key, "pipeline_%d_db%d%c_transactions", pipeline->id, i, id);
-      vlen = sprintf(val, "%"PRIu64, inst->transactions);
-      add_stat(key, klen, val, vlen, cookie);
-      
-      /* batches */
-      klen = sprintf(key, "pipeline_%d_db%d%c_batches", pipeline->id, i, id);
-      vlen = sprintf(val, "%"PRIu64, inst->batches);
-      add_stat(key, klen, val, vlen, cookie);
-      
-      /* locks_failed */
-      klen = sprintf(key, "pipeline_%d_db%d%c_locks_failed", pipeline->id, i, id);
-      vlen = sprintf(val, "%"PRIu64, inst->locks_failed);
-      add_stat(key, klen, val, vlen, cookie);
-    }
-  }
-}
-
-// ------------------------------- The worker thread ----------------------
-
-#define STAT_INTERVAL 50
-
-
-/* OPEN QUESTION:  How does the worker thread get terminated? */
-/* NOTE: "conf = getConfigration()" will not be safe for hot-swapping; 
-   we'll have to ensure that the worker thread and the pipeline use
-   the same config.
-*/
-
-
-void * run_3thd_worker_thread(void *s) {
-  return ((Scheduler_3thread *) s)->run_ndb_worker_thread();
-}
-
-
-void * Scheduler_3thread::run_ndb_worker_thread() {
-  DEBUG_ENTER();
-  workitem *newitem;
-  const Configuration & conf = get_Configuration();     // see note
-  int lockerr;
-  unsigned int ncycles = 0;
-  
-  do {
-    /* Wait for something to appear on the queue */
-    newitem = (workitem *) workqueue_consumer_wait(queue);
-    
-    /* Fetch the config for its key prefix */
-    const KeyPrefix *pfx = conf.getPrefixByInfo(newitem->prefix_info);
-    
-    /* From here on we will work solely with the suffix part of the key. */
-    newitem->base.nsuffix = newitem->base.nkey - pfx->prefix_len;
-    DEBUG_ASSERT(newitem->base.nsuffix > 0);
-    
-    /* Get the head NdbInstance for the correct cluster */
-    LockableNdbInstance *inst = instances[pfx->info.cluster_id];
-    
-    // A: Walk the list of instances trying to lock one.    
-    lockerr = - 1;
-    while(lockerr && inst->next) {
-      lockerr = inst->trylock();
-      if(lockerr) inst = inst->next;
-    }
-    // B: If you still don't have a lock, wait on the lock for the last instance.
-    if(lockerr) 
-      lockerr = inst->lock();
-    // C: If you *still* don't have a lock, that's either EINVAL or EDEADLK,
-    // and you will hit the assertion in getPlanForPrefix().
-    
-    // Now we've got an NDB Instance. Fetch the query plan for this prefix.
-    newitem->plan = inst->getPlanForPrefix(pfx);
-    
-    // Build the NDB transaction
-    if(!worker_prepare_operation(newitem)) {
-      /******* Error handling.  The operation is not supported. */
-      if(newitem->base.is_sync) {
-        newitem->status = & status_block_unsupported;
-        pipeline->engine->server.cookie->store_engine_specific(newitem->cookie, newitem); 
-        pipeline->engine->server.cookie->notify_io_complete(newitem->cookie, ENGINE_ENOTSUP);
-      }
-    }
-    
-    inst->unlock();  
-    
-    if(ncycles++ % STAT_INTERVAL == 0)
-      stats.worker_thread_vtime = get_thread_vtime();
-    
-  } while(newitem); 
-}
-
-
-
-// ------------------------------- The commit thread ----------------------
-
-
-void * run_3thd_commit_thread(void *s) {
-  return ((Scheduler_3thread *) s)->run_ndb_commit_thread();
-}
-
-
-//FIXME: On shutdown, the commit thread tends to segfault inside sendPollNdb().
-
-/*  Note: You could use gethrtime() to keep the pace here. */
-/*  Note: latency will go up with the number of clusters unless you have 
- one commit thread per cluster */
-
-void * Scheduler_3thread::run_ndb_commit_thread() {
-  DEBUG_ENTER();
-  LockableNdbInstance *inst;
-  int transactions, sleep_usec, wait_msec;
-    
-  while(1) {
-    sleep_usec = 1000;
-    /* For all NDBs, send transactions and poll for results */
-    for(int i = 0; i < nclusters ; i++) {
-      for(inst = instances[i] ; inst ; inst = inst->next) {
-        /* Wait 1 ms, but just poll the last instance. */
-        wait_msec = inst->next ? 1 : 0;
-        inst->lock();   /* fixme: actually just pollNdb() now */
-        transactions = inst->db->sendPollNdb(
-            /* wait time (0 = just poll) */  wait_msec,  
-            /* minimum ops to wait for */    queue->depth,
-            /* force send */                 1);                  
-        inst->addPollStat(transactions);  // update statistics
-        if(transactions > 0) sleep_usec = 500;
-        inst->unlock();
-      }
-    }    
-    usleep(sleep_usec);
-    
-    stats.cycles++;
-    if(! (stats.cycles % STAT_INTERVAL)) 
-      stats.commit_thread_vtime = get_thread_vtime(); 
-  } 
-}

=== removed file 'storage/ndb/memcache/src/schedulers/3thread.h'
--- a/storage/ndb/memcache/src/schedulers/3thread.h	2011-04-07 11:20:56 +0000
+++ b/storage/ndb/memcache/src/schedulers/3thread.h	1970-01-01 00:00:00 +0000
@@ -1,66 +0,0 @@
-/*
- Copyright (c) 2011, Oracle and/or its affiliates. All rights
- reserved.
- 
- This program is free software; you can redistribute it and/or
- modify it under the terms of the GNU General Public License
- as published by the Free Software Foundation; version 2 of
- the License.
- 
- This program is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- GNU General Public License for more details.
- 
- You should have received a copy of the GNU General Public License
- along with this program; if not, write to the Free Software
- Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- 02110-1301  USA
- */
-#ifndef NDBMEMCACHE_3THREAD_SCHEDULER_H
-#define NDBMEMCACHE_3THREAD_SCHEDULER_H
-
-#ifndef __cplusplus
-#error "This file is for C++ only"
-#endif
-
-#include <memcached/types.h>
-
-#include "ndbmemcache_config.h"
-#include "Scheduler.h"
-#include "NdbInstance.h"
-
-
-/* 
- *              3-Thread Scheduler 
- *
- * The 3-thread scheduler launches an NDB worker thread and a separate 
- * commit thread; it uses a small number of Ndb objects.
- */
-class Scheduler_3thread : public Scheduler {
-public:
-  void init(int threadnum, int nthreads, const char *config_string);
-  void attach_thread(thread_identifier *);
-  ENGINE_ERROR_CODE schedule(workitem *);
-  void io_completed(workitem *) {};
-  void add_stats(ADD_STAT, const void *);
-  void * run_ndb_worker_thread();
-  void * run_ndb_commit_thread();
-
-  
-private:  
-  pthread_t worker_thread_id;
-  pthread_t commit_thread_id; 
-  int nclusters;
-  struct workqueue *queue; 
-  LockableNdbInstance **instances;  
-  struct sched_stats_3thread { 
-    uint64_t cycles;      /* total number of loops in the commit thread */
-    uint64_t worker_thread_vtime;
-    uint64_t commit_thread_vtime;
-  } stats;  
-};
-
-
-#endif
-

=== added file 'storage/ndb/memcache/src/schedulers/Bulk.cc'
--- a/storage/ndb/memcache/src/schedulers/Bulk.cc	1970-01-01 00:00:00 +0000
+++ b/storage/ndb/memcache/src/schedulers/Bulk.cc	2011-04-19 01:16:36 +0000
@@ -0,0 +1,282 @@
+/*
+ Copyright (c) 2011, Oracle and/or its affiliates. All rights
+ reserved.
+ 
+ This program is free software; you can redistribute it and/or
+ modify it under the terms of the GNU General Public License
+ as published by the Free Software Foundation; version 2 of
+ the License.
+ 
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+ 
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ 02110-1301  USA
+ */
+/* System headers */
+/* C++ files must define __STDC_FORMAT_MACROS in order to get PRIu64 */
+#define __STDC_FORMAT_MACROS 
+#include <inttypes.h>
+#include <stdio.h>
+#include <unistd.h>
+
+/* Memcache headers */
+#include "memcached/types.h"
+#include <memcached/extension_loggers.h>
+
+/* NDB Memcache headers */
+#include "bulk.h"
+#include "workitem.h"
+#include "ndb_worker.h"
+#include "status_block.h"
+
+extern EXTENSION_LOGGER_DESCRIPTOR *logger;
+
+class bulk_commit_thread_spec {
+public:
+  bulk_commit_thread_spec(Scheduler_bulk *s, int i): sched(s), cluster_id(i) {};
+  Scheduler_bulk *sched;
+  int cluster_id;
+};
+
+extern "C" {
+  void * run_bulk_commit_thread(void *);
+}
+
+status_block status_block_unsupported = { ENGINE_ENOTSUP, "Not supported." };
+
+
+void Scheduler_bulk::init(int my_thread, int nthreads, const char *) {
+  const Configuration & conf = get_Configuration();
+  ClusterConnectionPool *pool;
+  Ndb_cluster_connection *conn;
+  LockableNdbInstance *inst;
+  
+  /* Do initialization steps that are configuration-dependent */
+  nclusters = conf.nclusters;
+   
+  /* Get the NDB instances -- 2 instances for each cluster. */  
+  const int tx_per_inst = 8192;  // maximum transactions on each NDB Instance
+  for(int i = 0 ; i < nclusters ; i++) {
+    pool = conf.getConnectionById(i);
+    conn = pool->getPooledConnection(my_thread);
+    inst = new LockableNdbInstance(conn, conf.nprefixes, tx_per_inst);
+    inst->next = new LockableNdbInstance(conn, conf.nprefixes, tx_per_inst);
+    cluster[i].instance = inst;    
+  
+    /* also get a master NdbInstance for the connection */
+    pool->setNdbInstance(my_thread, new NdbInstance(conn, conf.nprefixes, 128));
+  }
+
+  /* Hoard a number of transactions for each NDB object.  How many?  Enough 
+     to meet the performance requested in the configuration. */
+  for(int c = 0 ; c < conf.nclusters ; c++) {
+    ClusterConnectionPool *pool = conf.getConnectionById(c);
+    cluster[c].usec_rtt = pool->usec_rtt;
+    double tx_time_in_usec = pool->usec_rtt * 5;
+    double tx_per_ndb_per_sec = 1000000 / tx_time_in_usec;
+    double prefetch = conf.max_tps / tx_per_ndb_per_sec;
+    cluster[c].nprefetch = (int) (prefetch / nthreads);
+    DEBUG_PRINT("cluster %d: prefetch %d transactions.", c, cluster[c].nprefetch);
+    
+    NdbTransaction ** txlist1 = (NdbTransaction **) 
+      calloc(cluster[c].nprefetch, sizeof(NdbTransaction *));
+    NdbTransaction ** txlist2 = (NdbTransaction **) 
+      calloc(cluster[c].nprefetch, sizeof(NdbTransaction *));
+    
+    const KeyPrefix *prefix = conf.getNextPrefixForCluster(c, NULL);
+    QueryPlan *plan = 0;
+  
+    /* open them all */
+    for(int i = 0; i < cluster[c].nprefetch ; i++) {
+      plan = cluster[c].instance->getPlanForPrefix(prefix);
+      txlist1[i] = cluster[c].instance->db->startTransaction();  // on instance 1
+      txlist2[i] = cluster[c].instance->next->db->startTransaction();  // on instance 2
+    }
+
+    /* close them all */
+    for(int i = 0; i < cluster[c].nprefetch ; i++) {
+      txlist1[i]->close();
+      txlist2[i]->close(); 
+    }
+    
+    /* free the lists */
+    delete[] txlist1;
+    delete[] txlist2;
+    
+    // Launch the commit thread    
+    bulk_commit_thread_spec * spec = new bulk_commit_thread_spec(this, c);  
+    pthread_create(& cluster[c].commit_thread_id, NULL, 
+                   run_bulk_commit_thread, spec);
+  }
+
+}
+
+
+void Scheduler_bulk::attach_thread(thread_identifier * parent) {
+  
+  pipeline = parent->pipeline;
+  logger->log(LOG_WARNING, 0, "Pipeline %d attached to bulk scheduler.\n",
+              pipeline->id);
+}
+
+
+ENGINE_ERROR_CODE Scheduler_bulk::schedule(workitem *newitem) {
+  DEBUG_ENTER();
+  const Configuration & conf = get_Configuration();     // see note
+  int lockerr;
+  ENGINE_ERROR_CODE response_code = ENGINE_EWOULDBLOCK;
+  /* Fetch the config for its key prefix */
+  const KeyPrefix *pfx = conf.getPrefixByInfo(newitem->prefix_info);
+
+  if(newitem->prefix_info.prefix_id) {
+    DEBUG_PRINT("prefix %d: \"%s\" Table: %s  Value Cols: %d", 
+                newitem->prefix_info.prefix_id, pfx->prefix, 
+                pfx->table->table_name, pfx->table->nvaluecols);
+  }
+  
+  /* From here on we will work mainly with the suffix part of the key. */
+  newitem->base.nsuffix = newitem->base.nkey - pfx->prefix_len;
+  DEBUG_ASSERT(newitem->base.nsuffix > 0);
+  
+  /* Get the head NdbInstance for the correct cluster */
+  LockableNdbInstance *inst = cluster[newitem->prefix_info.cluster_id].instance;
+  
+  // A: Walk the list of instances trying to lock one.    
+  lockerr = - 1;
+  while(lockerr && inst->next) {
+    lockerr = inst->trylock();
+    if(lockerr) inst = inst->next;
+  }
+  // B: If you still don't have a lock, wait on the lock for the last instance.
+  if(lockerr) 
+    lockerr = inst->lock();
+  // C: If you *still* don't have a lock, that's either EINVAL or EDEADLK,
+  // and you will hit the assertion in getPlanForPrefix().
+  
+  // Now we've got an NDB Instance. Fetch the query plan for this prefix.
+  newitem->plan = inst->getPlanForPrefix(pfx);
+  
+  workitem_set_NdbInstance(newitem, inst);
+
+  // Build the NDB transaction
+  if(worker_prepare_operation(newitem)) {
+    inst->npending++;
+  }
+  else {
+    /******* Error handling.  The operation is not supported. */
+    if(newitem->base.is_sync) {
+      newitem->status = & status_block_unsupported;
+      pipeline->engine->server.cookie->store_engine_specific(newitem->cookie, newitem); 
+      pipeline->engine->server.cookie->notify_io_complete(newitem->cookie, ENGINE_ENOTSUP);
+    }
+  }
+  
+  inst->unlock();
+
+  return response_code;
+}
+
+
+void Scheduler_bulk::io_completed(workitem *item) {
+  DEBUG_ENTER();
+  item->ndb_instance = NULL;
+}
+
+
+void Scheduler_bulk::add_stats(ADD_STAT add_stat, 
+                                  const void * cookie) {  
+  char key[128];
+  char val[128];
+  const char *units[4] = { "ns", "us", "ms", "s" };
+  int klen, vlen, p;
+  char id;
+  LockableNdbInstance *inst;
+    
+  for(int i = 0; i < nclusters; i++) {
+    /* per-cluster */
+    klen = sprintf(key, "t%d_db%d_commit_cycles", pipeline->id, i);
+    vlen = sprintf(val, "%"PRIu64, cluster[i].stats.cycles);
+    add_stat(key, klen, val, vlen, cookie);
+    
+    if(cluster[i].stats.vtime != 0) {  
+      klen = sprintf(key, "t%d_db%d_commit_thread_vtime", pipeline->id, i);
+      vlen = sprintf(val, "%"PRIu64, cluster[i].stats.vtime);
+      add_stat(key, klen, val, vlen, cookie);
+    }
+        
+    /* per-cluster, per-NdbInstance */
+    for(id = 'a', inst = cluster[i].instance ; inst ; id++, inst = inst->next) {      
+      /* transactions */
+      klen = sprintf(key, "t%d_db%d%c_transactions", pipeline->id, i, id);
+      vlen = sprintf(val, "%"PRIu64, inst->transactions);
+      add_stat(key, klen, val, vlen, cookie);
+      
+      /* batches */
+      klen = sprintf(key, "t%d_db%d%c_batches", pipeline->id, i, id);
+      vlen = sprintf(val, "%"PRIu64, inst->batches);
+      add_stat(key, klen, val, vlen, cookie);
+      
+      /* locks_failed */
+      klen = sprintf(key, "t%d_db%d%c_locks_failed", pipeline->id, i, id);
+      vlen = sprintf(val, "%"PRIu64, inst->locks_failed);
+      add_stat(key, klen, val, vlen, cookie);
+    }
+  }
+}
+
+
+// ------------------------------- The commit thread ----------------------
+
+#define STAT_INTERVAL 50
+
+
+void * run_bulk_commit_thread(void *s) {
+  bulk_commit_thread_spec *spec = (bulk_commit_thread_spec *) s;
+  
+  return spec->sched->run_ndb_commit_thread(spec->cluster_id);
+}
+
+//FIXME: On shutdown, the commit thread tends to segfault inside sendPollNdb().
+
+/*  Note: You could use gethrtime() to keep the pace here. */
+/*  Note: latency will go up with the number of clusters unless you have 
+ one commit thread per cluster */
+
+void * Scheduler_bulk::run_ndb_commit_thread(int c) {
+  DEBUG_ENTER();
+  LockableNdbInstance *inst;
+  int transactions, sleep_usec, wait_msec;
+  int rtt = cluster[c].usec_rtt;
+
+  cluster[c].stats.cycles = 0;
+  cluster[c].stats.vtime = 0;
+    
+  while(1) {
+    sleep_usec = rtt * 2;
+    /* For all NDBs, send transactions and poll for results */
+    for(inst = cluster[c].instance ; inst ; inst = inst->next) {
+      /* Wait with 1 ms timeout, but just poll the last instance. */
+      wait_msec = inst->next ? 1 : 0;
+      inst->lock(); 
+      if(inst->npending) {  // there are pending transactions 
+        transactions = inst->db->sendPollNdb(
+            /* wait time (0 = just poll) */  wait_msec,  
+            /* minimum ops to wait for */   (inst->npending + 1) / 2,
+            /* force send */                 1);                  
+        inst->addPollStat(transactions);  // update statistics
+        if(transactions > 0) sleep_usec = rtt;
+      }
+      inst->unlock();
+    }
+    usleep(sleep_usec);
+    
+    cluster[c].stats.cycles++;
+    if(! (cluster[c].stats.cycles % STAT_INTERVAL)) 
+      cluster[c].stats.vtime = get_thread_vtime(); 
+  } 
+}

=== added file 'storage/ndb/memcache/src/schedulers/Bulk.h'
--- a/storage/ndb/memcache/src/schedulers/Bulk.h	1970-01-01 00:00:00 +0000
+++ b/storage/ndb/memcache/src/schedulers/Bulk.h	2011-04-19 01:16:36 +0000
@@ -0,0 +1,65 @@
+/*
+ Copyright (c) 2011, Oracle and/or its affiliates. All rights
+ reserved.
+ 
+ This program is free software; you can redistribute it and/or
+ modify it under the terms of the GNU General Public License
+ as published by the Free Software Foundation; version 2 of
+ the License.
+ 
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+ 
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ 02110-1301  USA
+ */
+#ifndef NDBMEMCACHE_BULK_SCHEDULER_H
+#define NDBMEMCACHE_BULK_SCHEDULER_H
+
+#ifndef __cplusplus
+#error "This file is for C++ only"
+#endif
+
+#include <memcached/types.h>
+
+#include "config.h"
+#include "Scheduler.h"
+#include "NdbInstance.h"
+
+
+/* 
+ *              Bulk Scheduler 
+ *
+ * The bulk scheduler uses a small number of Ndb objects with many transactions
+ * on each.
+ */
+class Scheduler_bulk : public Scheduler {
+public:
+  void init(int threadnum, int nthreads, const char *config_string);
+  void attach_thread(thread_identifier *);
+  ENGINE_ERROR_CODE schedule(workitem *);
+  void io_completed(workitem *);
+  void add_stats(ADD_STAT, const void *);
+  void * run_ndb_commit_thread(int thread_id);
+
+private:  
+  int nclusters;
+  struct {
+    int usec_rtt;
+    int nprefetch;
+    pthread_t commit_thread_id; 
+    LockableNdbInstance *instance;
+    struct sched_stats_bulk { 
+      uint64_t cycles;      /* total number of loops in the commit thread */
+      uint64_t vtime;
+    } stats;
+  } cluster[MAX_CLUSTERS];
+};
+
+
+#endif
+

=== modified file 'storage/ndb/memcache/src/schedulers/Stockholm.cc'
--- a/storage/ndb/memcache/src/schedulers/Stockholm.cc	2011-04-09 07:36:24 +0000
+++ b/storage/ndb/memcache/src/schedulers/Stockholm.cc	2011-04-19 01:16:36 +0000
@@ -178,7 +178,6 @@ ENGINE_ERROR_CODE Scheduler_stockholm::s
     return ENGINE_TMPFAIL;
   }
 
-
   workitem_set_NdbInstance(newitem, inst);
   
   // Fetch the query plan for this prefix.

=== modified file 'storage/ndb/memcache/src/workqueue.c'
--- a/storage/ndb/memcache/src/workqueue.c	2011-04-07 11:20:56 +0000
+++ b/storage/ndb/memcache/src/workqueue.c	2011-04-19 01:16:36 +0000
@@ -50,7 +50,7 @@ int workqueue_init(struct workqueue *q, 
   q->depth = 0;                
   q->minfree = size / 16;     
   assert(nconsumers > 0);
-  q->threads = nconsumers - 1;  /* boolean 0 if there is just one consumer */
+  q->threads = nconsumers - 1;  /* boolean false if there is just one consumer */
     
   pthread_cond_init(& q->not_empty, NULL);  
   pthread_cond_init(& q->not_full, NULL);  

=== modified file 'storage/ndb/memcache/unit/Makefile.am'
--- a/storage/ndb/memcache/unit/Makefile.am	2011-04-08 08:01:14 +0000
+++ b/storage/ndb/memcache/unit/Makefile.am	2011-04-19 01:23:01 +0000
@@ -46,7 +46,7 @@ run_unit_tests_CFLAGS = -g -O0 \
 
 run_unit_test_CXXFLAGS = -g -O0 
 
-run_unit_tests_LDADD = ../ndb_engine_la-3thread.lo  \
+run_unit_tests_LDADD = ../ndb_engine_la-Bulk.lo  \
                        ../ndb_engine_la-ClusterConnectionPool.lo  \
                        ../ndb_engine_la-Configuration.lo  \
                        ../ndb_engine_la-DataTypeHandler.lo  \

=== modified file 'storage/ndb/memcache/unit/harness.cc'
--- a/storage/ndb/memcache/unit/harness.cc	2011-04-08 08:01:14 +0000
+++ b/storage/ndb/memcache/unit/harness.cc	2011-04-19 01:16:36 +0000
@@ -40,7 +40,6 @@ extern EXTENSION_LOGGER_DESCRIPTOR *logg
 Ndb_cluster_connection * connect(const char *);
 
 int main(int argc, char *argv[]) {
-
   int test_number = -1;
   char * test_name = 0;
   if(argc > 1) connect_string = argv[1];

No bundle (reason: useless for push emails).
Thread
bzr push into mysql-5.1-telco-7.2 branch (john.duncan:4160 to 4161) John David Duncan19 Apr