4161 John David Duncan 2011-04-18 [merge]
Merge
removed:
storage/ndb/memcache/src/schedulers/3thread.cc
storage/ndb/memcache/src/schedulers/3thread.h
added:
storage/ndb/memcache/src/schedulers/Bulk.cc
storage/ndb/memcache/src/schedulers/Bulk.h
modified:
storage/ndb/memcache/Makefile.am
storage/ndb/memcache/include/NdbInstance.h
storage/ndb/memcache/src/NdbInstance.cc
storage/ndb/memcache/src/ndb_pipeline.cc
storage/ndb/memcache/src/ndb_worker.cc
storage/ndb/memcache/src/schedulers/Stockholm.cc
storage/ndb/memcache/src/workqueue.c
storage/ndb/memcache/unit/Makefile.am
storage/ndb/memcache/unit/harness.cc
4160 John David Duncan 2011-04-18
Add Ndb Custom Data ptr
modified:
storage/ndb/include/ndbapi/Ndb.hpp
storage/ndb/src/ndbapi/Ndb.cpp
storage/ndb/src/ndbapi/NdbImpl.hpp
storage/ndb/src/ndbapi/Ndbinit.cpp
=== modified file 'storage/ndb/memcache/Makefile.am'
--- a/storage/ndb/memcache/Makefile.am 2011-04-09 07:36:24 +0000
+++ b/storage/ndb/memcache/Makefile.am 2011-04-19 01:23:01 +0000
@@ -34,8 +34,8 @@ ndb_engine_la_SOURCES= \
src/workqueue.c \
src/schedulers/Stockholm.h \
src/schedulers/Stockholm.cc \
- src/schedulers/3thread.h \
- src/schedulers/3thread.cc \
+ src/schedulers/Bulk.h \
+ src/schedulers/Bulk.cc \
src/schedulers/Flex.h \
src/schedulers/Flex.cc \
src/schedulers/Flex_broker.h \
=== modified file 'storage/ndb/memcache/include/NdbInstance.h'
--- a/storage/ndb/memcache/include/NdbInstance.h 2011-04-08 08:14:11 +0000
+++ b/storage/ndb/memcache/include/NdbInstance.h 2011-04-19 01:16:36 +0000
@@ -62,6 +62,7 @@ public:
Uint64 batches; // number of pollNdb() calls that got results.
Uint64 transactions; // total polled transactions from pollNdb().
Uint64 locks_failed; // number of failed lock attempts.
+ int npending;
bool is_locked;
private:
@@ -74,6 +75,7 @@ inline void LockableNdbInstance::addPoll
if(n) {
batches++;
transactions += n;
+ npending -= n;
}
}
=== modified file 'storage/ndb/memcache/src/NdbInstance.cc'
--- a/storage/ndb/memcache/src/NdbInstance.cc 2011-04-08 08:14:11 +0000
+++ b/storage/ndb/memcache/src/NdbInstance.cc 2011-04-19 01:16:36 +0000
@@ -75,6 +75,7 @@ LockableNdbInstance::LockableNdbInstance
batches = 0;
transactions = 0;
next = 0;
+ npending = 0;
}
=== modified file 'storage/ndb/memcache/src/ndb_pipeline.cc'
--- a/storage/ndb/memcache/src/ndb_pipeline.cc 2011-04-06 04:31:28 +0000
+++ b/storage/ndb/memcache/src/ndb_pipeline.cc 2011-04-19 01:16:36 +0000
@@ -35,9 +35,9 @@
#include "thread_identifier.h"
#include "ndb_worker.h"
-#include "schedulers/3thread.h"
-#include "schedulers/Stockholm.h"
#include "schedulers/Flex.h"
+#include "schedulers/Bulk.h"
+#include "schedulers/Stockholm.h"
#define DEFAULT_SCHEDULER Scheduler_flex
@@ -181,8 +181,8 @@ void * initialize_scheduler(const char *
s = new Scheduler_flex;
if(!strncasecmp(cf, "stockholm", 10))
s = new Scheduler_stockholm;
- else if(!strncasecmp(cf, "3-thread", 9))
- s = new Scheduler_3thread;
+ else if(!strncasecmp(cf, "bulk", 5))
+ s = new Scheduler_bulk;
else {
/* pass the config string to the default scheduler */
sched_options = cf;
=== modified file 'storage/ndb/memcache/src/ndb_worker.cc'
--- a/storage/ndb/memcache/src/ndb_worker.cc 2011-04-09 07:36:24 +0000
+++ b/storage/ndb/memcache/src/ndb_worker.cc 2011-04-19 01:16:36 +0000
@@ -853,9 +853,13 @@ bool finalize_write(workitem *wqitem, bo
se = (struct default_engine *) pipeline->engine->m_default_engine;
/* If the write was succesful, update the local cache */
+ /* Possible bugs here:
+ (1) store_item will store nbytes as length, which is wrong.
+ (2) The CAS may be incorrect.
+ */
if(wqitem->prefix_info.do_mc_write && tx_did_match) {
return (store_item(se, wqitem->cache_item,
- hash_item_get_cas_ptr(wqitem->cache_item), // is that right?
+ hash_item_get_cas_ptr(wqitem->cache_item),
OPERATION_SET, wqitem->cookie) == ENGINE_SUCCESS);
}
return true;
=== removed file 'storage/ndb/memcache/src/schedulers/3thread.cc'
--- a/storage/ndb/memcache/src/schedulers/3thread.cc 2011-04-06 04:31:28 +0000
+++ b/storage/ndb/memcache/src/schedulers/3thread.cc 1970-01-01 00:00:00 +0000
@@ -1,274 +0,0 @@
-/*
- Copyright (c) 2011, Oracle and/or its affiliates. All rights
- reserved.
-
- This program is free software; you can redistribute it and/or
- modify it under the terms of the GNU General Public License
- as published by the Free Software Foundation; version 2 of
- the License.
-
- This program is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- GNU General Public License for more details.
-
- You should have received a copy of the GNU General Public License
- along with this program; if not, write to the Free Software
- Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- 02110-1301 USA
- */
-/* System headers */
-/* C++ files must define __STDC_FORMAT_MACROS in order to get PRIu64 */
-#define __STDC_FORMAT_MACROS
-#include <inttypes.h>
-#include <stdio.h>
-#include <unistd.h>
-
-/* Memcache headers */
-#include "memcached/types.h"
-#include <memcached/extension_loggers.h>
-
-/* NDB Memcache headers */
-#include "3thread.h"
-#include "workitem.h"
-#include "ndb_worker.h"
-#include "status_block.h"
-
-extern EXTENSION_LOGGER_DESCRIPTOR *logger;
-
-extern "C" {
- void * run_3thd_worker_thread(void *);
- void * run_3thd_commit_thread(void *);
-}
-
-status_block status_block_unsupported = { ENGINE_ENOTSUP, "Not supported." };
-
-void Scheduler_3thread::init(int my_thread, int, const char *) {
- const Configuration & conf = get_Configuration();
- ClusterConnectionPool *pool;
- Ndb_cluster_connection *conn;
- LockableNdbInstance *inst;
-
- /* Allocate and initialize the workqueue.
- The engine thread will add items to this queue, and the worker thread will
- consume them.
- */
- queue = (struct workqueue *) malloc(sizeof(struct workqueue));
- workqueue_init(queue, 8192, 1);
-
- /* Do initialization steps that are configuration-dependent */
- nclusters = conf.nclusters;
-
- /* Get the NDB instances.
- For now, the plan is that we get 2 NDB instances for cluster 0,
- and one instance for each additional cluster.
- (This is open to refinement later).
-
- We maintain the instances in an array of lists, so
- instances[n] is the head list of instances for cluster n.
-
- We also have to decide how many transactions to accommodate on each
- instance. For now we'll use WORK_QUEUE_SIZE on *each* instance, though
- actually WORK_QUEUE_SIZE is the upper bound on transactions for all
- instances combined.
- */
- instances = new LockableNdbInstance *[nclusters];
- for(int i = 0 ; i < nclusters ; i++) {
- pool = conf.getConnectionById(i);
- conn = pool->getPooledConnection(my_thread);
- inst = new LockableNdbInstance(conn, conf.nprefixes, 8192);
- instances[i] = inst;
- }
-
- /* Plus the extra instance for cluster 0: */
- pool = conf.getConnectionById(0);
- conn = pool->getPooledConnection(my_thread);
- inst = new LockableNdbInstance(conn, conf.nprefixes, 8192);
- instances[0]->next = inst;
-
- // Launch the worker thread
- pthread_create(& worker_thread_id, NULL, run_3thd_worker_thread, (void *) this);
-
- // Launch the commit thread
- pthread_create(& commit_thread_id, NULL, run_3thd_commit_thread, (void *) this);
-}
-
-
-void Scheduler_3thread::attach_thread(thread_identifier * parent) {
-
- pipeline = parent->pipeline;
- logger->log(LOG_WARNING, 0, "Pipeline %d attached to 3-thread scheduler.\n",
- pipeline->id);
- stats.cycles = 0;
- stats.worker_thread_vtime = 0;
- stats.commit_thread_vtime = 0;
-}
-
-
-ENGINE_ERROR_CODE Scheduler_3thread::schedule(workitem *newitem) {
- workqueue_add(queue, newitem);
- return ENGINE_EWOULDBLOCK;
-}
-
-
-void Scheduler_3thread::add_stats(ADD_STAT add_stat,
- const void * cookie) {
- char key[128];
- char val[128];
- const char *units[4] = { "ns", "us", "ms", "s" };
- int klen, vlen, p;
- char id;
- LockableNdbInstance *inst;
-
- klen = sprintf(key, "pipeline_%d_commit_cycles", pipeline->id);
- vlen = sprintf(val, "%"PRIu64, stats.cycles);
- add_stat(key, klen, val, vlen, cookie);
-
- if(stats.worker_thread_vtime != 0) {
- klen = sprintf(key, "pipeline_%d_ndb_worker_thread_time", pipeline->id);
- vlen = sprintf(val, "%"PRIu64, stats.worker_thread_vtime);
- add_stat(key, klen, val, vlen, cookie);
- }
- if(stats.commit_thread_vtime != 0) {
- klen = sprintf(key, "pipeline_%d_commit_thread_time", pipeline->id);
- vlen = sprintf(val, "%"PRIu64, stats.commit_thread_vtime);
- add_stat(key, klen, val, vlen, cookie);
- }
-
- for(int i = 0; i < nclusters; i++) {
- for(id = 'a', inst = instances[i] ; inst ; id++, inst = inst->next) {
-
- /* transactions */
- klen = sprintf(key, "pipeline_%d_db%d%c_transactions", pipeline->id, i, id);
- vlen = sprintf(val, "%"PRIu64, inst->transactions);
- add_stat(key, klen, val, vlen, cookie);
-
- /* batches */
- klen = sprintf(key, "pipeline_%d_db%d%c_batches", pipeline->id, i, id);
- vlen = sprintf(val, "%"PRIu64, inst->batches);
- add_stat(key, klen, val, vlen, cookie);
-
- /* locks_failed */
- klen = sprintf(key, "pipeline_%d_db%d%c_locks_failed", pipeline->id, i, id);
- vlen = sprintf(val, "%"PRIu64, inst->locks_failed);
- add_stat(key, klen, val, vlen, cookie);
- }
- }
-}
-
-// ------------------------------- The worker thread ----------------------
-
-#define STAT_INTERVAL 50
-
-
-/* OPEN QUESTION: How does the worker thread get terminated? */
-/* NOTE: "conf = getConfigration()" will not be safe for hot-swapping;
- we'll have to ensure that the worker thread and the pipeline use
- the same config.
-*/
-
-
-void * run_3thd_worker_thread(void *s) {
- return ((Scheduler_3thread *) s)->run_ndb_worker_thread();
-}
-
-
-void * Scheduler_3thread::run_ndb_worker_thread() {
- DEBUG_ENTER();
- workitem *newitem;
- const Configuration & conf = get_Configuration(); // see note
- int lockerr;
- unsigned int ncycles = 0;
-
- do {
- /* Wait for something to appear on the queue */
- newitem = (workitem *) workqueue_consumer_wait(queue);
-
- /* Fetch the config for its key prefix */
- const KeyPrefix *pfx = conf.getPrefixByInfo(newitem->prefix_info);
-
- /* From here on we will work solely with the suffix part of the key. */
- newitem->base.nsuffix = newitem->base.nkey - pfx->prefix_len;
- DEBUG_ASSERT(newitem->base.nsuffix > 0);
-
- /* Get the head NdbInstance for the correct cluster */
- LockableNdbInstance *inst = instances[pfx->info.cluster_id];
-
- // A: Walk the list of instances trying to lock one.
- lockerr = - 1;
- while(lockerr && inst->next) {
- lockerr = inst->trylock();
- if(lockerr) inst = inst->next;
- }
- // B: If you still don't have a lock, wait on the lock for the last instance.
- if(lockerr)
- lockerr = inst->lock();
- // C: If you *still* don't have a lock, that's either EINVAL or EDEADLK,
- // and you will hit the assertion in getPlanForPrefix().
-
- // Now we've got an NDB Instance. Fetch the query plan for this prefix.
- newitem->plan = inst->getPlanForPrefix(pfx);
-
- // Build the NDB transaction
- if(!worker_prepare_operation(newitem)) {
- /******* Error handling. The operation is not supported. */
- if(newitem->base.is_sync) {
- newitem->status = & status_block_unsupported;
- pipeline->engine->server.cookie->store_engine_specific(newitem->cookie, newitem);
- pipeline->engine->server.cookie->notify_io_complete(newitem->cookie, ENGINE_ENOTSUP);
- }
- }
-
- inst->unlock();
-
- if(ncycles++ % STAT_INTERVAL == 0)
- stats.worker_thread_vtime = get_thread_vtime();
-
- } while(newitem);
-}
-
-
-
-// ------------------------------- The commit thread ----------------------
-
-
-void * run_3thd_commit_thread(void *s) {
- return ((Scheduler_3thread *) s)->run_ndb_commit_thread();
-}
-
-
-//FIXME: On shutdown, the commit thread tends to segfault inside sendPollNdb().
-
-/* Note: You could use gethrtime() to keep the pace here. */
-/* Note: latency will go up with the number of clusters unless you have
- one commit thread per cluster */
-
-void * Scheduler_3thread::run_ndb_commit_thread() {
- DEBUG_ENTER();
- LockableNdbInstance *inst;
- int transactions, sleep_usec, wait_msec;
-
- while(1) {
- sleep_usec = 1000;
- /* For all NDBs, send transactions and poll for results */
- for(int i = 0; i < nclusters ; i++) {
- for(inst = instances[i] ; inst ; inst = inst->next) {
- /* Wait 1 ms, but just poll the last instance. */
- wait_msec = inst->next ? 1 : 0;
- inst->lock(); /* fixme: actually just pollNdb() now */
- transactions = inst->db->sendPollNdb(
- /* wait time (0 = just poll) */ wait_msec,
- /* minimum ops to wait for */ queue->depth,
- /* force send */ 1);
- inst->addPollStat(transactions); // update statistics
- if(transactions > 0) sleep_usec = 500;
- inst->unlock();
- }
- }
- usleep(sleep_usec);
-
- stats.cycles++;
- if(! (stats.cycles % STAT_INTERVAL))
- stats.commit_thread_vtime = get_thread_vtime();
- }
-}
=== removed file 'storage/ndb/memcache/src/schedulers/3thread.h'
--- a/storage/ndb/memcache/src/schedulers/3thread.h 2011-04-07 11:20:56 +0000
+++ b/storage/ndb/memcache/src/schedulers/3thread.h 1970-01-01 00:00:00 +0000
@@ -1,66 +0,0 @@
-/*
- Copyright (c) 2011, Oracle and/or its affiliates. All rights
- reserved.
-
- This program is free software; you can redistribute it and/or
- modify it under the terms of the GNU General Public License
- as published by the Free Software Foundation; version 2 of
- the License.
-
- This program is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- GNU General Public License for more details.
-
- You should have received a copy of the GNU General Public License
- along with this program; if not, write to the Free Software
- Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- 02110-1301 USA
- */
-#ifndef NDBMEMCACHE_3THREAD_SCHEDULER_H
-#define NDBMEMCACHE_3THREAD_SCHEDULER_H
-
-#ifndef __cplusplus
-#error "This file is for C++ only"
-#endif
-
-#include <memcached/types.h>
-
-#include "ndbmemcache_config.h"
-#include "Scheduler.h"
-#include "NdbInstance.h"
-
-
-/*
- * 3-Thread Scheduler
- *
- * The 3-thread scheduler launches an NDB worker thread and a separate
- * commit thread; it uses a small number of Ndb objects.
- */
-class Scheduler_3thread : public Scheduler {
-public:
- void init(int threadnum, int nthreads, const char *config_string);
- void attach_thread(thread_identifier *);
- ENGINE_ERROR_CODE schedule(workitem *);
- void io_completed(workitem *) {};
- void add_stats(ADD_STAT, const void *);
- void * run_ndb_worker_thread();
- void * run_ndb_commit_thread();
-
-
-private:
- pthread_t worker_thread_id;
- pthread_t commit_thread_id;
- int nclusters;
- struct workqueue *queue;
- LockableNdbInstance **instances;
- struct sched_stats_3thread {
- uint64_t cycles; /* total number of loops in the commit thread */
- uint64_t worker_thread_vtime;
- uint64_t commit_thread_vtime;
- } stats;
-};
-
-
-#endif
-
=== added file 'storage/ndb/memcache/src/schedulers/Bulk.cc'
--- a/storage/ndb/memcache/src/schedulers/Bulk.cc 1970-01-01 00:00:00 +0000
+++ b/storage/ndb/memcache/src/schedulers/Bulk.cc 2011-04-19 01:16:36 +0000
@@ -0,0 +1,282 @@
+/*
+ Copyright (c) 2011, Oracle and/or its affiliates. All rights
+ reserved.
+
+ This program is free software; you can redistribute it and/or
+ modify it under the terms of the GNU General Public License
+ as published by the Free Software Foundation; version 2 of
+ the License.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ 02110-1301 USA
+ */
+/* System headers */
+/* C++ files must define __STDC_FORMAT_MACROS in order to get PRIu64 */
+#define __STDC_FORMAT_MACROS
+#include <inttypes.h>
+#include <stdio.h>
+#include <unistd.h>
+
+/* Memcache headers */
+#include "memcached/types.h"
+#include <memcached/extension_loggers.h>
+
+/* NDB Memcache headers */
+#include "bulk.h"
+#include "workitem.h"
+#include "ndb_worker.h"
+#include "status_block.h"
+
+extern EXTENSION_LOGGER_DESCRIPTOR *logger;
+
+class bulk_commit_thread_spec {
+public:
+ bulk_commit_thread_spec(Scheduler_bulk *s, int i): sched(s), cluster_id(i) {};
+ Scheduler_bulk *sched;
+ int cluster_id;
+};
+
+extern "C" {
+ void * run_bulk_commit_thread(void *);
+}
+
+status_block status_block_unsupported = { ENGINE_ENOTSUP, "Not supported." };
+
+
+void Scheduler_bulk::init(int my_thread, int nthreads, const char *) {
+ const Configuration & conf = get_Configuration();
+ ClusterConnectionPool *pool;
+ Ndb_cluster_connection *conn;
+ LockableNdbInstance *inst;
+
+ /* Do initialization steps that are configuration-dependent */
+ nclusters = conf.nclusters;
+
+ /* Get the NDB instances -- 2 instances for each cluster. */
+ const int tx_per_inst = 8192; // maximum transactions on each NDB Instance
+ for(int i = 0 ; i < nclusters ; i++) {
+ pool = conf.getConnectionById(i);
+ conn = pool->getPooledConnection(my_thread);
+ inst = new LockableNdbInstance(conn, conf.nprefixes, tx_per_inst);
+ inst->next = new LockableNdbInstance(conn, conf.nprefixes, tx_per_inst);
+ cluster[i].instance = inst;
+
+ /* also get a master NdbInstance for the connection */
+ pool->setNdbInstance(my_thread, new NdbInstance(conn, conf.nprefixes, 128));
+ }
+
+ /* Hoard a number of transactions for each NDB object. How many? Enough
+ to meet the performance requested in the configuration. */
+ for(int c = 0 ; c < conf.nclusters ; c++) {
+ ClusterConnectionPool *pool = conf.getConnectionById(c);
+ cluster[c].usec_rtt = pool->usec_rtt;
+ double tx_time_in_usec = pool->usec_rtt * 5;
+ double tx_per_ndb_per_sec = 1000000 / tx_time_in_usec;
+ double prefetch = conf.max_tps / tx_per_ndb_per_sec;
+ cluster[c].nprefetch = (int) (prefetch / nthreads);
+ DEBUG_PRINT("cluster %d: prefetch %d transactions.", c, cluster[c].nprefetch);
+
+ NdbTransaction ** txlist1 = (NdbTransaction **)
+ calloc(cluster[c].nprefetch, sizeof(NdbTransaction *));
+ NdbTransaction ** txlist2 = (NdbTransaction **)
+ calloc(cluster[c].nprefetch, sizeof(NdbTransaction *));
+
+ const KeyPrefix *prefix = conf.getNextPrefixForCluster(c, NULL);
+ QueryPlan *plan = 0;
+
+ /* open them all */
+ for(int i = 0; i < cluster[c].nprefetch ; i++) {
+ plan = cluster[c].instance->getPlanForPrefix(prefix);
+ txlist1[i] = cluster[c].instance->db->startTransaction(); // on instance 1
+ txlist2[i] = cluster[c].instance->next->db->startTransaction(); // on instance 2
+ }
+
+ /* close them all */
+ for(int i = 0; i < cluster[c].nprefetch ; i++) {
+ txlist1[i]->close();
+ txlist2[i]->close();
+ }
+
+ /* free the lists */
+ delete[] txlist1;
+ delete[] txlist2;
+
+ // Launch the commit thread
+ bulk_commit_thread_spec * spec = new bulk_commit_thread_spec(this, c);
+ pthread_create(& cluster[c].commit_thread_id, NULL,
+ run_bulk_commit_thread, spec);
+ }
+
+}
+
+
+void Scheduler_bulk::attach_thread(thread_identifier * parent) {
+
+ pipeline = parent->pipeline;
+ logger->log(LOG_WARNING, 0, "Pipeline %d attached to bulk scheduler.\n",
+ pipeline->id);
+}
+
+
+ENGINE_ERROR_CODE Scheduler_bulk::schedule(workitem *newitem) {
+ DEBUG_ENTER();
+ const Configuration & conf = get_Configuration(); // see note
+ int lockerr;
+ ENGINE_ERROR_CODE response_code = ENGINE_EWOULDBLOCK;
+ /* Fetch the config for its key prefix */
+ const KeyPrefix *pfx = conf.getPrefixByInfo(newitem->prefix_info);
+
+ if(newitem->prefix_info.prefix_id) {
+ DEBUG_PRINT("prefix %d: \"%s\" Table: %s Value Cols: %d",
+ newitem->prefix_info.prefix_id, pfx->prefix,
+ pfx->table->table_name, pfx->table->nvaluecols);
+ }
+
+ /* From here on we will work mainly with the suffix part of the key. */
+ newitem->base.nsuffix = newitem->base.nkey - pfx->prefix_len;
+ DEBUG_ASSERT(newitem->base.nsuffix > 0);
+
+ /* Get the head NdbInstance for the correct cluster */
+ LockableNdbInstance *inst = cluster[newitem->prefix_info.cluster_id].instance;
+
+ // A: Walk the list of instances trying to lock one.
+ lockerr = - 1;
+ while(lockerr && inst->next) {
+ lockerr = inst->trylock();
+ if(lockerr) inst = inst->next;
+ }
+ // B: If you still don't have a lock, wait on the lock for the last instance.
+ if(lockerr)
+ lockerr = inst->lock();
+ // C: If you *still* don't have a lock, that's either EINVAL or EDEADLK,
+ // and you will hit the assertion in getPlanForPrefix().
+
+ // Now we've got an NDB Instance. Fetch the query plan for this prefix.
+ newitem->plan = inst->getPlanForPrefix(pfx);
+
+ workitem_set_NdbInstance(newitem, inst);
+
+ // Build the NDB transaction
+ if(worker_prepare_operation(newitem)) {
+ inst->npending++;
+ }
+ else {
+ /******* Error handling. The operation is not supported. */
+ if(newitem->base.is_sync) {
+ newitem->status = & status_block_unsupported;
+ pipeline->engine->server.cookie->store_engine_specific(newitem->cookie, newitem);
+ pipeline->engine->server.cookie->notify_io_complete(newitem->cookie, ENGINE_ENOTSUP);
+ }
+ }
+
+ inst->unlock();
+
+ return response_code;
+}
+
+
+void Scheduler_bulk::io_completed(workitem *item) {
+ DEBUG_ENTER();
+ item->ndb_instance = NULL;
+}
+
+
+void Scheduler_bulk::add_stats(ADD_STAT add_stat,
+ const void * cookie) {
+ char key[128];
+ char val[128];
+ const char *units[4] = { "ns", "us", "ms", "s" };
+ int klen, vlen, p;
+ char id;
+ LockableNdbInstance *inst;
+
+ for(int i = 0; i < nclusters; i++) {
+ /* per-cluster */
+ klen = sprintf(key, "t%d_db%d_commit_cycles", pipeline->id, i);
+ vlen = sprintf(val, "%"PRIu64, cluster[i].stats.cycles);
+ add_stat(key, klen, val, vlen, cookie);
+
+ if(cluster[i].stats.vtime != 0) {
+ klen = sprintf(key, "t%d_db%d_commit_thread_vtime", pipeline->id, i);
+ vlen = sprintf(val, "%"PRIu64, cluster[i].stats.vtime);
+ add_stat(key, klen, val, vlen, cookie);
+ }
+
+ /* per-cluster, per-NdbInstance */
+ for(id = 'a', inst = cluster[i].instance ; inst ; id++, inst = inst->next) {
+ /* transactions */
+ klen = sprintf(key, "t%d_db%d%c_transactions", pipeline->id, i, id);
+ vlen = sprintf(val, "%"PRIu64, inst->transactions);
+ add_stat(key, klen, val, vlen, cookie);
+
+ /* batches */
+ klen = sprintf(key, "t%d_db%d%c_batches", pipeline->id, i, id);
+ vlen = sprintf(val, "%"PRIu64, inst->batches);
+ add_stat(key, klen, val, vlen, cookie);
+
+ /* locks_failed */
+ klen = sprintf(key, "t%d_db%d%c_locks_failed", pipeline->id, i, id);
+ vlen = sprintf(val, "%"PRIu64, inst->locks_failed);
+ add_stat(key, klen, val, vlen, cookie);
+ }
+ }
+}
+
+
+// ------------------------------- The commit thread ----------------------
+
+#define STAT_INTERVAL 50
+
+
+void * run_bulk_commit_thread(void *s) {
+ bulk_commit_thread_spec *spec = (bulk_commit_thread_spec *) s;
+
+ return spec->sched->run_ndb_commit_thread(spec->cluster_id);
+}
+
+//FIXME: On shutdown, the commit thread tends to segfault inside sendPollNdb().
+
+/* Note: You could use gethrtime() to keep the pace here. */
+/* Note: latency will go up with the number of clusters unless you have
+ one commit thread per cluster */
+
+void * Scheduler_bulk::run_ndb_commit_thread(int c) {
+ DEBUG_ENTER();
+ LockableNdbInstance *inst;
+ int transactions, sleep_usec, wait_msec;
+ int rtt = cluster[c].usec_rtt;
+
+ cluster[c].stats.cycles = 0;
+ cluster[c].stats.vtime = 0;
+
+ while(1) {
+ sleep_usec = rtt * 2;
+ /* For all NDBs, send transactions and poll for results */
+ for(inst = cluster[c].instance ; inst ; inst = inst->next) {
+ /* Wait with 1 ms timeout, but just poll the last instance. */
+ wait_msec = inst->next ? 1 : 0;
+ inst->lock();
+ if(inst->npending) { // there are pending transactions
+ transactions = inst->db->sendPollNdb(
+ /* wait time (0 = just poll) */ wait_msec,
+ /* minimum ops to wait for */ (inst->npending + 1) / 2,
+ /* force send */ 1);
+ inst->addPollStat(transactions); // update statistics
+ if(transactions > 0) sleep_usec = rtt;
+ }
+ inst->unlock();
+ }
+ usleep(sleep_usec);
+
+ cluster[c].stats.cycles++;
+ if(! (cluster[c].stats.cycles % STAT_INTERVAL))
+ cluster[c].stats.vtime = get_thread_vtime();
+ }
+}
=== added file 'storage/ndb/memcache/src/schedulers/Bulk.h'
--- a/storage/ndb/memcache/src/schedulers/Bulk.h 1970-01-01 00:00:00 +0000
+++ b/storage/ndb/memcache/src/schedulers/Bulk.h 2011-04-19 01:16:36 +0000
@@ -0,0 +1,65 @@
+/*
+ Copyright (c) 2011, Oracle and/or its affiliates. All rights
+ reserved.
+
+ This program is free software; you can redistribute it and/or
+ modify it under the terms of the GNU General Public License
+ as published by the Free Software Foundation; version 2 of
+ the License.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ 02110-1301 USA
+ */
+#ifndef NDBMEMCACHE_BULK_SCHEDULER_H
+#define NDBMEMCACHE_BULK_SCHEDULER_H
+
+#ifndef __cplusplus
+#error "This file is for C++ only"
+#endif
+
+#include <memcached/types.h>
+
+#include "config.h"
+#include "Scheduler.h"
+#include "NdbInstance.h"
+
+
+/*
+ * Bulk Scheduler
+ *
+ * The bulk scheduler uses a small number of Ndb objects with many transactions
+ * on each.
+ */
+class Scheduler_bulk : public Scheduler {
+public:
+ void init(int threadnum, int nthreads, const char *config_string);
+ void attach_thread(thread_identifier *);
+ ENGINE_ERROR_CODE schedule(workitem *);
+ void io_completed(workitem *);
+ void add_stats(ADD_STAT, const void *);
+ void * run_ndb_commit_thread(int thread_id);
+
+private:
+ int nclusters;
+ struct {
+ int usec_rtt;
+ int nprefetch;
+ pthread_t commit_thread_id;
+ LockableNdbInstance *instance;
+ struct sched_stats_bulk {
+ uint64_t cycles; /* total number of loops in the commit thread */
+ uint64_t vtime;
+ } stats;
+ } cluster[MAX_CLUSTERS];
+};
+
+
+#endif
+
=== modified file 'storage/ndb/memcache/src/schedulers/Stockholm.cc'
--- a/storage/ndb/memcache/src/schedulers/Stockholm.cc 2011-04-09 07:36:24 +0000
+++ b/storage/ndb/memcache/src/schedulers/Stockholm.cc 2011-04-19 01:16:36 +0000
@@ -178,7 +178,6 @@ ENGINE_ERROR_CODE Scheduler_stockholm::s
return ENGINE_TMPFAIL;
}
-
workitem_set_NdbInstance(newitem, inst);
// Fetch the query plan for this prefix.
=== modified file 'storage/ndb/memcache/src/workqueue.c'
--- a/storage/ndb/memcache/src/workqueue.c 2011-04-07 11:20:56 +0000
+++ b/storage/ndb/memcache/src/workqueue.c 2011-04-19 01:16:36 +0000
@@ -50,7 +50,7 @@ int workqueue_init(struct workqueue *q,
q->depth = 0;
q->minfree = size / 16;
assert(nconsumers > 0);
- q->threads = nconsumers - 1; /* boolean 0 if there is just one consumer */
+ q->threads = nconsumers - 1; /* boolean false if there is just one consumer */
pthread_cond_init(& q->not_empty, NULL);
pthread_cond_init(& q->not_full, NULL);
=== modified file 'storage/ndb/memcache/unit/Makefile.am'
--- a/storage/ndb/memcache/unit/Makefile.am 2011-04-08 08:01:14 +0000
+++ b/storage/ndb/memcache/unit/Makefile.am 2011-04-19 01:23:01 +0000
@@ -46,7 +46,7 @@ run_unit_tests_CFLAGS = -g -O0 \
run_unit_test_CXXFLAGS = -g -O0
-run_unit_tests_LDADD = ../ndb_engine_la-3thread.lo \
+run_unit_tests_LDADD = ../ndb_engine_la-Bulk.lo \
../ndb_engine_la-ClusterConnectionPool.lo \
../ndb_engine_la-Configuration.lo \
../ndb_engine_la-DataTypeHandler.lo \
=== modified file 'storage/ndb/memcache/unit/harness.cc'
--- a/storage/ndb/memcache/unit/harness.cc 2011-04-08 08:01:14 +0000
+++ b/storage/ndb/memcache/unit/harness.cc 2011-04-19 01:16:36 +0000
@@ -40,7 +40,6 @@ extern EXTENSION_LOGGER_DESCRIPTOR *logg
Ndb_cluster_connection * connect(const char *);
int main(int argc, char *argv[]) {
-
int test_number = -1;
char * test_name = 0;
if(argc > 1) connect_string = argv[1];
No bundle (reason: useless for push emails).
| Thread |
|---|
| • bzr push into mysql-5.1-telco-7.2 branch (john.duncan:4160 to 4161) | John David Duncan | 19 Apr |