3578 John David Duncan 2011-09-30
Online reconfiguration.
This revision compiles and passes the test suite on my laptop.
modified:
mysql-test/lib/My/Memcache.pm
mysql-test/suite/ndb_memcache/include/datatypes_tables.inc
mysql-test/suite/ndb_memcache/t/type_char.test
storage/ndb/memcache/sandbox.sh.in
storage/ndb/memcache/src/ConnQueryPlanSet.cc
storage/ndb/memcache/src/ndb_worker.cc
storage/ndb/memcache/src/schedulers/S_sched.cc
storage/ndb/memcache/src/schedulers/S_sched.h
3577 John David Duncan 2011-09-30
ndb/memcache Configuration -- small fixes
modified:
storage/ndb/memcache/include/Configuration.h
storage/ndb/memcache/src/Configuration.cc
3576 John David Duncan 2011-09-30
Changes to online reconfiguration in S Scheduler.
Online reconfig is actually thread-safe now, protected by a read/write lock.
Online reconfig in the scheduler now *only* udpates the Configuration pointer, the generation number, and the list of QueryPlans.
So it is not currently possible to add a cluster or to grow the pool of Ndb objects using online reconfig.
This change also adapts both schedulers to use ConnQueryPlanSet.
modified:
storage/ndb/memcache/src/schedulers/S_sched.cc
storage/ndb/memcache/src/schedulers/S_sched.h
storage/ndb/memcache/src/schedulers/Stockholm.cc
storage/ndb/memcache/src/schedulers/Stockholm.h
3575 John David Duncan 2011-09-30
New class ConnQueryPlanSet.
Each NdbInstance no longer keeps a list of QueryPlans;
now, for each cluster connection, a ConnQueryPlanSet holds the QueryPlans.
This revision does not compile.
added:
storage/ndb/memcache/include/ConnQueryPlanSet.h
storage/ndb/memcache/src/ConnQueryPlanSet.cc
modified:
storage/ndb/memcache/CMakeLists.txt
storage/ndb/memcache/include/Configuration.h
storage/ndb/memcache/include/NdbInstance.h
storage/ndb/memcache/src/Configuration.cc
storage/ndb/memcache/src/NdbInstance.cc
3574 John David Duncan 2011-09-30
QueryPlan::db is now private.
modified:
storage/ndb/memcache/include/Configuration.h
storage/ndb/memcache/include/Operation.h
storage/ndb/memcache/include/QueryPlan.h
storage/ndb/memcache/src/Configuration.cc
storage/ndb/memcache/src/Operation.cc
storage/ndb/memcache/src/ndb_worker.cc
storage/ndb/memcache/unit/all_tests.h
storage/ndb/memcache/unit/alloc.cc
storage/ndb/memcache/unit/cas.cc
storage/ndb/memcache/unit/casbits.cc
storage/ndb/memcache/unit/connpool.cc
storage/ndb/memcache/unit/harness.cc
storage/ndb/memcache/unit/incr.cc
storage/ndb/memcache/unit/queue.cc
storage/ndb/memcache/unit/tsv.cc
3573 John David Duncan 2011-09-30
Some QueryPlan methods are now const.
modified:
storage/ndb/memcache/include/QueryPlan.h
storage/ndb/memcache/src/QueryPlan.cc
3572 John David Duncan 2011-09-30
memcache: Rather than keeping an Ndb object (and eventually leaking it), Configuration and Config_v1 allocate an Ndb on the stack when they need one.
modified:
storage/ndb/memcache/include/Configuration.h
storage/ndb/memcache/src/Config_v1.cc
storage/ndb/memcache/src/Configuration.cc
3571 John David Duncan 2011-09-29
Column->getSizeInBytes() seems to work for all cases of getColumnRecordSize().
It also does better for determining alignment; for instance NdbApi seems to treat
a timestamp column as an array of 4 chars, so getSize() was 1.
modified:
storage/ndb/memcache/include/DataTypeHandler.h
storage/ndb/memcache/include/QueryPlan.h
storage/ndb/memcache/include/Record.h
storage/ndb/memcache/src/DataTypeHandler.cc
storage/ndb/memcache/src/QueryPlan.cc
storage/ndb/memcache/src/Record.cc
=== modified file 'mysql-test/lib/My/Memcache.pm'
--- a/mysql-test/lib/My/Memcache.pm 2011-09-29 03:29:32 +0000
+++ b/mysql-test/lib/My/Memcache.pm 2011-09-30 20:46:27 +0000
@@ -127,6 +127,9 @@ sub wait_for_reconf {
if($new_gen > 0) {
$self->{cf_gen} = $new_gen;
}
+ else {
+ print STDERR "Timed out.\n";
+ }
return $new_gen;
}
=== modified file 'mysql-test/suite/ndb_memcache/include/datatypes_tables.inc'
--- a/mysql-test/suite/ndb_memcache/include/datatypes_tables.inc 2011-09-29 03:29:32 +0000
+++ b/mysql-test/suite/ndb_memcache/include/datatypes_tables.inc 2011-09-30 20:46:27 +0000
@@ -50,7 +50,6 @@ UPDATE memcache_server_roles set update_
DROP VIEW tv_tablist;
---real_sleep 1
--source suite/ndb_memcache/include/wait_for_reconf.inc
}
=== modified file 'mysql-test/suite/ndb_memcache/t/type_char.test'
--- a/mysql-test/suite/ndb_memcache/t/type_char.test 2011-09-29 03:29:32 +0000
+++ b/mysql-test/suite/ndb_memcache/t/type_char.test 2011-09-30 20:46:27 +0000
@@ -35,6 +35,7 @@ UPDATE memcache_server_roles set update_
--perl
use strict;
+use Carp;
use lib "lib";
use My::Memcache;
# Use a binary protocol connection (so keys can contain spaces)
@@ -42,36 +43,31 @@ my $mc = My::Memcache::Binary->new();
my $port = $ENV{MTR_BUILD_THREAD} * 10 + 10000 + 8;
my $r = $mc->connect("localhost",$port);
-# sleep(1) because there is still an undiagnosed race condition in
-# online reconfiguration -- jdd, 14 Aug. 2011
-sleep(1);
-
my $cf_gen = $mc->wait_for_reconf();
-if($cf_gen > 0) {
- # test CHAR key with VARCHAR value
- $mc->set("tck:a","fred") || die;
- $mc->set("tck:1","frederick") || die;
- $mc->set("tck:aa","frederica") || die;
- $mc->set("tck:a b c d","freddy") || die;
-
- ($mc->get("tck:aa") == "frederica") || die;
- ($mc->get("tck:a b c d") == "freddy") || die;
-
- # test VARCHAR key with CHAR value
- $mc->set("tcv:a", "al") || die;
- $mc->set("tcv:b", "alphonse") || die;
- $mc->set("tcv:c", "allen") || die;
- $mc->set("tcv:d", "alien invasion") || die;
-
- ($mc->get("tcv:d") == "alien invasion") || die;
- ($mc->get("tcv:a") == "al") || die;
- ($mc->get("tcv:ee") == "NOT_FOUND") || die;
-}
-else {
- print " ***** Skipped test [$r/$cf_gen] \n";
+if($cf_gen == 0) {
+ Carp::confess("FAILED WAIT_FOR_RECONF");
}
+# test CHAR key with VARCHAR value
+$mc->set("tck:a","fred") || Carp::confess("FAILED # 01 (SET)");
+$mc->set("tck:1","frederick") || Carp::confess("FAILED # 02 (SET)");
+$mc->set("tck:aa","frederica") || Carp::confess("FAILED # 03 (SET)");
+$mc->set("tck:a b c d","freddy") || Carp::confess("FAILED # 04 (SET)");
+
+($mc->get("tck:aa") == "frederica") || Carp::confess("FAILED # 05 (GET)");
+($mc->get("tck:a b c d") == "freddy") || Carp::confess("FAILED # 06 (GET)");
+
+# test VARCHAR key with CHAR value
+$mc->set("tcv:a", "al") || Carp::confess("FAILED # 07 (SET)");
+$mc->set("tcv:b", "alphonse") || Carp::confess("FAILED # 08 (SET)");
+$mc->set("tcv:c", "allen") || Carp::confess("FAILED # 09 (SET)");
+$mc->set("tcv:d", "alien invasion") || Carp::confess("FAILED # 10 (SET)");
+
+($mc->get("tcv:d") == "alien invasion")|| Carp::confess("FAILED # 11 (GET)");
+($mc->get("tcv:a") == "al") || Carp::confess("FAILED # 12 (GET)");
+($mc->get("tcv:ee") == "NOT_FOUND") || Carp::confess("FAILED # 13 (GET)");
+
EOF
=== modified file 'storage/ndb/memcache/CMakeLists.txt'
--- a/storage/ndb/memcache/CMakeLists.txt 2011-09-25 07:19:37 +0000
+++ b/storage/ndb/memcache/CMakeLists.txt 2011-09-30 17:04:30 +0000
@@ -56,6 +56,7 @@ set(NDB_MEMCACHE_SOURCE_FILES
src/ClusterConnectionPool.cc
src/Configuration.cc
src/Config_v1.cc
+ src/ConnQueryPlanSet.cc
src/DataTypeHandler.cc
src/KeyPrefix.cc
src/NdbInstance.cc
=== modified file 'storage/ndb/memcache/include/Configuration.h'
--- a/storage/ndb/memcache/include/Configuration.h 2011-09-22 18:27:10 +0000
+++ b/storage/ndb/memcache/include/Configuration.h 2011-09-30 20:34:55 +0000
@@ -87,7 +87,7 @@ class Configuration {
const KeyPrefix * getPrefixForKey(const char *key, int nkey) const;
const KeyPrefix * getPrefixByInfo(const prefix_info_t info) const;
const KeyPrefix * getPrefix(int id) const; // inlined
- const KeyPrefix * getNextPrefixForCluster(uint cluster_id, KeyPrefix *) const;
+ const KeyPrefix * getNextPrefixForCluster(uint cluster_id, const KeyPrefix *) const;
void setPrimaryConnectString(const char *); // inlined
void setServerRole(const char *); // inlined
const char * getServerRole(); // inlined
@@ -113,7 +113,6 @@ class Configuration {
int storePrefix(KeyPrefix &prefix);
void storeCAS(uint64_t ndb_engine_cas, uint64_t default_engine_cas);
- Ndb *db;
const char *primary_connect_string;
int onlineReloadFlag;
int (*reload_waiter)(Ndb_cluster_connection *, const char *);
@@ -122,7 +121,7 @@ class Configuration {
/* private methods */
void store_default_prefix();
config_ver_enum get_supported_version();
- bool fetch_meta_record(QueryPlan *plan, const char *version);
+ bool fetch_meta_record(QueryPlan *plan, Ndb *db, const char *version);
/* private instance variables */
const char *server_role;
=== added file 'storage/ndb/memcache/include/ConnQueryPlanSet.h'
--- a/storage/ndb/memcache/include/ConnQueryPlanSet.h 1970-01-01 00:00:00 +0000
+++ b/storage/ndb/memcache/include/ConnQueryPlanSet.h 2011-09-30 17:04:30 +0000
@@ -0,0 +1,49 @@
+/*
+ Copyright (c) 2011, Oracle and/or its affiliates. All rights
+ reserved.
+
+ This program is free software; you can redistribute it and/or
+ modify it under the terms of the GNU General Public License
+ as published by the Free Software Foundation; version 2 of
+ the License.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ 02110-1301 USA
+ */
+
+#ifndef NDBMEMCACHE_CONNQUERYPLANSET_H
+#define NDBMEMCACHE_CONNQUERYPLANSET_H
+
+#ifndef __cplusplus
+#error "This file is for C++ only"
+#endif
+
+
+#include "Configuration.h"
+#include "QueryPlan.h"
+
+class ConnQueryPlanSet {
+public:
+ ConnQueryPlanSet(Ndb_cluster_connection *, int n_plans);
+ ~ConnQueryPlanSet();
+
+ bool buildSetForConfiguration(const Configuration *, int cluster_id);
+ QueryPlan * getPlanForPrefix(const KeyPrefix *);
+
+private:
+ Ndb *db;
+ int nplans;
+ QueryPlan **plans;
+};
+
+
+
+#endif
+
=== modified file 'storage/ndb/memcache/include/NdbInstance.h'
--- a/storage/ndb/memcache/include/NdbInstance.h 2011-09-12 10:05:07 +0000
+++ b/storage/ndb/memcache/include/NdbInstance.h 2011-09-30 17:04:30 +0000
@@ -34,27 +34,22 @@
struct workitem;
#define VPSZ sizeof(void *)
-#define ISZ sizeof(int)
-#define TOTAL_SZ (ISZ + (4 * VPSZ))
+#define TOTAL_SZ (3 * VPSZ)
#define PADDING (64 - TOTAL_SZ)
class NdbInstance {
public:
/* Public Methods */
- NdbInstance(Ndb_cluster_connection *, int, int);
+ NdbInstance(Ndb_cluster_connection *, int);
~NdbInstance();
- QueryPlan * getPlanForPrefix(const KeyPrefix *);
/* Public Instance Variables */
Ndb *db;
NdbInstance *next;
workitem *wqitem;
- int sched_gen_number;
private:
- int nplans;
- QueryPlan **plans;
char cache_line_padding[PADDING];
};
=== modified file 'storage/ndb/memcache/include/Operation.h'
--- a/storage/ndb/memcache/include/Operation.h 2011-09-12 10:05:07 +0000
+++ b/storage/ndb/memcache/include/Operation.h 2011-09-30 16:22:30 +0000
@@ -83,7 +83,7 @@ public:
/* NdbTransaction method wrappers */
// startTransaction
- NdbTransaction *startTransaction() const;
+ NdbTransaction *startTransaction(Ndb *) const;
// read
const NdbOperation *readTuple(NdbTransaction *tx,
=== modified file 'storage/ndb/memcache/include/QueryPlan.h'
--- a/storage/ndb/memcache/include/QueryPlan.h 2011-09-29 19:13:43 +0000
+++ b/storage/ndb/memcache/include/QueryPlan.h 2011-09-30 16:22:30 +0000
@@ -47,13 +47,12 @@ class QueryPlan {
QueryPlan() : initialized(0) {};
QueryPlan(Ndb *, const TableSpec *, PlanOpts opts = NoOptions);
~QueryPlan();
- bool keyIsPrimaryKey();
- void debug_dump();
+ bool keyIsPrimaryKey() const;
+ void debug_dump() const;
/* public instance variables */
bool initialized;
bool dup_numbers;
- Ndb *db;
const TableSpec *spec;
NdbDictionary::Dictionary *dict;
const NdbDictionary::Table *table;
@@ -77,6 +76,7 @@ class QueryPlan {
const NdbDictionary::Index * chooseIndex();
/* Private instance variables */
+ Ndb *db;
};
=== modified file 'storage/ndb/memcache/sandbox.sh.in'
--- a/storage/ndb/memcache/sandbox.sh.in 2011-09-28 20:28:34 +0000
+++ b/storage/ndb/memcache/sandbox.sh.in 2011-09-30 20:46:27 +0000
@@ -1,7 +1,5 @@
#!/bin/sh
-# FIXME: Solaris doesn't have whoami and "kill" doesn't work as used here.
-
prefix=@CMAKE_INSTALL_PREFIX@
bindir=@INSTALL_BINDIR@
libexecdir=@INSTALL_SBINDIR@
@@ -12,6 +10,7 @@ sourcetree=@CMAKE_CURRENT_SOURCE_DIR@
installtree=@CMAKE_INSTALL_PREFIX@/memcache-api
memcached_binary=@MEMCACHED_BIN_PATH@
+MYSELF=`who am i | awk '{print $1}'`
MYSQL_PREFIX=$prefix
MYSQL_BIN=$prefix/$bindir
MYSQL_LIBEXEC=$prefix/$libexecdir
@@ -20,7 +19,11 @@ MYSQL_LIB=$prefix/$libdir
MEMCACHE_BASE=$memcachedir
SOURCE_TREE=$sourcetree
HOME_BASE=$sourcetree # fallback to source tree
-test -d $installtree && HOME_BASE=$installtree # prefer installed tree
+if test $HOME_BASE != `pwd`
+ then
+ # prefer installed tree unless we are currently in the source tree
+ test -d $installtree && HOME_BASE=$installtree
+fi
test_paths() {
test_path $MYSQL_BIN ndb_mgm
@@ -58,7 +61,7 @@ write_my_cnf() {
echo "ndbcluster"
echo "datadir=$HOME_BASE/sandbox/data"
echo "pid-file=$HOME_BASE/sandbox/mysqld.pid"
- echo "user="`whoami`
+ echo "user=$MYSELF"
echo "innodb_log_file_size=1M"
echo
) > $HOME_BASE/sandbox/my.cnf
@@ -94,7 +97,7 @@ write_cluster_ini() {
do_install_db() {
$MYSQL_SCRIPTS/mysql_install_db \
--basedir=$MYSQL_PREFIX --datadir=$HOME_BASE/sandbox/data \
- --skip-name-resolve --user=`whoami` > /dev/null
+ --skip-name-resolve --user=$MYSELF > /dev/null
if test ! -d sandbox/data/mysql
then echo "Failed: mysql_install_db did not work." && exit
else echo "Created MySQL System Tables"
@@ -159,7 +162,8 @@ start_memcached() {
}
stop_memcached() {
- kill `cat $HOME_BASE/sandbox/memcached.pid`
+ SERVERPID=`cat $HOME_BASE/sandbox/memcached.pid`
+ test -n "$SERVERPID" && kill $SERVERPID
}
stop_mysqld() {
@@ -173,6 +177,9 @@ stop_mgm_server() {
final_message() {
sleep 2
echo
+ echo "Sandbox directory is $HOME_BASE/sandbox"
+ echo "Memcached is $memcached_binary"
+ echo
echo "Use \"sh sandbox.sh stop\" to stop memcached & MySQL Cluster"
echo
}
=== modified file 'storage/ndb/memcache/src/Config_v1.cc'
--- a/storage/ndb/memcache/src/Config_v1.cc 2011-09-29 03:26:04 +0000
+++ b/storage/ndb/memcache/src/Config_v1.cc 2011-09-30 07:03:26 +0000
@@ -195,14 +195,16 @@ bool config_v1::read_configuration() {
int config_v1::get_server_role_id() {
uint32_t val = -1;
+ Ndb db(conf.primary_conn);
+ db.init(2);
TableSpec spec("ndbmemcache.memcache_server_roles",
"role_name", "role_id,max_tps");
- QueryPlan plan(conf.db, &spec);
+ QueryPlan plan(&db, &spec);
Operation op(&plan, OP_READ);
op.key_buffer = (char *) malloc(op.requiredKeyBuffer());
op.buffer = (char *) malloc(op.requiredBuffer());
- NdbTransaction *tx = conf.db->startTransaction();
+ NdbTransaction *tx = db.startTransaction();
op.clearKeyNullBits();
op.setKeyPart(COL_STORE_KEY, conf.server_role, strlen(conf.server_role));
@@ -240,13 +242,15 @@ bool config_v1::get_policies() {
DEBUG_ENTER_METHOD("config_v1::get_policies");
bool success = true;
int res;
+ Ndb db(conf.primary_conn);
+ db.init(4);
TableSpec spec("ndbmemcache.cache_policies",
"policy_name",
"get_policy,set_policy,delete_policy,flush_from_db");
- QueryPlan plan(conf.db, &spec);
+ QueryPlan plan(&db, &spec);
Operation op(&plan, OP_SCAN);
- NdbTransaction *tx = conf.db->startTransaction();
+ NdbTransaction *tx = db.startTransaction();
NdbScanOperation *scan = op.scanTable(tx);
if(! scan) {
logger->log(LOG_WARNING, 0, tx->getNdbError().message);
@@ -311,14 +315,16 @@ bool config_v1::get_connections() {
DEBUG_ENTER_METHOD("config_v1::get_connections");
bool success = true;
int res;
+ Ndb db(conf.primary_conn);
+ db.init(4);
TableSpec spec("ndbmemcache.ndb_clusters",
"cluster_id",
"ndb_connectstring,microsec_rtt");
/* Scan the ndb_clusters table */
- QueryPlan plan(conf.db, &spec);
+ QueryPlan plan(&db, &spec);
Operation op(&plan, OP_SCAN);
- NdbTransaction *tx = conf.db->startTransaction();
+ NdbTransaction *tx = db.startTransaction();
NdbScanOperation *scan = op.scanTable(tx);
if(! scan) {
logger->log(LOG_WARNING, 0, tx->getNdbError().message);
@@ -383,16 +389,18 @@ TableSpec * config_v1::get_container(cha
TableSpec * config_v1::get_container_record(char *name) {
TableSpec *container;
+ Ndb db(conf.primary_conn);
+ db.init(1);
TableSpec spec("ndbmemcache.containers",
"name",
"db_schema,db_table,key_columns,value_columns,flags,"
- "increment_column,cas_column,expire_time_column");
- QueryPlan plan(conf.db, &spec);
+ "increment_column,cas_column,expire_time_column");
+ QueryPlan plan(&db, &spec);
Operation op(&plan, OP_READ);
op.key_buffer = (char *) malloc(op.requiredKeyBuffer());
op.buffer = (char *) malloc(op.requiredBuffer());
- NdbTransaction *tx = conf.db->startTransaction();
+ NdbTransaction *tx = db.startTransaction();
op.clearKeyNullBits();
op.setKeyPart(COL_STORE_KEY, name, strlen(name));
@@ -478,7 +486,9 @@ bool config_v1::get_prefixes(int role_id
TableSpec spec("ndbmemcache.key_prefixes",
"server_role_id,key_prefix",
"cluster_id,policy,container");
- QueryPlan plan(conf.db, &spec, PKScan);
+ Ndb db(conf.primary_conn);
+ db.init(4);
+ QueryPlan plan(&db, &spec, PKScan);
Operation op(&plan, OP_SCAN);
// `server_role_id` INT UNSIGNED NOT NULL DEFAULT 0,
@@ -492,7 +502,7 @@ bool config_v1::get_prefixes(int role_id
bound.low_inclusive = bound.high_inclusive = true;
bound.range_no = 0;
- NdbTransaction *tx = conf.db->startTransaction();
+ NdbTransaction *tx = db.startTransaction();
NdbIndexScanOperation *scan = op.scanIndex(tx, &bound);
if(! scan) {
logger->log(LOG_WARNING, 0, "scanIndex(): %s\n", tx->getNdbError().message);
@@ -629,21 +639,23 @@ void config_v1::log_signon() {
DEBUG_ENTER_METHOD("config_v1::log_signon");
char my_hostname[256];
gethostname(my_hostname, 256);
+ Ndb db(conf.primary_conn);
+ db.init(1);
TableSpec spec("ndbmemcache.last_memcached_signon",
"ndb_node_id", "hostname,server_role,signon_time");
- QueryPlan plan(conf.db, &spec);
+ QueryPlan plan(&db, &spec);
Operation op(&plan, OPERATION_SET);
op.buffer = (char *) malloc(op.requiredBuffer());
op.key_buffer = (char *) malloc(op.requiredKeyBuffer());
op.clearNullBits();
- op.setKeyPartInt(COL_STORE_KEY, conf.db->getNodeId()); // node ID (in key)
- op.setColumnInt(COL_STORE_KEY, conf.db->getNodeId()); // node ID (in row)
+ op.setKeyPartInt(COL_STORE_KEY, db.getNodeId()); // node ID (in key)
+ op.setColumnInt(COL_STORE_KEY, db.getNodeId()); // node ID (in row)
op.setColumn(COL_STORE_VALUE+0, my_hostname, strlen(my_hostname)); // hostname
op.setColumn(COL_STORE_VALUE+1, conf.server_role, strlen(conf.server_role)); // role
op.setColumnInt(COL_STORE_VALUE+2,time(NULL)); // timestamp
- NdbTransaction *tx = conf.db->startTransaction(); // TODO: node selection
+ NdbTransaction *tx = db.startTransaction(); // TODO: node selection
op.writeTuple(tx);
tx->execute(NdbTransaction::Commit);
tx->getGCI(&signon_gci);
@@ -666,10 +678,12 @@ void config_v1::set_initial_cas() {
| | + 8bit | |
| | NodeId | |
---------------------------------------------------------------- */
+ Ndb db(conf.primary_conn);
+ db.init(1);
const uint64_t MASK_GCI = 0x07FFFFFF00000000LLU; // Use these 27 bits of GCI
const uint64_t ENGINE_BIT = 0x0000001000000000LLU; // bit 36
- uint64_t node_id = ((uint64_t) conf.db->getNodeId()) << 28;
+ uint64_t node_id = ((uint64_t) db.getNodeId()) << 28;
uint64_t gci_bits = (signon_gci & MASK_GCI) << 5;
uint64_t def_eng_cas = gci_bits | node_id;
uint64_t ndb_eng_cas = gci_bits | ENGINE_BIT | node_id;
@@ -677,7 +691,7 @@ void config_v1::set_initial_cas() {
// void storeCAS(uint64_t ndb_engine_cas, uint64_t default_engine_cas);
conf.storeCAS(ndb_eng_cas, def_eng_cas);
DEBUG_PRINT("Sign On GCI: 0x%llx | Node Id: [%d] 0x%llx | Engine bit: 0x%llx",
- signon_gci, conf.db->getNodeId(), node_id, ENGINE_BIT);
+ signon_gci, db.getNodeId(), node_id, ENGINE_BIT);
DEBUG_PRINT("Initial CAS: %llu 0x%llx ", ndb_eng_cas, ndb_eng_cas);
return;
=== modified file 'storage/ndb/memcache/src/Configuration.cc'
--- a/storage/ndb/memcache/src/Configuration.cc 2011-09-29 03:26:04 +0000
+++ b/storage/ndb/memcache/src/Configuration.cc 2011-09-30 20:34:55 +0000
@@ -54,12 +54,7 @@ Configuration::Configuration(Configurati
primary_connect_string(old->primary_connect_string),
server_role(old->server_role),
config_version(CONFIG_VER_UNKNOWN),
- primary_conn(old->primary_conn)
-{
- db = new Ndb(primary_conn);
- db->init();
-}
-
+ primary_conn(old->primary_conn) {};
bool Configuration::connectToPrimary() {
@@ -84,12 +79,12 @@ bool Configuration::connectToPrimary() {
primary_conn = ClusterConnectionPool::connect(primary_connect_string);
if(primary_conn) {
- db = new Ndb(primary_conn); // This Ndb is used only to read the "meta" table.
- db->init();
return true;
}
- logger->log(LOG_WARNING, 0, "FAILED.\n");
- return false;
+ else {
+ logger->log(LOG_WARNING, 0, "FAILED.\n");
+ return false;
+ }
}
@@ -201,13 +196,17 @@ const KeyPrefix * Configuration::getPref
const KeyPrefix * Configuration::getNextPrefixForCluster(unsigned int cluster_id,
- KeyPrefix *k) const {
+ const KeyPrefix *k) const {
unsigned int i = 0;
- if(k) while(prefixes[i] != k && i < nprefixes) i++;
+ if(k) {
+ while(prefixes[i] != k && i < nprefixes) i++; // find k in the list
+ i++; // then advance one more
+ }
+
while(i < nprefixes && prefixes[i]->info.cluster_id != cluster_id) i++;
- if(i == nprefixes) return 0;
+ if(i >= nprefixes) return 0;
else return prefixes[i];
}
@@ -283,19 +282,21 @@ void Configuration::storeCAS(uint64_t nd
config_ver_enum Configuration::get_supported_version() {
+ Ndb db(primary_conn);
+ db.init(1);
TableSpec ts_meta("ndbmemcache.meta", "application,metadata_version", "");
- QueryPlan plan(db, &ts_meta);
+ QueryPlan plan(& db, &ts_meta);
// "initialized" is set only if the ndbmemcache.meta table exists:
if(plan.initialized) {
- if(fetch_meta_record(&plan, "1.1")) {
+ if(fetch_meta_record(&plan, &db, "1.1")) {
DEBUG_PRINT("1.1");
return CONFIG_VER_1_1;
}
- if(fetch_meta_record(&plan, "1.0")) {
+ if(fetch_meta_record(&plan, &db, "1.0")) {
DEBUG_PRINT("1.0");
return CONFIG_VER_1_0;
}
- if(fetch_meta_record(&plan, "1.0a")) {
+ if(fetch_meta_record(&plan, &db, "1.0a")) {
DEBUG_PRINT("1.0a");
logger->log(LOG_WARNING, 0, "\nThe configuration schema from prototype2 is"
" no longer supported.\nPlease drop your ndbmemcache database,"
@@ -307,7 +308,8 @@ config_ver_enum Configuration::get_suppo
}
-bool Configuration::fetch_meta_record(QueryPlan *plan, const char *version) {
+bool Configuration::fetch_meta_record(QueryPlan *plan, Ndb *db,
+ const char *version) {
DEBUG_ENTER_METHOD("Configuration::fetch_meta_record");
bool result = false;
=== added file 'storage/ndb/memcache/src/ConnQueryPlanSet.cc'
--- a/storage/ndb/memcache/src/ConnQueryPlanSet.cc 1970-01-01 00:00:00 +0000
+++ b/storage/ndb/memcache/src/ConnQueryPlanSet.cc 2011-09-30 20:46:27 +0000
@@ -0,0 +1,64 @@
+/*
+ Copyright (c) 2011, Oracle and/or its affiliates. All rights
+ reserved.
+
+ This program is free software; you can redistribute it and/or
+ modify it under the terms of the GNU General Public License
+ as published by the Free Software Foundation; version 2 of
+ the License.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ 02110-1301 USA
+ */
+
+#include "debug.h"
+#include "ConnQueryPlanSet.h"
+
+ConnQueryPlanSet::ConnQueryPlanSet(Ndb_cluster_connection *conn, int n) :
+ nplans(n),
+ plans(new QueryPlan *[n])
+{
+ memset(plans, 0, (nplans * sizeof(QueryPlan *)));
+ db = new Ndb(conn);
+ db->init();
+}
+
+
+ConnQueryPlanSet::~ConnQueryPlanSet()
+{
+ delete db;
+ delete plans;
+}
+
+
+bool ConnQueryPlanSet:: buildSetForConfiguration(const Configuration *cf,
+ int cluster_id)
+{
+ const KeyPrefix *k = cf->getNextPrefixForCluster(cluster_id, NULL);
+ while(k) {
+ DEBUG_PRINT("Building plan for prefix %d", k->info.prefix_id);
+ getPlanForPrefix(k);
+ k = cf->getNextPrefixForCluster(cluster_id, k);
+ }
+}
+
+
+QueryPlan * ConnQueryPlanSet::getPlanForPrefix(const KeyPrefix *prefix) {
+ int plan_id = prefix->info.prefix_id;
+
+ if(plans[plan_id] == 0) {
+ plans[plan_id] = new QueryPlan(db, prefix->table);
+ }
+
+ if(plans[plan_id]->initialized)
+ return plans[plan_id];
+ else
+ return 0;
+}
=== modified file 'storage/ndb/memcache/src/NdbInstance.cc'
--- a/storage/ndb/memcache/src/NdbInstance.cc 2011-09-25 05:06:46 +0000
+++ b/storage/ndb/memcache/src/NdbInstance.cc 2011-09-30 17:04:30 +0000
@@ -28,13 +28,9 @@
------------- NdbInstance ----------------
------------------------------------------ */
-NdbInstance::NdbInstance(Ndb_cluster_connection *c, int nprefixes,
- int ntransactions) :
+NdbInstance::NdbInstance(Ndb_cluster_connection *c, int ntransactions) :
next(0), wqitem(0)
{
- nplans = nprefixes;
- plans = new QueryPlan *[nplans];
- memset(plans, 0, (nplans * sizeof(QueryPlan *)));
if(c) {
db = new Ndb(c);
db->init(ntransactions);
@@ -49,23 +45,6 @@ NdbInstance::NdbInstance(Ndb_cluster_con
NdbInstance::~NdbInstance() {
if(db) delete db;
-
- for(int i = 0 ; i < nplans ; i++)
- if(plans[i])
- delete plans[i];
}
-QueryPlan * NdbInstance::getPlanForPrefix(const KeyPrefix *prefix) {
- int plan_id = prefix->info.prefix_id;
-
- if(plans[plan_id] == 0) {
- plans[plan_id] = new QueryPlan(db, prefix->table);
- }
-
- if(plans[plan_id]->initialized)
- return plans[plan_id];
- else
- return 0;
-}
-
=== modified file 'storage/ndb/memcache/src/Operation.cc'
--- a/storage/ndb/memcache/src/Operation.cc 2011-09-22 18:27:10 +0000
+++ b/storage/ndb/memcache/src/Operation.cc 2011-09-30 16:22:30 +0000
@@ -70,10 +70,10 @@ size_t Operation::copyValue(int idx, cha
/* NdbTransaction method wrappers */
-NdbTransaction * Operation::startTransaction() const {
+NdbTransaction * Operation::startTransaction(Ndb *db) const {
char hash_buffer[512];
- return plan->db->startTransaction(plan->key_record->ndb_record, key_buffer,
- hash_buffer, 512);
+ return db->startTransaction(plan->key_record->ndb_record, key_buffer,
+ hash_buffer, 512);
}
NdbIndexScanOperation * Operation::scanIndex(NdbTransaction *tx,
=== modified file 'storage/ndb/memcache/src/QueryPlan.cc'
--- a/storage/ndb/memcache/src/QueryPlan.cc 2011-09-29 19:13:43 +0000
+++ b/storage/ndb/memcache/src/QueryPlan.cc 2011-09-30 15:34:37 +0000
@@ -217,7 +217,7 @@ QueryPlan::~QueryPlan() {
}
-void QueryPlan::debug_dump() {
+void QueryPlan::debug_dump() const {
if(key_record) {
DEBUG_PRINT("Key record:");
key_record->debug_dump();
@@ -232,7 +232,7 @@ void QueryPlan::debug_dump() {
}
}
-bool QueryPlan::keyIsPrimaryKey() {
+bool QueryPlan::keyIsPrimaryKey() const {
if(spec->nkeycols == table->getNoOfPrimaryKeys()) {
for(int i = 0 ; i < spec->nkeycols ; i++)
if(strcmp(spec->key_columns[i], table->getPrimaryKey(i)))
=== modified file 'storage/ndb/memcache/src/ndb_worker.cc'
--- a/storage/ndb/memcache/src/ndb_worker.cc 2011-09-28 03:58:19 +0000
+++ b/storage/ndb/memcache/src/ndb_worker.cc 2011-09-30 20:46:27 +0000
@@ -174,7 +174,7 @@ op_status_t worker_do_delete(workitem *w
Operation op(plan, OP_DELETE, wqitem->ndb_key_buffer);
const NdbOperation *ndb_op = 0;
- NdbTransaction *tx = op.startTransaction();
+ NdbTransaction *tx = op.startTransaction(wqitem->ndb_instance->db);
DEBUG_ASSERT(tx);
op.clearKeyNullBits();
@@ -278,9 +278,10 @@ op_status_t worker_do_write(workitem *wq
}
/* Start the transaction */
- NdbTransaction *tx = op.startTransaction();
+ NdbTransaction *tx = op.startTransaction(wqitem->ndb_instance->db);
if(! tx) {
- logger->log(LOG_WARNING, 0, "tx: %s \n", plan->db->getNdbError().message);
+ logger->log(LOG_WARNING, 0, "tx: %s \n",
+ wqitem->ndb_instance->db->getNdbError().message);
DEBUG_ASSERT(false);
}
@@ -346,7 +347,7 @@ op_status_t worker_do_read(workitem *wqi
op.setKeyPart(COL_STORE_KEY, dbkey, wqitem->base.nsuffix);
/* Start a transaction, and call NdbTransaction::readTuple() */
- NdbTransaction *tx = op.startTransaction();
+ NdbTransaction *tx = op.startTransaction(wqitem->ndb_instance->db);
DEBUG_ASSERT(tx);
if(! op.readTuple(tx)) {
@@ -436,7 +437,7 @@ op_status_t worker_do_math(workitem *wqi
}
/* Use an op (either one) to start the transaction */
- NdbTransaction *tx = op1.startTransaction();
+ NdbTransaction *tx = op1.startTransaction(wqitem->ndb_instance->db);
/* NdbOperation #1: READ */
{
@@ -910,25 +911,23 @@ ENGINE_ERROR_CODE ndb_flush_all(ndb_pipe
DEBUG_PRINT(" %d prefixes", conf.nprefixes);
for(int i = 0 ; i < conf.nprefixes ; i++) {
- NdbInstance *inst = 0;
- const KeyPrefix *p = conf.getPrefix(i);
- if(p->info.use_ndb && p->info.do_db_flush) {
- ClusterConnectionPool *pool = conf.getConnectionPoolById(p->info.cluster_id);
+ const KeyPrefix *pfx = conf.getPrefix(i);
+ if(pfx->info.use_ndb && pfx->info.do_db_flush) {
+ ClusterConnectionPool *pool = conf.getConnectionPoolById(pfx->info.cluster_id);
Ndb_cluster_connection *conn = pool->getMainConnection();
- inst = new NdbInstance(conn, conf.nprefixes, 128);
- QueryPlan *plan = inst->getPlanForPrefix(p);
- if(plan->keyIsPrimaryKey()) {
- /* To flush, scan the table and delete every row */
- DEBUG_PRINT("prefix %d - deleting from %s", i, p->table->table_name);
- scan_delete(inst, plan);
+ NdbInstance inst(conn, 128);
+ QueryPlan plan(inst.db, pfx->table);
+ if(plan.keyIsPrimaryKey()) {
+ // To flush, scan the table and delete every row
+ DEBUG_PRINT("prefix %d - deleting from %s", i, pfx->table->table_name);
+ scan_delete(&inst, &plan);
}
else DEBUG_PRINT("prefix %d - not scanning table %s -- accees path "
- "is not primary key", i, p->table->table_name);
+ "is not primary key", i, pfx->table->table_name);
}
else DEBUG_PRINT("prefix %d - not scanning table %s -- use_ndb:%d flush:%d",
- i, p->table ? p->table->table_name : "",
- p->info.use_ndb, p->info.do_db_flush);
- if(inst) delete inst;
+ i, pfx->table ? pfx->table->table_name : "",
+ pfx->info.use_ndb, pfx->info.do_db_flush);
}
return ENGINE_SUCCESS;
=== modified file 'storage/ndb/memcache/src/schedulers/S_sched.cc'
--- a/storage/ndb/memcache/src/schedulers/S_sched.cc 2011-09-28 03:29:26 +0000
+++ b/storage/ndb/memcache/src/schedulers/S_sched.cc 2011-09-30 20:46:27 +0000
@@ -46,10 +46,10 @@ extern "C" {
extern EXTENSION_LOGGER_DESCRIPTOR *logger;
/* Lock that protects online reconfiguration */
-pthread_mutex_t reconf_lock = PTHREAD_MUTEX_INITIALIZER;
+pthread_rwlock_t reconf_lock = PTHREAD_RWLOCK_INITIALIZER;
/* Scheduler Global singleton */
-S::SchedulerGlobal * volatile s_global;
+S::SchedulerGlobal * s_global;
/* Global scheduler generation number */
int sched_generation_number;
@@ -110,6 +110,19 @@ void S::SchedulerGlobal::init(int _nthre
}
+void S::SchedulerGlobal::reconfigure(Configuration * new_cf) {
+ conf = new_cf;
+ generation++;
+
+ for(int i = 0; i < nclusters ; i++) {
+ for(int j = 0; j < options.n_worker_threads; j++) {
+ WorkerConnection *wc = * (getWorkerConnectionPtr(j, i));
+ wc->reconfigure(new_cf);
+ }
+ }
+}
+
+
void S::SchedulerGlobal::shutdown() {
if(running) {
logger->log(LOG_WARNING, 0, "Shutting down scheduler.");
@@ -210,10 +223,10 @@ void S::SchedulerGlobal::add_stats(const
const char *status;
char *gen = gen_number_buffer;
- if(pthread_mutex_trylock(& reconf_lock) == 0) {
+ if(pthread_rwlock_tryrdlock(& reconf_lock) == 0) {
status = "Running";
snprintf(gen, 16, "%d", generation);
- pthread_mutex_unlock(& reconf_lock);
+ pthread_rwlock_unlock(& reconf_lock);
}
else {
status = "Loading";
@@ -224,12 +237,8 @@ void S::SchedulerGlobal::add_stats(const
}
else {
DEBUG_PRINT(" scheduler");
- /* Aggregate scheduler statistics, so long as reconf is not in progress */
- if(pthread_mutex_trylock(& reconf_lock) == 0) {
- for(int c = 0 ; c < nclusters ; c++) {
- clusters[c]->add_stats(stat_key, add_stat, cookie);
- }
- pthread_mutex_unlock(& reconf_lock);
+ for(int c = 0 ; c < nclusters ; c++) {
+ clusters[c]->add_stats(stat_key, add_stat, cookie);
}
}
}
@@ -261,47 +270,6 @@ void S::SchedulerWorker::shutdown() {
}
-bool S::SchedulerWorker::global_reconfigure(Configuration *new_conf) {
- DEBUG_ENTER();
- bool result = false;
-
- if(pthread_mutex_lock(& reconf_lock) == 0) {
- sched_generation_number++;
- SchedulerGlobal *old_global = s_global;
- SchedulerGlobal *new_global = new SchedulerGlobal(new_conf);
-
- /* Get the new scheduler up and running.
- * The primary cluster will have its reference count increased
- * (and be re-used). A Cluster and its Connections are durable,
- * but WorkerConnections are configuration-specific and transient.
- */
- new_global->init(old_global->nthreads, old_global->config_string);
-
- /* If attach_thread() has already happened, set the pipeline: */
- if(pipeline) {
- new_global->engine = pipeline->engine;
- }
-
- /* Set s_global. Releasing the mutex will issue a memory barrier,
- * making the new value visible to other threads.
- */
- s_global = new_global;
-
- /* We are done with the lock */
- pthread_mutex_unlock(& reconf_lock);
-
- /* Now shut down the old scheduler.
- * Clusters that are unused in the new config will be gracefully shut down.
- */
- old_global->shutdown();
-
- result = true;
- }
-
- return result;
-}
-
-
void S::SchedulerWorker::attach_thread(thread_identifier *parent) {
DEBUG_ENTER();
@@ -316,19 +284,27 @@ void S::SchedulerWorker::attach_thread(t
ENGINE_ERROR_CODE S::SchedulerWorker::schedule(workitem *item) {
- int c;
+ int c = item->prefix_info.cluster_id;
NdbInstance *inst;
+ S::WorkerConnection *wc;
+ const KeyPrefix *pfx;
+
DEBUG_PRINT("SchedulerWorker / config gen. %d", s_global->generation);
- const KeyPrefix * pfx = s_global->conf->getPrefixByInfo(item->prefix_info);
+ /* ACQUIRE READ LOCK */
+ if(pthread_rwlock_rdlock(& reconf_lock) == 0) {
+ wc = * (s_global->getWorkerConnectionPtr(id, c));
+ pfx = s_global->conf->getPrefixByInfo(item->prefix_info);
+ pthread_rwlock_unlock(& reconf_lock);
+ }
+ else {
+ return ENGINE_TMPFAIL;
+ }
+ /* READ LOCK RELEASED */
item->base.nsuffix = item->base.nkey - pfx->prefix_len;
if(item->base.nsuffix == 0) return ENGINE_EINVAL; // key too short
- c = item->prefix_info.cluster_id;
-
- S::WorkerConnection *wc = * (s_global->getWorkerConnectionPtr(id, c));
-
if(wc && wc->freelist) {
inst = wc->freelist;
wc->freelist = inst->next;
@@ -347,7 +323,7 @@ ENGINE_ERROR_CODE S::SchedulerWorker::sc
workitem_set_NdbInstance(item, inst);
// Fetch the query plan for this prefix.
- item->plan = inst->getPlanForPrefix(pfx);
+ item->plan = wc->plan_set->getPlanForPrefix(pfx);
if(! item->plan) {
DEBUG_PRINT("getPlanForPrefix() failure");
return ENGINE_FAILED;
@@ -410,7 +386,7 @@ void S::SchedulerWorker::io_completed(wo
int c = item->prefix_info.cluster_id;
S::WorkerConnection * wc = * (s_global->getWorkerConnectionPtr(id, c));
if(wc && ! wc->sendqueue->is_aborted()) {
- assert(inst->sched_gen_number == s_global->generation);
+ // assert(inst->sched_gen_number == s_global->generation);
inst->wqitem = NULL;
inst->next = wc->freelist;
wc->freelist = inst;
@@ -424,6 +400,23 @@ void S::SchedulerWorker::io_completed(wo
}
+/* This is a partial implementation of online reconfiguration.
+ It can replace KeyPrefix mappings, but not add a cluster at runtime
+ (nor will it catch an attempt to do so -- which will eventually lead to
+ a crash after a getWorkerConnectionPtr()).
+*/
+bool S::SchedulerWorker::global_reconfigure(Configuration *new_cf) {
+ bool r = false;
+
+ if(pthread_rwlock_wrlock(& reconf_lock) == 0) {
+ s_global->reconfigure(new_cf);
+ pthread_rwlock_unlock(& reconf_lock);
+ r = true;
+ }
+ return r;
+}
+
+
void S::SchedulerWorker::add_stats(const char *stat_key,
ADD_STAT add_stat,
const void *cookie) {
@@ -531,12 +524,16 @@ S::WorkerConnection::WorkerConnection(Sc
conn = cl->connections[id.conn];
id.node = conn->node_id;
+ /* Build the plan_set and all QueryPlans */
+ old_plan_set = 0;
+ plan_set = new ConnQueryPlanSet(conn->conn, conf->nprefixes);
+ plan_set->buildSetForConfiguration(conf, cluster_id);
+
/* Build the freelist */
freelist = 0;
int my_ndb_inst = conn->nInst / global->options.n_worker_threads;
for(int j = 0 ; j < my_ndb_inst ; j++ ) {
- NdbInstance *inst = new NdbInstance(conn->conn, conf->nprefixes, 2);
- inst->sched_gen_number = global->generation;
+ NdbInstance *inst = new NdbInstance(conn->conn, 2);
inst->next = freelist;
freelist = inst;
}
@@ -561,7 +558,7 @@ S::WorkerConnection::WorkerConnection(Sc
// Open them all.
for(NdbInstance *inst = freelist; inst != 0 ;inst=inst->next, i++) {
NdbTransaction *tx;
- plan = inst->getPlanForPrefix(prefix);
+ plan = plan_set->getPlanForPrefix(prefix);
tx = inst->db->startTransaction();
if(! tx) logger->log(LOG_WARNING, 0, inst->db->getNdbError().message);
txlist[i] = tx;
@@ -575,7 +572,20 @@ S::WorkerConnection::WorkerConnection(Sc
// Free the list.
delete[] txlist;
}
+}
+
+
+void S::WorkerConnection::reconfigure(Configuration *new_cf) {
+ if(old_plan_set) { /* Garbage collect the old old plans */
+ delete old_plan_set;
+ }
+ old_plan_set = plan_set;
+
+ ConnQueryPlanSet *new_plans =
+ new ConnQueryPlanSet(conn->conn, new_cf->nprefixes);
+ new_plans->buildSetForConfiguration(new_cf, id.cluster);
+ plan_set = new_plans;
}
@@ -592,6 +602,12 @@ S::WorkerConnection::~WorkerConnection()
/* Delete the sendqueue */
delete sendqueue;
+
+ /* Delete the current QueryPlans (and maybe the previous ones, too) */
+ delete plan_set;
+ if(old_plan_set) {
+ delete old_plan_set;
+ }
}
=== modified file 'storage/ndb/memcache/src/schedulers/S_sched.h'
--- a/storage/ndb/memcache/src/schedulers/S_sched.h 2011-09-26 21:12:27 +0000
+++ b/storage/ndb/memcache/src/schedulers/S_sched.h 2011-09-30 20:46:27 +0000
@@ -32,6 +32,7 @@
#include "ndbmemcache_config.h"
#include "Scheduler.h"
#include "KeyPrefix.h"
+#include "ConnQueryPlanSet.h"
#include "Queue.h"
/*
@@ -50,7 +51,7 @@ public:
class SchedulerWorker; // one object per memcached worker thread
class Cluster; // one object for each cluster
class Connection; // one object per connection to a cluster
- class WorkerConnection; // one object per {worker,conncetion} pair
+ class WorkerConnection; // one object per {worker,connection} pair
};
@@ -62,6 +63,7 @@ public:
~SchedulerGlobal() {};
void init(int threads, const char *config_string);
void add_stats(const char *, ADD_STAT, const void *);
+ void reconfigure(Configuration *);
void shutdown();
WorkerConnection ** getWorkerConnectionPtr(int thd, int cluster) const {
return & workerConnections[(thd * nclusters) + cluster];
@@ -184,6 +186,7 @@ public:
WorkerConnection(SchedulerGlobal *, int thd_id, int cluster_id);
~WorkerConnection();
void shutdown();
+ void reconfigure(Configuration *);
struct {
int thd : 8;
@@ -192,6 +195,7 @@ public:
unsigned int node : 8;
} id;
S::Connection *conn;
+ ConnQueryPlanSet *plan_set, *old_plan_set;
NdbInstance *freelist;
Queue<NdbInstance> * sendqueue;
};
=== modified file 'storage/ndb/memcache/src/schedulers/Stockholm.cc'
--- a/storage/ndb/memcache/src/schedulers/Stockholm.cc 2011-09-22 18:27:10 +0000
+++ b/storage/ndb/memcache/src/schedulers/Stockholm.cc 2011-09-30 18:04:25 +0000
@@ -58,7 +58,7 @@ void Scheduler_stockholm::init(int my_th
c, conf.max_tps, pool->usec_rtt, cluster[c].nInst);
}
- // Get the NDB instances.
+ // Get the ConnQueryPlanSet and NDB instances for each cluster.
for(unsigned int c = 0 ; c < conf.nclusters ; c++) {
cluster[c].instances = (NdbInstance**)
calloc(cluster[c].nInst, sizeof(NdbInstance *));
@@ -66,9 +66,12 @@ void Scheduler_stockholm::init(int my_th
ClusterConnectionPool *pool = conf.getConnectionPoolById(c);
Ndb_cluster_connection *conn = pool->getPooledConnection(my_thread);
+ cluster[c].plan_set = new ConnQueryPlanSet(conn, conf.nprefixes);
+ cluster[c].plan_set->buildSetForConfiguration(&conf, c);
+
cluster[c].nextFree = NULL;
for(int i = 0; i < cluster[c].nInst ; i++) {
- NdbInstance *inst = new NdbInstance(conn, conf.nprefixes, 1);
+ NdbInstance *inst = new NdbInstance(conn, 1);
cluster[c].instances[i] = inst;
inst->next = cluster[c].nextFree;
cluster[c].nextFree = inst;
@@ -77,11 +80,11 @@ void Scheduler_stockholm::init(int my_th
logger->log(LOG_WARNING, 0, "Pipeline %d using %u Ndb instances for Cluster %u.\n",
my_thread, cluster[c].nInst, c);
}
-
+
+
/* Hoard a transaction (an API connect record) for each Ndb object. This
first call to startTransaction() will send TC_SEIZEREQ and wait for a
reply, but later at runtime startTransaction() should return immediately.
- Also, for each NDB instance, pre-build the QueryPlan for the default key prefix.
TODO? Start one tx on each data node.
*/
QueryPlan *plan;
@@ -93,7 +96,7 @@ void Scheduler_stockholm::init(int my_th
txlist = ( NdbTransaction **) calloc(cluster[c].nInst, sizeof(NdbTransaction *));
// Open them all.
for(int i = 0 ; i < cluster[c].nInst ; i++) {
- plan = cluster[c].instances[i]->getPlanForPrefix(prefix);
+ plan = cluster[c].plan_set->getPlanForPrefix(prefix);
txlist[i] = cluster[c].instances[i]->db->startTransaction();
}
// Close them all.
@@ -185,7 +188,7 @@ ENGINE_ERROR_CODE Scheduler_stockholm::s
workitem_set_NdbInstance(newitem, inst);
// Fetch the query plan for this prefix.
- newitem->plan = inst->getPlanForPrefix(pfx);
+ newitem->plan = cluster[c].plan_set->getPlanForPrefix(pfx);
if(! newitem->plan) return ENGINE_FAILED;
// Build the NDB transaction
=== modified file 'storage/ndb/memcache/src/schedulers/Stockholm.h'
--- a/storage/ndb/memcache/src/schedulers/Stockholm.h 2011-09-12 10:05:07 +0000
+++ b/storage/ndb/memcache/src/schedulers/Stockholm.h 2011-09-30 18:04:25 +0000
@@ -31,6 +31,7 @@
#include "workitem.h"
#include "Scheduler.h"
#include "KeyPrefix.h"
+#include "ConnQueryPlanSet.h"
/*
@@ -62,6 +63,7 @@ private:
uint64_t commit_thread_vtime;
} stats;
pthread_t commit_thread_id;
+ ConnQueryPlanSet * plan_set;
NdbInstance **instances;
int nInst;
NdbInstance *nextFree;
=== modified file 'storage/ndb/memcache/unit/all_tests.h'
--- a/storage/ndb/memcache/unit/all_tests.h 2011-09-22 05:02:00 +0000
+++ b/storage/ndb/memcache/unit/all_tests.h 2011-09-30 16:22:30 +0000
@@ -34,9 +34,9 @@
#define REQ_NDB_CONNECTION 1
#define REQ_DEMO_TABLE 2
-void delete_row(QueryPlan *plan, const char * key, int verbose);
+void delete_row(QueryPlan *plan, Ndb *db, const char * key, int verbose);
-typedef int TESTCASE(QueryPlan *plan, int verbose);
+typedef int TESTCASE(QueryPlan *plan, Ndb *db, int verbose);
struct test_item {
int enabled;
=== modified file 'storage/ndb/memcache/unit/alloc.cc'
--- a/storage/ndb/memcache/unit/alloc.cc 2011-09-22 18:27:10 +0000
+++ b/storage/ndb/memcache/unit/alloc.cc 2011-09-30 16:22:30 +0000
@@ -27,7 +27,7 @@
#include "all_tests.h"
-int run_allocator_test(QueryPlan *, int v) {
+int run_allocator_test(QueryPlan *, Ndb *, int v) {
struct request_pipeline *p = get_request_pipeline();
memory_pool *p1 = pipeline_create_memory_pool(p);
=== modified file 'storage/ndb/memcache/unit/cas.cc'
--- a/storage/ndb/memcache/unit/cas.cc 2011-09-12 10:05:07 +0000
+++ b/storage/ndb/memcache/unit/cas.cc 2011-09-30 16:22:30 +0000
@@ -21,35 +21,36 @@
#include "all_tests.h"
-int set_row(int, QueryPlan *, const char *, Uint64 , Uint64 );
+int set_row(int, QueryPlan *, Ndb *, const char *, Uint64 , Uint64 );
void build_cas_routine(NdbInterpretedCode *, QueryPlan *plan, Uint64 *cas);
-int run_cas_test(QueryPlan *plan, int v) {
+int run_cas_test(QueryPlan *plan, Ndb *db, int v) {
const Uint64 cas = 30090000000000003ULL;
int r;
- r = set_row(v, plan, "cas_unit_test_1", 0ULL, cas); // a normal update
+ r = set_row(v, plan, db, "cas_unit_test_1", 0ULL, cas); // a normal update
detail(v, "(1): %d\n", r);
require(r == 0);
- r = set_row(v, plan, "cas_unit_test_1", cas, cas + 1); // an interpreted update
+ r = set_row(v, plan, db, "cas_unit_test_1", cas, cas + 1); // an interpreted update
detail(v, "(2): %d\n", r);
require(r == 0);
- r = set_row(v, plan, "cas_unit_test_2", 0ULL, cas); // a normal update
+ r = set_row(v, plan, db, "cas_unit_test_2", 0ULL, cas); // a normal update
detail(v, "(2): %d\n", r);
require(r == 0);
- r = set_row(v, plan, "cas_unit_test_2", cas-1, cas+1); // this should fail
+ r = set_row(v, plan, db, "cas_unit_test_2", cas-1, cas+1); // this should fail
detail(v, "(2): %d\n", r);
require(r == 899);
pass;
}
-int set_row(int v, QueryPlan *plan, const char *akey, Uint64 old_cas, Uint64 new_cas) {
+int set_row(int v, QueryPlan *plan, Ndb *db,
+ const char *akey, Uint64 old_cas, Uint64 new_cas) {
NdbOperation::OperationOptions options;
NdbInterpretedCode code(plan->table);
char key[50];
@@ -76,7 +77,7 @@ int set_row(int v, QueryPlan *plan, cons
op.setColumn(COL_STORE_VALUE, value, strlen(value));
op.setColumnBigUnsigned(COL_STORE_CAS, new_cas);
- NdbTransaction *tx = op.startTransaction();
+ NdbTransaction *tx = op.startTransaction(db);
if(old_cas) {
/* This is an interpreted update */
=== modified file 'storage/ndb/memcache/unit/casbits.cc'
--- a/storage/ndb/memcache/unit/casbits.cc 2011-09-22 18:27:10 +0000
+++ b/storage/ndb/memcache/unit/casbits.cc 2011-09-30 16:22:30 +0000
@@ -31,7 +31,7 @@ ndbmc_atomic32_t engine_cas_lo = 0xb0000
void worker_set_cas(int verbose, Uint64 *cas);
-int test_cas_bitshifts(QueryPlan *, int v) {
+int test_cas_bitshifts(QueryPlan *, Ndb *, int v) {
Uint64 cas = 0ULL;
worker_set_cas(v, &cas);
require(cas == 0x00717530B0000065ULL);
=== modified file 'storage/ndb/memcache/unit/connpool.cc'
--- a/storage/ndb/memcache/unit/connpool.cc 2011-09-12 10:05:07 +0000
+++ b/storage/ndb/memcache/unit/connpool.cc 2011-09-30 16:22:30 +0000
@@ -23,10 +23,10 @@
#include "all_tests.h"
-int run_pool_test(QueryPlan *plan, int v) {
+int run_pool_test(QueryPlan *plan, Ndb *db1, int v) {
int conn_retries = 0;
int r;
- Ndb_cluster_connection &main_conn = plan->db->get_ndb_cluster_connection();
+ Ndb_cluster_connection &main_conn = db1->get_ndb_cluster_connection();
/* The pooled connection */
Ndb_cluster_connection * nc = new Ndb_cluster_connection(connect_string, & main_conn);
@@ -75,13 +75,13 @@ int run_pool_test(QueryPlan *plan, int v
nc->node_id(), nc->get_connected_host(), nc->get_connected_port());
require(nc->node_id());
- Ndb *db = new Ndb(nc);
- require(db);
+ Ndb *db2 = new Ndb(nc);
+ require(db2);
detail(v, "Created an Ndb object.\n");
- db->init(4);
+ db2->init(4);
- NdbTransaction *tx = db->startTransaction();
+ NdbTransaction *tx = db2->startTransaction();
require(tx);
detail(v, "Started a transaction.\n");
=== modified file 'storage/ndb/memcache/unit/harness.cc'
--- a/storage/ndb/memcache/unit/harness.cc 2011-09-12 10:05:07 +0000
+++ b/storage/ndb/memcache/unit/harness.cc 2011-09-30 16:22:30 +0000
@@ -113,7 +113,7 @@ int main(int argc, char *argv[]) {
if(test_number >= 0) { /* Run a particular test */
printf("%s\n", all_tests[test_number].name);
- int r = all_tests[test_number].function(plan, 1); //verbose
+ int r = all_tests[test_number].function(plan, db, 1); //verbose
if(r) {
printf(" [FAIL] at line %d\n", r); nfail++;
} else {
@@ -124,7 +124,7 @@ int main(int argc, char *argv[]) {
for(int i = 0; all_tests[i].name; i++) {
if(all_tests[i].enabled) {
printf("%-30s", all_tests[i].name);
- int r = all_tests[i].function(plan, 0); // quiet
+ int r = all_tests[i].function(plan, db, 0); // quiet
printf(" %s\n", r ? "[FAIL]" : "[PASS]");
if(r) nfail++;
else npass++;
@@ -200,7 +200,7 @@ Ndb_cluster_connection * connect(const c
}
-void delete_row(QueryPlan *plan, const char * key, int verbose) {
+void delete_row(QueryPlan *plan, Ndb *db, const char * key, int verbose) {
char ndbkeybuffer[300];
Operation op(plan, OP_DELETE, ndbkeybuffer);
@@ -208,7 +208,7 @@ void delete_row(QueryPlan *plan, const c
op.clearKeyNullBits();
op.setKeyPart(COL_STORE_KEY, key, strlen(key));
- NdbTransaction * tx = op.startTransaction();
+ NdbTransaction * tx = op.startTransaction(db);
op.deleteTuple(tx);
tx->execute(NdbTransaction::Commit);
=== modified file 'storage/ndb/memcache/unit/incr.cc'
--- a/storage/ndb/memcache/unit/incr.cc 2011-09-22 18:27:10 +0000
+++ b/storage/ndb/memcache/unit/incr.cc 2011-09-30 16:22:30 +0000
@@ -19,47 +19,48 @@
*/
#include "all_tests.h"
-int do_incr(int v, QueryPlan *plan, const char *akey, bool, bool, Uint64 *val);
+int do_incr(int v, QueryPlan *plan, Ndb *db,
+ const char *akey, bool, bool, Uint64 *val);
-int run_incr_test(QueryPlan *plan, int v) {
+int run_incr_test(QueryPlan *plan, Ndb *db, int v) {
int r = 0;
Uint64 val = 33;
- delete_row(plan, "incr_unit_test_1", v);
- delete_row(plan, "incr_unit_test_2", v);
+ delete_row(plan, db, "incr_unit_test_1", v);
+ delete_row(plan, db, "incr_unit_test_2", v);
detail(v, "Test 1: INCR non-existing row, create=false\n");
- r = do_incr(v, plan, "incr_unit_test_1", false, true, &val);
+ r = do_incr(v, plan, db, "incr_unit_test_1", false, true, &val);
detail(v, "Result - NDB=%d Val=%llu \n\n", r, (long long unsigned) val);
require(r == 626);
require(val == 33);
detail(v, "Test 2: INCR non-existing row, create=true, update = false\n");
- r = do_incr(v, plan, "incr_unit_test_1", true, false, &val);
+ r = do_incr(v, plan, db, "incr_unit_test_1", true, false, &val);
detail(v, "Result - NDB=%d Val=%llu \n\n", r, val);
require(r == 626); // the transaction gets a 626 even if the insert succeeds
require(val == -1ULL);
detail(v, "test 3: READ row created in test 2\n");
- r = do_incr(v, plan, "incr_unit_test_1", false, false, &val);
+ r = do_incr(v, plan, db, "incr_unit_test_1", false, false, &val);
detail(v, "Result - NDB=%d Val=%llu \n\n", r, (long long unsigned) val);
require(r == 0);
require(val == -1ULL);
detail(v, "Test 4: INCR non-existing row, create=true\n");
- r = do_incr(v, plan, "incr_unit_test_2", true, true, &val);
+ r = do_incr(v, plan, db, "incr_unit_test_2", true, true, &val);
detail(v, "Result - NDB=%d Val=%llu \n\n", r, (long long unsigned) val);
require(r == 626);
require(val == 0);
detail(v, "Test 5: INCR existing row, create=false\n");
- r = do_incr(v, plan, "incr_unit_test_2", false, true, &val);
+ r = do_incr(v, plan, db, "incr_unit_test_2", false, true, &val);
detail(v, "Result - NDB=%d Val=%llu \n\n", r, (long long unsigned) val);
require(r == 0);
require(val == 1);
detail(v, "Test 6: INCR existing row, create=true\n");
- r = do_incr(v, plan, "incr_unit_test_2", true, true, &val);
+ r = do_incr(v, plan, db, "incr_unit_test_2", true, true, &val);
detail(v, "Result - NDB=%d Val=%llu \n\n", r, (long long unsigned) val);
require(r == 630); // the insert failed but the update succeeded
require(val == 2);
@@ -68,7 +69,7 @@ int run_incr_test(QueryPlan *plan, int v
}
-int do_incr(int v, QueryPlan *plan, const char *akey,
+int do_incr(int v, QueryPlan *plan, Ndb *db, const char *akey,
bool create, bool update, Uint64 *val) {
int r;
char key[50];
@@ -96,9 +97,9 @@ int do_incr(int v, QueryPlan *plan, cons
op.setColumn(COL_STORE_KEY, key, strlen(key));
op.setColumnBigUnsigned(COL_STORE_CAS, 0ULL);
- NdbTransaction *tx = op.startTransaction();
+ NdbTransaction *tx = op.startTransaction(db);
if(! tx) {
- int r = plan->db->RESULT;
+ int r = db->RESULT;
detail(v, " get tx: %d \n", r);
return r;
}
=== modified file 'storage/ndb/memcache/unit/queue.cc'
--- a/storage/ndb/memcache/unit/queue.cc 2011-09-12 10:05:07 +0000
+++ b/storage/ndb/memcache/unit/queue.cc 2011-09-30 16:22:30 +0000
@@ -41,7 +41,7 @@ q_test_obj * get(Queue<q_test_obj> &q, i
const int n_loop_items = 50000;
-int run_queue_test(QueryPlan *, int v) {
+int run_queue_test(QueryPlan *, Ndb *, int v) {
Queue<q_test_obj> q(n_loop_items);
/* nothing there */
=== modified file 'storage/ndb/memcache/unit/tsv.cc'
--- a/storage/ndb/memcache/unit/tsv.cc 2011-09-12 10:05:07 +0000
+++ b/storage/ndb/memcache/unit/tsv.cc 2011-09-30 16:22:30 +0000
@@ -25,7 +25,7 @@
#include "all_tests.h"
-int run_tsv_test(QueryPlan *, int v) {
+int run_tsv_test(QueryPlan *, Ndb *, int v) {
{
TabSeparatedValues t1("frodo.xxx", 4, 5);
require(t1.getLength() == 5);
No bundle (reason: useless for push emails).
| Thread |
|---|
| • bzr push into mysql-5.5-cluster branch (john.duncan:3571 to 3578) | John David Duncan | 2 Oct |