From: John David Duncan Date: September 30 2011 9:30pm Subject: bzr push into mysql-5.5-cluster branch (john.duncan:3571 to 3578) List-Archive: http://lists.mysql.com/commits/141252 Message-Id: <201109302130.p8ULUvJn006913@acsmt356.oracle.com> MIME-Version: 1.0 Content-Type: text/plain; charset="us-ascii" Content-Transfer-Encoding: 7bit 3578 John David Duncan 2011-09-30 Online reconfiguration. This revision compiles and passes the test suite on my laptop. modified: mysql-test/lib/My/Memcache.pm mysql-test/suite/ndb_memcache/include/datatypes_tables.inc mysql-test/suite/ndb_memcache/t/type_char.test storage/ndb/memcache/sandbox.sh.in storage/ndb/memcache/src/ConnQueryPlanSet.cc storage/ndb/memcache/src/ndb_worker.cc storage/ndb/memcache/src/schedulers/S_sched.cc storage/ndb/memcache/src/schedulers/S_sched.h 3577 John David Duncan 2011-09-30 ndb/memcache Configuration -- small fixes modified: storage/ndb/memcache/include/Configuration.h storage/ndb/memcache/src/Configuration.cc 3576 John David Duncan 2011-09-30 Changes to online reconfiguration in S Scheduler. Online reconfig is actually thread-safe now, protected by a read/write lock. Online reconfig in the scheduler now *only* udpates the Configuration pointer, the generation number, and the list of QueryPlans. So it is not currently possible to add a cluster or to grow the pool of Ndb objects using online reconfig. This change also adapts both schedulers to use ConnQueryPlanSet. modified: storage/ndb/memcache/src/schedulers/S_sched.cc storage/ndb/memcache/src/schedulers/S_sched.h storage/ndb/memcache/src/schedulers/Stockholm.cc storage/ndb/memcache/src/schedulers/Stockholm.h 3575 John David Duncan 2011-09-30 New class ConnQueryPlanSet. Each NdbInstance no longer keeps a list of QueryPlans; now, for each cluster connection, a ConnQueryPlanSet holds the QueryPlans. This revision does not compile. added: storage/ndb/memcache/include/ConnQueryPlanSet.h storage/ndb/memcache/src/ConnQueryPlanSet.cc modified: storage/ndb/memcache/CMakeLists.txt storage/ndb/memcache/include/Configuration.h storage/ndb/memcache/include/NdbInstance.h storage/ndb/memcache/src/Configuration.cc storage/ndb/memcache/src/NdbInstance.cc 3574 John David Duncan 2011-09-30 QueryPlan::db is now private. modified: storage/ndb/memcache/include/Configuration.h storage/ndb/memcache/include/Operation.h storage/ndb/memcache/include/QueryPlan.h storage/ndb/memcache/src/Configuration.cc storage/ndb/memcache/src/Operation.cc storage/ndb/memcache/src/ndb_worker.cc storage/ndb/memcache/unit/all_tests.h storage/ndb/memcache/unit/alloc.cc storage/ndb/memcache/unit/cas.cc storage/ndb/memcache/unit/casbits.cc storage/ndb/memcache/unit/connpool.cc storage/ndb/memcache/unit/harness.cc storage/ndb/memcache/unit/incr.cc storage/ndb/memcache/unit/queue.cc storage/ndb/memcache/unit/tsv.cc 3573 John David Duncan 2011-09-30 Some QueryPlan methods are now const. modified: storage/ndb/memcache/include/QueryPlan.h storage/ndb/memcache/src/QueryPlan.cc 3572 John David Duncan 2011-09-30 memcache: Rather than keeping an Ndb object (and eventually leaking it), Configuration and Config_v1 allocate an Ndb on the stack when they need one. modified: storage/ndb/memcache/include/Configuration.h storage/ndb/memcache/src/Config_v1.cc storage/ndb/memcache/src/Configuration.cc 3571 John David Duncan 2011-09-29 Column->getSizeInBytes() seems to work for all cases of getColumnRecordSize(). It also does better for determining alignment; for instance NdbApi seems to treat a timestamp column as an array of 4 chars, so getSize() was 1. modified: storage/ndb/memcache/include/DataTypeHandler.h storage/ndb/memcache/include/QueryPlan.h storage/ndb/memcache/include/Record.h storage/ndb/memcache/src/DataTypeHandler.cc storage/ndb/memcache/src/QueryPlan.cc storage/ndb/memcache/src/Record.cc === modified file 'mysql-test/lib/My/Memcache.pm' --- a/mysql-test/lib/My/Memcache.pm 2011-09-29 03:29:32 +0000 +++ b/mysql-test/lib/My/Memcache.pm 2011-09-30 20:46:27 +0000 @@ -127,6 +127,9 @@ sub wait_for_reconf { if($new_gen > 0) { $self->{cf_gen} = $new_gen; } + else { + print STDERR "Timed out.\n"; + } return $new_gen; } === modified file 'mysql-test/suite/ndb_memcache/include/datatypes_tables.inc' --- a/mysql-test/suite/ndb_memcache/include/datatypes_tables.inc 2011-09-29 03:29:32 +0000 +++ b/mysql-test/suite/ndb_memcache/include/datatypes_tables.inc 2011-09-30 20:46:27 +0000 @@ -50,7 +50,6 @@ UPDATE memcache_server_roles set update_ DROP VIEW tv_tablist; ---real_sleep 1 --source suite/ndb_memcache/include/wait_for_reconf.inc } === modified file 'mysql-test/suite/ndb_memcache/t/type_char.test' --- a/mysql-test/suite/ndb_memcache/t/type_char.test 2011-09-29 03:29:32 +0000 +++ b/mysql-test/suite/ndb_memcache/t/type_char.test 2011-09-30 20:46:27 +0000 @@ -35,6 +35,7 @@ UPDATE memcache_server_roles set update_ --perl use strict; +use Carp; use lib "lib"; use My::Memcache; # Use a binary protocol connection (so keys can contain spaces) @@ -42,36 +43,31 @@ my $mc = My::Memcache::Binary->new(); my $port = $ENV{MTR_BUILD_THREAD} * 10 + 10000 + 8; my $r = $mc->connect("localhost",$port); -# sleep(1) because there is still an undiagnosed race condition in -# online reconfiguration -- jdd, 14 Aug. 2011 -sleep(1); - my $cf_gen = $mc->wait_for_reconf(); -if($cf_gen > 0) { - # test CHAR key with VARCHAR value - $mc->set("tck:a","fred") || die; - $mc->set("tck:1","frederick") || die; - $mc->set("tck:aa","frederica") || die; - $mc->set("tck:a b c d","freddy") || die; - - ($mc->get("tck:aa") == "frederica") || die; - ($mc->get("tck:a b c d") == "freddy") || die; - - # test VARCHAR key with CHAR value - $mc->set("tcv:a", "al") || die; - $mc->set("tcv:b", "alphonse") || die; - $mc->set("tcv:c", "allen") || die; - $mc->set("tcv:d", "alien invasion") || die; - - ($mc->get("tcv:d") == "alien invasion") || die; - ($mc->get("tcv:a") == "al") || die; - ($mc->get("tcv:ee") == "NOT_FOUND") || die; -} -else { - print " ***** Skipped test [$r/$cf_gen] \n"; +if($cf_gen == 0) { + Carp::confess("FAILED WAIT_FOR_RECONF"); } +# test CHAR key with VARCHAR value +$mc->set("tck:a","fred") || Carp::confess("FAILED # 01 (SET)"); +$mc->set("tck:1","frederick") || Carp::confess("FAILED # 02 (SET)"); +$mc->set("tck:aa","frederica") || Carp::confess("FAILED # 03 (SET)"); +$mc->set("tck:a b c d","freddy") || Carp::confess("FAILED # 04 (SET)"); + +($mc->get("tck:aa") == "frederica") || Carp::confess("FAILED # 05 (GET)"); +($mc->get("tck:a b c d") == "freddy") || Carp::confess("FAILED # 06 (GET)"); + +# test VARCHAR key with CHAR value +$mc->set("tcv:a", "al") || Carp::confess("FAILED # 07 (SET)"); +$mc->set("tcv:b", "alphonse") || Carp::confess("FAILED # 08 (SET)"); +$mc->set("tcv:c", "allen") || Carp::confess("FAILED # 09 (SET)"); +$mc->set("tcv:d", "alien invasion") || Carp::confess("FAILED # 10 (SET)"); + +($mc->get("tcv:d") == "alien invasion")|| Carp::confess("FAILED # 11 (GET)"); +($mc->get("tcv:a") == "al") || Carp::confess("FAILED # 12 (GET)"); +($mc->get("tcv:ee") == "NOT_FOUND") || Carp::confess("FAILED # 13 (GET)"); + EOF === modified file 'storage/ndb/memcache/CMakeLists.txt' --- a/storage/ndb/memcache/CMakeLists.txt 2011-09-25 07:19:37 +0000 +++ b/storage/ndb/memcache/CMakeLists.txt 2011-09-30 17:04:30 +0000 @@ -56,6 +56,7 @@ set(NDB_MEMCACHE_SOURCE_FILES src/ClusterConnectionPool.cc src/Configuration.cc src/Config_v1.cc + src/ConnQueryPlanSet.cc src/DataTypeHandler.cc src/KeyPrefix.cc src/NdbInstance.cc === modified file 'storage/ndb/memcache/include/Configuration.h' --- a/storage/ndb/memcache/include/Configuration.h 2011-09-22 18:27:10 +0000 +++ b/storage/ndb/memcache/include/Configuration.h 2011-09-30 20:34:55 +0000 @@ -87,7 +87,7 @@ class Configuration { const KeyPrefix * getPrefixForKey(const char *key, int nkey) const; const KeyPrefix * getPrefixByInfo(const prefix_info_t info) const; const KeyPrefix * getPrefix(int id) const; // inlined - const KeyPrefix * getNextPrefixForCluster(uint cluster_id, KeyPrefix *) const; + const KeyPrefix * getNextPrefixForCluster(uint cluster_id, const KeyPrefix *) const; void setPrimaryConnectString(const char *); // inlined void setServerRole(const char *); // inlined const char * getServerRole(); // inlined @@ -113,7 +113,6 @@ class Configuration { int storePrefix(KeyPrefix &prefix); void storeCAS(uint64_t ndb_engine_cas, uint64_t default_engine_cas); - Ndb *db; const char *primary_connect_string; int onlineReloadFlag; int (*reload_waiter)(Ndb_cluster_connection *, const char *); @@ -122,7 +121,7 @@ class Configuration { /* private methods */ void store_default_prefix(); config_ver_enum get_supported_version(); - bool fetch_meta_record(QueryPlan *plan, const char *version); + bool fetch_meta_record(QueryPlan *plan, Ndb *db, const char *version); /* private instance variables */ const char *server_role; === added file 'storage/ndb/memcache/include/ConnQueryPlanSet.h' --- a/storage/ndb/memcache/include/ConnQueryPlanSet.h 1970-01-01 00:00:00 +0000 +++ b/storage/ndb/memcache/include/ConnQueryPlanSet.h 2011-09-30 17:04:30 +0000 @@ -0,0 +1,49 @@ +/* + Copyright (c) 2011, Oracle and/or its affiliates. All rights + reserved. + + This program is free software; you can redistribute it and/or + modify it under the terms of the GNU General Public License + as published by the Free Software Foundation; version 2 of + the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA + 02110-1301 USA + */ + +#ifndef NDBMEMCACHE_CONNQUERYPLANSET_H +#define NDBMEMCACHE_CONNQUERYPLANSET_H + +#ifndef __cplusplus +#error "This file is for C++ only" +#endif + + +#include "Configuration.h" +#include "QueryPlan.h" + +class ConnQueryPlanSet { +public: + ConnQueryPlanSet(Ndb_cluster_connection *, int n_plans); + ~ConnQueryPlanSet(); + + bool buildSetForConfiguration(const Configuration *, int cluster_id); + QueryPlan * getPlanForPrefix(const KeyPrefix *); + +private: + Ndb *db; + int nplans; + QueryPlan **plans; +}; + + + +#endif + === modified file 'storage/ndb/memcache/include/NdbInstance.h' --- a/storage/ndb/memcache/include/NdbInstance.h 2011-09-12 10:05:07 +0000 +++ b/storage/ndb/memcache/include/NdbInstance.h 2011-09-30 17:04:30 +0000 @@ -34,27 +34,22 @@ struct workitem; #define VPSZ sizeof(void *) -#define ISZ sizeof(int) -#define TOTAL_SZ (ISZ + (4 * VPSZ)) +#define TOTAL_SZ (3 * VPSZ) #define PADDING (64 - TOTAL_SZ) class NdbInstance { public: /* Public Methods */ - NdbInstance(Ndb_cluster_connection *, int, int); + NdbInstance(Ndb_cluster_connection *, int); ~NdbInstance(); - QueryPlan * getPlanForPrefix(const KeyPrefix *); /* Public Instance Variables */ Ndb *db; NdbInstance *next; workitem *wqitem; - int sched_gen_number; private: - int nplans; - QueryPlan **plans; char cache_line_padding[PADDING]; }; === modified file 'storage/ndb/memcache/include/Operation.h' --- a/storage/ndb/memcache/include/Operation.h 2011-09-12 10:05:07 +0000 +++ b/storage/ndb/memcache/include/Operation.h 2011-09-30 16:22:30 +0000 @@ -83,7 +83,7 @@ public: /* NdbTransaction method wrappers */ // startTransaction - NdbTransaction *startTransaction() const; + NdbTransaction *startTransaction(Ndb *) const; // read const NdbOperation *readTuple(NdbTransaction *tx, === modified file 'storage/ndb/memcache/include/QueryPlan.h' --- a/storage/ndb/memcache/include/QueryPlan.h 2011-09-29 19:13:43 +0000 +++ b/storage/ndb/memcache/include/QueryPlan.h 2011-09-30 16:22:30 +0000 @@ -47,13 +47,12 @@ class QueryPlan { QueryPlan() : initialized(0) {}; QueryPlan(Ndb *, const TableSpec *, PlanOpts opts = NoOptions); ~QueryPlan(); - bool keyIsPrimaryKey(); - void debug_dump(); + bool keyIsPrimaryKey() const; + void debug_dump() const; /* public instance variables */ bool initialized; bool dup_numbers; - Ndb *db; const TableSpec *spec; NdbDictionary::Dictionary *dict; const NdbDictionary::Table *table; @@ -77,6 +76,7 @@ class QueryPlan { const NdbDictionary::Index * chooseIndex(); /* Private instance variables */ + Ndb *db; }; === modified file 'storage/ndb/memcache/sandbox.sh.in' --- a/storage/ndb/memcache/sandbox.sh.in 2011-09-28 20:28:34 +0000 +++ b/storage/ndb/memcache/sandbox.sh.in 2011-09-30 20:46:27 +0000 @@ -1,7 +1,5 @@ #!/bin/sh -# FIXME: Solaris doesn't have whoami and "kill" doesn't work as used here. - prefix=@CMAKE_INSTALL_PREFIX@ bindir=@INSTALL_BINDIR@ libexecdir=@INSTALL_SBINDIR@ @@ -12,6 +10,7 @@ sourcetree=@CMAKE_CURRENT_SOURCE_DIR@ installtree=@CMAKE_INSTALL_PREFIX@/memcache-api memcached_binary=@MEMCACHED_BIN_PATH@ +MYSELF=`who am i | awk '{print $1}'` MYSQL_PREFIX=$prefix MYSQL_BIN=$prefix/$bindir MYSQL_LIBEXEC=$prefix/$libexecdir @@ -20,7 +19,11 @@ MYSQL_LIB=$prefix/$libdir MEMCACHE_BASE=$memcachedir SOURCE_TREE=$sourcetree HOME_BASE=$sourcetree # fallback to source tree -test -d $installtree && HOME_BASE=$installtree # prefer installed tree +if test $HOME_BASE != `pwd` + then + # prefer installed tree unless we are currently in the source tree + test -d $installtree && HOME_BASE=$installtree +fi test_paths() { test_path $MYSQL_BIN ndb_mgm @@ -58,7 +61,7 @@ write_my_cnf() { echo "ndbcluster" echo "datadir=$HOME_BASE/sandbox/data" echo "pid-file=$HOME_BASE/sandbox/mysqld.pid" - echo "user="`whoami` + echo "user=$MYSELF" echo "innodb_log_file_size=1M" echo ) > $HOME_BASE/sandbox/my.cnf @@ -94,7 +97,7 @@ write_cluster_ini() { do_install_db() { $MYSQL_SCRIPTS/mysql_install_db \ --basedir=$MYSQL_PREFIX --datadir=$HOME_BASE/sandbox/data \ - --skip-name-resolve --user=`whoami` > /dev/null + --skip-name-resolve --user=$MYSELF > /dev/null if test ! -d sandbox/data/mysql then echo "Failed: mysql_install_db did not work." && exit else echo "Created MySQL System Tables" @@ -159,7 +162,8 @@ start_memcached() { } stop_memcached() { - kill `cat $HOME_BASE/sandbox/memcached.pid` + SERVERPID=`cat $HOME_BASE/sandbox/memcached.pid` + test -n "$SERVERPID" && kill $SERVERPID } stop_mysqld() { @@ -173,6 +177,9 @@ stop_mgm_server() { final_message() { sleep 2 echo + echo "Sandbox directory is $HOME_BASE/sandbox" + echo "Memcached is $memcached_binary" + echo echo "Use \"sh sandbox.sh stop\" to stop memcached & MySQL Cluster" echo } === modified file 'storage/ndb/memcache/src/Config_v1.cc' --- a/storage/ndb/memcache/src/Config_v1.cc 2011-09-29 03:26:04 +0000 +++ b/storage/ndb/memcache/src/Config_v1.cc 2011-09-30 07:03:26 +0000 @@ -195,14 +195,16 @@ bool config_v1::read_configuration() { int config_v1::get_server_role_id() { uint32_t val = -1; + Ndb db(conf.primary_conn); + db.init(2); TableSpec spec("ndbmemcache.memcache_server_roles", "role_name", "role_id,max_tps"); - QueryPlan plan(conf.db, &spec); + QueryPlan plan(&db, &spec); Operation op(&plan, OP_READ); op.key_buffer = (char *) malloc(op.requiredKeyBuffer()); op.buffer = (char *) malloc(op.requiredBuffer()); - NdbTransaction *tx = conf.db->startTransaction(); + NdbTransaction *tx = db.startTransaction(); op.clearKeyNullBits(); op.setKeyPart(COL_STORE_KEY, conf.server_role, strlen(conf.server_role)); @@ -240,13 +242,15 @@ bool config_v1::get_policies() { DEBUG_ENTER_METHOD("config_v1::get_policies"); bool success = true; int res; + Ndb db(conf.primary_conn); + db.init(4); TableSpec spec("ndbmemcache.cache_policies", "policy_name", "get_policy,set_policy,delete_policy,flush_from_db"); - QueryPlan plan(conf.db, &spec); + QueryPlan plan(&db, &spec); Operation op(&plan, OP_SCAN); - NdbTransaction *tx = conf.db->startTransaction(); + NdbTransaction *tx = db.startTransaction(); NdbScanOperation *scan = op.scanTable(tx); if(! scan) { logger->log(LOG_WARNING, 0, tx->getNdbError().message); @@ -311,14 +315,16 @@ bool config_v1::get_connections() { DEBUG_ENTER_METHOD("config_v1::get_connections"); bool success = true; int res; + Ndb db(conf.primary_conn); + db.init(4); TableSpec spec("ndbmemcache.ndb_clusters", "cluster_id", "ndb_connectstring,microsec_rtt"); /* Scan the ndb_clusters table */ - QueryPlan plan(conf.db, &spec); + QueryPlan plan(&db, &spec); Operation op(&plan, OP_SCAN); - NdbTransaction *tx = conf.db->startTransaction(); + NdbTransaction *tx = db.startTransaction(); NdbScanOperation *scan = op.scanTable(tx); if(! scan) { logger->log(LOG_WARNING, 0, tx->getNdbError().message); @@ -383,16 +389,18 @@ TableSpec * config_v1::get_container(cha TableSpec * config_v1::get_container_record(char *name) { TableSpec *container; + Ndb db(conf.primary_conn); + db.init(1); TableSpec spec("ndbmemcache.containers", "name", "db_schema,db_table,key_columns,value_columns,flags," - "increment_column,cas_column,expire_time_column"); - QueryPlan plan(conf.db, &spec); + "increment_column,cas_column,expire_time_column"); + QueryPlan plan(&db, &spec); Operation op(&plan, OP_READ); op.key_buffer = (char *) malloc(op.requiredKeyBuffer()); op.buffer = (char *) malloc(op.requiredBuffer()); - NdbTransaction *tx = conf.db->startTransaction(); + NdbTransaction *tx = db.startTransaction(); op.clearKeyNullBits(); op.setKeyPart(COL_STORE_KEY, name, strlen(name)); @@ -478,7 +486,9 @@ bool config_v1::get_prefixes(int role_id TableSpec spec("ndbmemcache.key_prefixes", "server_role_id,key_prefix", "cluster_id,policy,container"); - QueryPlan plan(conf.db, &spec, PKScan); + Ndb db(conf.primary_conn); + db.init(4); + QueryPlan plan(&db, &spec, PKScan); Operation op(&plan, OP_SCAN); // `server_role_id` INT UNSIGNED NOT NULL DEFAULT 0, @@ -492,7 +502,7 @@ bool config_v1::get_prefixes(int role_id bound.low_inclusive = bound.high_inclusive = true; bound.range_no = 0; - NdbTransaction *tx = conf.db->startTransaction(); + NdbTransaction *tx = db.startTransaction(); NdbIndexScanOperation *scan = op.scanIndex(tx, &bound); if(! scan) { logger->log(LOG_WARNING, 0, "scanIndex(): %s\n", tx->getNdbError().message); @@ -629,21 +639,23 @@ void config_v1::log_signon() { DEBUG_ENTER_METHOD("config_v1::log_signon"); char my_hostname[256]; gethostname(my_hostname, 256); + Ndb db(conf.primary_conn); + db.init(1); TableSpec spec("ndbmemcache.last_memcached_signon", "ndb_node_id", "hostname,server_role,signon_time"); - QueryPlan plan(conf.db, &spec); + QueryPlan plan(&db, &spec); Operation op(&plan, OPERATION_SET); op.buffer = (char *) malloc(op.requiredBuffer()); op.key_buffer = (char *) malloc(op.requiredKeyBuffer()); op.clearNullBits(); - op.setKeyPartInt(COL_STORE_KEY, conf.db->getNodeId()); // node ID (in key) - op.setColumnInt(COL_STORE_KEY, conf.db->getNodeId()); // node ID (in row) + op.setKeyPartInt(COL_STORE_KEY, db.getNodeId()); // node ID (in key) + op.setColumnInt(COL_STORE_KEY, db.getNodeId()); // node ID (in row) op.setColumn(COL_STORE_VALUE+0, my_hostname, strlen(my_hostname)); // hostname op.setColumn(COL_STORE_VALUE+1, conf.server_role, strlen(conf.server_role)); // role op.setColumnInt(COL_STORE_VALUE+2,time(NULL)); // timestamp - NdbTransaction *tx = conf.db->startTransaction(); // TODO: node selection + NdbTransaction *tx = db.startTransaction(); // TODO: node selection op.writeTuple(tx); tx->execute(NdbTransaction::Commit); tx->getGCI(&signon_gci); @@ -666,10 +678,12 @@ void config_v1::set_initial_cas() { | | + 8bit | | | | NodeId | | ---------------------------------------------------------------- */ + Ndb db(conf.primary_conn); + db.init(1); const uint64_t MASK_GCI = 0x07FFFFFF00000000LLU; // Use these 27 bits of GCI const uint64_t ENGINE_BIT = 0x0000001000000000LLU; // bit 36 - uint64_t node_id = ((uint64_t) conf.db->getNodeId()) << 28; + uint64_t node_id = ((uint64_t) db.getNodeId()) << 28; uint64_t gci_bits = (signon_gci & MASK_GCI) << 5; uint64_t def_eng_cas = gci_bits | node_id; uint64_t ndb_eng_cas = gci_bits | ENGINE_BIT | node_id; @@ -677,7 +691,7 @@ void config_v1::set_initial_cas() { // void storeCAS(uint64_t ndb_engine_cas, uint64_t default_engine_cas); conf.storeCAS(ndb_eng_cas, def_eng_cas); DEBUG_PRINT("Sign On GCI: 0x%llx | Node Id: [%d] 0x%llx | Engine bit: 0x%llx", - signon_gci, conf.db->getNodeId(), node_id, ENGINE_BIT); + signon_gci, db.getNodeId(), node_id, ENGINE_BIT); DEBUG_PRINT("Initial CAS: %llu 0x%llx ", ndb_eng_cas, ndb_eng_cas); return; === modified file 'storage/ndb/memcache/src/Configuration.cc' --- a/storage/ndb/memcache/src/Configuration.cc 2011-09-29 03:26:04 +0000 +++ b/storage/ndb/memcache/src/Configuration.cc 2011-09-30 20:34:55 +0000 @@ -54,12 +54,7 @@ Configuration::Configuration(Configurati primary_connect_string(old->primary_connect_string), server_role(old->server_role), config_version(CONFIG_VER_UNKNOWN), - primary_conn(old->primary_conn) -{ - db = new Ndb(primary_conn); - db->init(); -} - + primary_conn(old->primary_conn) {}; bool Configuration::connectToPrimary() { @@ -84,12 +79,12 @@ bool Configuration::connectToPrimary() { primary_conn = ClusterConnectionPool::connect(primary_connect_string); if(primary_conn) { - db = new Ndb(primary_conn); // This Ndb is used only to read the "meta" table. - db->init(); return true; } - logger->log(LOG_WARNING, 0, "FAILED.\n"); - return false; + else { + logger->log(LOG_WARNING, 0, "FAILED.\n"); + return false; + } } @@ -201,13 +196,17 @@ const KeyPrefix * Configuration::getPref const KeyPrefix * Configuration::getNextPrefixForCluster(unsigned int cluster_id, - KeyPrefix *k) const { + const KeyPrefix *k) const { unsigned int i = 0; - if(k) while(prefixes[i] != k && i < nprefixes) i++; + if(k) { + while(prefixes[i] != k && i < nprefixes) i++; // find k in the list + i++; // then advance one more + } + while(i < nprefixes && prefixes[i]->info.cluster_id != cluster_id) i++; - if(i == nprefixes) return 0; + if(i >= nprefixes) return 0; else return prefixes[i]; } @@ -283,19 +282,21 @@ void Configuration::storeCAS(uint64_t nd config_ver_enum Configuration::get_supported_version() { + Ndb db(primary_conn); + db.init(1); TableSpec ts_meta("ndbmemcache.meta", "application,metadata_version", ""); - QueryPlan plan(db, &ts_meta); + QueryPlan plan(& db, &ts_meta); // "initialized" is set only if the ndbmemcache.meta table exists: if(plan.initialized) { - if(fetch_meta_record(&plan, "1.1")) { + if(fetch_meta_record(&plan, &db, "1.1")) { DEBUG_PRINT("1.1"); return CONFIG_VER_1_1; } - if(fetch_meta_record(&plan, "1.0")) { + if(fetch_meta_record(&plan, &db, "1.0")) { DEBUG_PRINT("1.0"); return CONFIG_VER_1_0; } - if(fetch_meta_record(&plan, "1.0a")) { + if(fetch_meta_record(&plan, &db, "1.0a")) { DEBUG_PRINT("1.0a"); logger->log(LOG_WARNING, 0, "\nThe configuration schema from prototype2 is" " no longer supported.\nPlease drop your ndbmemcache database," @@ -307,7 +308,8 @@ config_ver_enum Configuration::get_suppo } -bool Configuration::fetch_meta_record(QueryPlan *plan, const char *version) { +bool Configuration::fetch_meta_record(QueryPlan *plan, Ndb *db, + const char *version) { DEBUG_ENTER_METHOD("Configuration::fetch_meta_record"); bool result = false; === added file 'storage/ndb/memcache/src/ConnQueryPlanSet.cc' --- a/storage/ndb/memcache/src/ConnQueryPlanSet.cc 1970-01-01 00:00:00 +0000 +++ b/storage/ndb/memcache/src/ConnQueryPlanSet.cc 2011-09-30 20:46:27 +0000 @@ -0,0 +1,64 @@ +/* + Copyright (c) 2011, Oracle and/or its affiliates. All rights + reserved. + + This program is free software; you can redistribute it and/or + modify it under the terms of the GNU General Public License + as published by the Free Software Foundation; version 2 of + the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA + 02110-1301 USA + */ + +#include "debug.h" +#include "ConnQueryPlanSet.h" + +ConnQueryPlanSet::ConnQueryPlanSet(Ndb_cluster_connection *conn, int n) : + nplans(n), + plans(new QueryPlan *[n]) +{ + memset(plans, 0, (nplans * sizeof(QueryPlan *))); + db = new Ndb(conn); + db->init(); +} + + +ConnQueryPlanSet::~ConnQueryPlanSet() +{ + delete db; + delete plans; +} + + +bool ConnQueryPlanSet:: buildSetForConfiguration(const Configuration *cf, + int cluster_id) +{ + const KeyPrefix *k = cf->getNextPrefixForCluster(cluster_id, NULL); + while(k) { + DEBUG_PRINT("Building plan for prefix %d", k->info.prefix_id); + getPlanForPrefix(k); + k = cf->getNextPrefixForCluster(cluster_id, k); + } +} + + +QueryPlan * ConnQueryPlanSet::getPlanForPrefix(const KeyPrefix *prefix) { + int plan_id = prefix->info.prefix_id; + + if(plans[plan_id] == 0) { + plans[plan_id] = new QueryPlan(db, prefix->table); + } + + if(plans[plan_id]->initialized) + return plans[plan_id]; + else + return 0; +} === modified file 'storage/ndb/memcache/src/NdbInstance.cc' --- a/storage/ndb/memcache/src/NdbInstance.cc 2011-09-25 05:06:46 +0000 +++ b/storage/ndb/memcache/src/NdbInstance.cc 2011-09-30 17:04:30 +0000 @@ -28,13 +28,9 @@ ------------- NdbInstance ---------------- ------------------------------------------ */ -NdbInstance::NdbInstance(Ndb_cluster_connection *c, int nprefixes, - int ntransactions) : +NdbInstance::NdbInstance(Ndb_cluster_connection *c, int ntransactions) : next(0), wqitem(0) { - nplans = nprefixes; - plans = new QueryPlan *[nplans]; - memset(plans, 0, (nplans * sizeof(QueryPlan *))); if(c) { db = new Ndb(c); db->init(ntransactions); @@ -49,23 +45,6 @@ NdbInstance::NdbInstance(Ndb_cluster_con NdbInstance::~NdbInstance() { if(db) delete db; - - for(int i = 0 ; i < nplans ; i++) - if(plans[i]) - delete plans[i]; } -QueryPlan * NdbInstance::getPlanForPrefix(const KeyPrefix *prefix) { - int plan_id = prefix->info.prefix_id; - - if(plans[plan_id] == 0) { - plans[plan_id] = new QueryPlan(db, prefix->table); - } - - if(plans[plan_id]->initialized) - return plans[plan_id]; - else - return 0; -} - === modified file 'storage/ndb/memcache/src/Operation.cc' --- a/storage/ndb/memcache/src/Operation.cc 2011-09-22 18:27:10 +0000 +++ b/storage/ndb/memcache/src/Operation.cc 2011-09-30 16:22:30 +0000 @@ -70,10 +70,10 @@ size_t Operation::copyValue(int idx, cha /* NdbTransaction method wrappers */ -NdbTransaction * Operation::startTransaction() const { +NdbTransaction * Operation::startTransaction(Ndb *db) const { char hash_buffer[512]; - return plan->db->startTransaction(plan->key_record->ndb_record, key_buffer, - hash_buffer, 512); + return db->startTransaction(plan->key_record->ndb_record, key_buffer, + hash_buffer, 512); } NdbIndexScanOperation * Operation::scanIndex(NdbTransaction *tx, === modified file 'storage/ndb/memcache/src/QueryPlan.cc' --- a/storage/ndb/memcache/src/QueryPlan.cc 2011-09-29 19:13:43 +0000 +++ b/storage/ndb/memcache/src/QueryPlan.cc 2011-09-30 15:34:37 +0000 @@ -217,7 +217,7 @@ QueryPlan::~QueryPlan() { } -void QueryPlan::debug_dump() { +void QueryPlan::debug_dump() const { if(key_record) { DEBUG_PRINT("Key record:"); key_record->debug_dump(); @@ -232,7 +232,7 @@ void QueryPlan::debug_dump() { } } -bool QueryPlan::keyIsPrimaryKey() { +bool QueryPlan::keyIsPrimaryKey() const { if(spec->nkeycols == table->getNoOfPrimaryKeys()) { for(int i = 0 ; i < spec->nkeycols ; i++) if(strcmp(spec->key_columns[i], table->getPrimaryKey(i))) === modified file 'storage/ndb/memcache/src/ndb_worker.cc' --- a/storage/ndb/memcache/src/ndb_worker.cc 2011-09-28 03:58:19 +0000 +++ b/storage/ndb/memcache/src/ndb_worker.cc 2011-09-30 20:46:27 +0000 @@ -174,7 +174,7 @@ op_status_t worker_do_delete(workitem *w Operation op(plan, OP_DELETE, wqitem->ndb_key_buffer); const NdbOperation *ndb_op = 0; - NdbTransaction *tx = op.startTransaction(); + NdbTransaction *tx = op.startTransaction(wqitem->ndb_instance->db); DEBUG_ASSERT(tx); op.clearKeyNullBits(); @@ -278,9 +278,10 @@ op_status_t worker_do_write(workitem *wq } /* Start the transaction */ - NdbTransaction *tx = op.startTransaction(); + NdbTransaction *tx = op.startTransaction(wqitem->ndb_instance->db); if(! tx) { - logger->log(LOG_WARNING, 0, "tx: %s \n", plan->db->getNdbError().message); + logger->log(LOG_WARNING, 0, "tx: %s \n", + wqitem->ndb_instance->db->getNdbError().message); DEBUG_ASSERT(false); } @@ -346,7 +347,7 @@ op_status_t worker_do_read(workitem *wqi op.setKeyPart(COL_STORE_KEY, dbkey, wqitem->base.nsuffix); /* Start a transaction, and call NdbTransaction::readTuple() */ - NdbTransaction *tx = op.startTransaction(); + NdbTransaction *tx = op.startTransaction(wqitem->ndb_instance->db); DEBUG_ASSERT(tx); if(! op.readTuple(tx)) { @@ -436,7 +437,7 @@ op_status_t worker_do_math(workitem *wqi } /* Use an op (either one) to start the transaction */ - NdbTransaction *tx = op1.startTransaction(); + NdbTransaction *tx = op1.startTransaction(wqitem->ndb_instance->db); /* NdbOperation #1: READ */ { @@ -910,25 +911,23 @@ ENGINE_ERROR_CODE ndb_flush_all(ndb_pipe DEBUG_PRINT(" %d prefixes", conf.nprefixes); for(int i = 0 ; i < conf.nprefixes ; i++) { - NdbInstance *inst = 0; - const KeyPrefix *p = conf.getPrefix(i); - if(p->info.use_ndb && p->info.do_db_flush) { - ClusterConnectionPool *pool = conf.getConnectionPoolById(p->info.cluster_id); + const KeyPrefix *pfx = conf.getPrefix(i); + if(pfx->info.use_ndb && pfx->info.do_db_flush) { + ClusterConnectionPool *pool = conf.getConnectionPoolById(pfx->info.cluster_id); Ndb_cluster_connection *conn = pool->getMainConnection(); - inst = new NdbInstance(conn, conf.nprefixes, 128); - QueryPlan *plan = inst->getPlanForPrefix(p); - if(plan->keyIsPrimaryKey()) { - /* To flush, scan the table and delete every row */ - DEBUG_PRINT("prefix %d - deleting from %s", i, p->table->table_name); - scan_delete(inst, plan); + NdbInstance inst(conn, 128); + QueryPlan plan(inst.db, pfx->table); + if(plan.keyIsPrimaryKey()) { + // To flush, scan the table and delete every row + DEBUG_PRINT("prefix %d - deleting from %s", i, pfx->table->table_name); + scan_delete(&inst, &plan); } else DEBUG_PRINT("prefix %d - not scanning table %s -- accees path " - "is not primary key", i, p->table->table_name); + "is not primary key", i, pfx->table->table_name); } else DEBUG_PRINT("prefix %d - not scanning table %s -- use_ndb:%d flush:%d", - i, p->table ? p->table->table_name : "", - p->info.use_ndb, p->info.do_db_flush); - if(inst) delete inst; + i, pfx->table ? pfx->table->table_name : "", + pfx->info.use_ndb, pfx->info.do_db_flush); } return ENGINE_SUCCESS; === modified file 'storage/ndb/memcache/src/schedulers/S_sched.cc' --- a/storage/ndb/memcache/src/schedulers/S_sched.cc 2011-09-28 03:29:26 +0000 +++ b/storage/ndb/memcache/src/schedulers/S_sched.cc 2011-09-30 20:46:27 +0000 @@ -46,10 +46,10 @@ extern "C" { extern EXTENSION_LOGGER_DESCRIPTOR *logger; /* Lock that protects online reconfiguration */ -pthread_mutex_t reconf_lock = PTHREAD_MUTEX_INITIALIZER; +pthread_rwlock_t reconf_lock = PTHREAD_RWLOCK_INITIALIZER; /* Scheduler Global singleton */ -S::SchedulerGlobal * volatile s_global; +S::SchedulerGlobal * s_global; /* Global scheduler generation number */ int sched_generation_number; @@ -110,6 +110,19 @@ void S::SchedulerGlobal::init(int _nthre } +void S::SchedulerGlobal::reconfigure(Configuration * new_cf) { + conf = new_cf; + generation++; + + for(int i = 0; i < nclusters ; i++) { + for(int j = 0; j < options.n_worker_threads; j++) { + WorkerConnection *wc = * (getWorkerConnectionPtr(j, i)); + wc->reconfigure(new_cf); + } + } +} + + void S::SchedulerGlobal::shutdown() { if(running) { logger->log(LOG_WARNING, 0, "Shutting down scheduler."); @@ -210,10 +223,10 @@ void S::SchedulerGlobal::add_stats(const const char *status; char *gen = gen_number_buffer; - if(pthread_mutex_trylock(& reconf_lock) == 0) { + if(pthread_rwlock_tryrdlock(& reconf_lock) == 0) { status = "Running"; snprintf(gen, 16, "%d", generation); - pthread_mutex_unlock(& reconf_lock); + pthread_rwlock_unlock(& reconf_lock); } else { status = "Loading"; @@ -224,12 +237,8 @@ void S::SchedulerGlobal::add_stats(const } else { DEBUG_PRINT(" scheduler"); - /* Aggregate scheduler statistics, so long as reconf is not in progress */ - if(pthread_mutex_trylock(& reconf_lock) == 0) { - for(int c = 0 ; c < nclusters ; c++) { - clusters[c]->add_stats(stat_key, add_stat, cookie); - } - pthread_mutex_unlock(& reconf_lock); + for(int c = 0 ; c < nclusters ; c++) { + clusters[c]->add_stats(stat_key, add_stat, cookie); } } } @@ -261,47 +270,6 @@ void S::SchedulerWorker::shutdown() { } -bool S::SchedulerWorker::global_reconfigure(Configuration *new_conf) { - DEBUG_ENTER(); - bool result = false; - - if(pthread_mutex_lock(& reconf_lock) == 0) { - sched_generation_number++; - SchedulerGlobal *old_global = s_global; - SchedulerGlobal *new_global = new SchedulerGlobal(new_conf); - - /* Get the new scheduler up and running. - * The primary cluster will have its reference count increased - * (and be re-used). A Cluster and its Connections are durable, - * but WorkerConnections are configuration-specific and transient. - */ - new_global->init(old_global->nthreads, old_global->config_string); - - /* If attach_thread() has already happened, set the pipeline: */ - if(pipeline) { - new_global->engine = pipeline->engine; - } - - /* Set s_global. Releasing the mutex will issue a memory barrier, - * making the new value visible to other threads. - */ - s_global = new_global; - - /* We are done with the lock */ - pthread_mutex_unlock(& reconf_lock); - - /* Now shut down the old scheduler. - * Clusters that are unused in the new config will be gracefully shut down. - */ - old_global->shutdown(); - - result = true; - } - - return result; -} - - void S::SchedulerWorker::attach_thread(thread_identifier *parent) { DEBUG_ENTER(); @@ -316,19 +284,27 @@ void S::SchedulerWorker::attach_thread(t ENGINE_ERROR_CODE S::SchedulerWorker::schedule(workitem *item) { - int c; + int c = item->prefix_info.cluster_id; NdbInstance *inst; + S::WorkerConnection *wc; + const KeyPrefix *pfx; + DEBUG_PRINT("SchedulerWorker / config gen. %d", s_global->generation); - const KeyPrefix * pfx = s_global->conf->getPrefixByInfo(item->prefix_info); + /* ACQUIRE READ LOCK */ + if(pthread_rwlock_rdlock(& reconf_lock) == 0) { + wc = * (s_global->getWorkerConnectionPtr(id, c)); + pfx = s_global->conf->getPrefixByInfo(item->prefix_info); + pthread_rwlock_unlock(& reconf_lock); + } + else { + return ENGINE_TMPFAIL; + } + /* READ LOCK RELEASED */ item->base.nsuffix = item->base.nkey - pfx->prefix_len; if(item->base.nsuffix == 0) return ENGINE_EINVAL; // key too short - c = item->prefix_info.cluster_id; - - S::WorkerConnection *wc = * (s_global->getWorkerConnectionPtr(id, c)); - if(wc && wc->freelist) { inst = wc->freelist; wc->freelist = inst->next; @@ -347,7 +323,7 @@ ENGINE_ERROR_CODE S::SchedulerWorker::sc workitem_set_NdbInstance(item, inst); // Fetch the query plan for this prefix. - item->plan = inst->getPlanForPrefix(pfx); + item->plan = wc->plan_set->getPlanForPrefix(pfx); if(! item->plan) { DEBUG_PRINT("getPlanForPrefix() failure"); return ENGINE_FAILED; @@ -410,7 +386,7 @@ void S::SchedulerWorker::io_completed(wo int c = item->prefix_info.cluster_id; S::WorkerConnection * wc = * (s_global->getWorkerConnectionPtr(id, c)); if(wc && ! wc->sendqueue->is_aborted()) { - assert(inst->sched_gen_number == s_global->generation); + // assert(inst->sched_gen_number == s_global->generation); inst->wqitem = NULL; inst->next = wc->freelist; wc->freelist = inst; @@ -424,6 +400,23 @@ void S::SchedulerWorker::io_completed(wo } +/* This is a partial implementation of online reconfiguration. + It can replace KeyPrefix mappings, but not add a cluster at runtime + (nor will it catch an attempt to do so -- which will eventually lead to + a crash after a getWorkerConnectionPtr()). +*/ +bool S::SchedulerWorker::global_reconfigure(Configuration *new_cf) { + bool r = false; + + if(pthread_rwlock_wrlock(& reconf_lock) == 0) { + s_global->reconfigure(new_cf); + pthread_rwlock_unlock(& reconf_lock); + r = true; + } + return r; +} + + void S::SchedulerWorker::add_stats(const char *stat_key, ADD_STAT add_stat, const void *cookie) { @@ -531,12 +524,16 @@ S::WorkerConnection::WorkerConnection(Sc conn = cl->connections[id.conn]; id.node = conn->node_id; + /* Build the plan_set and all QueryPlans */ + old_plan_set = 0; + plan_set = new ConnQueryPlanSet(conn->conn, conf->nprefixes); + plan_set->buildSetForConfiguration(conf, cluster_id); + /* Build the freelist */ freelist = 0; int my_ndb_inst = conn->nInst / global->options.n_worker_threads; for(int j = 0 ; j < my_ndb_inst ; j++ ) { - NdbInstance *inst = new NdbInstance(conn->conn, conf->nprefixes, 2); - inst->sched_gen_number = global->generation; + NdbInstance *inst = new NdbInstance(conn->conn, 2); inst->next = freelist; freelist = inst; } @@ -561,7 +558,7 @@ S::WorkerConnection::WorkerConnection(Sc // Open them all. for(NdbInstance *inst = freelist; inst != 0 ;inst=inst->next, i++) { NdbTransaction *tx; - plan = inst->getPlanForPrefix(prefix); + plan = plan_set->getPlanForPrefix(prefix); tx = inst->db->startTransaction(); if(! tx) logger->log(LOG_WARNING, 0, inst->db->getNdbError().message); txlist[i] = tx; @@ -575,7 +572,20 @@ S::WorkerConnection::WorkerConnection(Sc // Free the list. delete[] txlist; } +} + + +void S::WorkerConnection::reconfigure(Configuration *new_cf) { + if(old_plan_set) { /* Garbage collect the old old plans */ + delete old_plan_set; + } + old_plan_set = plan_set; + + ConnQueryPlanSet *new_plans = + new ConnQueryPlanSet(conn->conn, new_cf->nprefixes); + new_plans->buildSetForConfiguration(new_cf, id.cluster); + plan_set = new_plans; } @@ -592,6 +602,12 @@ S::WorkerConnection::~WorkerConnection() /* Delete the sendqueue */ delete sendqueue; + + /* Delete the current QueryPlans (and maybe the previous ones, too) */ + delete plan_set; + if(old_plan_set) { + delete old_plan_set; + } } === modified file 'storage/ndb/memcache/src/schedulers/S_sched.h' --- a/storage/ndb/memcache/src/schedulers/S_sched.h 2011-09-26 21:12:27 +0000 +++ b/storage/ndb/memcache/src/schedulers/S_sched.h 2011-09-30 20:46:27 +0000 @@ -32,6 +32,7 @@ #include "ndbmemcache_config.h" #include "Scheduler.h" #include "KeyPrefix.h" +#include "ConnQueryPlanSet.h" #include "Queue.h" /* @@ -50,7 +51,7 @@ public: class SchedulerWorker; // one object per memcached worker thread class Cluster; // one object for each cluster class Connection; // one object per connection to a cluster - class WorkerConnection; // one object per {worker,conncetion} pair + class WorkerConnection; // one object per {worker,connection} pair }; @@ -62,6 +63,7 @@ public: ~SchedulerGlobal() {}; void init(int threads, const char *config_string); void add_stats(const char *, ADD_STAT, const void *); + void reconfigure(Configuration *); void shutdown(); WorkerConnection ** getWorkerConnectionPtr(int thd, int cluster) const { return & workerConnections[(thd * nclusters) + cluster]; @@ -184,6 +186,7 @@ public: WorkerConnection(SchedulerGlobal *, int thd_id, int cluster_id); ~WorkerConnection(); void shutdown(); + void reconfigure(Configuration *); struct { int thd : 8; @@ -192,6 +195,7 @@ public: unsigned int node : 8; } id; S::Connection *conn; + ConnQueryPlanSet *plan_set, *old_plan_set; NdbInstance *freelist; Queue * sendqueue; }; === modified file 'storage/ndb/memcache/src/schedulers/Stockholm.cc' --- a/storage/ndb/memcache/src/schedulers/Stockholm.cc 2011-09-22 18:27:10 +0000 +++ b/storage/ndb/memcache/src/schedulers/Stockholm.cc 2011-09-30 18:04:25 +0000 @@ -58,7 +58,7 @@ void Scheduler_stockholm::init(int my_th c, conf.max_tps, pool->usec_rtt, cluster[c].nInst); } - // Get the NDB instances. + // Get the ConnQueryPlanSet and NDB instances for each cluster. for(unsigned int c = 0 ; c < conf.nclusters ; c++) { cluster[c].instances = (NdbInstance**) calloc(cluster[c].nInst, sizeof(NdbInstance *)); @@ -66,9 +66,12 @@ void Scheduler_stockholm::init(int my_th ClusterConnectionPool *pool = conf.getConnectionPoolById(c); Ndb_cluster_connection *conn = pool->getPooledConnection(my_thread); + cluster[c].plan_set = new ConnQueryPlanSet(conn, conf.nprefixes); + cluster[c].plan_set->buildSetForConfiguration(&conf, c); + cluster[c].nextFree = NULL; for(int i = 0; i < cluster[c].nInst ; i++) { - NdbInstance *inst = new NdbInstance(conn, conf.nprefixes, 1); + NdbInstance *inst = new NdbInstance(conn, 1); cluster[c].instances[i] = inst; inst->next = cluster[c].nextFree; cluster[c].nextFree = inst; @@ -77,11 +80,11 @@ void Scheduler_stockholm::init(int my_th logger->log(LOG_WARNING, 0, "Pipeline %d using %u Ndb instances for Cluster %u.\n", my_thread, cluster[c].nInst, c); } - + + /* Hoard a transaction (an API connect record) for each Ndb object. This first call to startTransaction() will send TC_SEIZEREQ and wait for a reply, but later at runtime startTransaction() should return immediately. - Also, for each NDB instance, pre-build the QueryPlan for the default key prefix. TODO? Start one tx on each data node. */ QueryPlan *plan; @@ -93,7 +96,7 @@ void Scheduler_stockholm::init(int my_th txlist = ( NdbTransaction **) calloc(cluster[c].nInst, sizeof(NdbTransaction *)); // Open them all. for(int i = 0 ; i < cluster[c].nInst ; i++) { - plan = cluster[c].instances[i]->getPlanForPrefix(prefix); + plan = cluster[c].plan_set->getPlanForPrefix(prefix); txlist[i] = cluster[c].instances[i]->db->startTransaction(); } // Close them all. @@ -185,7 +188,7 @@ ENGINE_ERROR_CODE Scheduler_stockholm::s workitem_set_NdbInstance(newitem, inst); // Fetch the query plan for this prefix. - newitem->plan = inst->getPlanForPrefix(pfx); + newitem->plan = cluster[c].plan_set->getPlanForPrefix(pfx); if(! newitem->plan) return ENGINE_FAILED; // Build the NDB transaction === modified file 'storage/ndb/memcache/src/schedulers/Stockholm.h' --- a/storage/ndb/memcache/src/schedulers/Stockholm.h 2011-09-12 10:05:07 +0000 +++ b/storage/ndb/memcache/src/schedulers/Stockholm.h 2011-09-30 18:04:25 +0000 @@ -31,6 +31,7 @@ #include "workitem.h" #include "Scheduler.h" #include "KeyPrefix.h" +#include "ConnQueryPlanSet.h" /* @@ -62,6 +63,7 @@ private: uint64_t commit_thread_vtime; } stats; pthread_t commit_thread_id; + ConnQueryPlanSet * plan_set; NdbInstance **instances; int nInst; NdbInstance *nextFree; === modified file 'storage/ndb/memcache/unit/all_tests.h' --- a/storage/ndb/memcache/unit/all_tests.h 2011-09-22 05:02:00 +0000 +++ b/storage/ndb/memcache/unit/all_tests.h 2011-09-30 16:22:30 +0000 @@ -34,9 +34,9 @@ #define REQ_NDB_CONNECTION 1 #define REQ_DEMO_TABLE 2 -void delete_row(QueryPlan *plan, const char * key, int verbose); +void delete_row(QueryPlan *plan, Ndb *db, const char * key, int verbose); -typedef int TESTCASE(QueryPlan *plan, int verbose); +typedef int TESTCASE(QueryPlan *plan, Ndb *db, int verbose); struct test_item { int enabled; === modified file 'storage/ndb/memcache/unit/alloc.cc' --- a/storage/ndb/memcache/unit/alloc.cc 2011-09-22 18:27:10 +0000 +++ b/storage/ndb/memcache/unit/alloc.cc 2011-09-30 16:22:30 +0000 @@ -27,7 +27,7 @@ #include "all_tests.h" -int run_allocator_test(QueryPlan *, int v) { +int run_allocator_test(QueryPlan *, Ndb *, int v) { struct request_pipeline *p = get_request_pipeline(); memory_pool *p1 = pipeline_create_memory_pool(p); === modified file 'storage/ndb/memcache/unit/cas.cc' --- a/storage/ndb/memcache/unit/cas.cc 2011-09-12 10:05:07 +0000 +++ b/storage/ndb/memcache/unit/cas.cc 2011-09-30 16:22:30 +0000 @@ -21,35 +21,36 @@ #include "all_tests.h" -int set_row(int, QueryPlan *, const char *, Uint64 , Uint64 ); +int set_row(int, QueryPlan *, Ndb *, const char *, Uint64 , Uint64 ); void build_cas_routine(NdbInterpretedCode *, QueryPlan *plan, Uint64 *cas); -int run_cas_test(QueryPlan *plan, int v) { +int run_cas_test(QueryPlan *plan, Ndb *db, int v) { const Uint64 cas = 30090000000000003ULL; int r; - r = set_row(v, plan, "cas_unit_test_1", 0ULL, cas); // a normal update + r = set_row(v, plan, db, "cas_unit_test_1", 0ULL, cas); // a normal update detail(v, "(1): %d\n", r); require(r == 0); - r = set_row(v, plan, "cas_unit_test_1", cas, cas + 1); // an interpreted update + r = set_row(v, plan, db, "cas_unit_test_1", cas, cas + 1); // an interpreted update detail(v, "(2): %d\n", r); require(r == 0); - r = set_row(v, plan, "cas_unit_test_2", 0ULL, cas); // a normal update + r = set_row(v, plan, db, "cas_unit_test_2", 0ULL, cas); // a normal update detail(v, "(2): %d\n", r); require(r == 0); - r = set_row(v, plan, "cas_unit_test_2", cas-1, cas+1); // this should fail + r = set_row(v, plan, db, "cas_unit_test_2", cas-1, cas+1); // this should fail detail(v, "(2): %d\n", r); require(r == 899); pass; } -int set_row(int v, QueryPlan *plan, const char *akey, Uint64 old_cas, Uint64 new_cas) { +int set_row(int v, QueryPlan *plan, Ndb *db, + const char *akey, Uint64 old_cas, Uint64 new_cas) { NdbOperation::OperationOptions options; NdbInterpretedCode code(plan->table); char key[50]; @@ -76,7 +77,7 @@ int set_row(int v, QueryPlan *plan, cons op.setColumn(COL_STORE_VALUE, value, strlen(value)); op.setColumnBigUnsigned(COL_STORE_CAS, new_cas); - NdbTransaction *tx = op.startTransaction(); + NdbTransaction *tx = op.startTransaction(db); if(old_cas) { /* This is an interpreted update */ === modified file 'storage/ndb/memcache/unit/casbits.cc' --- a/storage/ndb/memcache/unit/casbits.cc 2011-09-22 18:27:10 +0000 +++ b/storage/ndb/memcache/unit/casbits.cc 2011-09-30 16:22:30 +0000 @@ -31,7 +31,7 @@ ndbmc_atomic32_t engine_cas_lo = 0xb0000 void worker_set_cas(int verbose, Uint64 *cas); -int test_cas_bitshifts(QueryPlan *, int v) { +int test_cas_bitshifts(QueryPlan *, Ndb *, int v) { Uint64 cas = 0ULL; worker_set_cas(v, &cas); require(cas == 0x00717530B0000065ULL); === modified file 'storage/ndb/memcache/unit/connpool.cc' --- a/storage/ndb/memcache/unit/connpool.cc 2011-09-12 10:05:07 +0000 +++ b/storage/ndb/memcache/unit/connpool.cc 2011-09-30 16:22:30 +0000 @@ -23,10 +23,10 @@ #include "all_tests.h" -int run_pool_test(QueryPlan *plan, int v) { +int run_pool_test(QueryPlan *plan, Ndb *db1, int v) { int conn_retries = 0; int r; - Ndb_cluster_connection &main_conn = plan->db->get_ndb_cluster_connection(); + Ndb_cluster_connection &main_conn = db1->get_ndb_cluster_connection(); /* The pooled connection */ Ndb_cluster_connection * nc = new Ndb_cluster_connection(connect_string, & main_conn); @@ -75,13 +75,13 @@ int run_pool_test(QueryPlan *plan, int v nc->node_id(), nc->get_connected_host(), nc->get_connected_port()); require(nc->node_id()); - Ndb *db = new Ndb(nc); - require(db); + Ndb *db2 = new Ndb(nc); + require(db2); detail(v, "Created an Ndb object.\n"); - db->init(4); + db2->init(4); - NdbTransaction *tx = db->startTransaction(); + NdbTransaction *tx = db2->startTransaction(); require(tx); detail(v, "Started a transaction.\n"); === modified file 'storage/ndb/memcache/unit/harness.cc' --- a/storage/ndb/memcache/unit/harness.cc 2011-09-12 10:05:07 +0000 +++ b/storage/ndb/memcache/unit/harness.cc 2011-09-30 16:22:30 +0000 @@ -113,7 +113,7 @@ int main(int argc, char *argv[]) { if(test_number >= 0) { /* Run a particular test */ printf("%s\n", all_tests[test_number].name); - int r = all_tests[test_number].function(plan, 1); //verbose + int r = all_tests[test_number].function(plan, db, 1); //verbose if(r) { printf(" [FAIL] at line %d\n", r); nfail++; } else { @@ -124,7 +124,7 @@ int main(int argc, char *argv[]) { for(int i = 0; all_tests[i].name; i++) { if(all_tests[i].enabled) { printf("%-30s", all_tests[i].name); - int r = all_tests[i].function(plan, 0); // quiet + int r = all_tests[i].function(plan, db, 0); // quiet printf(" %s\n", r ? "[FAIL]" : "[PASS]"); if(r) nfail++; else npass++; @@ -200,7 +200,7 @@ Ndb_cluster_connection * connect(const c } -void delete_row(QueryPlan *plan, const char * key, int verbose) { +void delete_row(QueryPlan *plan, Ndb *db, const char * key, int verbose) { char ndbkeybuffer[300]; Operation op(plan, OP_DELETE, ndbkeybuffer); @@ -208,7 +208,7 @@ void delete_row(QueryPlan *plan, const c op.clearKeyNullBits(); op.setKeyPart(COL_STORE_KEY, key, strlen(key)); - NdbTransaction * tx = op.startTransaction(); + NdbTransaction * tx = op.startTransaction(db); op.deleteTuple(tx); tx->execute(NdbTransaction::Commit); === modified file 'storage/ndb/memcache/unit/incr.cc' --- a/storage/ndb/memcache/unit/incr.cc 2011-09-22 18:27:10 +0000 +++ b/storage/ndb/memcache/unit/incr.cc 2011-09-30 16:22:30 +0000 @@ -19,47 +19,48 @@ */ #include "all_tests.h" -int do_incr(int v, QueryPlan *plan, const char *akey, bool, bool, Uint64 *val); +int do_incr(int v, QueryPlan *plan, Ndb *db, + const char *akey, bool, bool, Uint64 *val); -int run_incr_test(QueryPlan *plan, int v) { +int run_incr_test(QueryPlan *plan, Ndb *db, int v) { int r = 0; Uint64 val = 33; - delete_row(plan, "incr_unit_test_1", v); - delete_row(plan, "incr_unit_test_2", v); + delete_row(plan, db, "incr_unit_test_1", v); + delete_row(plan, db, "incr_unit_test_2", v); detail(v, "Test 1: INCR non-existing row, create=false\n"); - r = do_incr(v, plan, "incr_unit_test_1", false, true, &val); + r = do_incr(v, plan, db, "incr_unit_test_1", false, true, &val); detail(v, "Result - NDB=%d Val=%llu \n\n", r, (long long unsigned) val); require(r == 626); require(val == 33); detail(v, "Test 2: INCR non-existing row, create=true, update = false\n"); - r = do_incr(v, plan, "incr_unit_test_1", true, false, &val); + r = do_incr(v, plan, db, "incr_unit_test_1", true, false, &val); detail(v, "Result - NDB=%d Val=%llu \n\n", r, val); require(r == 626); // the transaction gets a 626 even if the insert succeeds require(val == -1ULL); detail(v, "test 3: READ row created in test 2\n"); - r = do_incr(v, plan, "incr_unit_test_1", false, false, &val); + r = do_incr(v, plan, db, "incr_unit_test_1", false, false, &val); detail(v, "Result - NDB=%d Val=%llu \n\n", r, (long long unsigned) val); require(r == 0); require(val == -1ULL); detail(v, "Test 4: INCR non-existing row, create=true\n"); - r = do_incr(v, plan, "incr_unit_test_2", true, true, &val); + r = do_incr(v, plan, db, "incr_unit_test_2", true, true, &val); detail(v, "Result - NDB=%d Val=%llu \n\n", r, (long long unsigned) val); require(r == 626); require(val == 0); detail(v, "Test 5: INCR existing row, create=false\n"); - r = do_incr(v, plan, "incr_unit_test_2", false, true, &val); + r = do_incr(v, plan, db, "incr_unit_test_2", false, true, &val); detail(v, "Result - NDB=%d Val=%llu \n\n", r, (long long unsigned) val); require(r == 0); require(val == 1); detail(v, "Test 6: INCR existing row, create=true\n"); - r = do_incr(v, plan, "incr_unit_test_2", true, true, &val); + r = do_incr(v, plan, db, "incr_unit_test_2", true, true, &val); detail(v, "Result - NDB=%d Val=%llu \n\n", r, (long long unsigned) val); require(r == 630); // the insert failed but the update succeeded require(val == 2); @@ -68,7 +69,7 @@ int run_incr_test(QueryPlan *plan, int v } -int do_incr(int v, QueryPlan *plan, const char *akey, +int do_incr(int v, QueryPlan *plan, Ndb *db, const char *akey, bool create, bool update, Uint64 *val) { int r; char key[50]; @@ -96,9 +97,9 @@ int do_incr(int v, QueryPlan *plan, cons op.setColumn(COL_STORE_KEY, key, strlen(key)); op.setColumnBigUnsigned(COL_STORE_CAS, 0ULL); - NdbTransaction *tx = op.startTransaction(); + NdbTransaction *tx = op.startTransaction(db); if(! tx) { - int r = plan->db->RESULT; + int r = db->RESULT; detail(v, " get tx: %d \n", r); return r; } === modified file 'storage/ndb/memcache/unit/queue.cc' --- a/storage/ndb/memcache/unit/queue.cc 2011-09-12 10:05:07 +0000 +++ b/storage/ndb/memcache/unit/queue.cc 2011-09-30 16:22:30 +0000 @@ -41,7 +41,7 @@ q_test_obj * get(Queue &q, i const int n_loop_items = 50000; -int run_queue_test(QueryPlan *, int v) { +int run_queue_test(QueryPlan *, Ndb *, int v) { Queue q(n_loop_items); /* nothing there */ === modified file 'storage/ndb/memcache/unit/tsv.cc' --- a/storage/ndb/memcache/unit/tsv.cc 2011-09-12 10:05:07 +0000 +++ b/storage/ndb/memcache/unit/tsv.cc 2011-09-30 16:22:30 +0000 @@ -25,7 +25,7 @@ #include "all_tests.h" -int run_tsv_test(QueryPlan *, int v) { +int run_tsv_test(QueryPlan *, Ndb *, int v) { { TabSeparatedValues t1("frodo.xxx", 4, 5); require(t1.getLength() == 5); No bundle (reason: useless for push emails).