List:Commits« Previous MessageNext Message »
From:John David Duncan Date:September 30 2011 9:30pm
Subject:bzr push into mysql-5.5-cluster branch (john.duncan:3571 to 3578)
View as plain text  
 3578 John David Duncan	2011-09-30
      Online reconfiguration.  
      This revision compiles and passes the test suite on my laptop.

    modified:
      mysql-test/lib/My/Memcache.pm
      mysql-test/suite/ndb_memcache/include/datatypes_tables.inc
      mysql-test/suite/ndb_memcache/t/type_char.test
      storage/ndb/memcache/sandbox.sh.in
      storage/ndb/memcache/src/ConnQueryPlanSet.cc
      storage/ndb/memcache/src/ndb_worker.cc
      storage/ndb/memcache/src/schedulers/S_sched.cc
      storage/ndb/memcache/src/schedulers/S_sched.h
 3577 John David Duncan	2011-09-30
      ndb/memcache Configuration -- small fixes

    modified:
      storage/ndb/memcache/include/Configuration.h
      storage/ndb/memcache/src/Configuration.cc
 3576 John David Duncan	2011-09-30
      Changes to online reconfiguration in S Scheduler. 
      Online reconfig is actually thread-safe now, protected by a read/write lock.
      Online reconfig in the scheduler now *only* udpates the Configuration pointer, the generation number, and the list of QueryPlans.  
      So it is not currently possible to add a cluster or to grow the pool of Ndb objects using online reconfig. 
      This change also adapts both schedulers to use ConnQueryPlanSet.

    modified:
      storage/ndb/memcache/src/schedulers/S_sched.cc
      storage/ndb/memcache/src/schedulers/S_sched.h
      storage/ndb/memcache/src/schedulers/Stockholm.cc
      storage/ndb/memcache/src/schedulers/Stockholm.h
 3575 John David Duncan	2011-09-30
      New class ConnQueryPlanSet. 
      Each NdbInstance no longer keeps a list of QueryPlans;
      now, for each cluster connection, a ConnQueryPlanSet holds the QueryPlans.
      This revision does not compile.

    added:
      storage/ndb/memcache/include/ConnQueryPlanSet.h
      storage/ndb/memcache/src/ConnQueryPlanSet.cc
    modified:
      storage/ndb/memcache/CMakeLists.txt
      storage/ndb/memcache/include/Configuration.h
      storage/ndb/memcache/include/NdbInstance.h
      storage/ndb/memcache/src/Configuration.cc
      storage/ndb/memcache/src/NdbInstance.cc
 3574 John David Duncan	2011-09-30
      QueryPlan::db is now private.

    modified:
      storage/ndb/memcache/include/Configuration.h
      storage/ndb/memcache/include/Operation.h
      storage/ndb/memcache/include/QueryPlan.h
      storage/ndb/memcache/src/Configuration.cc
      storage/ndb/memcache/src/Operation.cc
      storage/ndb/memcache/src/ndb_worker.cc
      storage/ndb/memcache/unit/all_tests.h
      storage/ndb/memcache/unit/alloc.cc
      storage/ndb/memcache/unit/cas.cc
      storage/ndb/memcache/unit/casbits.cc
      storage/ndb/memcache/unit/connpool.cc
      storage/ndb/memcache/unit/harness.cc
      storage/ndb/memcache/unit/incr.cc
      storage/ndb/memcache/unit/queue.cc
      storage/ndb/memcache/unit/tsv.cc
 3573 John David Duncan	2011-09-30
      Some QueryPlan methods are now const.

    modified:
      storage/ndb/memcache/include/QueryPlan.h
      storage/ndb/memcache/src/QueryPlan.cc
 3572 John David Duncan	2011-09-30
      memcache: Rather than keeping an Ndb object (and eventually leaking it), Configuration and Config_v1 allocate an Ndb on the stack when they need one. 

    modified:
      storage/ndb/memcache/include/Configuration.h
      storage/ndb/memcache/src/Config_v1.cc
      storage/ndb/memcache/src/Configuration.cc
 3571 John David Duncan	2011-09-29
      Column->getSizeInBytes() seems to work for all cases of getColumnRecordSize().
      It also does better for determining alignment; for instance NdbApi seems to treat
      a timestamp column as an array of 4 chars, so getSize() was 1.  

    modified:
      storage/ndb/memcache/include/DataTypeHandler.h
      storage/ndb/memcache/include/QueryPlan.h
      storage/ndb/memcache/include/Record.h
      storage/ndb/memcache/src/DataTypeHandler.cc
      storage/ndb/memcache/src/QueryPlan.cc
      storage/ndb/memcache/src/Record.cc
=== modified file 'mysql-test/lib/My/Memcache.pm'
--- a/mysql-test/lib/My/Memcache.pm	2011-09-29 03:29:32 +0000
+++ b/mysql-test/lib/My/Memcache.pm	2011-09-30 20:46:27 +0000
@@ -127,6 +127,9 @@ sub wait_for_reconf {
   if($new_gen > 0) {
     $self->{cf_gen} = $new_gen;
   }
+  else {
+    print STDERR "Timed out.\n";
+  }
   
   return $new_gen;
 }

=== modified file 'mysql-test/suite/ndb_memcache/include/datatypes_tables.inc'
--- a/mysql-test/suite/ndb_memcache/include/datatypes_tables.inc	2011-09-29 03:29:32 +0000
+++ b/mysql-test/suite/ndb_memcache/include/datatypes_tables.inc	2011-09-30 20:46:27 +0000
@@ -50,7 +50,6 @@ UPDATE memcache_server_roles set update_
   
 DROP VIEW tv_tablist;
 
---real_sleep 1
 --source suite/ndb_memcache/include/wait_for_reconf.inc
 }
 

=== modified file 'mysql-test/suite/ndb_memcache/t/type_char.test'
--- a/mysql-test/suite/ndb_memcache/t/type_char.test	2011-09-29 03:29:32 +0000
+++ b/mysql-test/suite/ndb_memcache/t/type_char.test	2011-09-30 20:46:27 +0000
@@ -35,6 +35,7 @@ UPDATE memcache_server_roles set update_
 --perl
 
 use strict;
+use Carp;
 use lib "lib";
 use My::Memcache;
 # Use a binary protocol connection (so keys can contain spaces)
@@ -42,36 +43,31 @@ my $mc = My::Memcache::Binary->new();  
 my $port = $ENV{MTR_BUILD_THREAD} * 10 + 10000 + 8;
 my $r = $mc->connect("localhost",$port);
 
-# sleep(1) because there is still an undiagnosed race condition in 
-# online reconfiguration -- jdd, 14 Aug. 2011
-sleep(1);
-
 my $cf_gen = $mc->wait_for_reconf();
 
-if($cf_gen > 0) {
-  # test CHAR key with VARCHAR value
-  $mc->set("tck:a","fred")              || die;
-  $mc->set("tck:1","frederick")         || die;
-  $mc->set("tck:aa","frederica")        || die;
-  $mc->set("tck:a b c d","freddy")      || die; 
-  
-  ($mc->get("tck:aa") == "frederica")   || die;
-  ($mc->get("tck:a b c d") == "freddy") || die;
-  
-  # test VARCHAR key with CHAR value
-  $mc->set("tcv:a", "al")               || die;
-  $mc->set("tcv:b", "alphonse")         || die;
-  $mc->set("tcv:c", "allen")            || die;
-  $mc->set("tcv:d", "alien invasion")   || die;
-  
-  ($mc->get("tcv:d") == "alien invasion") || die;
-  ($mc->get("tcv:a") == "al")           || die;
-  ($mc->get("tcv:ee") == "NOT_FOUND")   || die;
-}
-else {
-  print " ***** Skipped test  [$r/$cf_gen] \n";
+if($cf_gen == 0) {
+  Carp::confess("FAILED WAIT_FOR_RECONF");
 }
 
+# test CHAR key with VARCHAR value
+$mc->set("tck:a","fred")               || Carp::confess("FAILED # 01 (SET)");
+$mc->set("tck:1","frederick")          || Carp::confess("FAILED # 02 (SET)");
+$mc->set("tck:aa","frederica")         || Carp::confess("FAILED # 03 (SET)");
+$mc->set("tck:a b c d","freddy")       || Carp::confess("FAILED # 04 (SET)");
+  
+($mc->get("tck:aa") == "frederica")    || Carp::confess("FAILED # 05 (GET)");
+($mc->get("tck:a b c d") == "freddy")  || Carp::confess("FAILED # 06 (GET)");
+
+# test VARCHAR key with CHAR value
+$mc->set("tcv:a", "al")                || Carp::confess("FAILED # 07 (SET)");
+$mc->set("tcv:b", "alphonse")          || Carp::confess("FAILED # 08 (SET)");
+$mc->set("tcv:c", "allen")             || Carp::confess("FAILED # 09 (SET)");
+$mc->set("tcv:d", "alien invasion")    || Carp::confess("FAILED # 10 (SET)");
+
+($mc->get("tcv:d") == "alien invasion")|| Carp::confess("FAILED # 11 (GET)");
+($mc->get("tcv:a") == "al")            || Carp::confess("FAILED # 12 (GET)");
+($mc->get("tcv:ee") == "NOT_FOUND")    || Carp::confess("FAILED # 13 (GET)");
+
 EOF
 
 

=== modified file 'storage/ndb/memcache/CMakeLists.txt'
--- a/storage/ndb/memcache/CMakeLists.txt	2011-09-25 07:19:37 +0000
+++ b/storage/ndb/memcache/CMakeLists.txt	2011-09-30 17:04:30 +0000
@@ -56,6 +56,7 @@ set(NDB_MEMCACHE_SOURCE_FILES
     src/ClusterConnectionPool.cc
     src/Configuration.cc
     src/Config_v1.cc
+    src/ConnQueryPlanSet.cc
     src/DataTypeHandler.cc
     src/KeyPrefix.cc
     src/NdbInstance.cc

=== modified file 'storage/ndb/memcache/include/Configuration.h'
--- a/storage/ndb/memcache/include/Configuration.h	2011-09-22 18:27:10 +0000
+++ b/storage/ndb/memcache/include/Configuration.h	2011-09-30 20:34:55 +0000
@@ -87,7 +87,7 @@ class Configuration {
   const KeyPrefix * getPrefixForKey(const char *key, int nkey) const;
   const KeyPrefix * getPrefixByInfo(const prefix_info_t info) const;
   const KeyPrefix * getPrefix(int id) const;                          // inlined
-  const KeyPrefix * getNextPrefixForCluster(uint cluster_id, KeyPrefix *) const;
+  const KeyPrefix * getNextPrefixForCluster(uint cluster_id, const KeyPrefix *) const;
   void setPrimaryConnectString(const char *);                         // inlined
   void setServerRole(const char *);                                   // inlined
   const char * getServerRole();                                       // inlined
@@ -113,7 +113,6 @@ class Configuration {
   int storePrefix(KeyPrefix &prefix);
   void storeCAS(uint64_t ndb_engine_cas, uint64_t default_engine_cas);
 
-  Ndb *db;
   const char *primary_connect_string;
   int onlineReloadFlag;
   int (*reload_waiter)(Ndb_cluster_connection *, const char *);
@@ -122,7 +121,7 @@ class Configuration {
   /* private methods */
   void store_default_prefix();
   config_ver_enum get_supported_version();
-  bool fetch_meta_record(QueryPlan *plan, const char *version);
+  bool fetch_meta_record(QueryPlan *plan, Ndb *db, const char *version);
   
   /* private instance variables */
   const char *server_role;

=== added file 'storage/ndb/memcache/include/ConnQueryPlanSet.h'
--- a/storage/ndb/memcache/include/ConnQueryPlanSet.h	1970-01-01 00:00:00 +0000
+++ b/storage/ndb/memcache/include/ConnQueryPlanSet.h	2011-09-30 17:04:30 +0000
@@ -0,0 +1,49 @@
+/*
+ Copyright (c) 2011, Oracle and/or its affiliates. All rights
+ reserved.
+ 
+ This program is free software; you can redistribute it and/or
+ modify it under the terms of the GNU General Public License
+ as published by the Free Software Foundation; version 2 of
+ the License.
+ 
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+ 
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ 02110-1301  USA
+ */
+
+#ifndef NDBMEMCACHE_CONNQUERYPLANSET_H
+#define NDBMEMCACHE_CONNQUERYPLANSET_H
+
+#ifndef __cplusplus
+#error "This file is for C++ only"
+#endif
+
+
+#include "Configuration.h"
+#include "QueryPlan.h"
+
+class ConnQueryPlanSet {
+public:
+  ConnQueryPlanSet(Ndb_cluster_connection *, int n_plans);
+  ~ConnQueryPlanSet();
+
+  bool buildSetForConfiguration(const Configuration *, int cluster_id);
+  QueryPlan * getPlanForPrefix(const KeyPrefix *);
+
+private:
+  Ndb *db;
+  int nplans;
+  QueryPlan **plans;  
+};
+
+
+
+#endif
+

=== modified file 'storage/ndb/memcache/include/NdbInstance.h'
--- a/storage/ndb/memcache/include/NdbInstance.h	2011-09-12 10:05:07 +0000
+++ b/storage/ndb/memcache/include/NdbInstance.h	2011-09-30 17:04:30 +0000
@@ -34,27 +34,22 @@
 struct workitem;
 
 #define VPSZ sizeof(void *)
-#define ISZ  sizeof(int)
-#define TOTAL_SZ (ISZ + (4 * VPSZ))
+#define TOTAL_SZ (3 * VPSZ)
 #define PADDING (64 - TOTAL_SZ)
 
 
 class NdbInstance {
 public:
   /* Public Methods */
-  NdbInstance(Ndb_cluster_connection *, int, int);
+  NdbInstance(Ndb_cluster_connection *, int);
   ~NdbInstance();
-  QueryPlan * getPlanForPrefix(const KeyPrefix *);
 
   /* Public Instance Variables */  
   Ndb *db;
   NdbInstance *next;
   workitem *wqitem;
-  int sched_gen_number;
  
 private:
-  int nplans;
-  QueryPlan **plans;  
   char cache_line_padding[PADDING];
 };
 

=== modified file 'storage/ndb/memcache/include/Operation.h'
--- a/storage/ndb/memcache/include/Operation.h	2011-09-12 10:05:07 +0000
+++ b/storage/ndb/memcache/include/Operation.h	2011-09-30 16:22:30 +0000
@@ -83,7 +83,7 @@ public: 
 
   /* NdbTransaction method wrappers */
   // startTransaction
-  NdbTransaction *startTransaction() const;
+  NdbTransaction *startTransaction(Ndb *) const;
 
   // read
   const NdbOperation *readTuple(NdbTransaction *tx, 

=== modified file 'storage/ndb/memcache/include/QueryPlan.h'
--- a/storage/ndb/memcache/include/QueryPlan.h	2011-09-29 19:13:43 +0000
+++ b/storage/ndb/memcache/include/QueryPlan.h	2011-09-30 16:22:30 +0000
@@ -47,13 +47,12 @@ class QueryPlan {
   QueryPlan() : initialized(0)  {};  
   QueryPlan(Ndb *, const TableSpec *, PlanOpts opts = NoOptions); 
   ~QueryPlan();
-  bool keyIsPrimaryKey();
-  void debug_dump();
+  bool keyIsPrimaryKey() const;
+  void debug_dump() const;
    
   /* public instance variables */
   bool initialized;
   bool dup_numbers;
-  Ndb *db;
   const TableSpec *spec;
   NdbDictionary::Dictionary *dict;
   const NdbDictionary::Table *table;
@@ -77,6 +76,7 @@ class QueryPlan {
   const NdbDictionary::Index * chooseIndex();
 
   /* Private instance variables */
+  Ndb *db;
 };
 
 

=== modified file 'storage/ndb/memcache/sandbox.sh.in'
--- a/storage/ndb/memcache/sandbox.sh.in	2011-09-28 20:28:34 +0000
+++ b/storage/ndb/memcache/sandbox.sh.in	2011-09-30 20:46:27 +0000
@@ -1,7 +1,5 @@
 #!/bin/sh
 
-# FIXME:  Solaris doesn't have whoami and "kill" doesn't work as used here.
-
 prefix=@CMAKE_INSTALL_PREFIX@
 bindir=@INSTALL_BINDIR@
 libexecdir=@INSTALL_SBINDIR@
@@ -12,6 +10,7 @@ sourcetree=@CMAKE_CURRENT_SOURCE_DIR@
 installtree=@CMAKE_INSTALL_PREFIX@/memcache-api
 memcached_binary=@MEMCACHED_BIN_PATH@
 
+MYSELF=`who am i | awk '{print $1}'`
 MYSQL_PREFIX=$prefix
 MYSQL_BIN=$prefix/$bindir
 MYSQL_LIBEXEC=$prefix/$libexecdir
@@ -20,7 +19,11 @@ MYSQL_LIB=$prefix/$libdir
 MEMCACHE_BASE=$memcachedir
 SOURCE_TREE=$sourcetree
 HOME_BASE=$sourcetree   # fallback to source tree 
-test -d $installtree && HOME_BASE=$installtree  # prefer installed tree
+if test $HOME_BASE != `pwd`  
+ then
+   # prefer installed tree unless we are currently in the source tree
+   test -d $installtree && HOME_BASE=$installtree
+fi
 
 test_paths() {
   test_path $MYSQL_BIN ndb_mgm
@@ -58,7 +61,7 @@ write_my_cnf() {
     echo "ndbcluster"
     echo "datadir=$HOME_BASE/sandbox/data"
     echo "pid-file=$HOME_BASE/sandbox/mysqld.pid"
-    echo "user="`whoami`
+    echo "user=$MYSELF"
     echo "innodb_log_file_size=1M"
     echo
   ) > $HOME_BASE/sandbox/my.cnf
@@ -94,7 +97,7 @@ write_cluster_ini() {
 do_install_db() {
   $MYSQL_SCRIPTS/mysql_install_db \
     --basedir=$MYSQL_PREFIX --datadir=$HOME_BASE/sandbox/data \
-    --skip-name-resolve --user=`whoami` > /dev/null
+    --skip-name-resolve --user=$MYSELF > /dev/null
   if test ! -d sandbox/data/mysql 
     then echo "Failed: mysql_install_db did not work." && exit
     else echo "Created MySQL System Tables"
@@ -159,7 +162,8 @@ start_memcached() {
 }
 
 stop_memcached() {
-  kill `cat $HOME_BASE/sandbox/memcached.pid`
+  SERVERPID=`cat $HOME_BASE/sandbox/memcached.pid`
+  test -n "$SERVERPID" && kill $SERVERPID
 }
 
 stop_mysqld() {
@@ -173,6 +177,9 @@ stop_mgm_server() {
 final_message() {
   sleep 2
   echo
+  echo "Sandbox directory is $HOME_BASE/sandbox"
+  echo "Memcached is $memcached_binary"
+  echo 
   echo "Use \"sh sandbox.sh stop\" to stop memcached & MySQL Cluster"
   echo 
 }

=== modified file 'storage/ndb/memcache/src/Config_v1.cc'
--- a/storage/ndb/memcache/src/Config_v1.cc	2011-09-29 03:26:04 +0000
+++ b/storage/ndb/memcache/src/Config_v1.cc	2011-09-30 07:03:26 +0000
@@ -195,14 +195,16 @@ bool config_v1::read_configuration() {
 int config_v1::get_server_role_id() {
   uint32_t val = -1;
   
+  Ndb db(conf.primary_conn);
+  db.init(2);
   TableSpec spec("ndbmemcache.memcache_server_roles",
                  "role_name", "role_id,max_tps");
-  QueryPlan plan(conf.db, &spec); 
+  QueryPlan plan(&db, &spec);
   Operation op(&plan, OP_READ);
   
   op.key_buffer = (char *) malloc(op.requiredKeyBuffer());
   op.buffer =     (char *) malloc(op.requiredBuffer());
-  NdbTransaction *tx = conf.db->startTransaction();
+  NdbTransaction *tx = db.startTransaction();
   
   op.clearKeyNullBits();
   op.setKeyPart(COL_STORE_KEY, conf.server_role, strlen(conf.server_role));
@@ -240,13 +242,15 @@ bool config_v1::get_policies() {
   DEBUG_ENTER_METHOD("config_v1::get_policies");
   bool success = true;
   int res;
+  Ndb db(conf.primary_conn);
+  db.init(4);
   TableSpec spec("ndbmemcache.cache_policies",
                  "policy_name",                 
                  "get_policy,set_policy,delete_policy,flush_from_db");
-  QueryPlan plan(conf.db, &spec); 
+  QueryPlan plan(&db, &spec); 
   Operation op(&plan, OP_SCAN);
   
-  NdbTransaction *tx = conf.db->startTransaction();
+  NdbTransaction *tx = db.startTransaction();
   NdbScanOperation *scan = op.scanTable(tx);
   if(! scan) {
     logger->log(LOG_WARNING, 0, tx->getNdbError().message);
@@ -311,14 +315,16 @@ bool config_v1::get_connections() {
   DEBUG_ENTER_METHOD("config_v1::get_connections");
   bool success = true;
   int res;
+  Ndb db(conf.primary_conn);
+  db.init(4);
   TableSpec spec("ndbmemcache.ndb_clusters",
                  "cluster_id",
                  "ndb_connectstring,microsec_rtt");  
   /* Scan the ndb_clusters table */
-  QueryPlan plan(conf.db, &spec); 
+  QueryPlan plan(&db, &spec); 
   Operation op(&plan, OP_SCAN);
   
-  NdbTransaction *tx = conf.db->startTransaction();
+  NdbTransaction *tx = db.startTransaction();
   NdbScanOperation *scan = op.scanTable(tx);
   if(! scan) {
     logger->log(LOG_WARNING, 0, tx->getNdbError().message);
@@ -383,16 +389,18 @@ TableSpec * config_v1::get_container(cha
 
 TableSpec * config_v1::get_container_record(char *name) {
   TableSpec *container;
+  Ndb db(conf.primary_conn);
+  db.init(1);
   TableSpec spec("ndbmemcache.containers",
                  "name",  
                  "db_schema,db_table,key_columns,value_columns,flags,"
-                 "increment_column,cas_column,expire_time_column");
-  QueryPlan plan(conf.db, &spec); 
+                 "increment_column,cas_column,expire_time_column");                 
+  QueryPlan plan(&db, &spec); 
   Operation op(&plan, OP_READ);
   
   op.key_buffer = (char *) malloc(op.requiredKeyBuffer());
   op.buffer     = (char *) malloc(op.requiredBuffer());
-  NdbTransaction *tx = conf.db->startTransaction();
+  NdbTransaction *tx = db.startTransaction();
   
   op.clearKeyNullBits();
   op.setKeyPart(COL_STORE_KEY, name, strlen(name));
@@ -478,7 +486,9 @@ bool config_v1::get_prefixes(int role_id
   TableSpec spec("ndbmemcache.key_prefixes",
                  "server_role_id,key_prefix", 
                  "cluster_id,policy,container");
-  QueryPlan plan(conf.db, &spec, PKScan); 
+  Ndb db(conf.primary_conn);
+  db.init(4);
+  QueryPlan plan(&db, &spec, PKScan); 
   Operation op(&plan, OP_SCAN);
   
   // `server_role_id` INT UNSIGNED NOT NULL DEFAULT 0,
@@ -492,7 +502,7 @@ bool config_v1::get_prefixes(int role_id
   bound.low_inclusive = bound.high_inclusive = true;
   bound.range_no = 0;
   
-  NdbTransaction *tx = conf.db->startTransaction();
+  NdbTransaction *tx = db.startTransaction();
   NdbIndexScanOperation *scan = op.scanIndex(tx, &bound);
   if(! scan) {
     logger->log(LOG_WARNING, 0, "scanIndex(): %s\n", tx->getNdbError().message);
@@ -629,21 +639,23 @@ void config_v1::log_signon() {
   DEBUG_ENTER_METHOD("config_v1::log_signon");
   char my_hostname[256];
   gethostname(my_hostname, 256);  
+  Ndb db(conf.primary_conn);
+  db.init(1);
   TableSpec spec("ndbmemcache.last_memcached_signon",
                  "ndb_node_id", "hostname,server_role,signon_time");
-  QueryPlan plan(conf.db, &spec);
+  QueryPlan plan(&db, &spec);
   
   Operation op(&plan, OPERATION_SET);
   op.buffer     = (char *) malloc(op.requiredBuffer());
   op.key_buffer = (char *) malloc(op.requiredKeyBuffer());
   op.clearNullBits();
-  op.setKeyPartInt(COL_STORE_KEY,   conf.db->getNodeId());  // node ID (in key)
-  op.setColumnInt(COL_STORE_KEY,    conf.db->getNodeId());  // node ID (in row)
+  op.setKeyPartInt(COL_STORE_KEY,   db.getNodeId());  // node ID (in key)
+  op.setColumnInt(COL_STORE_KEY,    db.getNodeId());  // node ID (in row)
   op.setColumn(COL_STORE_VALUE+0,   my_hostname, strlen(my_hostname));           // hostname
   op.setColumn(COL_STORE_VALUE+1,   conf.server_role, strlen(conf.server_role)); // role
   op.setColumnInt(COL_STORE_VALUE+2,time(NULL));                                 // timestamp
   
-  NdbTransaction *tx = conf.db->startTransaction();   // TODO: node selection
+  NdbTransaction *tx = db.startTransaction();   // TODO: node selection
   op.writeTuple(tx);
   tx->execute(NdbTransaction::Commit);
   tx->getGCI(&signon_gci);
@@ -666,10 +678,12 @@ void config_v1::set_initial_cas() {
    |                          | + 8bit |                            | 
    |                          | NodeId |                            |
    ----------------------------------------------------------------   */
+  Ndb db(conf.primary_conn);
+  db.init(1);
   const uint64_t MASK_GCI   = 0x07FFFFFF00000000LLU; // Use these 27 bits of GCI 
   const uint64_t ENGINE_BIT = 0x0000001000000000LLU; // bit 36
   
-  uint64_t node_id = ((uint64_t) conf.db->getNodeId()) << 28;  
+  uint64_t node_id = ((uint64_t) db.getNodeId()) << 28;  
   uint64_t gci_bits = (signon_gci & MASK_GCI) << 5;
   uint64_t def_eng_cas = gci_bits | node_id;
   uint64_t ndb_eng_cas = gci_bits | ENGINE_BIT | node_id;
@@ -677,7 +691,7 @@ void config_v1::set_initial_cas() {
   //  void storeCAS(uint64_t ndb_engine_cas, uint64_t default_engine_cas);
   conf.storeCAS(ndb_eng_cas, def_eng_cas);
   DEBUG_PRINT("Sign On GCI: 0x%llx | Node Id: [%d] 0x%llx | Engine bit: 0x%llx", 
-              signon_gci, conf.db->getNodeId(), node_id, ENGINE_BIT);
+              signon_gci, db.getNodeId(), node_id, ENGINE_BIT);
   DEBUG_PRINT("Initial CAS: %llu 0x%llx ", ndb_eng_cas, ndb_eng_cas);
   
   return;

=== modified file 'storage/ndb/memcache/src/Configuration.cc'
--- a/storage/ndb/memcache/src/Configuration.cc	2011-09-29 03:26:04 +0000
+++ b/storage/ndb/memcache/src/Configuration.cc	2011-09-30 20:34:55 +0000
@@ -54,12 +54,7 @@ Configuration::Configuration(Configurati
   primary_connect_string(old->primary_connect_string),
   server_role(old->server_role),
   config_version(CONFIG_VER_UNKNOWN),
-  primary_conn(old->primary_conn)
-{
-  db = new Ndb(primary_conn);
-  db->init();
-}
-
+  primary_conn(old->primary_conn)  {};
 
 
 bool Configuration::connectToPrimary() {
@@ -84,12 +79,12 @@ bool Configuration::connectToPrimary() {
   primary_conn = ClusterConnectionPool::connect(primary_connect_string);
 
   if(primary_conn) {    
-    db = new Ndb(primary_conn);  // This Ndb is used only to read the "meta" table.
-    db->init();
     return true;
   }
-  logger->log(LOG_WARNING, 0, "FAILED.\n");
-  return false;
+  else {
+    logger->log(LOG_WARNING, 0, "FAILED.\n");
+    return false;
+  }
 }
 
 
@@ -201,13 +196,17 @@ const KeyPrefix * Configuration::getPref
 
 
 const KeyPrefix * Configuration::getNextPrefixForCluster(unsigned int cluster_id, 
-                                                         KeyPrefix *k) const {
+                                                         const KeyPrefix *k) const {
   unsigned int i = 0;
 
-  if(k) while(prefixes[i] != k && i < nprefixes) i++;
+  if(k) {
+    while(prefixes[i] != k && i < nprefixes) i++;  // find k in the list
+    i++;  // then advance one more
+  }
+    
   while(i < nprefixes && prefixes[i]->info.cluster_id != cluster_id) i++;
 
-  if(i == nprefixes) return 0;
+  if(i >= nprefixes) return 0;
   else return prefixes[i];
 }
 
@@ -283,19 +282,21 @@ void Configuration::storeCAS(uint64_t nd
 
 config_ver_enum Configuration::get_supported_version() {
 
+  Ndb db(primary_conn);
+  db.init(1);
   TableSpec ts_meta("ndbmemcache.meta", "application,metadata_version", "");
-  QueryPlan plan(db, &ts_meta);
+  QueryPlan plan(& db, &ts_meta);
   // "initialized" is set only if the ndbmemcache.meta table exists:
   if(plan.initialized) {
-    if(fetch_meta_record(&plan, "1.1")) {
+    if(fetch_meta_record(&plan, &db, "1.1")) {
       DEBUG_PRINT("1.1");
       return CONFIG_VER_1_1;
     }
-    if(fetch_meta_record(&plan, "1.0")) {
+    if(fetch_meta_record(&plan, &db, "1.0")) {
       DEBUG_PRINT("1.0");
       return CONFIG_VER_1_0;
     }
-    if(fetch_meta_record(&plan, "1.0a")) {
+    if(fetch_meta_record(&plan, &db, "1.0a")) {
       DEBUG_PRINT("1.0a");
       logger->log(LOG_WARNING, 0, "\nThe configuration schema from prototype2 is"
                   " no longer supported.\nPlease drop your ndbmemcache database,"
@@ -307,7 +308,8 @@ config_ver_enum Configuration::get_suppo
 }  
 
 
-bool Configuration::fetch_meta_record(QueryPlan *plan, const char *version) {
+bool Configuration::fetch_meta_record(QueryPlan *plan, Ndb *db,
+                                      const char *version) {
   DEBUG_ENTER_METHOD("Configuration::fetch_meta_record");
   bool result = false;
 

=== added file 'storage/ndb/memcache/src/ConnQueryPlanSet.cc'
--- a/storage/ndb/memcache/src/ConnQueryPlanSet.cc	1970-01-01 00:00:00 +0000
+++ b/storage/ndb/memcache/src/ConnQueryPlanSet.cc	2011-09-30 20:46:27 +0000
@@ -0,0 +1,64 @@
+/*
+ Copyright (c) 2011, Oracle and/or its affiliates. All rights
+ reserved.
+ 
+ This program is free software; you can redistribute it and/or
+ modify it under the terms of the GNU General Public License
+ as published by the Free Software Foundation; version 2 of
+ the License.
+ 
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+ 
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ 02110-1301  USA
+ */
+
+#include "debug.h"
+#include "ConnQueryPlanSet.h"
+
+ConnQueryPlanSet::ConnQueryPlanSet(Ndb_cluster_connection *conn, int n) :
+  nplans(n),
+  plans(new QueryPlan *[n])
+{
+  memset(plans, 0, (nplans * sizeof(QueryPlan *)));  
+  db = new Ndb(conn);
+  db->init();
+}
+
+
+ConnQueryPlanSet::~ConnQueryPlanSet() 
+{
+  delete db;
+  delete plans;
+}
+
+
+bool ConnQueryPlanSet:: buildSetForConfiguration(const Configuration *cf,
+                                                 int cluster_id)
+{
+  const KeyPrefix *k = cf->getNextPrefixForCluster(cluster_id, NULL);
+  while(k) {
+    DEBUG_PRINT("Building plan for prefix %d", k->info.prefix_id);
+    getPlanForPrefix(k);
+    k = cf->getNextPrefixForCluster(cluster_id, k);
+  }
+}
+
+
+QueryPlan * ConnQueryPlanSet::getPlanForPrefix(const KeyPrefix *prefix) { 
+  int plan_id = prefix->info.prefix_id;
+  
+  if(plans[plan_id] == 0) {
+    plans[plan_id] = new QueryPlan(db, prefix->table);
+  }
+  
+  if(plans[plan_id]->initialized)
+    return plans[plan_id];
+  else
+    return 0;
+}

=== modified file 'storage/ndb/memcache/src/NdbInstance.cc'
--- a/storage/ndb/memcache/src/NdbInstance.cc	2011-09-25 05:06:46 +0000
+++ b/storage/ndb/memcache/src/NdbInstance.cc	2011-09-30 17:04:30 +0000
@@ -28,13 +28,9 @@
    ------------- NdbInstance ----------------
    ------------------------------------------ */
  
-NdbInstance::NdbInstance(Ndb_cluster_connection *c, int nprefixes,
-                         int ntransactions) :
+NdbInstance::NdbInstance(Ndb_cluster_connection *c, int ntransactions) :
   next(0), wqitem(0)
 {
-  nplans = nprefixes;
-  plans = new QueryPlan *[nplans];
-  memset(plans, 0, (nplans * sizeof(QueryPlan *)));
   if(c) {
     db = new Ndb(c);
     db->init(ntransactions);
@@ -49,23 +45,6 @@ NdbInstance::NdbInstance(Ndb_cluster_con
 
 NdbInstance::~NdbInstance() {
   if(db) delete db;
-  
-  for(int i = 0 ; i < nplans ; i++) 
-    if(plans[i])
-      delete plans[i];
 }
 
 
-QueryPlan * NdbInstance::getPlanForPrefix(const KeyPrefix *prefix) {
-  int plan_id = prefix->info.prefix_id;
-
-  if(plans[plan_id] == 0) {
-    plans[plan_id] = new QueryPlan(db, prefix->table);
-  }
-
-  if(plans[plan_id]->initialized)
-    return plans[plan_id];
-  else
-    return 0;
-}
-

=== modified file 'storage/ndb/memcache/src/Operation.cc'
--- a/storage/ndb/memcache/src/Operation.cc	2011-09-22 18:27:10 +0000
+++ b/storage/ndb/memcache/src/Operation.cc	2011-09-30 16:22:30 +0000
@@ -70,10 +70,10 @@ size_t Operation::copyValue(int idx, cha
 
 /* NdbTransaction method wrappers */
 
-NdbTransaction * Operation::startTransaction() const {
+NdbTransaction * Operation::startTransaction(Ndb *db) const {
   char hash_buffer[512];
-  return plan->db->startTransaction(plan->key_record->ndb_record, key_buffer,
-                                    hash_buffer, 512);
+  return db->startTransaction(plan->key_record->ndb_record, key_buffer,
+                              hash_buffer, 512);
 }
 
 NdbIndexScanOperation * Operation::scanIndex(NdbTransaction *tx,

=== modified file 'storage/ndb/memcache/src/QueryPlan.cc'
--- a/storage/ndb/memcache/src/QueryPlan.cc	2011-09-29 19:13:43 +0000
+++ b/storage/ndb/memcache/src/QueryPlan.cc	2011-09-30 15:34:37 +0000
@@ -217,7 +217,7 @@ QueryPlan::~QueryPlan() {
 }
 
 
-void QueryPlan::debug_dump() {
+void QueryPlan::debug_dump() const {
   if(key_record) {
     DEBUG_PRINT("Key record:");
     key_record->debug_dump();
@@ -232,7 +232,7 @@ void QueryPlan::debug_dump() {
   }
 }
 
-bool QueryPlan::keyIsPrimaryKey() {
+bool QueryPlan::keyIsPrimaryKey() const {
   if(spec->nkeycols == table->getNoOfPrimaryKeys()) {
     for(int i = 0 ; i < spec->nkeycols ; i++) 
       if(strcmp(spec->key_columns[i], table->getPrimaryKey(i)))

=== modified file 'storage/ndb/memcache/src/ndb_worker.cc'
--- a/storage/ndb/memcache/src/ndb_worker.cc	2011-09-28 03:58:19 +0000
+++ b/storage/ndb/memcache/src/ndb_worker.cc	2011-09-30 20:46:27 +0000
@@ -174,7 +174,7 @@ op_status_t worker_do_delete(workitem *w
   Operation op(plan, OP_DELETE, wqitem->ndb_key_buffer);
   const NdbOperation *ndb_op = 0;
 
-  NdbTransaction *tx = op.startTransaction();
+  NdbTransaction *tx = op.startTransaction(wqitem->ndb_instance->db);
   DEBUG_ASSERT(tx);
   
   op.clearKeyNullBits();
@@ -278,9 +278,10 @@ op_status_t worker_do_write(workitem *wq
   }
 
   /* Start the transaction */
-  NdbTransaction *tx = op.startTransaction();
+  NdbTransaction *tx = op.startTransaction(wqitem->ndb_instance->db);
   if(! tx) {
-    logger->log(LOG_WARNING, 0, "tx: %s \n", plan->db->getNdbError().message);
+    logger->log(LOG_WARNING, 0, "tx: %s \n", 
+                wqitem->ndb_instance->db->getNdbError().message);
     DEBUG_ASSERT(false);
   }
   
@@ -346,7 +347,7 @@ op_status_t worker_do_read(workitem *wqi
   op.setKeyPart(COL_STORE_KEY, dbkey, wqitem->base.nsuffix);  
 
   /* Start a transaction, and call NdbTransaction::readTuple() */
-  NdbTransaction *tx = op.startTransaction();
+  NdbTransaction *tx = op.startTransaction(wqitem->ndb_instance->db);
   DEBUG_ASSERT(tx);
 
   if(! op.readTuple(tx)) {
@@ -436,7 +437,7 @@ op_status_t worker_do_math(workitem *wqi
   } 
   
   /* Use an op (either one) to start the transaction */
-  NdbTransaction *tx = op1.startTransaction();
+  NdbTransaction *tx = op1.startTransaction(wqitem->ndb_instance->db);
   
   /* NdbOperation #1: READ */
   {
@@ -910,25 +911,23 @@ ENGINE_ERROR_CODE ndb_flush_all(ndb_pipe
   
   DEBUG_PRINT(" %d prefixes", conf.nprefixes);
   for(int i = 0 ; i < conf.nprefixes ; i++) {
-    NdbInstance *inst = 0;
-    const KeyPrefix *p = conf.getPrefix(i);
-    if(p->info.use_ndb && p->info.do_db_flush) {
-      ClusterConnectionPool *pool = conf.getConnectionPoolById(p->info.cluster_id);
+    const KeyPrefix *pfx = conf.getPrefix(i);
+    if(pfx->info.use_ndb && pfx->info.do_db_flush) {
+      ClusterConnectionPool *pool = conf.getConnectionPoolById(pfx->info.cluster_id);
       Ndb_cluster_connection *conn = pool->getMainConnection();
-      inst = new NdbInstance(conn, conf.nprefixes, 128);
-      QueryPlan *plan = inst->getPlanForPrefix(p);
-      if(plan->keyIsPrimaryKey()) {
-        /* To flush, scan the table and delete every row */
-        DEBUG_PRINT("prefix %d - deleting from %s", i, p->table->table_name);
-        scan_delete(inst, plan);      
+      NdbInstance inst(conn, 128);
+      QueryPlan plan(inst.db, pfx->table);
+      if(plan.keyIsPrimaryKey()) {
+        // To flush, scan the table and delete every row
+        DEBUG_PRINT("prefix %d - deleting from %s", i, pfx->table->table_name);
+        scan_delete(&inst, &plan);
       }
       else DEBUG_PRINT("prefix %d - not scanning table %s -- accees path "
-                       "is not primary key", i, p->table->table_name);
+                       "is not primary key", i, pfx->table->table_name);
     }
     else DEBUG_PRINT("prefix %d - not scanning table %s -- use_ndb:%d flush:%d",
-                     i, p->table ? p->table->table_name : "",
-                     p->info.use_ndb, p->info.do_db_flush);
-    if(inst) delete inst;
+                     i, pfx->table ? pfx->table->table_name : "",
+                     pfx->info.use_ndb, pfx->info.do_db_flush);
   }
   
   return ENGINE_SUCCESS;

=== modified file 'storage/ndb/memcache/src/schedulers/S_sched.cc'
--- a/storage/ndb/memcache/src/schedulers/S_sched.cc	2011-09-28 03:29:26 +0000
+++ b/storage/ndb/memcache/src/schedulers/S_sched.cc	2011-09-30 20:46:27 +0000
@@ -46,10 +46,10 @@ extern "C" {
 extern EXTENSION_LOGGER_DESCRIPTOR *logger;
 
 /* Lock that protects online reconfiguration */
-pthread_mutex_t reconf_lock = PTHREAD_MUTEX_INITIALIZER;
+pthread_rwlock_t reconf_lock = PTHREAD_RWLOCK_INITIALIZER;
 
 /* Scheduler Global singleton */
-S::SchedulerGlobal * volatile s_global;
+S::SchedulerGlobal * s_global;
 
 /* Global scheduler generation number */
 int sched_generation_number;
@@ -110,6 +110,19 @@ void S::SchedulerGlobal::init(int _nthre
 }
 
 
+void S::SchedulerGlobal::reconfigure(Configuration * new_cf) {
+  conf = new_cf;
+  generation++;
+  
+  for(int i = 0; i < nclusters ; i++) {
+    for(int j = 0; j < options.n_worker_threads; j++) {
+      WorkerConnection *wc = * (getWorkerConnectionPtr(j, i));
+      wc->reconfigure(new_cf);
+    }
+  }
+}
+
+
 void S::SchedulerGlobal::shutdown() {
   if(running) {
     logger->log(LOG_WARNING, 0, "Shutting down scheduler.");
@@ -210,10 +223,10 @@ void S::SchedulerGlobal::add_stats(const
     const char *status;
     char *gen = gen_number_buffer;
         
-    if(pthread_mutex_trylock(& reconf_lock) == 0) {
+    if(pthread_rwlock_tryrdlock(& reconf_lock) == 0) {
       status = "Running";
       snprintf(gen, 16, "%d", generation);
-      pthread_mutex_unlock(& reconf_lock);
+      pthread_rwlock_unlock(& reconf_lock);
     }
     else {
       status = "Loading";
@@ -224,12 +237,8 @@ void S::SchedulerGlobal::add_stats(const
   }
   else {
     DEBUG_PRINT(" scheduler");
-    /* Aggregate scheduler statistics, so long as reconf is not in progress */
-    if(pthread_mutex_trylock(& reconf_lock) == 0) {
-      for(int c = 0 ; c < nclusters ; c++) {
-        clusters[c]->add_stats(stat_key, add_stat, cookie);
-      }
-      pthread_mutex_unlock(& reconf_lock);
+    for(int c = 0 ; c < nclusters ; c++) {
+      clusters[c]->add_stats(stat_key, add_stat, cookie);
     }
   }
 }
@@ -261,47 +270,6 @@ void S::SchedulerWorker::shutdown() {
 }
 
 
-bool S::SchedulerWorker::global_reconfigure(Configuration *new_conf) {
-  DEBUG_ENTER();
-  bool result = false;
-
-  if(pthread_mutex_lock(& reconf_lock) == 0) {
-    sched_generation_number++;
-    SchedulerGlobal *old_global = s_global;
-    SchedulerGlobal *new_global = new SchedulerGlobal(new_conf);
-
-    /* Get the new scheduler up and running. 
-     * The primary cluster will have its reference count increased 
-     * (and be re-used).  A Cluster and its Connections are durable, 
-     * but WorkerConnections are configuration-specific and transient. 
-     */
-    new_global->init(old_global->nthreads, old_global->config_string);
-
-    /* If attach_thread() has already happened, set the pipeline: */
-    if(pipeline) {
-      new_global->engine = pipeline->engine;  
-    }
-
-    /* Set s_global.  Releasing the mutex will issue a memory barrier, 
-     * making the new value visible to other threads. 
-     */      
-    s_global = new_global;
-
-    /* We are done with the lock */
-    pthread_mutex_unlock(& reconf_lock);
-
-    /* Now shut down the old scheduler.
-     * Clusters that are unused in the new config will be gracefully shut down.
-     */
-    old_global->shutdown();
-
-    result = true;
-  }
-  
-  return result;
-}
-
-
 void S::SchedulerWorker::attach_thread(thread_identifier *parent) {
   DEBUG_ENTER();
   
@@ -316,19 +284,27 @@ void S::SchedulerWorker::attach_thread(t
 
 
 ENGINE_ERROR_CODE S::SchedulerWorker::schedule(workitem *item) {
-  int c;
+  int c = item->prefix_info.cluster_id;
   NdbInstance *inst;
+  S::WorkerConnection *wc;
+  const KeyPrefix *pfx;
+  
   DEBUG_PRINT("SchedulerWorker / config gen. %d", s_global->generation);
-  const KeyPrefix * pfx = s_global->conf->getPrefixByInfo(item->prefix_info);
 
+  /* ACQUIRE READ LOCK */
+  if(pthread_rwlock_rdlock(& reconf_lock) == 0) {
+    wc = * (s_global->getWorkerConnectionPtr(id, c));
+    pfx = s_global->conf->getPrefixByInfo(item->prefix_info);
+    pthread_rwlock_unlock(& reconf_lock);
+  }
+  else {
+    return ENGINE_TMPFAIL;
+  }
+  /* READ LOCK RELEASED */
   
   item->base.nsuffix = item->base.nkey - pfx->prefix_len;
   if(item->base.nsuffix == 0) return ENGINE_EINVAL; // key too short
   
-  c = item->prefix_info.cluster_id;
-  
-  S::WorkerConnection *wc = * (s_global->getWorkerConnectionPtr(id, c));
-      
   if(wc && wc->freelist) {
     inst = wc->freelist;
     wc->freelist = inst->next;
@@ -347,7 +323,7 @@ ENGINE_ERROR_CODE S::SchedulerWorker::sc
   workitem_set_NdbInstance(item, inst);
   
   // Fetch the query plan for this prefix.
-  item->plan = inst->getPlanForPrefix(pfx);
+  item->plan = wc->plan_set->getPlanForPrefix(pfx);
   if(! item->plan) {
     DEBUG_PRINT("getPlanForPrefix() failure");
     return ENGINE_FAILED;
@@ -410,7 +386,7 @@ void S::SchedulerWorker::io_completed(wo
     int c = item->prefix_info.cluster_id;
     S::WorkerConnection * wc = * (s_global->getWorkerConnectionPtr(id, c));
     if(wc && ! wc->sendqueue->is_aborted()) {
-      assert(inst->sched_gen_number == s_global->generation);
+      // assert(inst->sched_gen_number == s_global->generation);
       inst->wqitem = NULL;
       inst->next = wc->freelist;
       wc->freelist = inst;
@@ -424,6 +400,23 @@ void S::SchedulerWorker::io_completed(wo
 }
 
 
+/* This is a partial implementation of online reconfiguration.
+   It can replace KeyPrefix mappings, but not add a cluster at runtime 
+   (nor will it catch an attempt to do so -- which will eventually lead to
+   a crash after a getWorkerConnectionPtr()).    
+*/
+bool S::SchedulerWorker::global_reconfigure(Configuration *new_cf) {
+  bool r = false;
+  
+  if(pthread_rwlock_wrlock(& reconf_lock) == 0) {
+    s_global->reconfigure(new_cf);
+    pthread_rwlock_unlock(& reconf_lock);
+    r = true;
+  }
+  return r;
+}
+
+
 void S::SchedulerWorker::add_stats(const char *stat_key,
                                    ADD_STAT add_stat, 
                                    const void *cookie) {
@@ -531,12 +524,16 @@ S::WorkerConnection::WorkerConnection(Sc
   conn = cl->connections[id.conn];
   id.node = conn->node_id;
 
+  /* Build the plan_set and all QueryPlans */
+  old_plan_set = 0;
+  plan_set = new ConnQueryPlanSet(conn->conn, conf->nprefixes);
+  plan_set->buildSetForConfiguration(conf, cluster_id);
+
   /* Build the freelist */
   freelist = 0;
   int my_ndb_inst = conn->nInst / global->options.n_worker_threads;
   for(int j = 0 ; j < my_ndb_inst ; j++ ) {
-    NdbInstance *inst = new NdbInstance(conn->conn, conf->nprefixes, 2);
-    inst->sched_gen_number = global->generation;
+    NdbInstance *inst = new NdbInstance(conn->conn, 2);
     inst->next = freelist;
     freelist = inst;
   }
@@ -561,7 +558,7 @@ S::WorkerConnection::WorkerConnection(Sc
     // Open them all.
     for(NdbInstance *inst = freelist; inst != 0 ;inst=inst->next, i++) {
       NdbTransaction *tx;
-      plan = inst->getPlanForPrefix(prefix);
+      plan = plan_set->getPlanForPrefix(prefix);
       tx = inst->db->startTransaction();
       if(! tx) logger->log(LOG_WARNING, 0, inst->db->getNdbError().message);
       txlist[i] = tx;
@@ -575,7 +572,20 @@ S::WorkerConnection::WorkerConnection(Sc
     // Free the list.
     delete[] txlist;
   }
+}
+
+
+void S::WorkerConnection::reconfigure(Configuration *new_cf) {
+  if(old_plan_set) {  /* Garbage collect the old old plans */
+    delete old_plan_set;
+  }
+  old_plan_set = plan_set;
+  
+  ConnQueryPlanSet *new_plans = 
+    new ConnQueryPlanSet(conn->conn, new_cf->nprefixes);
+  new_plans->buildSetForConfiguration(new_cf, id.cluster);
   
+  plan_set = new_plans;
 }
 
 
@@ -592,6 +602,12 @@ S::WorkerConnection::~WorkerConnection()
     
   /* Delete the sendqueue */
   delete sendqueue;
+  
+  /* Delete the current QueryPlans (and maybe the previous ones, too) */
+  delete plan_set;
+  if(old_plan_set) {
+    delete old_plan_set;
+  }
 }  
 
 

=== modified file 'storage/ndb/memcache/src/schedulers/S_sched.h'
--- a/storage/ndb/memcache/src/schedulers/S_sched.h	2011-09-26 21:12:27 +0000
+++ b/storage/ndb/memcache/src/schedulers/S_sched.h	2011-09-30 20:46:27 +0000
@@ -32,6 +32,7 @@
 #include "ndbmemcache_config.h"
 #include "Scheduler.h"
 #include "KeyPrefix.h"
+#include "ConnQueryPlanSet.h"
 #include "Queue.h"
 
 /* 
@@ -50,7 +51,7 @@ public:
   class SchedulerWorker;     // one object per memcached worker thread 
   class Cluster;             // one object for each cluster
   class Connection;          // one object per connection to a cluster
-  class WorkerConnection;    // one object per {worker,conncetion} pair
+  class WorkerConnection;    // one object per {worker,connection} pair
 };
 
 
@@ -62,6 +63,7 @@ public:
   ~SchedulerGlobal() {};
   void init(int threads, const char *config_string);
   void add_stats(const char *, ADD_STAT, const void *);
+  void reconfigure(Configuration *);
   void shutdown();
   WorkerConnection ** getWorkerConnectionPtr(int thd, int cluster) const {
     return & workerConnections[(thd * nclusters) + cluster];
@@ -184,6 +186,7 @@ public:
   WorkerConnection(SchedulerGlobal *, int thd_id, int cluster_id);
   ~WorkerConnection();
   void shutdown();
+  void reconfigure(Configuration *);
 
   struct { 
     int thd           : 8;
@@ -192,6 +195,7 @@ public:
     unsigned int node : 8;
   } id;
   S::Connection *conn;
+  ConnQueryPlanSet *plan_set, *old_plan_set;
   NdbInstance *freelist;
   Queue<NdbInstance> * sendqueue;
 };

=== modified file 'storage/ndb/memcache/src/schedulers/Stockholm.cc'
--- a/storage/ndb/memcache/src/schedulers/Stockholm.cc	2011-09-22 18:27:10 +0000
+++ b/storage/ndb/memcache/src/schedulers/Stockholm.cc	2011-09-30 18:04:25 +0000
@@ -58,7 +58,7 @@ void Scheduler_stockholm::init(int my_th
                 c, conf.max_tps, pool->usec_rtt, cluster[c].nInst);
   }
   
-  // Get the NDB instances. 
+  // Get the ConnQueryPlanSet and NDB instances for each cluster.
   for(unsigned int c = 0 ; c < conf.nclusters ; c++) {
     cluster[c].instances = (NdbInstance**) 
       calloc(cluster[c].nInst, sizeof(NdbInstance *));
@@ -66,9 +66,12 @@ void Scheduler_stockholm::init(int my_th
     ClusterConnectionPool *pool = conf.getConnectionPoolById(c);
     Ndb_cluster_connection *conn = pool->getPooledConnection(my_thread);
 
+    cluster[c].plan_set = new ConnQueryPlanSet(conn, conf.nprefixes);
+    cluster[c].plan_set->buildSetForConfiguration(&conf, c);
+
     cluster[c].nextFree = NULL;    
     for(int i = 0; i < cluster[c].nInst ; i++) {
-      NdbInstance *inst = new NdbInstance(conn, conf.nprefixes, 1);
+      NdbInstance *inst = new NdbInstance(conn, 1);
       cluster[c].instances[i] = inst;
       inst->next = cluster[c].nextFree;
       cluster[c].nextFree = inst;
@@ -77,11 +80,11 @@ void Scheduler_stockholm::init(int my_th
     logger->log(LOG_WARNING, 0, "Pipeline %d using %u Ndb instances for Cluster %u.\n",
                 my_thread, cluster[c].nInst, c);
   }
-  
+
+
   /* Hoard a transaction (an API connect record) for each Ndb object.  This
      first call to startTransaction() will send TC_SEIZEREQ and wait for a 
      reply, but later at runtime startTransaction() should return immediately.
-     Also, for each NDB instance, pre-build the QueryPlan for the default key prefix.
      TODO? Start one tx on each data node.
   */
   QueryPlan *plan;
@@ -93,7 +96,7 @@ void Scheduler_stockholm::init(int my_th
       txlist = ( NdbTransaction **) calloc(cluster[c].nInst, sizeof(NdbTransaction *));
       // Open them all.
       for(int i = 0 ; i < cluster[c].nInst ; i++) {
-        plan = cluster[c].instances[i]->getPlanForPrefix(prefix);
+        plan = cluster[c].plan_set->getPlanForPrefix(prefix);
         txlist[i] = cluster[c].instances[i]->db->startTransaction();
       }
       // Close them all.
@@ -185,7 +188,7 @@ ENGINE_ERROR_CODE Scheduler_stockholm::s
   workitem_set_NdbInstance(newitem, inst);
   
   // Fetch the query plan for this prefix.
-  newitem->plan = inst->getPlanForPrefix(pfx);
+  newitem->plan = cluster[c].plan_set->getPlanForPrefix(pfx);
   if(! newitem->plan) return ENGINE_FAILED;
   
   // Build the NDB transaction

=== modified file 'storage/ndb/memcache/src/schedulers/Stockholm.h'
--- a/storage/ndb/memcache/src/schedulers/Stockholm.h	2011-09-12 10:05:07 +0000
+++ b/storage/ndb/memcache/src/schedulers/Stockholm.h	2011-09-30 18:04:25 +0000
@@ -31,6 +31,7 @@
 #include "workitem.h"
 #include "Scheduler.h"
 #include "KeyPrefix.h"
+#include "ConnQueryPlanSet.h"
 
 
 /* 
@@ -62,6 +63,7 @@ private:  
       uint64_t commit_thread_vtime;
     } stats;      
     pthread_t commit_thread_id;
+    ConnQueryPlanSet * plan_set;
     NdbInstance **instances;
     int nInst;
     NdbInstance *nextFree;

=== modified file 'storage/ndb/memcache/unit/all_tests.h'
--- a/storage/ndb/memcache/unit/all_tests.h	2011-09-22 05:02:00 +0000
+++ b/storage/ndb/memcache/unit/all_tests.h	2011-09-30 16:22:30 +0000
@@ -34,9 +34,9 @@
 #define REQ_NDB_CONNECTION 1
 #define REQ_DEMO_TABLE     2
 
-void delete_row(QueryPlan *plan, const char * key, int verbose);
+void delete_row(QueryPlan *plan, Ndb *db, const char * key, int verbose);
 
-typedef int TESTCASE(QueryPlan *plan, int verbose);
+typedef int TESTCASE(QueryPlan *plan, Ndb *db, int verbose);
 
 struct test_item {
   int enabled;

=== modified file 'storage/ndb/memcache/unit/alloc.cc'
--- a/storage/ndb/memcache/unit/alloc.cc	2011-09-22 18:27:10 +0000
+++ b/storage/ndb/memcache/unit/alloc.cc	2011-09-30 16:22:30 +0000
@@ -27,7 +27,7 @@
 
 #include "all_tests.h"
 
-int run_allocator_test(QueryPlan *, int v) {
+int run_allocator_test(QueryPlan *, Ndb *, int v) {
   struct request_pipeline *p = get_request_pipeline();
   
   memory_pool *p1 = pipeline_create_memory_pool(p);

=== modified file 'storage/ndb/memcache/unit/cas.cc'
--- a/storage/ndb/memcache/unit/cas.cc	2011-09-12 10:05:07 +0000
+++ b/storage/ndb/memcache/unit/cas.cc	2011-09-30 16:22:30 +0000
@@ -21,35 +21,36 @@
 
 #include "all_tests.h"
 
-int set_row(int, QueryPlan *, const char *, Uint64 , Uint64 );
+int set_row(int, QueryPlan *, Ndb *, const char *, Uint64 , Uint64 );
 void build_cas_routine(NdbInterpretedCode *, QueryPlan *plan, Uint64 *cas);
 
 
 
-int run_cas_test(QueryPlan *plan, int v) {
+int run_cas_test(QueryPlan *plan, Ndb *db, int v) {
   const Uint64 cas = 30090000000000003ULL;
   int r;
   
-  r = set_row(v, plan, "cas_unit_test_1", 0ULL, cas);      // a normal update
+  r = set_row(v, plan, db, "cas_unit_test_1", 0ULL, cas);      // a normal update
   detail(v, "(1): %d\n", r);
   require(r == 0);  
 
-  r = set_row(v, plan, "cas_unit_test_1", cas, cas + 1);   // an interpreted update
+  r = set_row(v, plan, db, "cas_unit_test_1", cas, cas + 1);   // an interpreted update
   detail(v, "(2): %d\n", r);
   require(r == 0);  
   
-  r = set_row(v, plan, "cas_unit_test_2", 0ULL, cas);      // a normal update
+  r = set_row(v, plan, db, "cas_unit_test_2", 0ULL, cas);      // a normal update
   detail(v, "(2): %d\n", r);
   require(r == 0);  
 
-  r = set_row(v, plan, "cas_unit_test_2", cas-1, cas+1);   // this should fail  
+  r = set_row(v, plan, db, "cas_unit_test_2", cas-1, cas+1);   // this should fail  
   detail(v, "(2): %d\n", r);
   require(r == 899);  
   pass;
 }
 
 
-int set_row(int v, QueryPlan *plan, const char *akey, Uint64 old_cas, Uint64 new_cas) {
+int set_row(int v, QueryPlan *plan, Ndb *db,
+            const char *akey, Uint64 old_cas, Uint64 new_cas) {
   NdbOperation::OperationOptions options;
   NdbInterpretedCode code(plan->table);
   char key[50];
@@ -76,7 +77,7 @@ int set_row(int v, QueryPlan *plan, cons
   op.setColumn(COL_STORE_VALUE, value, strlen(value));
   op.setColumnBigUnsigned(COL_STORE_CAS, new_cas);
   
-  NdbTransaction *tx = op.startTransaction();
+  NdbTransaction *tx = op.startTransaction(db);
   
   if(old_cas) {
     /* This is an interpreted update */

=== modified file 'storage/ndb/memcache/unit/casbits.cc'
--- a/storage/ndb/memcache/unit/casbits.cc	2011-09-22 18:27:10 +0000
+++ b/storage/ndb/memcache/unit/casbits.cc	2011-09-30 16:22:30 +0000
@@ -31,7 +31,7 @@ ndbmc_atomic32_t engine_cas_lo = 0xb0000
 void worker_set_cas(int verbose, Uint64 *cas);
 
 
-int test_cas_bitshifts(QueryPlan *, int v) {
+int test_cas_bitshifts(QueryPlan *, Ndb *, int v) {
   Uint64 cas = 0ULL;
   worker_set_cas(v, &cas); 
   require(cas == 0x00717530B0000065ULL);

=== modified file 'storage/ndb/memcache/unit/connpool.cc'
--- a/storage/ndb/memcache/unit/connpool.cc	2011-09-12 10:05:07 +0000
+++ b/storage/ndb/memcache/unit/connpool.cc	2011-09-30 16:22:30 +0000
@@ -23,10 +23,10 @@
 #include "all_tests.h"
 
 
-int run_pool_test(QueryPlan *plan, int v) {
+int run_pool_test(QueryPlan *plan, Ndb *db1, int v) {
   int conn_retries = 0;
   int r;
-  Ndb_cluster_connection &main_conn = plan->db->get_ndb_cluster_connection();  
+  Ndb_cluster_connection &main_conn = db1->get_ndb_cluster_connection();  
 
   /* The pooled connection */  
   Ndb_cluster_connection * nc = new Ndb_cluster_connection(connect_string, & main_conn);
@@ -75,13 +75,13 @@ int run_pool_test(QueryPlan *plan, int v
          nc->node_id(), nc->get_connected_host(), nc->get_connected_port());  
   require(nc->node_id());
 
-  Ndb *db = new Ndb(nc);
-  require(db);
+  Ndb *db2 = new Ndb(nc);
+  require(db2);
   detail(v, "Created an Ndb object.\n");
   
-  db->init(4);
+  db2->init(4);
 
-  NdbTransaction *tx = db->startTransaction();
+  NdbTransaction *tx = db2->startTransaction();
   require(tx);
   detail(v, "Started a transaction.\n");
   

=== modified file 'storage/ndb/memcache/unit/harness.cc'
--- a/storage/ndb/memcache/unit/harness.cc	2011-09-12 10:05:07 +0000
+++ b/storage/ndb/memcache/unit/harness.cc	2011-09-30 16:22:30 +0000
@@ -113,7 +113,7 @@ int main(int argc, char *argv[]) {
 
   if(test_number >= 0) {   /* Run a particular test */
     printf("%s\n", all_tests[test_number].name);
-    int r = all_tests[test_number].function(plan, 1);  //verbose
+    int r = all_tests[test_number].function(plan, db, 1);  //verbose
     if(r) {
       printf(" [FAIL] at line %d\n", r); nfail++;
     } else {
@@ -124,7 +124,7 @@ int main(int argc, char *argv[]) {
     for(int i = 0; all_tests[i].name; i++) {
       if(all_tests[i].enabled) {
         printf("%-30s", all_tests[i].name);
-        int r = all_tests[i].function(plan, 0);   // quiet
+        int r = all_tests[i].function(plan, db, 0);   // quiet
         printf(" %s\n", r ? "[FAIL]" : "[PASS]");
         if(r) nfail++; 
         else npass++;
@@ -200,7 +200,7 @@ Ndb_cluster_connection * connect(const c
 }
 
 
-void delete_row(QueryPlan *plan, const char * key, int verbose) {
+void delete_row(QueryPlan *plan, Ndb *db, const char * key, int verbose) {
   char ndbkeybuffer[300];
     
   Operation op(plan, OP_DELETE, ndbkeybuffer);  
@@ -208,7 +208,7 @@ void delete_row(QueryPlan *plan, const c
   op.clearKeyNullBits();
   op.setKeyPart(COL_STORE_KEY, key, strlen(key));
   
-  NdbTransaction * tx = op.startTransaction();
+  NdbTransaction * tx = op.startTransaction(db);
   op.deleteTuple(tx);
   tx->execute(NdbTransaction::Commit);  
 

=== modified file 'storage/ndb/memcache/unit/incr.cc'
--- a/storage/ndb/memcache/unit/incr.cc	2011-09-22 18:27:10 +0000
+++ b/storage/ndb/memcache/unit/incr.cc	2011-09-30 16:22:30 +0000
@@ -19,47 +19,48 @@
  */
 #include "all_tests.h"
 
-int do_incr(int v, QueryPlan *plan, const char *akey, bool, bool, Uint64 *val);
+int do_incr(int v, QueryPlan *plan, Ndb *db, 
+            const char *akey, bool, bool, Uint64 *val);
 
-int run_incr_test(QueryPlan *plan, int v) {
+int run_incr_test(QueryPlan *plan, Ndb *db, int v) {
   int r = 0;
   Uint64 val = 33;
   
-  delete_row(plan, "incr_unit_test_1", v);
-  delete_row(plan, "incr_unit_test_2", v);
+  delete_row(plan, db, "incr_unit_test_1", v);
+  delete_row(plan, db, "incr_unit_test_2", v);
   
   detail(v, "Test 1: INCR non-existing row, create=false\n");
-  r = do_incr(v, plan, "incr_unit_test_1", false, true, &val);
+  r = do_incr(v, plan, db, "incr_unit_test_1", false, true, &val);
   detail(v, "Result - NDB=%d Val=%llu \n\n", r, (long long unsigned) val);
   require(r == 626);
   require(val == 33);
 
   detail(v, "Test 2: INCR non-existing row, create=true, update = false\n");
-  r = do_incr(v, plan, "incr_unit_test_1", true, false, &val);
+  r = do_incr(v, plan, db, "incr_unit_test_1", true, false, &val);
   detail(v, "Result - NDB=%d Val=%llu \n\n", r, val);
   require(r == 626); // the transaction gets a 626 even if the insert succeeds
   require(val == -1ULL);
 
   detail(v, "test 3: READ row created in test 2\n");
-  r = do_incr(v, plan, "incr_unit_test_1", false, false, &val);
+  r = do_incr(v, plan, db, "incr_unit_test_1", false, false, &val);
   detail(v, "Result - NDB=%d Val=%llu \n\n", r, (long long unsigned) val);
   require(r == 0);
   require(val == -1ULL);
 
   detail(v, "Test 4: INCR non-existing row, create=true\n");
-  r = do_incr(v, plan, "incr_unit_test_2", true, true, &val);
+  r = do_incr(v, plan, db, "incr_unit_test_2", true, true, &val);
   detail(v, "Result - NDB=%d Val=%llu \n\n", r, (long long unsigned) val);
   require(r == 626);
   require(val == 0);
   
   detail(v, "Test 5: INCR existing row, create=false\n");
-  r = do_incr(v, plan, "incr_unit_test_2", false, true, &val);
+  r = do_incr(v, plan, db, "incr_unit_test_2", false, true, &val);
   detail(v, "Result - NDB=%d Val=%llu \n\n", r, (long long unsigned) val);
   require(r == 0);
   require(val == 1);  
 
   detail(v, "Test 6: INCR existing row, create=true\n");
-  r = do_incr(v, plan, "incr_unit_test_2", true, true, &val);
+  r = do_incr(v, plan, db, "incr_unit_test_2", true, true, &val);
   detail(v, "Result - NDB=%d Val=%llu \n\n", r, (long long unsigned) val);
   require(r == 630);   // the insert failed but the update succeeded
   require(val == 2);
@@ -68,7 +69,7 @@ int run_incr_test(QueryPlan *plan, int v
 }
 
 
-int do_incr(int v, QueryPlan *plan, const char *akey, 
+int do_incr(int v, QueryPlan *plan, Ndb *db, const char *akey, 
             bool create, bool update, Uint64 *val) {
   int r;
   char key[50];
@@ -96,9 +97,9 @@ int do_incr(int v, QueryPlan *plan, cons
   op.setColumn(COL_STORE_KEY, key, strlen(key));
   op.setColumnBigUnsigned(COL_STORE_CAS, 0ULL);
 
-  NdbTransaction *tx = op.startTransaction();
+  NdbTransaction *tx = op.startTransaction(db);
   if(! tx) {
-    int r =  plan->db->RESULT;
+    int r =  db->RESULT;
     detail(v, " get tx: %d \n", r);
     return r;
   }

=== modified file 'storage/ndb/memcache/unit/queue.cc'
--- a/storage/ndb/memcache/unit/queue.cc	2011-09-12 10:05:07 +0000
+++ b/storage/ndb/memcache/unit/queue.cc	2011-09-30 16:22:30 +0000
@@ -41,7 +41,7 @@ q_test_obj * get(Queue<q_test_obj> &q, i
 const int n_loop_items = 50000;
 
 
-int run_queue_test(QueryPlan *, int v) {
+int run_queue_test(QueryPlan *, Ndb *, int v) {
   Queue<q_test_obj> q(n_loop_items);
   
   /* nothing there */

=== modified file 'storage/ndb/memcache/unit/tsv.cc'
--- a/storage/ndb/memcache/unit/tsv.cc	2011-09-12 10:05:07 +0000
+++ b/storage/ndb/memcache/unit/tsv.cc	2011-09-30 16:22:30 +0000
@@ -25,7 +25,7 @@
 #include "all_tests.h"
 
 
-int run_tsv_test(QueryPlan *, int v) {
+int run_tsv_test(QueryPlan *, Ndb *, int v) {
   {
     TabSeparatedValues t1("frodo.xxx", 4, 5);
     require(t1.getLength() == 5);

No bundle (reason: useless for push emails).
Thread
bzr push into mysql-5.5-cluster branch (john.duncan:3571 to 3578) John David Duncan2 Oct