List:Commits« Previous MessageNext Message »
From:Pekka Nousiainen Date:August 31 2011 9:09am
Subject:bzr push into mysql-5.1-telco-7.0 branch (pekka.nousiainen:4471 to 4490)
View as plain text  
 4490 jonas oreland	2011-08-30
      ndb - fix windows problems in mt_thr_config

    modified:
      storage/ndb/src/kernel/vm/mt_thr_config.cpp
 4489 jonas oreland	2011-08-30
      ndb - change to use THRConfig for thread-config (not yet cpu-bindings)

    modified:
      storage/ndb/src/kernel/ndbd.cpp
      storage/ndb/src/kernel/vm/CMakeLists.txt
      storage/ndb/src/kernel/vm/Configuration.cpp
      storage/ndb/src/kernel/vm/Configuration.hpp
      storage/ndb/src/kernel/vm/Makefile.am
      storage/ndb/src/kernel/vm/mt_thr_config.cpp
      storage/ndb/src/kernel/vm/mt_thr_config.hpp
 4488 jonas oreland	2011-08-30
      ndb - setupConfiguration prior to get_multithreaded_config (prep for THRConfig usage)

    modified:
      storage/ndb/src/kernel/ndbd.cpp
 4487 jonas oreland	2011-08-30
      ndb - remove hard-coded block-juggling when selecting which block to send STOP_FOR_CRASH to...Just use first in block-list (that now exists)

    modified:
      storage/ndb/src/kernel/vm/mt.cpp
 4486 jonas oreland	2011-08-30
      ndb - add new config parameter for unified thread-config

    modified:
      storage/ndb/include/mgmapi/mgmapi_config_parameters.h
 4485 Mauritz Sundell	2011-08-28
      Regenerated (again) test result fro ndb_statistics1 with notes.
      
      Wrong result committed - stupid me.
      
      Three query plans changed but is equally good as the previous.
      A test related to bug #59519 seems broken even before the
      previus commit. The query plan should have >1 row in last join.
      
      In the same query the index for (I,J) should be choosen but
      in this case the resulting index (J,K) is equally good.

    modified:
      mysql-test/suite/ndb/r/ndb_statistics1.result
 4484 Mauritz Sundell	2011-08-28
      Regenerated test result fro ndb_statistics1 with notes.
      
      Three query plans changed but is equally good as the previous.
      A test related to bug #59519 seems broken even before the
      previus commit. The query plan should have >1 row in last join.
      
      In the same query the index for (I,J) should be choosen but
      in this case the resulting index (J,K) is equally good.

    modified:
      mysql-test/suite/ndb/r/ndb_statistics1.result
 4483 Mauritz Sundell	2011-08-28
      ndb - index statistics for partial keys of unique/primary ordered index

    modified:
      mysql-test/suite/ndb/r/ndb_index_stat.result
      mysql-test/suite/ndb/t/ndb_index_stat.test
      sql/ha_ndbcluster.cc
 4482 jonas oreland	2011-08-27
      ndb - still memory leak in binlog code if using more than 128 columns...

    modified:
      sql/ha_ndbcluster_binlog.cc
 4481 jonas oreland	2011-08-27
      ndb - still memory leak in error case in ha_ndbinfo::open

    modified:
      sql/ha_ndbinfo.cc
 4480 jonas oreland	2011-08-27
      ndbmtd - remove usage of g_map_instance
        and instead create duplicate thr_map instances when needed.

    modified:
      storage/ndb/src/kernel/SimBlockList.cpp
      storage/ndb/src/kernel/vm/SimulatedBlock.cpp
      storage/ndb/src/kernel/vm/SimulatedBlock.hpp
      storage/ndb/src/kernel/vm/dummy_nonmt.cpp
      storage/ndb/src/kernel/vm/mt.cpp
      storage/ndb/src/kernel/vm/mt.hpp
 4479 Jonas Oreland	2011-08-26
      ndb - move thread configuration out into own class.
        currently it's in ndbd.cpp and mt.cpp
        new class implements old behaviour as well as method suggested on dev-ndb
          (which everyone seems to be happy with...that has spoken)
      
        new class is intended to be used by both ndbmtd and ndb_mgmd
          so that thread-configuration can be verified by ndb_mgmd
          when reading config.ini (currently it's just ignored in ndbmtd
          if it's incorrect)
      
        This patch just introduces class, including tap-test
          but does not use it in neither ndbmtd nor ndb_mgmd...
          that is next patch(es)

    added:
      storage/ndb/src/kernel/vm/mt_thr_config.cpp
      storage/ndb/src/kernel/vm/mt_thr_config.hpp
    modified:
      storage/ndb/src/kernel/vm/Makefile.am
 4478 Jonas Oreland	2011-08-26
      ndb - add handy methods to SpareBitmask (equal, overlaps, str, getBitNo)

    modified:
      storage/ndb/include/util/SparseBitmask.hpp
 4477 Jonas Oreland	2011-08-25
      ndb - remove some obfsuscation in Configuration.cpp wrt lock-to-cpu handling

    modified:
      storage/ndb/src/kernel/vm/Configuration.cpp
      storage/ndb/src/kernel/vm/Configuration.hpp
 4476 Jonas Oreland	2011-08-25
      ndb - set ndb_index_stat_enable=TRUE in >= 7.2.0

    modified:
      sql/ha_ndbcluster.cc
 4475 Ole John Aske	2011-08-22
      Fix for SPJ regression introduced by revno4456 to branch 'mysql-5.1-telco-7.0'
      
      My previous 'correlation.patch' stores a TupleCorrelation (Uint32) together
      with each received row directly in the NdbReceiver buffer.
      
      This causes problem when there are RecAttr's in the received data.
      NdbReceiver has an assumption that everything not being a NdbRecord
      are NdbRecAttr in this buffer. It therefore try to parse this TupleCorrelation
      as a NdbRecAttr which will fail.
      
      Instead of storing the TupleCorrelations together with the row, we now
      allocate a separate array of TupleCorrelations as part of each NdbResultSet
      where the TupleCorrelations are stored.

    modified:
      storage/ndb/src/ndbapi/NdbQueryOperation.cpp
 4474 Ole John Aske	2011-08-22
      Fixed SPJ regression:
      
      After push of 'revno: 4448' to branch 'mysql-5.1-telco-7.0', the ndbapi test 
      'testSpj' started failing.
      
      This is a fix for the first of these problems ('testSpj -n scanJoin' still fails)
      which reverts part of the changes in revno 4448.
      
      The problem fixed - or rather compensated for - is that retrieving ATTR values
      from the receive buffer has the underlying assumption that the NdbReceiver
      refer the next row (or end of the current).
      
      Therefor this fix increment the 'current' row by using ::get_row instead of ::peek_row
      and remove a verifySortOrder from a place where the current row has been advanced, and
      before the fragment has been reorganized(). 

    modified:
      storage/ndb/src/ndbapi/NdbQueryOperation.cpp
 4473 Jan Wedvik	2011-08-22
      This commit fixes (hopefully) compiler warnings in previous commit.

    modified:
      storage/ndb/src/kernel/blocks/dbspj/DbspjMain.cpp
 4472 Jan Wedvik	2011-08-22
      This commit implements adaptive parallelism for (non-root) scan operations
      (i.e. index scans). So far, such scans have always been executed with
      maximal parallelism, such that each fragment were scanned in parallel.
      This also meant that the available batch size would have to be divided by the
      number of fragments. For index scans with large result sets, this was 
      inefficient, since it meant asking for many small batches.
      
      As an example, assume that a table scan of t1 is followed by an index scan on 
      t2 where the index has poor selectivity. The query is then effectively a cross
      join. When such queries where tested on distributed clusters with 
      multi-threaded ndbd, the queries would typically run five to six times slower 
      than with query pushdown disabled.
      
      This commit will therefore try to set an optimal parallelism, depending on the
      size of the scan result set. This eliminates the performance regression in 
      the test cases described above.
      
      This works as follows:
      
      * The first time an index scan is started, it starts with parallelism=1 for 
      the first batch.
      
      * For the subsequent batches, the SPJ block will try to set the parallelism 
      such that all rows from a single fragment will fit in one batch. If this
      is not possible, parallelism will remain 1.
      
      * Whenever the index scan is started again (typically after receiving a new
      batch for its parent), the SPJ block will try to guess the optimal parallelism
      based on statistics from previous runs. 'Optimal' means that one can expect
      all rows from a given fragment to be retrieved in one batch if possible.
      
      See also quilt sereies published on 2011-08-18.

    modified:
      storage/ndb/src/kernel/blocks/dbspj/Dbspj.hpp
      storage/ndb/src/kernel/blocks/dbspj/DbspjMain.cpp
      storage/ndb/src/ndbapi/NdbQueryOperation.cpp
      storage/ndb/src/ndbapi/NdbQueryOperation.hpp
      storage/ndb/src/ndbapi/NdbQueryOperationImpl.hpp
 4471 jonas oreland	2011-08-19 [merge]
      ndb - merge 63 to 70

    modified:
      storage/ndb/src/kernel/blocks/dbdih/DbdihMain.cpp
      storage/ndb/src/kernel/blocks/qmgr/QmgrMain.cpp
=== modified file 'mysql-test/suite/ndb/r/ndb_index_stat.result'
--- a/mysql-test/suite/ndb/r/ndb_index_stat.result	2011-07-23 14:38:08 +0000
+++ b/mysql-test/suite/ndb/r/ndb_index_stat.result	2011-08-28 13:29:22 +0000
@@ -475,6 +475,18 @@ select count(*) from t1 where f > '222';
 count(*)
 1
 drop table t1;
+create table t1 (a1 int, b1 int, primary key(b1), key(a1)) engine=ndbcluster partition by key() partitions 1;
+create table t2 (b2 int, c2 int, primary key(b2,c2)) engine=ndbcluster partition by key() partitions 1;
+# table t1 is only for forcing record by key count for table t2 that should be near 50 (not 1)
+analyze table t1, t2;
+Table	Op	Msg_type	Msg_text
+test.t1	analyze	status	OK
+test.t2	analyze	status	OK
+explain select * from t1, t2 where b2 = b1 and a1 = 1;
+id	select_type	table	type	possible_keys	key	key_len	ref	rows	Extra
+1	SIMPLE	t1	ref	PRIMARY,a1	a1	5	const	2	#
+1	SIMPLE	t2	ref	PRIMARY	PRIMARY	4	test.t1.b1	50	#
+drop table t1, t2;
 set @is_enable = @is_enable_default;
 set @is_enable = NULL;
 # is_enable_on=0 is_enable_off=1

=== modified file 'mysql-test/suite/ndb/r/ndb_statistics1.result'
--- a/mysql-test/suite/ndb/r/ndb_statistics1.result	2011-07-25 17:11:41 +0000
+++ b/mysql-test/suite/ndb/r/ndb_statistics1.result	2011-08-28 18:21:59 +0000
@@ -77,7 +77,7 @@ SELECT * FROM t10000 AS X JOIN t10000 AS
 ON Y.I=X.I AND Y.J = X.I;
 id	select_type	table	type	possible_keys	key	key_len	ref	rows	Extra
 1	SIMPLE	X	ALL	I	NULL	NULL	NULL	10000	
-1	SIMPLE	Y	ref	J,I	I	10	test.X.I,test.X.I	1	Using where
+1	SIMPLE	Y	ref	J,I	J	5	test.X.I	1	Using where
 EXPLAIN
 SELECT * FROM t100 WHERE k < 42;
 id	select_type	table	type	possible_keys	key	key_len	ref	rows	Extra
@@ -145,11 +145,11 @@ id	select_type	table	type	possible_keys
 EXPLAIN
 SELECT * FROM t10000 WHERE J = 0 AND K < 1;
 id	select_type	table	type	possible_keys	key	key_len	ref	rows	Extra
-1	SIMPLE	t10000	range	PRIMARY,J	PRIMARY	4	NULL	2	Using where with pushed condition
+1	SIMPLE	t10000	ref	PRIMARY,J	J	5	const	2	Using where with pushed condition
 EXPLAIN
 SELECT * FROM t10000 WHERE J = 0 AND K BETWEEN 1 AND 10;
 id	select_type	table	type	possible_keys	key	key_len	ref	rows	Extra
-1	SIMPLE	t10000	range	PRIMARY,J	PRIMARY	4	NULL	2	Using where with pushed condition
+1	SIMPLE	t10000	ref	PRIMARY,J	J	5	const	2	Using where with pushed condition
 EXPLAIN
 SELECT * FROM t10000 WHERE J = 0 AND K = 1;
 id	select_type	table	type	possible_keys	key	key_len	ref	rows	Extra

=== modified file 'mysql-test/suite/ndb/t/ndb_index_stat.test'
--- a/mysql-test/suite/ndb/t/ndb_index_stat.test	2011-07-23 14:35:37 +0000
+++ b/mysql-test/suite/ndb/t/ndb_index_stat.test	2011-08-28 13:29:22 +0000
@@ -309,5 +309,30 @@ while ($i)
 }
 drop table t1;
 
+#
+# Check estimates of records per key for partial keys using unique/primary ordered index
+#
+
+create table t1 (a1 int, b1 int, primary key(b1), key(a1)) engine=ndbcluster partition by key() partitions 1;
+create table t2 (b2 int, c2 int, primary key(b2,c2)) engine=ndbcluster partition by key() partitions 1;
+
+--disable_query_log
+let $i = 100;
+while ($i)
+{
+  eval insert into t1 (a1,b1) values ($i,$i);
+  eval insert into t2 (b2,c2) values ($i mod 2, $i div 2);
+  dec $i;
+}
+--enable_query_log
+
+--echo # table t1 is only for forcing record by key count for table t2 that should be near 50 (not 1)
+analyze table t1, t2;
+# Hide Extra column
+--replace_column 10 #
+explain select * from t1, t2 where b2 = b1 and a1 = 1;
+
+drop table t1, t2;
+
 set @is_enable = @is_enable_default;
 source ndb_index_stat_enable.inc;

=== modified file 'sql/ha_ndbcluster.cc'
--- a/sql/ha_ndbcluster.cc	2011-07-23 14:38:08 +0000
+++ b/sql/ha_ndbcluster.cc	2011-08-28 13:29:22 +0000
@@ -164,6 +164,11 @@ static MYSQL_THDVAR_ULONG(
   0                                  /* block */
 );
 
+#if NDB_VERSION_D < NDB_MAKE_VERSION(7,2,0)
+#define DEFAULT_NDB_INDEX_STAT_ENABLE FALSE
+#else
+#define DEFAULT_NDB_INDEX_STAT_ENABLE TRUE
+#endif
 
 static MYSQL_THDVAR_BOOL(
   index_stat_enable,                 /* name */
@@ -171,7 +176,7 @@ static MYSQL_THDVAR_BOOL(
   "Use ndb index statistics in query optimization.",
   NULL,                              /* check func. */
   NULL,                              /* update func. */
-  FALSE                              /* default */
+  DEFAULT_NDB_INDEX_STAT_ENABLE      /* default */
 );
 
 
@@ -1353,19 +1358,22 @@ void ha_ndbcluster::set_rec_per_key()
   */
   for (uint i=0 ; i < table_share->keys ; i++)
   {
+    bool is_unique_index= false;
     KEY* key_info= table->key_info + i;
     switch (get_index_type(i))
     {
-    case UNIQUE_ORDERED_INDEX:
-    case PRIMARY_KEY_ORDERED_INDEX:
     case UNIQUE_INDEX:
     case PRIMARY_KEY_INDEX:
     {
       // Index is unique when all 'key_parts' are specified,
       // else distribution is unknown and not specified here.
-      key_info->rec_per_key[key_info->key_parts-1]= 1;
+      is_unique_index= true;
       break;
     }
+    case UNIQUE_ORDERED_INDEX:
+    case PRIMARY_KEY_ORDERED_INDEX:
+      is_unique_index= true;
+      // intentional fall thru to logic for ordered index
     case ORDERED_INDEX:
       // 'Records pr. key' are unknown for non-unique indexes.
       // (May change when we get better index statistics.)
@@ -1395,6 +1403,11 @@ void ha_ndbcluster::set_rec_per_key()
     default:
       DBUG_ASSERT(false);
     }
+    // set rows per key to 1 for complete key given for unique/primary index
+    if (is_unique_index)
+    {
+      key_info->rec_per_key[key_info->key_parts-1]= 1;
+    }
   }
   DBUG_VOID_RETURN;
 }

=== modified file 'sql/ha_ndbcluster_binlog.cc'
--- a/sql/ha_ndbcluster_binlog.cc	2011-07-08 12:28:37 +0000
+++ b/sql/ha_ndbcluster_binlog.cc	2011-08-27 12:12:27 +0000
@@ -6299,8 +6299,8 @@ ndb_binlog_thread_handle_data_event(Ndb
   MY_BITMAP b;
   /* Potential buffer for the bitmap */
   uint32 bitbuf[128 / (sizeof(uint32) * 8)];
-  bitmap_init(&b, n_fields <= sizeof(bitbuf) * 8 ? bitbuf : NULL, 
-              n_fields, FALSE);
+  const bool own_buffer = n_fields <= sizeof(bitbuf) * 8;
+  bitmap_init(&b, own_buffer ? bitbuf : NULL, n_fields, FALSE); 
   bitmap_set_all(&b);
 
   /*
@@ -6463,6 +6463,11 @@ ndb_binlog_thread_handle_data_event(Ndb
     my_free(blobs_buffer[1], MYF(MY_ALLOW_ZERO_PTR));
   }
 
+  if (!own_buffer)
+  {
+    bitmap_free(&b);
+  }
+
   return 0;
 }
 

=== modified file 'sql/ha_ndbinfo.cc'
--- a/sql/ha_ndbinfo.cc	2011-06-30 15:59:25 +0000
+++ b/sql/ha_ndbinfo.cc	2011-08-27 09:54:26 +0000
@@ -367,6 +367,7 @@ int ha_ndbinfo::open(const char *name, i
   int err = g_ndbinfo->openTable(name, &m_impl.m_table);
   if (err)
   {
+    assert(m_impl.m_table == 0);
     if (err == NdbInfo::ERR_NoSuchTable)
       DBUG_RETURN(HA_ERR_NO_SUCH_TABLE);
     DBUG_RETURN(err2mysql(err));
@@ -390,6 +391,7 @@ int ha_ndbinfo::open(const char *name, i
       warn_incompatible(ndb_tab, true,
                         "column '%s' is NOT NULL",
                         field->field_name);
+      delete m_impl.m_table; m_impl.m_table= 0;
       DBUG_RETURN(ERR_INCOMPAT_TABLE_DEF);
     }
 
@@ -427,6 +429,7 @@ int ha_ndbinfo::open(const char *name, i
       warn_incompatible(ndb_tab, true,
                         "column '%s' is not compatible",
                         field->field_name);
+      delete m_impl.m_table; m_impl.m_table= 0;
       DBUG_RETURN(ERR_INCOMPAT_TABLE_DEF);
     }
   }

=== modified file 'storage/ndb/include/mgmapi/mgmapi_config_parameters.h'
--- a/storage/ndb/include/mgmapi/mgmapi_config_parameters.h	2011-07-04 13:37:56 +0000
+++ b/storage/ndb/include/mgmapi/mgmapi_config_parameters.h	2011-08-30 09:40:52 +0000
@@ -195,6 +195,7 @@
 #define CFG_DB_INDEX_STAT_UPDATE_DELAY   626
 
 #define CFG_DB_MAX_DML_OPERATIONS_PER_TRANSACTION 627
+#define CFG_DB_MT_THREAD_CONFIG          628
 
 #define CFG_NODE_ARBIT_RANK           200
 #define CFG_NODE_ARBIT_DELAY          201

=== modified file 'storage/ndb/include/util/SparseBitmask.hpp'
--- a/storage/ndb/include/util/SparseBitmask.hpp	2010-08-28 09:37:09 +0000
+++ b/storage/ndb/include/util/SparseBitmask.hpp	2011-08-26 09:20:08 +0000
@@ -20,6 +20,7 @@
 
 #include <ndb_global.h>
 #include <util/Vector.hpp>
+#include <util/BaseString.hpp>
 
 class SparseBitmask {
   unsigned m_max_size;
@@ -102,6 +103,11 @@ public:
 
   bool isclear() const { return count() == 0; }
 
+  unsigned getBitNo(unsigned n) const {
+    assert(n < m_vec.size());
+    return m_vec[n];
+  }
+
   void print(void) const {
     for (unsigned i = 0; i < m_vec.size(); i++)
     {
@@ -110,6 +116,38 @@ public:
     }
   }
 
+  bool equal(const SparseBitmask& obj) const {
+    if (obj.count() != count())
+      return false;
+
+    for (unsigned i = 0; i<count(); i++)
+      if (!obj.get(m_vec[i]))
+        return false;
+
+    return true;
+  }
+
+  bool overlaps(const SparseBitmask& obj) const {
+    for (unsigned i = 0; i<count(); i++)
+      if (!obj.get(m_vec[i]))
+        return true;
+
+    for (unsigned i = 0; i<obj.count(); i++)
+      if (!get(obj.getBitNo(i)))
+        return true;
+    return false;
+  }
+
+  BaseString str() const {
+    BaseString tmp;
+    const char* sep="";
+    for (unsigned i = 0; i<m_vec.size(); i++)
+    {
+      tmp.appfmt("%s%u", sep, m_vec[i]);
+      sep=",";
+    }
+    return tmp;
+  }
 };
 
 #endif

=== modified file 'storage/ndb/src/kernel/SimBlockList.cpp'
--- a/storage/ndb/src/kernel/SimBlockList.cpp	2011-06-30 15:59:25 +0000
+++ b/storage/ndb/src/kernel/SimBlockList.cpp	2011-08-27 06:06:02 +0000
@@ -160,6 +160,7 @@ SimBlockList::load(EmulatorData& data){
       for (int i = 0; i < noOfBlocks; i++)
         theList[i]->loadWorkers();
     }
+    finalize_thr_map();
   }
 
   // Check that all blocks could be created

=== modified file 'storage/ndb/src/kernel/blocks/dbspj/Dbspj.hpp'
--- a/storage/ndb/src/kernel/blocks/dbspj/Dbspj.hpp	2011-07-04 13:37:56 +0000
+++ b/storage/ndb/src/kernel/blocks/dbspj/Dbspj.hpp	2011-08-22 08:35:35 +0000
@@ -530,21 +530,87 @@ public:
   typedef SLFifoListImpl<ScanFragHandle_pool, ScanFragHandle> ScanFragHandle_list;
   typedef LocalSLFifoListImpl<ScanFragHandle_pool, ScanFragHandle> Local_ScanFragHandle_list;
 
+  /**
+   * This class computes mean and standard deviation incrementally for a series
+   * of samples.
+   */
+  class IncrementalStatistics
+  {
+  public:
+    /**
+     * We cannot have a (non-trivial) constructor, since this class is used in
+     * unions.
+     */
+    void init()
+    {
+      m_mean = m_sumSquare = 0.0;
+      m_noOfSamples = 0;
+    }
+
+    // Add another sample.
+    void update(double sample);
+
+    double getMean() const { return m_mean; }
+
+    double getStdDev() const { 
+      return m_noOfSamples < 2 ? 0.0 : sqrt(m_sumSquare/(m_noOfSamples - 1));
+    }
+
+  private:
+    // Mean of all samples
+    double m_mean;
+    //Sum of square of differences from the current mean.
+    double m_sumSquare;
+    Uint32 m_noOfSamples;
+  }; // IncrementalStatistics
+
   struct ScanIndexData
   {
     Uint16 m_frags_complete;
     Uint16 m_frags_outstanding;
+    /**
+     * The number of fragment for which we have not yet sent SCAN_FRAGREQ but
+     * will eventually do so.
+     */
+    Uint16 m_frags_not_started;
     Uint32 m_rows_received;  // #execTRANSID_AI
     Uint32 m_rows_expecting; // Sum(ScanFragConf)
     Uint32 m_batch_chunks;   // #SCAN_FRAGREQ + #SCAN_NEXTREQ to retrieve batch
     Uint32 m_scanCookie;
     Uint32 m_fragCount;
+    // The number of fragments that we scan in parallel.
+    Uint32 m_parallelism;
+    /**
+     * True if this is the first instantiation of this operation. A child
+     * operation will be instantiated once for each batch of its parent.
+     */
+    bool m_firstExecution;
+    /**
+     * Mean and standard deviation for the optimal parallelism for earlier
+     * executions of this operation.
+     */
+    IncrementalStatistics m_parallelismStat;
+    // Total number of rows for the current execution of this operation.
+    Uint32 m_totalRows;
+    // Total number of bytes for the current execution of this operation.
+    Uint32 m_totalBytes;
+
     ScanFragHandle_list::HeadPOD m_fragments; // ScanFrag states
     union
     {
       PatternStore::HeadPOD m_prunePattern;
       Uint32 m_constPrunePtrI;
     };
+    /**
+     * Max number of rows seen in a batch. Used for calculating the number of
+     * rows per fragment in the next next batch when using adaptive batch size.
+     */
+    Uint32 m_largestBatchRows;
+    /**
+     * Max number of bytes seen in a batch. Used for calculating the number of
+     * rows per fragment in the next next batch when using adaptive batch size.
+     */
+    Uint32 m_largestBatchBytes;
     Uint32 m_scanFragReq[ScanFragReq::SignalLength + 2];
   };
 
@@ -1164,6 +1230,13 @@ private:
   void scanIndex_parent_row(Signal*,Ptr<Request>,Ptr<TreeNode>, const RowPtr&);
   void scanIndex_fixupBound(Ptr<ScanFragHandle> fragPtr, Uint32 ptrI, Uint32);
   void scanIndex_send(Signal*,Ptr<Request>,Ptr<TreeNode>);
+  void scanIndex_send(Signal* signal,
+                      Ptr<Request> requestPtr,
+                      Ptr<TreeNode> treeNodePtr,
+                      Uint32 noOfFrags,
+                      Uint32 bs_bytes,
+                      Uint32 bs_rows,
+                      Uint32& batchRange);
   void scanIndex_batchComplete(Signal* signal);
   Uint32 scanIndex_findFrag(Local_ScanFragHandle_list &, Ptr<ScanFragHandle>&,
                             Uint32 fragId);

=== modified file 'storage/ndb/src/kernel/blocks/dbspj/DbspjMain.cpp'
--- a/storage/ndb/src/kernel/blocks/dbspj/DbspjMain.cpp	2011-07-04 13:37:56 +0000
+++ b/storage/ndb/src/kernel/blocks/dbspj/DbspjMain.cpp	2011-08-22 11:50:41 +0000
@@ -4450,6 +4450,9 @@ Dbspj::parseScanIndex(Build_context& ctx
     data.m_fragments.init();
     data.m_frags_outstanding = 0;
     data.m_frags_complete = 0;
+    data.m_frags_not_started = 0;
+    data.m_parallelismStat.init();
+    data.m_firstExecution = true;
     data.m_batch_chunks = 0;
 
     err = parseDA(ctx, requestPtr, treeNodePtr,
@@ -5002,6 +5005,7 @@ Dbspj::scanIndex_parent_batch_complete(S
       }
     }
   }
+  data.m_frags_not_started = data.m_fragCount - data.m_frags_complete;
 
   if (data.m_frags_complete == data.m_fragCount)
   {
@@ -5015,7 +5019,120 @@ Dbspj::scanIndex_parent_batch_complete(S
   /**
    * When parent's batch is complete, we send our batch
    */
-  scanIndex_send(signal, requestPtr, treeNodePtr);
+  const ScanFragReq * org = (const ScanFragReq*)data.m_scanFragReq;
+  ndbassert(org->batch_size_rows >= data.m_fragCount - data.m_frags_complete);
+
+  if (treeNodePtr.p->m_bits & TreeNode::T_SCAN_PARALLEL)
+  {
+    jam();
+    data.m_parallelism = data.m_fragCount - data.m_frags_complete;
+  }
+  else if (data.m_firstExecution)
+  {
+    /**
+     * Having a high parallelism would allow us to fetch data from many
+     * fragments in parallel and thus reduce the number of round trips.
+     * On the other hand, we should set parallelism so low that we can fetch
+     * all data from a fragment in one batch if possible.
+     * Since this is the first execution, we do not know how many rows or bytes
+     * this operation is likely to return. Therefore we set parallelism to 1,
+     * since this gives the lowest penalty if our guess is wrong.
+     */
+    jam();
+    data.m_parallelism = 1;
+  }
+  else
+  {
+    jam();
+    /**
+     * Use statistics from earlier runs of this operation to estimate the
+     * initial parallelism. We use the mean minus two times the standard
+     * deviation to have a low risk of setting parallelism to high (as erring
+     * in the other direction is more costly).
+     */
+    Int32 parallelism = 
+      static_cast<Int32>(data.m_parallelismStat.getMean()
+                         - 2 * data.m_parallelismStat.getStdDev());
+
+    if (parallelism < 1)
+    {
+      jam();
+      parallelism = 1;
+    }
+    else if ((data.m_fragCount - data.m_frags_complete) % parallelism != 0)
+    {
+      jam();
+      /**
+       * Set parallelism such that we can expect to have similar
+       * parallelism in each batch. For example if there are 8 remaining
+       * fragments, then we should fecth 2 times 4 fragments rather than
+       * 7+1.
+       */
+      const Int32 roundTrips =
+        1 + (data.m_fragCount - data.m_frags_complete) / parallelism;
+      parallelism = (data.m_fragCount - data.m_frags_complete) / roundTrips;
+    }
+
+    data.m_parallelism = static_cast<Uint32>(parallelism);
+
+#ifdef DEBUG_SCAN_FRAGREQ
+    DEBUG("::scanIndex_send() starting index scan with parallelism="
+          << data.m_parallelism);
+#endif
+  }
+  ndbrequire(data.m_parallelism > 0);
+
+  const Uint32 bs_rows = org->batch_size_rows/ data.m_parallelism;
+  const Uint32 bs_bytes = org->batch_size_bytes / data.m_parallelism;
+  ndbassert(bs_rows > 0);
+  ndbassert(bs_bytes > 0);
+
+  data.m_largestBatchRows = 0;
+  data.m_largestBatchBytes = 0;
+  data.m_totalRows = 0;
+  data.m_totalBytes = 0;
+
+  {
+    Local_ScanFragHandle_list list(m_scanfraghandle_pool, data.m_fragments);
+    Ptr<ScanFragHandle> fragPtr;
+    list.first(fragPtr);
+
+    while(!fragPtr.isNull())
+    {
+      ndbassert(fragPtr.p->m_state == ScanFragHandle::SFH_NOT_STARTED ||
+                fragPtr.p->m_state == ScanFragHandle::SFH_COMPLETE);
+      fragPtr.p->m_state = ScanFragHandle::SFH_NOT_STARTED;
+      list.next(fragPtr);
+    }
+  }
+
+  Uint32 batchRange = 0;
+  scanIndex_send(signal,
+                 requestPtr,
+                 treeNodePtr,
+                 data.m_parallelism,
+                 bs_bytes,
+                 bs_rows,
+                 batchRange);
+
+  data.m_firstExecution = false;
+
+  if (treeNodePtr.p->m_bits & TreeNode::T_SCAN_PARALLEL)
+  {
+    ndbrequire((data.m_frags_outstanding + data.m_frags_complete) ==
+               data.m_fragCount);
+  }
+  else
+  {
+    ndbrequire(static_cast<Uint32>(data.m_frags_outstanding + 
+                                   data.m_frags_complete) <=
+               data.m_fragCount);
+  }
+
+  data.m_batch_chunks = 1;
+  requestPtr.p->m_cnt_active++;
+  requestPtr.p->m_outstanding++;
+  treeNodePtr.p->m_state = TreeNode::TN_ACTIVE;
 }
 
 void
@@ -5045,77 +5162,91 @@ Dbspj::scanIndex_parent_batch_repeat(Sig
   }
 }
 
+/**
+ * Ask for the first batch for a number of fragments.
+ */
 void
 Dbspj::scanIndex_send(Signal* signal,
                       Ptr<Request> requestPtr,
-                      Ptr<TreeNode> treeNodePtr)
+                      Ptr<TreeNode> treeNodePtr,
+                      Uint32 noOfFrags,
+                      Uint32 bs_bytes,
+                      Uint32 bs_rows,
+                      Uint32& batchRange)
 {
-  jam();
-
-  ScanIndexData& data = treeNodePtr.p->m_scanindex_data;
-  const ScanFragReq * org = (const ScanFragReq*)data.m_scanFragReq;
-
-  Uint32 cnt = 1;
-  Uint32 bs_rows = org->batch_size_rows;
-  Uint32 bs_bytes = org->batch_size_bytes;
-  if (treeNodePtr.p->m_bits & TreeNode::T_SCAN_PARALLEL)
-  {
-    jam();
-    cnt = data.m_fragCount - data.m_frags_complete;
-    ndbrequire(cnt > 0);
-
-    bs_rows /= cnt;
-    bs_bytes /= cnt;
-    ndbassert(bs_rows > 0);
-  }
-
   /**
    * if (m_bits & prunemask):
    * - Range keys sliced out to each ScanFragHandle
    * - Else, range keys kept on first (and only) ScanFragHandle
    */
-  Uint32 prunemask = TreeNode::T_PRUNE_PATTERN | TreeNode::T_CONST_PRUNE;
+  const bool prune = treeNodePtr.p->m_bits &
+    (TreeNode::T_PRUNE_PATTERN | TreeNode::T_CONST_PRUNE);
 
   /**
-   * Don't release keyInfo if it may be sent multiple times, eiter:
-   *   - Not pruned -> same keyInfo goes to all datanodes.
-   *   - Result handling is REPEAT_SCAN_RESULT and same batch may be 
-   *     repeated multiple times due to incomplete bushy X-scans.
-   *     (by ::scanIndex_parent_batch_repeat())
-   *
-   * When not released, ::scanIndex_parent_batch_cleanup() will 
-   * eventually release them when preparing arrival of a new parent batch.
+   * If scan is repeatable, we must make sure not to release range keys so
+   * that we canuse them again in the next repetition.
    */
-  const bool release = ((treeNodePtr.p->m_bits & prunemask) != 0 &&
-                        (treeNodePtr.p->m_bits & TreeNode::T_SCAN_REPEATABLE) == 0);
+  const bool repeatable =
+    (treeNodePtr.p->m_bits & TreeNode::T_SCAN_REPEATABLE) != 0;
 
-  ScanFragReq* req = reinterpret_cast<ScanFragReq*>(signal->getDataPtrSend());
+  ScanIndexData& data = treeNodePtr.p->m_scanindex_data;
+  ndbassert(noOfFrags > 0);
+  ndbassert(data.m_frags_not_started >= noOfFrags);
+  ScanFragReq* const req =
+    reinterpret_cast<ScanFragReq*>(signal->getDataPtrSend());
+  const ScanFragReq * const org
+    = reinterpret_cast<ScanFragReq*>(data.m_scanFragReq);
   memcpy(req, org, sizeof(data.m_scanFragReq));
   // req->variableData[0] // set below
   req->variableData[1] = requestPtr.p->m_rootResultData;
   req->batch_size_bytes = bs_bytes;
   req->batch_size_rows = bs_rows;
 
-  Ptr<ScanFragHandle> fragPtr;
+  Uint32 requestsSent = 0;
   Local_ScanFragHandle_list list(m_scanfraghandle_pool, data.m_fragments);
-
-  Uint32 keyInfoPtrI = RNIL;
+  Ptr<ScanFragHandle> fragPtr;
   list.first(fragPtr);
-  if ((treeNodePtr.p->m_bits & prunemask) == 0)
+  Uint32 keyInfoPtrI = fragPtr.p->m_rangePtrI;
+  ndbrequire(prune || keyInfoPtrI != RNIL);
+  /**
+   * Iterate over the list of fragments until we have sent as many
+   * SCAN_FRAGREQs as we should.
+   */
+  while (requestsSent < noOfFrags)
   {
     jam();
-    keyInfoPtrI = fragPtr.p->m_rangePtrI;
-    ndbrequire(keyInfoPtrI != RNIL);
-  }
+    ndbassert(!fragPtr.isNull());
 
-  Uint32 batchRange = 0;
-  for (Uint32 i = 0; i < cnt && !fragPtr.isNull(); list.next(fragPtr))
-  {
-    jam();
+    if (fragPtr.p->m_state != ScanFragHandle::SFH_NOT_STARTED)
+    {
+      // Skip forward to the frags that we should send.
+      jam();
+      list.next(fragPtr);
+      continue;
+    }
+
+    const Uint32 ref = fragPtr.p->m_ref;
+
+    if (noOfFrags==1 && !prune &&
+        data.m_frags_not_started == data.m_fragCount &&
+        refToNode(ref) != getOwnNodeId() &&
+        list.hasNext(fragPtr))
+    {
+      /**
+       * If we are doing a scan with adaptive parallelism and start with
+       * parallelism=1 then it makes sense to fetch a batch from a fragment on
+       * the local data node. The reason for this is that if that fragment
+       * contains few rows, we may be able to read from several fragments in
+       * parallel. Then we minimize the total number of round trips (to remote
+       * data nodes) if we fetch the first fragment batch locally.
+       */
+      jam();
+      list.next(fragPtr);
+      continue;
+    }
 
     SectionHandle handle(this);
 
-    Uint32 ref = fragPtr.p->m_ref;
     Uint32 attrInfoPtrI = treeNodePtr.p->m_send.m_attrInfoPtrI;
 
     /**
@@ -5124,26 +5255,34 @@ Dbspj::scanIndex_send(Signal* signal,
     req->senderData = fragPtr.i;
     req->fragmentNoKeyLen = fragPtr.p->m_fragId;
 
-    if ((treeNodePtr.p->m_bits & prunemask))
+    if (prune)
     {
       jam();
       keyInfoPtrI = fragPtr.p->m_rangePtrI;
       if (keyInfoPtrI == RNIL)
       {
+        /**
+         * Since we use pruning, we can see that no parent rows would hash
+         * to this fragment.
+         */
         jam();
         fragPtr.p->m_state = ScanFragHandle::SFH_COMPLETE;
+        list.next(fragPtr);
         continue;
       }
-    }
-    if (release)
-    {
-      /**
-       * If we'll use sendSignal() and we need to send the attrInfo several
-       *   times, we need to copy them
-       */
-      Uint32 tmp = RNIL;
-      ndbrequire(dupSection(tmp, attrInfoPtrI)); // TODO handle error
-      attrInfoPtrI = tmp;
+
+      if (!repeatable)
+      {
+        /**
+         * If we'll use sendSignal() and we need to send the attrInfo several
+         * times, we need to copy them. (For repeatable or unpruned scans
+         * we use sendSignalNoRelease(), so then we do not need to copy.)
+         */
+        jam();
+        Uint32 tmp = RNIL;
+        ndbrequire(dupSection(tmp, attrInfoPtrI)); // TODO handle error
+        attrInfoPtrI = tmp;
+      }
     }
 
     req->variableData[0] = batchRange;
@@ -5152,16 +5291,14 @@ Dbspj::scanIndex_send(Signal* signal,
     handle.m_cnt = 2;
 
 #if defined DEBUG_SCAN_FRAGREQ
-    {
-      ndbout_c("SCAN_FRAGREQ to %x", ref);
-      printSCAN_FRAGREQ(stdout, signal->getDataPtrSend(),
-                        NDB_ARRAY_SIZE(treeNodePtr.p->m_scanfrag_data.m_scanFragReq),
-                        DBLQH);
-      printf("ATTRINFO: ");
-      print(handle.m_ptr[0], stdout);
-      printf("KEYINFO: ");
-      print(handle.m_ptr[1], stdout);
-    }
+    ndbout_c("SCAN_FRAGREQ to %x", ref);
+    printSCAN_FRAGREQ(stdout, signal->getDataPtrSend(),
+                      NDB_ARRAY_SIZE(treeNodePtr.p->m_scanfrag_data.m_scanFragReq),
+                      DBLQH);
+    printf("ATTRINFO: ");
+    print(handle.m_ptr[0], stdout);
+    printf("KEYINFO: ");
+    print(handle.m_ptr[1], stdout);
 #endif
 
     if (refToNode(ref) == getOwnNodeId())
@@ -5173,8 +5310,13 @@ Dbspj::scanIndex_send(Signal* signal,
       c_Counters.incr_counter(CI_REMOTE_RANGE_SCANS_SENT, 1);
     }
 
-    if (release)
+    if (prune && !repeatable)
     {
+      /**
+       * For a non-repeatable pruned scan, key info is unique for each
+       * fragment and therefore cannot be reused, so we release key info
+       * right away.
+       */
       jam();
       sendSignal(ref, GSN_SCAN_FRAGREQ, signal,
                  NDB_ARRAY_SIZE(data.m_scanFragReq), JBB, &handle);
@@ -5183,32 +5325,24 @@ Dbspj::scanIndex_send(Signal* signal,
     }
     else
     {
+      /**
+       * Reuse key info for multiple fragments and/or multiple repetitions
+       * of the scan.
+       */
       jam();
       sendSignalNoRelease(ref, GSN_SCAN_FRAGREQ, signal,
                           NDB_ARRAY_SIZE(data.m_scanFragReq), JBB, &handle);
     }
     handle.clear();
 
-    i++;
     fragPtr.p->m_state = ScanFragHandle::SFH_SCANNING; // running
     data.m_frags_outstanding++;
     batchRange += bs_rows;
-  }
+    requestsSent++;
+    list.next(fragPtr);
+  } // while (requestsSent < noOfFrags)
 
-  if (treeNodePtr.p->m_bits & TreeNode::T_SCAN_PARALLEL)
-  {
-    ndbrequire(data.m_frags_outstanding == 
-               data.m_fragCount - data.m_frags_complete);
-  }
-  else
-  {
-    ndbrequire(data.m_frags_outstanding == 1);
-  }
-
-  data.m_batch_chunks = 1;
-  requestPtr.p->m_cnt_active++;
-  requestPtr.p->m_outstanding++;
-  treeNodePtr.p->m_state = TreeNode::TN_ACTIVE;
+  data.m_frags_not_started -= requestsSent;
 }
 
 void
@@ -5282,6 +5416,10 @@ Dbspj::scanIndex_execSCAN_FRAGCONF(Signa
   }
 
   requestPtr.p->m_rows += rows;
+  data.m_totalRows += rows;
+  data.m_totalBytes += conf->total_len;
+  data.m_largestBatchRows = MAX(data.m_largestBatchRows, rows);
+  data.m_largestBatchBytes = MAX(data.m_largestBatchBytes, conf->total_len);
 
   if (!treeNodePtr.p->isLeaf())
   {
@@ -5302,7 +5440,9 @@ Dbspj::scanIndex_execSCAN_FRAGCONF(Signa
     ndbrequire(data.m_frags_complete < data.m_fragCount);
     data.m_frags_complete++;
 
-    if (data.m_frags_complete == data.m_fragCount)
+    if (data.m_frags_complete == data.m_fragCount ||
+        ((requestPtr.p->m_state & Request::RS_ABORTING) != 0 &&
+         data.m_fragCount == (data.m_frags_complete + data.m_frags_not_started)))
     {
       jam();
       ndbrequire(requestPtr.p->m_cnt_active);
@@ -5314,6 +5454,32 @@ Dbspj::scanIndex_execSCAN_FRAGCONF(Signa
 
   if (data.m_frags_outstanding == 0)
   {
+    const ScanFragReq * const org
+      = reinterpret_cast<const ScanFragReq*>(data.m_scanFragReq);
+
+    if (data.m_frags_complete == data.m_fragCount)
+    {
+      jam();
+      /**
+       * Calculate what would have been the optimal parallelism for the
+       * scan instance that we have just completed, and update
+       * 'parallelismStat' with this value. We then use this statistics to set
+       * the initial parallelism for the next instance of this operation.
+       */
+      double parallelism = data.m_fragCount;
+      if (data.m_totalRows > 0)
+      {
+        parallelism = MIN(parallelism,
+                          double(org->batch_size_rows) / data.m_totalRows);
+      }
+      if (data.m_totalBytes > 0)
+      {
+        parallelism = MIN(parallelism,
+                          double(org->batch_size_bytes) / data.m_totalBytes);
+      }
+      data.m_parallelismStat.update(parallelism);
+    }
+
     /**
      * Don't reportBatchComplete to children if we're aborting...
      */
@@ -5364,7 +5530,7 @@ Dbspj::scanIndex_execSCAN_FRAGREF(Signal
   ndbrequire(data.m_frags_outstanding > 0);
   data.m_frags_outstanding--;
 
-  if (data.m_frags_complete == data.m_fragCount)
+  if (data.m_fragCount == (data.m_frags_complete + data.m_frags_not_started))
   {
     jam();
     ndbrequire(requestPtr.p->m_cnt_active);
@@ -5390,21 +5556,78 @@ Dbspj::scanIndex_execSCAN_NEXTREQ(Signal
   jam();
 
   ScanIndexData& data = treeNodePtr.p->m_scanindex_data;
+  const ScanFragReq * org = (const ScanFragReq*)data.m_scanFragReq;
 
   data.m_rows_received = 0;
   data.m_rows_expecting = 0;
   ndbassert(data.m_frags_outstanding == 0);
 
   ndbrequire(data.m_frags_complete < data.m_fragCount);
-  Uint32 cnt = data.m_fragCount - data.m_frags_complete;
   if ((treeNodePtr.p->m_bits & TreeNode::T_SCAN_PARALLEL) == 0)
   {
     jam();
-    cnt = 1;
+    /**
+     * Since fetching few but large batches is more efficient, we
+     * set parallelism to the lowest value where we can still expect each
+     * batch to be full.
+     */
+    if (data.m_largestBatchRows < org->batch_size_rows/data.m_parallelism &&
+        data.m_largestBatchBytes < org->batch_size_bytes/data.m_parallelism)
+    {
+      jam();
+      data.m_parallelism = data.m_fragCount - data.m_frags_complete;
+      if (data.m_largestBatchRows > 0)
+      {
+        jam();
+        data.m_parallelism =
+          MIN(org->batch_size_rows / data.m_largestBatchRows,
+              data.m_parallelism);
+      }
+      if (data.m_largestBatchBytes > 0)
+      {
+        jam();
+        data.m_parallelism =
+          MIN(data.m_parallelism,
+              org->batch_size_bytes/data.m_largestBatchBytes);
+      }
+      if (data.m_frags_complete == 0 &&
+          data.m_frags_not_started % data.m_parallelism != 0)
+      {
+        jam();
+        /**
+         * Set parallelism such that we can expect to have similar
+         * parallelism in each batch. For example if there are 8 remaining
+         * fragments, then we should fecth 2 times 4 fragments rather than
+         * 7+1.
+         */
+        const Uint32 roundTrips =
+          1 + data.m_frags_not_started / data.m_parallelism;
+        data.m_parallelism = data.m_frags_not_started / roundTrips;
+      }
+    }
+    else
+    {
+      jam();
+      // We get full batches, so we should lower parallelism.
+      data.m_parallelism = MIN(data.m_fragCount - data.m_frags_complete,
+                               MAX(1, data.m_parallelism/2));
+    }
+    ndbassert(data.m_parallelism > 0);
+#ifdef DEBUG_SCAN_FRAGREQ
+    DEBUG("::scanIndex_execSCAN_NEXTREQ() Asking for new batches from " <<
+          data.m_parallelism <<
+          " fragments with " << org->batch_size_rows/data.m_parallelism <<
+          " rows and " << org->batch_size_bytes/data.m_parallelism <<
+          " bytes.");
+#endif
+  }
+  else
+  {
+    jam();
+    data.m_parallelism = data.m_fragCount - data.m_frags_complete;
   }
 
-  const ScanFragReq * org = (const ScanFragReq*)data.m_scanFragReq;
-  const Uint32 bs_rows = org->batch_size_rows/cnt;
+  const Uint32 bs_rows = org->batch_size_rows/data.m_parallelism;
   ndbassert(bs_rows > 0);
   ScanFragNextReq* req =
     reinterpret_cast<ScanFragNextReq*>(signal->getDataPtrSend());
@@ -5413,20 +5636,27 @@ Dbspj::scanIndex_execSCAN_NEXTREQ(Signal
   req->transId1 = requestPtr.p->m_transId[0];
   req->transId2 = requestPtr.p->m_transId[1];
   req->batch_size_rows = bs_rows;
-  req->batch_size_bytes = org->batch_size_bytes/cnt;
+  req->batch_size_bytes = org->batch_size_bytes/data.m_parallelism;
 
   Uint32 batchRange = 0;
   Ptr<ScanFragHandle> fragPtr;
+  Uint32 sentFragCount = 0;
+  {
+    /**
+     * First, ask for more data from fragments that are already started.
+     */
   Local_ScanFragHandle_list list(m_scanfraghandle_pool, data.m_fragments);
   list.first(fragPtr);
-  for (Uint32 i = 0; i < cnt && !fragPtr.isNull(); list.next(fragPtr))
+    while (sentFragCount < data.m_parallelism && !fragPtr.isNull())
   {
     jam();
+      ndbassert(fragPtr.p->m_state == ScanFragHandle::SFH_WAIT_NEXTREQ ||
+                fragPtr.p->m_state == ScanFragHandle::SFH_COMPLETE ||
+                fragPtr.p->m_state == ScanFragHandle::SFH_NOT_STARTED);
     if (fragPtr.p->m_state == ScanFragHandle::SFH_WAIT_NEXTREQ)
     {
       jam();
 
-      i++;
       data.m_frags_outstanding++;
       req->variableData[0] = batchRange;
       fragPtr.p->m_state = ScanFragHandle::SFH_SCANNING;
@@ -5434,6 +5664,7 @@ Dbspj::scanIndex_execSCAN_NEXTREQ(Signal
 
       DEBUG("scanIndex_execSCAN_NEXTREQ to: " << hex
             << treeNodePtr.p->m_send.m_ref
+              << ", m_node_no=" << treeNodePtr.p->m_node_no
             << ", senderData: " << req->senderData);
 
 #ifdef DEBUG_SCAN_FRAGREQ
@@ -5445,9 +5676,27 @@ Dbspj::scanIndex_execSCAN_NEXTREQ(Signal
       sendSignal(fragPtr.p->m_ref, GSN_SCAN_NEXTREQ, signal,
                  ScanFragNextReq::SignalLength + 1,
                  JBB);
+        sentFragCount++;
+      }
+      list.next(fragPtr);
     }
   }
 
+  if (sentFragCount < data.m_parallelism)
+  {
+    /**
+     * Then start new fragments until we reach data.m_parallelism.
+     */
+    jam();
+    ndbassert(data.m_frags_not_started != 0);
+    scanIndex_send(signal,
+                   requestPtr,
+                   treeNodePtr,
+                   data.m_parallelism - sentFragCount,
+                   org->batch_size_bytes/data.m_parallelism,
+                   bs_rows,
+                   batchRange);
+  }
   /**
    * cursor should not have been positioned here...
    *   unless we actually had something more to send.
@@ -5544,14 +5793,28 @@ Dbspj::scanIndex_abort(Signal* signal,
     }
   }
 
-  ndbrequire(cnt_waiting + cnt_scanning > 0);
   if (cnt_scanning == 0)
   {
-    /**
-     * If all were waiting...this should increase m_outstanding
-     */
-    jam();
-    requestPtr.p->m_outstanding++;
+    if (cnt_waiting > 0)
+    {
+      /**
+       * If all were waiting...this should increase m_outstanding
+       */
+      jam();
+      requestPtr.p->m_outstanding++;
+    }
+    else
+    {
+      /**
+       * All fragments are either complete or not yet started, so there is
+       * nothing to abort.
+       */
+      jam();
+      ndbassert(data.m_frags_not_started > 0);
+      ndbrequire(requestPtr.p->m_cnt_active);
+      requestPtr.p->m_cnt_active--;
+      treeNodePtr.p->m_state = TreeNode::TN_INACTIVE;
+    }
   }
 }
 
@@ -5603,6 +5866,8 @@ Dbspj::scanIndex_execNODE_FAILREP(Signal
       jam();
       ndbrequire(data.m_frags_complete < data.m_fragCount);
       data.m_frags_complete++;
+      ndbrequire(data.m_frags_not_started > 0);
+      data.m_frags_not_started--;
       // fall through
     case ScanFragHandle::SFH_COMPLETE:
       jam();
@@ -5637,8 +5902,8 @@ Dbspj::scanIndex_execNODE_FAILREP(Signal
     requestPtr.p->m_outstanding--;
   }
 
-  if (save1 != data.m_fragCount
-      && data.m_frags_complete == data.m_fragCount)
+  if (save1 != 0 &&
+      data.m_fragCount == (data.m_frags_complete + data.m_frags_not_started))
   {
     jam();
     ndbrequire(requestPtr.p->m_cnt_active);
@@ -5654,7 +5919,8 @@ Dbspj::scanIndex_release_rangekeys(Ptr<R
                                    Ptr<TreeNode> treeNodePtr)
 {
   jam();
-  DEBUG("scanIndex_release_rangekeys(), m_node_no: " << treeNodePtr.p->m_node_no);
+  DEBUG("scanIndex_release_rangekeys(), tree node " << treeNodePtr.i
+          << " m_node_no: " << treeNodePtr.p->m_node_no);
 
   ScanIndexData& data = treeNodePtr.p->m_scanindex_data;
   Local_ScanFragHandle_list list(m_scanfraghandle_pool, data.m_fragments);
@@ -7115,3 +7381,15 @@ void Dbspj::execDBINFO_SCANREQ(Signal *s
 
   ndbinfo_send_scan_conf(signal, req, rl);
 } // Dbspj::execDBINFO_SCANREQ(Signal *signal)
+
+void Dbspj::IncrementalStatistics::update(double sample)
+{
+  // Prevent wrap-around
+  if(m_noOfSamples < 0xffffffff)
+  {
+    m_noOfSamples++;
+    const double delta = sample - m_mean;
+    m_mean += delta/m_noOfSamples;
+    m_sumSquare +=  delta * (sample - m_mean);
+  }
+}

=== modified file 'storage/ndb/src/kernel/ndbd.cpp'
--- a/storage/ndb/src/kernel/ndbd.cpp	2011-01-30 23:13:49 +0000
+++ b/storage/ndb/src/kernel/ndbd.cpp	2011-08-30 12:00:48 +0000
@@ -297,69 +297,45 @@ get_multithreaded_config(EmulatorData& e
 {
   // multithreaded is compiled in ndbd/ndbmtd for now
   globalData.isNdbMt = SimulatedBlock::isMultiThreaded();
-  if (!globalData.isNdbMt) {
+  if (!globalData.isNdbMt)
+  {
     ndbout << "NDBMT: non-mt" << endl;
     return 0;
   }
 
-  ndb_mgm_configuration * conf = ed.theConfiguration->getClusterConfig();
-  if (conf == 0)
-  {
-    abort();
-  }
-
-  ndb_mgm_configuration_iterator * p =
-    ndb_mgm_create_configuration_iterator(conf, CFG_SECTION_NODE);
-  if (ndb_mgm_find(p, CFG_NODE_ID, globalData.ownId))
-  {
-    abort();
-  }
+  THRConfig & conf = ed.theConfiguration->m_thr_config;
 
-  Uint32 mtthreads = 0;
-  ndb_mgm_get_int_parameter(p, CFG_DB_MT_THREADS, &mtthreads);
-  ndbout << "NDBMT: MaxNoOfExecutionThreads=" << mtthreads << endl;
+  Uint32 threadcount = conf.getThreadCount();
+  ndbout << "NDBMT: MaxNoOfExecutionThreads=" << threadcount << endl;
 
   globalData.isNdbMtLqh = true;
 
   {
-    Uint32 classic = 0;
-    ndb_mgm_get_int_parameter(p, CFG_NDBMT_CLASSIC, &classic);
-    if (classic)
-      globalData.isNdbMtLqh = false;
-
-    const char* p = NdbEnv_GetEnv("NDB_MT_LQH", (char*)0, 0);
-    if (p != 0)
+    if (conf.getMtClassic())
     {
-      if (strstr(p, "NOPLEASE") != 0)
-        globalData.isNdbMtLqh = false;
-      else
-        globalData.isNdbMtLqh = true;
+      globalData.isNdbMtLqh = false;
     }
   }
 
   if (!globalData.isNdbMtLqh)
     return 0;
 
-  Uint32 threads = 0;
-  switch(mtthreads){
-  case 0:
-  case 1:
-  case 2:
-  case 3:
-    threads = 1; // TC + receiver + SUMA + LQH
-    break;
-  case 4:
-  case 5:
-  case 6:
-    threads = 2; // TC + receiver + SUMA + 2 * LQH
-    break;
-  default:
-    threads = 4; // TC + receiver + SUMA + 4 * LQH
-  }
-
-  ndb_mgm_get_int_parameter(p, CFG_NDBMT_LQH_THREADS, &threads);
+  Uint32 threads = conf.getThreadCount(THRConfig::T_LDM);
   Uint32 workers = threads;
-  ndb_mgm_get_int_parameter(p, CFG_NDBMT_LQH_WORKERS, &workers);
+  {
+    ndb_mgm_configuration * conf = ed.theConfiguration->getClusterConfig();
+    if (conf == 0)
+    {
+      abort();
+    }
+    ndb_mgm_configuration_iterator * p =
+      ndb_mgm_create_configuration_iterator(conf, CFG_SECTION_NODE);
+    if (ndb_mgm_find(p, CFG_NODE_ID, globalData.ownId))
+    {
+      abort();
+    }
+    ndb_mgm_get_int_parameter(p, CFG_NDBMT_LQH_WORKERS, &workers);
+  }
 
 #ifdef VM_TRACE
   // testing
@@ -368,9 +344,6 @@ get_multithreaded_config(EmulatorData& e
     p = NdbEnv_GetEnv("NDBMT_LQH_WORKERS", (char*)0, 0);
     if (p != 0)
       workers = atoi(p);
-    p = NdbEnv_GetEnv("NDBMT_LQH_THREADS", (char*)0, 0);
-    if (p != 0)
-      threads = atoi(p);
   }
 #endif
 
@@ -654,10 +627,11 @@ ndbd_run(bool foreground, int report_fd,
     // Ignore error
   }
 
+  theConfig->setupConfiguration();
+
   if (get_multithreaded_config(globalEmulatorData))
     ndbd_exit(-1);
 
-  theConfig->setupConfiguration();
   systemInfo(* theConfig, * theConfig->m_logLevel);
 
   NdbThread* pWatchdog = globalEmulatorData.theWatchDog->doStart();

=== modified file 'storage/ndb/src/kernel/vm/CMakeLists.txt'
--- a/storage/ndb/src/kernel/vm/CMakeLists.txt	2011-02-02 00:40:07 +0000
+++ b/storage/ndb/src/kernel/vm/CMakeLists.txt	2011-08-30 12:00:48 +0000
@@ -38,6 +38,7 @@ ADD_LIBRARY(ndbkernel STATIC
     Ndbinfo.cpp
     NdbinfoTables.cpp
     ArenaPool.cpp
+    mt_thr_config.cpp
 )
 
 ADD_LIBRARY(ndbsched STATIC

=== modified file 'storage/ndb/src/kernel/vm/Configuration.cpp'
--- a/storage/ndb/src/kernel/vm/Configuration.cpp	2011-07-04 13:37:56 +0000
+++ b/storage/ndb/src/kernel/vm/Configuration.cpp	2011-08-30 12:00:48 +0000
@@ -33,6 +33,7 @@
 #include <kernel_config_parameters.h>
 
 #include <util/ConfigValues.hpp>
+#include <NdbEnv.h>
 
 #include <ndbapi_limits.h>
 
@@ -392,7 +393,77 @@ Configuration::setupConfiguration(){
     t = globalEmulatorData.theWatchDog ->setCheckInterval(t);
     _timeBetweenWatchDogCheckInitial = t;
   }
-  
+
+  const char * thrconfigstring = NdbEnv_GetEnv("NDB_MT_THREAD_CONFIG",
+                                               (char*)0, 0);
+  if (thrconfigstring ||
+      iter.get(CFG_DB_MT_THREAD_CONFIG, &thrconfigstring) == 0)
+  {
+    int res = m_thr_config.do_parse(thrconfigstring);
+    if (res != 0)
+    {
+      ERROR_SET(fatal, NDBD_EXIT_INVALID_CONFIG,
+                "Invalid configuration fetched, invalid ThreadConfig",
+                m_thr_config.getErrorMessage());
+    }
+  }
+  else
+  {
+    const char * mask;
+    if (iter.get(CFG_DB_EXECUTE_LOCK_CPU, &mask) == 0)
+    {
+      int res = m_thr_config.setLockExecuteThreadToCPU(mask);
+      if (res < 0)
+      {
+        // Could not parse LockExecuteThreadToCPU mask
+        g_eventLogger->warning("Failed to parse 'LockExecuteThreadToCPU=%s' "
+                               "(error: %d), ignoring it!",
+                               mask, res);
+      }
+    }
+
+    Uint32 maintCPU = NO_LOCK_CPU;
+    iter.get(CFG_DB_MAINT_LOCK_CPU, &maintCPU);
+    if (maintCPU == 65535)
+      maintCPU = NO_LOCK_CPU; // Ignore old default(may come from old mgmd)
+    if (maintCPU != NO_LOCK_CPU)
+      m_thr_config.setLockMaintThreadsToCPU(maintCPU);
+
+    Uint32 mtthreads = 0;
+    iter.get(CFG_DB_MT_THREADS, &mtthreads);
+
+    Uint32 classic = 0;
+    iter.get(CFG_NDBMT_CLASSIC, &classic);
+    const char* p = NdbEnv_GetEnv("NDB_MT_LQH", (char*)0, 0);
+    if (p != 0)
+    {
+      if (strstr(p, "NOPLEASE") != 0)
+        classic = 1;
+    }
+
+    Uint32 lqhthreads = 0;
+    iter.get(CFG_NDBMT_LQH_THREADS, &lqhthreads);
+
+    int res = m_thr_config.do_parse(mtthreads, lqhthreads, classic);
+    if (res != 0)
+    {
+      ERROR_SET(fatal, NDBD_EXIT_INVALID_CONFIG,
+                "Invalid configuration fetched, invalid thread configuration",
+                m_thr_config.getErrorMessage());
+    }
+  }
+  if (thrconfigstring)
+  {
+    ndbout_c("ThreadConfig: input: %s parsed: %s",
+             thrconfigstring,
+             m_thr_config.getConfigString());
+  }
+  else
+  {
+    ndbout_c("ThreadConfig (old ndb_mgmd): parsed: %s",
+             m_thr_config.getConfigString());
+  }
+
   ConfigValues* cf = ConfigValuesFactory::extractCurrentSection(iter.m_config);
 
   if(m_clusterConfigIter)
@@ -930,13 +1001,16 @@ Configuration::setAllLockCPU(bool exec_t
   Uint32 i;
   for (i = 0; i < threadInfo.size(); i++)
   {
-    if (threadInfo[i].type != NotInUse)
+    if (threadInfo[i].type == NotInUse)
+      continue;
+
+    bool run = 
+      (exec_thread && threadInfo[i].type == MainThread) ||
+      (!exec_thread && threadInfo[i].type != MainThread);
+
+    if (run)
     {
-      if (setLockCPU(threadInfo[i].pThread,
-                     threadInfo[i].type,
-                     exec_thread,
-                     FALSE))
-        return;
+      setLockCPU(threadInfo[i].pThread, threadInfo[i].type);
     }
   }
 }
@@ -966,11 +1040,8 @@ Configuration::setRealtimeScheduler(NdbT
 
 int
 Configuration::setLockCPU(NdbThread * pThread,
-                          enum ThreadTypes type,
-                          bool exec_thread,
-                          bool init)
+                          enum ThreadTypes type)
 {
-  (void)init;
   Uint32 cpu_id;
   int tid = NdbThread_GetTid(pThread);
   if (tid == -1)
@@ -981,9 +1052,6 @@ Configuration::setLockCPU(NdbThread * pT
     We only set new lock CPU characteristics for the threads for which
     it has changed
   */
-  if ((exec_thread && type != MainThread) ||
-      (!exec_thread && type == MainThread))
-    return 0;
   if (type == MainThread)
     cpu_id = executeLockCPU();
   else
@@ -1029,7 +1097,7 @@ Configuration::addThread(struct NdbThrea
      * main threads are set in ThreadConfig::ipControlLoop
      * as it's handled differently with mt
      */
-    setLockCPU(pThread, type, (type == MainThread), TRUE);
+    setLockCPU(pThread, type);
   }
   return i;
 }

=== modified file 'storage/ndb/src/kernel/vm/Configuration.hpp'
--- a/storage/ndb/src/kernel/vm/Configuration.hpp	2011-07-04 13:37:56 +0000
+++ b/storage/ndb/src/kernel/vm/Configuration.hpp	2011-08-30 12:00:48 +0000
@@ -27,6 +27,7 @@
 #include <NdbThread.h>
 #include <util/SparseBitmask.hpp>
 #include <util/UtilBuffer.hpp>
+#include "mt_thr_config.hpp"
 
 enum ThreadTypes
 {
@@ -86,10 +87,7 @@ public:
 
   void setAllRealtimeScheduler();
   void setAllLockCPU(bool exec_thread);
-  int setLockCPU(NdbThread*,
-                 enum ThreadTypes type,
-                 bool exec_thread,
-                 bool init);
+  int setLockCPU(NdbThread*, enum ThreadTypes type);
   int setRealtimeScheduler(NdbThread*,
                            enum ThreadTypes type,
                            bool real_time,
@@ -127,6 +125,7 @@ public:
   ndb_mgm_configuration* getClusterConfig() const { return m_clusterConfig; }
   Uint32 get_config_generation() const; 
 
+  THRConfigApplier m_thr_config;
 private:
   friend class Cmvmi;
   friend class Qmgr;

=== modified file 'storage/ndb/src/kernel/vm/Makefile.am'
--- a/storage/ndb/src/kernel/vm/Makefile.am	2011-06-30 15:59:25 +0000
+++ b/storage/ndb/src/kernel/vm/Makefile.am	2011-08-30 12:00:48 +0000
@@ -39,7 +39,8 @@ libkernel_a_SOURCES = VMSignal.cpp \
                       SafeMutex.cpp \
                       Ndbinfo.cpp \
                       NdbinfoTables.cpp \
-                      ArenaPool.cpp
+                      ArenaPool.cpp \
+                      mt_thr_config.cpp
 
 libsched_a_SOURCES = TimeQueue.cpp \
                      ThreadConfig.cpp \
@@ -121,4 +122,10 @@ testSafeMutex_LDFLAGS = @ndb_bin_am_ldfl
   $(top_builddir)/strings/libmystringslt.la \
   @readline_link@ @TERMCAP_LIB@
 
+mt_thr_config_t_SOURCES = mt_thr_config.cpp
+mt_thr_config_t_CXXFLAGS = -DTEST_MT_THR_CONFIG
+mt_thr_config_t_LDADD = \
+	$(top_builddir)/storage/ndb/src/common/util/libgeneral.la \
+	$(top_builddir)/mysys/libmysyslt.la
 
+noinst_PROGRAMS = mt_thr_config-t

=== modified file 'storage/ndb/src/kernel/vm/SimulatedBlock.cpp'
--- a/storage/ndb/src/kernel/vm/SimulatedBlock.cpp	2011-06-30 15:59:25 +0000
+++ b/storage/ndb/src/kernel/vm/SimulatedBlock.cpp	2011-08-27 06:06:02 +0000
@@ -102,16 +102,7 @@ SimulatedBlock::SimulatedBlock(BlockNumb
     globalData.setBlock(blockNumber, mainBlock);
   } else {
     ndbrequire(mainBlock != 0);
-    if (mainBlock->theInstanceList == 0) {
-      mainBlock->theInstanceList = new SimulatedBlock* [MaxInstances];
-      ndbrequire(mainBlock->theInstanceList != 0);
-      Uint32 i;
-      for (i = 0; i < MaxInstances; i++)
-        mainBlock->theInstanceList[i] = 0;
-    }
-    ndbrequire(theInstance < MaxInstances);
-    ndbrequire(mainBlock->theInstanceList[theInstance] == 0);
-    mainBlock->theInstanceList[theInstance] = this;
+    mainBlock->addInstance(this, theInstance);
   }
   theMainInstance = mainBlock;
 
@@ -139,6 +130,23 @@ SimulatedBlock::SimulatedBlock(BlockNumb
 }
 
 void
+SimulatedBlock::addInstance(SimulatedBlock* b, Uint32 theInstance)
+{
+  ndbrequire(theMainInstance == this);
+  ndbrequire(number() == b->number());
+  if (theInstanceList == 0)
+  {
+    theInstanceList = new SimulatedBlock* [MaxInstances];
+    ndbrequire(theInstanceList != 0);
+    for (Uint32 i = 0; i < MaxInstances; i++)
+      theInstanceList[i] = 0;
+  }
+  ndbrequire(theInstance < MaxInstances);
+  ndbrequire(theInstanceList[theInstance] == 0);
+  theInstanceList[theInstance] = b;
+}
+
+void
 SimulatedBlock::initCommon()
 {
   Uint32 count = 10;

=== modified file 'storage/ndb/src/kernel/vm/SimulatedBlock.hpp'
--- a/storage/ndb/src/kernel/vm/SimulatedBlock.hpp	2011-05-17 23:29:55 +0000
+++ b/storage/ndb/src/kernel/vm/SimulatedBlock.hpp	2011-08-27 06:06:02 +0000
@@ -168,6 +168,7 @@ public:
       return theInstanceList[instanceNumber];
     return 0;
   }
+  void addInstance(SimulatedBlock* b, Uint32 theInstanceNo);
   virtual void loadWorkers() {}
 
   struct ThreadContext

=== modified file 'storage/ndb/src/kernel/vm/dummy_nonmt.cpp'
--- a/storage/ndb/src/kernel/vm/dummy_nonmt.cpp	2011-06-30 15:59:25 +0000
+++ b/storage/ndb/src/kernel/vm/dummy_nonmt.cpp	2011-08-27 06:06:02 +0000
@@ -43,6 +43,12 @@ add_extra_worker_thr_map(Uint32, Uint32)
   assert(false);
 }
 
+void
+finalize_thr_map()
+{
+  assert(false);
+}
+
 Uint32
 compute_jb_pages(struct EmulatorData*)
 {

=== modified file 'storage/ndb/src/kernel/vm/mt.cpp'
--- a/storage/ndb/src/kernel/vm/mt.cpp	2011-05-16 12:24:55 +0000
+++ b/storage/ndb/src/kernel/vm/mt.cpp	2011-08-30 10:00:48 +0000
@@ -2381,49 +2381,6 @@ check_queues_empty(thr_data *selfptr)
 }
 
 /*
- * Map instance number to real instance on this node.  Used in
- * sendlocal/sendprioa to find right thread and in execute_signals
- * to find right block instance.  SignalHeader is not modified.
- */
-
-static Uint8 g_map_instance[MAX_BLOCK_INSTANCES];
-
-static void
-map_instance_init()
-{
-  g_map_instance[0] = 0;
-  Uint32 ino;
-  for (ino = 1; ino < MAX_BLOCK_INSTANCES; ino++)
-  {
-    if (!globalData.isNdbMtLqh)
-    {
-      g_map_instance[ino] = 0;
-    }
-    else
-    {
-      require(num_lqh_workers != 0);
-      if (ino <= MAX_NDBMT_LQH_WORKERS)
-      {
-        g_map_instance[ino] = 1 + (ino - 1) % num_lqh_workers;
-      }
-      else
-      {
-        /* Extra workers are not mapped. */
-        g_map_instance[ino] = ino;
-      }
-    }
-  }
-}
-
-static inline Uint32
-map_instance(const SignalHeader *s)
-{
-  Uint32 ino = blockToInstance(s->theReceiversBlockNumber);
-  assert(ino < MAX_BLOCK_INSTANCES);
-  return g_map_instance[ino];
-}
-
-/*
  * Execute at most MAX_SIGNALS signals from one job queue, updating local read
  * state as appropriate.
  *
@@ -2489,7 +2446,7 @@ execute_signals(thr_data *selfptr, thr_j
       NDB_PREFETCH_READ (read_buffer->m_data + read_pos + 32);
     }
     Uint32 bno = blockToMain(s->theReceiversBlockNumber);
-    Uint32 ino = map_instance(s);
+    Uint32 ino = blockToInstance(s->theReceiversBlockNumber);
     SimulatedBlock* block = globalData.mt_getBlock(bno, ino);
     assert(block != 0);
 
@@ -2572,8 +2529,8 @@ run_job_buffers(thr_data *selfptr, Signa
 }
 
 struct thr_map_entry {
-  enum { NULL_THR_NO = 0xFFFF };
-  Uint32 thr_no;
+  enum { NULL_THR_NO = 0xFF };
+  Uint8 thr_no;
   thr_map_entry() : thr_no(NULL_THR_NO) {}
 };
 
@@ -2681,6 +2638,50 @@ add_extra_worker_thr_map(Uint32 block, U
   add_thr_map(block, instance, thr_no);
 }
 
+/**
+ * create the duplicate entries needed so that
+ *   sender doesnt need to know how many instances there
+ *   actually are in this node...
+ *
+ * if only 1 instance...then duplicate that for all slots
+ * else assume instance 0 is proxy...and duplicate workers (modulo)
+ *
+ * NOTE: extra pgman worker is instance 5
+ */
+void
+finalize_thr_map()
+{
+  for (Uint32 b = 0; b < NO_OF_BLOCKS; b++)
+  {
+    Uint32 bno = b + MIN_BLOCK_NO;
+    Uint32 cnt = 0;
+    while (cnt < MAX_BLOCK_INSTANCES &&
+           thr_map[b][cnt].thr_no != thr_map_entry::NULL_THR_NO)
+      cnt++;
+
+    if (cnt != MAX_BLOCK_INSTANCES)
+    {
+      SimulatedBlock * main = globalData.getBlock(bno, 0);
+      for (Uint32 i = cnt; i < MAX_BLOCK_INSTANCES; i++)
+      {
+        Uint32 dup = (cnt == 1) ? 0 : 1 + ((i - 1) % (cnt - 1));
+        if (thr_map[b][i].thr_no == thr_map_entry::NULL_THR_NO)
+        {
+          thr_map[b][i] = thr_map[b][dup];
+          main->addInstance(globalData.getBlock(bno, dup), i);
+        }
+        else
+        {
+          /**
+           * extra pgman instance
+           */
+          require(bno == PGMAN);
+        }
+      }
+    }
+  }
+}
+
 static void reportSignalStats(Uint32 self, Uint32 a_count, Uint32 a_size,
                               Uint32 b_count, Uint32 b_size)
 {
@@ -3120,7 +3121,7 @@ sendlocal(Uint32 self, const SignalHeade
           const Uint32 secPtr[3])
 {
   Uint32 block = blockToMain(s->theReceiversBlockNumber);
-  Uint32 instance = map_instance(s);
+  Uint32 instance = blockToInstance(s->theReceiversBlockNumber);
 
   /*
    * Max number of signals to put into job buffer before flushing the buffer
@@ -3156,7 +3157,7 @@ sendprioa(Uint32 self, const SignalHeade
           const Uint32 secPtr[3])
 {
   Uint32 block = blockToMain(s->theReceiversBlockNumber);
-  Uint32 instance = map_instance(s);
+  Uint32 instance = blockToInstance(s->theReceiversBlockNumber);
 
   Uint32 dst = block2ThreadId(block, instance);
   struct thr_repository* rep = &g_thr_repository;
@@ -3250,28 +3251,11 @@ sendprioa_STOP_FOR_CRASH(const struct th
   */
   static thr_job_buffer dummy_buffer;
 
-  /*
-   * Before we had three main threads with fixed block assignment.
-   * Now there is also worker instances (we send to LQH instance).
+  /**
+   * Pick any instance running in this thread
    */
-  Uint32 main = 0;
-  Uint32 instance = 0;
-  if (dst == 0)
-    main = NDBCNTR;
-  else if (dst == 1)
-    main = DBLQH;
-  else if (dst >= NUM_MAIN_THREADS && dst < NUM_MAIN_THREADS + num_lqh_threads)
-  {
-    main = DBLQH;
-    instance = dst - NUM_MAIN_THREADS + 1;
-  }
-  else if (dst == receiver_thread_no)
-    main = CMVMI;
-  else
-    require(false);
-  Uint32 bno = numberToBlock(main, instance);
-  require(block2ThreadId(main, instance) == dst);
   struct thr_data * dstptr = rep->m_thread + dst;
+  Uint32 bno = dstptr->m_instance_list[0];
 
   memset(&signalT.header, 0, sizeof(SignalHeader));
   signalT.header.theVerId_signalNumber   = GSN_STOP_FOR_CRASH;
@@ -3507,7 +3491,6 @@ ThreadConfig::init()
 
   ndbout << "NDBMT: num_threads=" << num_threads << endl;
 
-  ::map_instance_init();
   ::rep_init(&g_thr_repository, num_threads,
              globalEmulatorData.m_mem_manager);
 }

=== modified file 'storage/ndb/src/kernel/vm/mt.hpp'
--- a/storage/ndb/src/kernel/vm/mt.hpp	2011-06-30 15:59:25 +0000
+++ b/storage/ndb/src/kernel/vm/mt.hpp	2011-08-27 06:06:02 +0000
@@ -34,6 +34,7 @@ void add_thr_map(Uint32 block, Uint32 in
 void add_main_thr_map();
 void add_lqh_worker_thr_map(Uint32 block, Uint32 instance);
 void add_extra_worker_thr_map(Uint32 block, Uint32 instance);
+void finalize_thr_map();
 
 void sendlocal(Uint32 self, const struct SignalHeader *s,
                const Uint32 *data, const Uint32 secPtr[3]);

=== added file 'storage/ndb/src/kernel/vm/mt_thr_config.cpp'
--- a/storage/ndb/src/kernel/vm/mt_thr_config.cpp	1970-01-01 00:00:00 +0000
+++ b/storage/ndb/src/kernel/vm/mt_thr_config.cpp	2011-08-30 14:13:15 +0000
@@ -0,0 +1,994 @@
+/*
+   Copyright (c) 2011, Oracle and/or its affiliates. All rights reserved.
+
+   This program is free software; you can redistribute it and/or modify
+   it under the terms of the GNU General Public License as published by
+   the Free Software Foundation; version 2 of the License.
+
+   This program is distributed in the hope that it will be useful,
+   but WITHOUT ANY WARRANTY; without even the implied warranty of
+   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+   GNU General Public License for more details.
+
+   You should have received a copy of the GNU General Public License
+   along with this program; if not, write to the Free Software
+   Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA
+*/
+
+#include "mt_thr_config.hpp"
+#include <kernel/ndb_limits.h>
+#include "../../common/util/parse_mask.hpp"
+
+
+static const struct THRConfig::Entries m_entries[] =
+{
+  // name    type              min  max
+  { "main",  THRConfig::T_MAIN,  1, 1 },
+  { "ldm",   THRConfig::T_LDM,   1, MAX_NDBMT_LQH_THREADS },
+  { "recv",  THRConfig::T_RECV,  1, 1 },
+  { "rep",   THRConfig::T_REP,   1, 1 },
+  { "maint", THRConfig::T_MAINT, 1, 1 }
+};
+
+static const struct THRConfig::Param m_params[] =
+{
+  { "count",   THRConfig::Param::S_UNSIGNED },
+  { "cpubind", THRConfig::Param::S_BITMASK },
+  { "cpuset",  THRConfig::Param::S_BITMASK }
+};
+
+#define IX_COUNT    0
+#define IX_CPUBOUND 1
+#define IX_CPUSET   2
+
+static
+unsigned
+getMaxEntries(Uint32 type)
+{
+  for (Uint32 i = 0; i<NDB_ARRAY_SIZE(m_entries); i++)
+  {
+    if (m_entries[i].m_type == type)
+      return m_entries[i].m_max_cnt;
+  }
+  return 0;
+}
+
+static
+const char *
+getEntryName(Uint32 type)
+{
+  for (Uint32 i = 0; i<NDB_ARRAY_SIZE(m_entries); i++)
+  {
+    if (m_entries[i].m_type == type)
+      return m_entries[i].m_name;
+  }
+  return 0;
+}
+
+static
+Uint32
+getEntryType(const char * type)
+{
+  for (Uint32 i = 0; i<NDB_ARRAY_SIZE(m_entries); i++)
+  {
+    if (strcasecmp(type, m_entries[i].m_name) == 0)
+      return i;
+  }
+
+  return THRConfig::T_END;
+}
+
+THRConfig::THRConfig()
+{
+  m_classic = false;
+}
+
+THRConfig::~THRConfig()
+{
+}
+
+int
+THRConfig::setLockExecuteThreadToCPU(const char * mask)
+{
+  int res = parse_mask(mask, m_LockExecuteThreadToCPU);
+  if (res < 0)
+  {
+    m_err_msg.assfmt("failed to parse 'LockExecuteThreadToCPU=%s' "
+                     "(error: %d)",
+                     mask, res);
+    return -1;
+  }
+  return 0;
+}
+
+int
+THRConfig::setLockMaintThreadsToCPU(unsigned val)
+{
+  m_LockMaintThreadsToCPU.set(val);
+  return 0;
+}
+
+void
+THRConfig::add(T_Type t)
+{
+  T_Thread tmp;
+  tmp.m_type = t;
+  tmp.m_bind_type = T_Thread::B_UNBOUND;
+  tmp.m_no = m_threads[t].size();
+  m_threads[t].push_back(tmp);
+}
+
+int
+THRConfig::do_parse(unsigned MaxNoOfExecutionThreads,
+                    unsigned __ndbmt_lqh_threads,
+                    unsigned __ndbmt_classic)
+{
+  /**
+   * This is old ndbd.cpp : get_multithreaded_config
+   */
+  if (__ndbmt_classic)
+  {
+    m_classic = true;
+    add(T_LDM);
+    add(T_MAIN);
+    add(T_MAINT);
+    return 0;
+  }
+
+  Uint32 lqhthreads = 0;
+  switch(MaxNoOfExecutionThreads){
+  case 0:
+  case 1:
+  case 2:
+  case 3:
+    lqhthreads = 1; // TC + receiver + SUMA + LQH
+    break;
+  case 4:
+  case 5:
+  case 6:
+    lqhthreads = 2; // TC + receiver + SUMA + 2 * LQH
+    break;
+  default:
+    lqhthreads = 4; // TC + receiver + SUMA + 4 * LQH
+  }
+
+  if (__ndbmt_lqh_threads)
+  {
+    lqhthreads = __ndbmt_lqh_threads;
+  }
+
+  add(T_MAIN);
+  add(T_REP);
+  add(T_RECV);
+  add(T_MAINT);
+  for(Uint32 i = 0; i < lqhthreads; i++)
+  {
+    add(T_LDM);
+  }
+
+  return do_bindings() || do_validate();
+}
+
+int
+THRConfig::do_bindings()
+{
+  if (m_LockMaintThreadsToCPU.count() == 1)
+  {
+    m_threads[T_MAINT][0].m_bind_type = T_Thread::B_CPU_BOUND;
+    m_threads[T_MAINT][0].m_bind_no = m_LockMaintThreadsToCPU.getBitNo(0);
+  }
+  else if (m_LockMaintThreadsToCPU.count() > 1)
+  {
+    unsigned no = createCpuSet(m_LockMaintThreadsToCPU);
+    m_threads[T_MAINT][0].m_bind_type = T_Thread::B_CPUSET_BOUND;
+    m_threads[T_MAINT][0].m_bind_no = no;
+  }
+
+  /**
+   * Check that no cpu_sets overlap
+   */
+  for (unsigned i = 0; i<m_cpu_sets.size(); i++)
+  {
+    for (unsigned j = i + 1; j < m_cpu_sets.size(); j++)
+    {
+      if (m_cpu_sets[i].overlaps(m_cpu_sets[j]))
+      {
+        m_err_msg.assfmt("Overlapping cpuset's [ %s ] and [ %s ]",
+                         m_cpu_sets[i].str().c_str(),
+                         m_cpu_sets[j].str().c_str());
+        return -1;
+      }
+    }
+  }
+
+  /**
+   * Check that no cpu_sets overlap
+   */
+  for (unsigned i = 0; i < NDB_ARRAY_SIZE(m_threads); i++)
+  {
+    for (unsigned j = 0; j < m_threads[i].size(); j++)
+    {
+      if (m_threads[i][j].m_bind_type == T_Thread::B_CPU_BOUND)
+      {
+        unsigned cpu = m_threads[i][j].m_bind_no;
+        for (unsigned k = 0; k<m_cpu_sets.size(); k++)
+        {
+          if (m_cpu_sets[k].get(cpu))
+          {
+            m_err_msg.assfmt("Overlapping cpubind %u with cpuset [ %s ]",
+                             cpu,
+                             m_cpu_sets[k].str().c_str());
+
+            return -1;
+          }
+        }
+      }
+    }
+  }
+
+  /**
+   * Remove all already bound threads from LockExecuteThreadToCPU-mask
+   */
+  for (unsigned i = 0; i<m_cpu_sets.size(); i++)
+  {
+    for (unsigned j = 0; j < m_cpu_sets[i].count(); j++)
+    {
+      m_LockExecuteThreadToCPU.clear(m_cpu_sets[i].getBitNo(j));
+    }
+  }
+
+  unsigned cnt_unbound = 0;
+  for (unsigned i = 0; i < NDB_ARRAY_SIZE(m_threads); i++)
+  {
+    for (unsigned j = 0; j < m_threads[i].size(); j++)
+    {
+      if (m_threads[i][j].m_bind_type == T_Thread::B_CPU_BOUND)
+      {
+        unsigned cpu = m_threads[i][j].m_bind_no;
+        m_LockExecuteThreadToCPU.clear(cpu);
+      }
+      else if (m_threads[i][j].m_bind_type == T_Thread::B_UNBOUND)
+      {
+        cnt_unbound ++;
+      }
+    }
+  }
+
+  if (m_threads[T_MAINT][0].m_bind_type == T_Thread::B_UNBOUND)
+  {
+    /**
+     * don't count this one...
+     */
+    cnt_unbound--;
+  }
+
+  if (m_LockExecuteThreadToCPU.count())
+  {
+    /**
+     * This is old mt.cpp : setcpuaffinity
+     */
+    SparseBitmask& mask = m_LockExecuteThreadToCPU;
+    unsigned cnt = mask.count();
+    unsigned num_threads = cnt_unbound;
+    bool isMtLqh = !m_classic;
+
+    if (cnt < num_threads)
+    {
+      m_info_msg.assfmt("WARNING: Too few CPU's specified with "
+                        "LockExecuteThreadToCPU. Only %u specified "
+                        " but %u was needed, this may cause contention.\n",
+                        cnt, num_threads);
+    }
+
+    if (cnt >= num_threads)
+    {
+      m_info_msg.appfmt("Assigning each thread its own CPU\n");
+      unsigned no = 0;
+      for (unsigned i = 0; i < NDB_ARRAY_SIZE(m_threads); i++)
+      {
+        if (i == T_MAINT)
+          continue;
+        for (unsigned j = 0; j < m_threads[i].size(); j++)
+        {
+          if (m_threads[i][j].m_bind_type == T_Thread::B_UNBOUND)
+          {
+            m_threads[i][j].m_bind_type = T_Thread::B_CPU_BOUND;
+            m_threads[i][j].m_bind_no = mask.getBitNo(no);
+            no++;
+          }
+        }
+      }
+    }
+    else if (cnt == 1)
+    {
+      unsigned cpu = mask.getBitNo(0);
+      m_info_msg.appfmt("Assigning all threads to CPU %u\n", cpu);
+      for (unsigned i = 0; i < NDB_ARRAY_SIZE(m_threads); i++)
+      {
+        if (i == T_MAINT)
+          continue;
+        bind_unbound(m_threads[i], cpu);
+      }
+    }
+    else if (isMtLqh)
+    {
+      unsigned unbound_ldm = count_unbound(m_threads[T_LDM]);
+      if (cnt > unbound_ldm)
+      {
+        /**
+         * let each LQH have it's own CPU and rest share...
+         */
+        m_info_msg.append("Assigning LQH threads to dedicated CPU(s) and "
+                          "other threads will share remaining\n");
+        unsigned cpu = mask.find(0);
+        for (unsigned i = 0; i < m_threads[T_LDM].size(); i++)
+        {
+          if (m_threads[T_LDM][i].m_bind_type == T_Thread::B_UNBOUND)
+          {
+            m_threads[T_LDM][i].m_bind_type = T_Thread::B_CPU_BOUND;
+            m_threads[T_LDM][i].m_bind_no = cpu;
+            mask.clear(cpu);
+            cpu = mask.find(cpu + 1);
+          }
+        }
+
+        cpu = mask.find(0);
+        bind_unbound(m_threads[T_MAIN], cpu);
+        bind_unbound(m_threads[T_REP], cpu);
+        if ((cpu = mask.find(cpu + 1)) == mask.NotFound)
+        {
+          cpu = mask.find(0);
+        }
+        bind_unbound(m_threads[T_RECV], cpu);
+      }
+      else
+      {
+        // put receiver, tc, backup/suma in 1 thread,
+        // and round robin LQH for rest
+        unsigned cpu = mask.find(0);
+        m_info_msg.appfmt("Assigning LQH threads round robin to CPU(s) and "
+                          "other threads will share CPU %u\n", cpu);
+        bind_unbound(m_threads[T_MAIN], cpu); // TC
+        bind_unbound(m_threads[T_REP], cpu);
+        bind_unbound(m_threads[T_RECV], cpu);
+        mask.clear(cpu);
+
+        cpu = mask.find(0);
+        for (unsigned i = 0; i < m_threads[T_LDM].size(); i++)
+        {
+          if (m_threads[T_LDM][i].m_bind_type == T_Thread::B_UNBOUND)
+          {
+            m_threads[T_LDM][i].m_bind_type = T_Thread::B_CPU_BOUND;
+            m_threads[T_LDM][i].m_bind_no = cpu;
+            if ((cpu = mask.find(cpu + 1)) == mask.NotFound)
+            {
+              cpu = mask.find(0);
+            }
+          }
+        }
+      }
+    }
+    else
+    {
+      unsigned cpu = mask.find(0);
+      m_info_msg.appfmt("Assigning LQH thread to CPU %u and "
+                        "other threads will share\n", cpu);
+      bind_unbound(m_threads[T_LDM], cpu);
+      cpu = mask.find(cpu + 1);
+      bind_unbound(m_threads[T_MAIN], cpu);
+      bind_unbound(m_threads[T_RECV], cpu);
+    }
+  }
+
+  return 0;
+}
+
+unsigned
+THRConfig::count_unbound(const Vector<T_Thread>& vec) const
+{
+  unsigned cnt = 0;
+  for (unsigned i = 0; i < vec.size(); i++)
+  {
+    if (vec[i].m_bind_type == T_Thread::B_UNBOUND)
+      cnt ++;
+  }
+  return cnt;
+}
+
+void
+THRConfig::bind_unbound(Vector<T_Thread>& vec, unsigned cpu)
+{
+  for (unsigned i = 0; i < vec.size(); i++)
+  {
+    if (vec[i].m_bind_type == T_Thread::B_UNBOUND)
+    {
+      vec[i].m_bind_type = T_Thread::B_CPU_BOUND;
+      vec[i].m_bind_no = cpu;
+    }
+  }
+}
+
+int
+THRConfig::do_validate()
+{
+  /**
+   * Check that there aren't too many of any thread type
+   */
+  for (unsigned i = 0; i< NDB_ARRAY_SIZE(m_threads); i++)
+  {
+    if (m_threads[i].size() > getMaxEntries(i))
+    {
+      m_err_msg.assfmt("Too many instances(%u) of %s max supported: %u",
+                       m_threads[i].size(),
+                       getEntryName(i),
+                       getMaxEntries(i));
+      return -1;
+    }
+  }
+
+  /**
+   * LDM can be 1 2 4
+   */
+  if (m_threads[T_LDM].size() == 3)
+  {
+    m_err_msg.assfmt("No of LDM-instances can be 1,2,4. Specified: %u",
+                     m_threads[T_LDM].size());
+    return -1;
+  }
+
+  return 0;
+}
+
+const char *
+THRConfig::getConfigString()
+{
+  m_cfg_string.clear();
+  const char * sep = "";
+  for (unsigned i = 0; i < NDB_ARRAY_SIZE(m_threads); i++)
+  {
+    if (m_threads[i].size())
+    {
+      const char * name = getEntryName(i);
+      if (i != T_MAINT)
+      {
+        for (unsigned j = 0; j < m_threads[i].size(); j++)
+        {
+          m_cfg_string.append(sep);
+          sep=",";
+          m_cfg_string.append(name);
+          if (m_threads[i][j].m_bind_type != T_Thread::B_UNBOUND)
+          {
+            m_cfg_string.append("={");
+            if (m_threads[i][j].m_bind_type == T_Thread::B_CPU_BOUND)
+            {
+              m_cfg_string.appfmt("cpubind=%u", m_threads[i][j].m_bind_no);
+            }
+            else if (m_threads[i][j].m_bind_type == T_Thread::B_CPUSET_BOUND)
+            {
+              m_cfg_string.appfmt("cpuset=%s",
+                                  m_cpu_sets[m_threads[i][j].m_bind_no].str().c_str());
+            }
+            m_cfg_string.append("}");
+          }
+        }
+      }
+      else
+      {
+        for (unsigned j = 0; j < m_threads[i].size(); j++)
+        {
+          if (m_threads[i][j].m_bind_type != T_Thread::B_UNBOUND)
+          {
+            m_cfg_string.append(sep);
+            sep=",";
+            m_cfg_string.append(name);
+            m_cfg_string.append("={");
+            if (m_threads[i][j].m_bind_type == T_Thread::B_CPU_BOUND)
+            {
+              m_cfg_string.appfmt("cpubind=%u", m_threads[i][j].m_bind_no);
+            }
+            else if (m_threads[i][j].m_bind_type == T_Thread::B_CPUSET_BOUND)
+            {
+              m_cfg_string.appfmt("cpuset=%s",
+                                  m_cpu_sets[m_threads[i][j].m_bind_no].str().c_str());
+            }
+            m_cfg_string.append("}");
+          }
+        }
+      }
+    }
+  }
+  return m_cfg_string.c_str();
+}
+
+Uint32
+THRConfig::getThreadCount() const
+{
+  // Note! not counting T_MAINT
+  Uint32 cnt = 0;
+  for (Uint32 i = 0; i < NDB_ARRAY_SIZE(m_threads); i++)
+  {
+    if (i != T_MAINT)
+    {
+      cnt += m_threads[i].size();
+    }
+  }
+  return cnt;
+}
+
+Uint32
+THRConfig::getThreadCount(T_Type type) const
+{
+  for (Uint32 i = 0; i < NDB_ARRAY_SIZE(m_threads); i++)
+  {
+    if (i == (Uint32)type)
+    {
+      return m_threads[i].size();
+    }
+  }
+  return 0;
+}
+
+const char *
+THRConfig::getErrorMessage() const
+{
+  return m_err_msg.c_str();
+}
+
+const char *
+THRConfig::getInfoMessage() const
+{
+  return m_info_msg.c_str();
+}
+
+static
+char *
+skipblank(char * str)
+{
+  while (isspace(* str))
+    str++;
+  return str;
+}
+
+Uint32
+THRConfig::find_type(char *& str)
+{
+  str = skipblank(str);
+
+  char * name = str;
+  if (* name == 0)
+  {
+    m_err_msg.assfmt("empty thread specification");
+    return 0;
+  }
+  char * end = name;
+  while(isalpha(* end))
+    end++;
+
+  char save = * end;
+  * end = 0;
+  Uint32 t = getEntryType(name);
+  if (t == T_END)
+  {
+    m_err_msg.assfmt("unknown thread type '%s'", name);
+  }
+  * end = save;
+  str = end;
+  return t;
+}
+
+struct ParamValue
+{
+  ParamValue() { found = false;}
+  bool found;
+  const char * string_val;
+  unsigned unsigned_val;
+  SparseBitmask mask_val;
+};
+
+static
+int
+parseUnsigned(char *& str, unsigned * dst)
+{
+  str = skipblank(str);
+  char * endptr = 0;
+  errno = 0;
+  long val = strtol(str, &endptr, 0);
+  if (errno == ERANGE)
+    return -1;
+  if (val < 0 || Int64(val) > 0xFFFFFFFF)
+    return -1;
+  if (endptr == str)
+    return -1;
+  str = endptr;
+  *dst = (unsigned)val;
+  return 0;
+}
+
+static
+int
+parseBitmask(char *& str, SparseBitmask * mask)
+{
+  str = skipblank(str);
+  size_t len = strspn(str, "0123456789-, ");
+  if (len == 0)
+    return -1;
+
+  while (isspace(str[len-1]))
+    len--;
+  if (str[len-1] == ',')
+    len--;
+  char save = str[len];
+  str[len] = 0;
+  int res = parse_mask(str, *mask);
+  str[len] = save;
+  str = str + len;
+  return res;
+}
+
+static
+int
+parseParams(char * str, ParamValue values[], BaseString& err)
+{
+  const char * const save = str;
+  while (* str)
+  {
+    str = skipblank(str);
+
+    unsigned idx = 0;
+    for (; idx < NDB_ARRAY_SIZE(m_params); idx++)
+    {
+      if (strncasecmp(str, m_params[idx].name, strlen(m_params[idx].name)) == 0)
+      {
+        str += strlen(m_params[idx].name);
+        break;
+      }
+    }
+
+    if (idx == NDB_ARRAY_SIZE(m_params))
+    {
+      err.assfmt("Unknown param near: '%s'", str);
+      return -1;
+    }
+
+    if (values[idx].found == true)
+    {
+      err.assfmt("Param '%s' found twice", m_params[idx].name);
+      return -1;
+    }
+
+    str = skipblank(str);
+    if (* str != '=')
+    {
+      err.assfmt("Missing '=' after %s in '%s'", m_params[idx].name, save);
+      return -1;
+    }
+    str++;
+    str = skipblank(str);
+
+    int res = 0;
+    switch(m_params[idx].type){
+    case THRConfig::Param::S_UNSIGNED:
+      res = parseUnsigned(str, &values[idx].unsigned_val);
+      break;
+    case THRConfig::Param::S_BITMASK:
+      res = parseBitmask(str, &values[idx].mask_val);
+      break;
+    default:
+      err.assfmt("Internal error, unknown type for param: '%s'",
+                 m_params[idx].name);
+      return -1;
+    }
+    if (res == -1)
+    {
+      err.assfmt("Unable to parse %s=%s", m_params[idx].name, str);
+      return -1;
+    }
+    values[idx].found = true;
+    str = skipblank(str);
+
+    if (* str == 0)
+      break;
+
+    if (* str != ',')
+    {
+      err.assfmt("Unable to parse near '%s'", str);
+      return -1;
+    }
+    str++;
+  }
+  return 0;
+}
+
+int
+THRConfig::find_spec(char *& str, T_Type type)
+{
+  str = skipblank(str);
+
+  switch(* str){
+  case ',':
+  case 0:
+    add(type);
+    return 0;
+  }
+
+  if (* str != '=')
+  {
+err:
+    int len = (int)strlen(str);
+    m_err_msg.assfmt("Invalid format near: '%.*s'",
+                     (len > 10) ? 10 : len, str);
+    return -1;
+  }
+
+  str++; // skip over =
+  str = skipblank(str);
+
+  if (* str != '{')
+  {
+    goto err;
+  }
+
+  str++;
+  char * start = str;
+
+  /**
+   * Find end
+   */
+  while (* str && (* str) != '}')
+    str++;
+
+  if (* str != '}')
+  {
+    goto err;
+  }
+
+  char * end = str;
+  char save = * end;
+  * end = 0;
+
+  ParamValue values[NDB_ARRAY_SIZE(m_params)];
+  values[IX_COUNT].unsigned_val = 1;
+  int res = parseParams(start, values, m_err_msg);
+  * end = save;
+
+  if (res != 0)
+  {
+    return -1;
+  }
+
+  if (values[IX_CPUBOUND].found && values[IX_CPUSET].found)
+  {
+    m_err_msg.assfmt("Both cpuset and cpubind specified!");
+    return -1;
+  }
+
+  unsigned cnt = values[IX_COUNT].unsigned_val;
+  const int index = m_threads[type].size();
+  for (unsigned i = 0; i < cnt; i++)
+  {
+    add(type);
+  }
+
+  assert(m_threads[type].size() == index + cnt);
+  if (values[IX_CPUSET].found)
+  {
+    SparseBitmask & mask = values[IX_CPUSET].mask_val;
+    unsigned no = createCpuSet(mask);
+    for (unsigned i = 0; i < cnt; i++)
+    {
+      m_threads[type][index+i].m_bind_type = T_Thread::B_CPUSET_BOUND;
+      m_threads[type][index+i].m_bind_no = no;
+    }
+  }
+  else if (values[IX_CPUBOUND].found)
+  {
+    SparseBitmask & mask = values[IX_CPUBOUND].mask_val;
+    if (mask.count() < cnt)
+    {
+      m_err_msg.assfmt("%s: trying to bind %u threads to %u cpus [%s]",
+                       getEntryName(type),
+                       cnt,
+                       mask.count(),
+                       mask.str().c_str());
+      return -1;
+    }
+    for (unsigned i = 0; i < cnt; i++)
+    {
+      m_threads[type][index+i].m_bind_type = T_Thread::B_CPU_BOUND;
+      m_threads[type][index+i].m_bind_no = mask.getBitNo(i % mask.count());
+    }
+  }
+
+  str++; // skip over }
+  return 0;
+}
+
+int
+THRConfig::find_next(char *& str)
+{
+  str = skipblank(str);
+
+  if (* str == 0)
+  {
+    return 0;
+  }
+  else if (* str == ',')
+  {
+    str++;
+    return 1;
+  }
+
+  int len = (int)strlen(str);
+  m_err_msg.assfmt("Invalid format near: '%.*s'",
+                   (len > 10) ? 10 : len, str);
+  return -1;
+}
+
+int
+THRConfig::do_parse(const char * ThreadConfig)
+{
+  BaseString str(ThreadConfig);
+  char * ptr = (char*)str.c_str();
+  while (* ptr)
+  {
+    Uint32 type = find_type(ptr);
+    if (type == T_END)
+      return -1;
+
+    if (find_spec(ptr, (T_Type)type) < 0)
+      return -1;
+
+    int ret = find_next(ptr);
+    if (ret < 0)
+      return ret;
+
+    if (ret == 0)
+      break;
+  }
+
+  for (Uint32 i = 0; i < T_END; i++)
+  {
+    while (m_threads[i].size() < m_entries[i].m_min_cnt)
+      add((T_Type)i);
+  }
+
+  return do_bindings() || do_validate();
+}
+
+unsigned
+THRConfig::createCpuSet(const SparseBitmask& mask)
+{
+  for (unsigned i = 0; i < m_cpu_sets.size(); i++)
+    if (m_cpu_sets[i].equal(mask))
+      return i;
+
+  m_cpu_sets.push_back(mask);
+  return m_cpu_sets.size() - 1;
+}
+
+template class Vector<SparseBitmask>;
+template class Vector<THRConfig::T_Thread>;
+
+#ifdef TEST_MT_THR_CONFIG
+
+#include <NdbTap.hpp>
+
+TAPTEST(mt_thr_config)
+{
+  {
+    THRConfig tmp;
+    OK(tmp.do_parse(8, 0, 0) == 0);
+  }
+
+  /**
+   * BASIC test
+   */
+  {
+    const char * ok[] =
+      {
+        "ldm,ldm",
+        "ldm={count=3},ldm",
+        "ldm={cpubind=1-2,5,count=3},ldm",
+        "ldm={ cpubind = 1- 2, 5 , count = 3 },ldm",
+        "ldm={count=3,cpubind=1-2,5 },  ldm",
+        "ldm={cpuset=1-3,count=3 },ldm",
+        "main,ldm={},ldm",
+        0
+      };
+
+    const char * fail [] =
+      {
+        "ldm,ldm,ldm",
+        "ldm={cpubind= 1 , cpuset=2 },ldm",
+        "ldm={count=4,cpubind=1-3},ldm",
+        "main,main,ldm,ldm",
+        "main={ keso=88, count=23},ldm,ldm",
+        "main={ cpuset=1-3 }, ldm={cpuset=3-4}",
+        "main={ cpuset=1-3 }, ldm={cpubind=2}",
+        0
+      };
+
+    for (Uint32 i = 0; ok[i]; i++)
+    {
+      THRConfig tmp;
+      int res = tmp.do_parse(ok[i]);
+      printf("do_parse(%s) => %s - %s\n", ok[i],
+             res == 0 ? "OK" : "FAIL",
+             res == 0 ? "" : tmp.getErrorMessage());
+      OK(res == 0);
+      {
+        BaseString out(tmp.getConfigString());
+        THRConfig check;
+        OK(check.do_parse(out.c_str()) == 0);
+        OK(strcmp(out.c_str(), check.getConfigString()) == 0);
+      }
+    }
+
+    for (Uint32 i = 0; fail[i]; i++)
+    {
+      THRConfig tmp;
+      int res = tmp.do_parse(fail[i]);
+      printf("do_parse(%s) => %s - %s\n", fail[i],
+             res == 0 ? "OK" : "FAIL",
+             res == 0 ? "" : tmp.getErrorMessage());
+      OK(res != 0);
+    }
+  }
+
+  {
+    /**
+     * Test interaction with LockExecuteThreadToCPU
+     */
+    const char * t[] =
+    {
+      /** threads, LockExecuteThreadToCPU, answer */
+      "1-8",
+      "ldm={count=4}",
+      "main={cpubind=1},ldm={cpubind=2},ldm={cpubind=3},ldm={cpubind=4},ldm={cpubind=5},recv={cpubind=6},rep={cpubind=7}",
+
+      "1-5",
+      "ldm={count=4}",
+      "main={cpubind=5},ldm={cpubind=1},ldm={cpubind=2},ldm={cpubind=3},ldm={cpubind=4},recv={cpubind=5},rep={cpubind=5}",
+
+      "1-3",
+      "ldm={count=4}",
+      "main={cpubind=1},ldm={cpubind=2},ldm={cpubind=3},ldm={cpubind=2},ldm={cpubind=3},recv={cpubind=1},rep={cpubind=1}",
+
+      "1-4",
+      "ldm={count=4}",
+      "main={cpubind=1},ldm={cpubind=2},ldm={cpubind=3},ldm={cpubind=4},ldm={cpubind=2},recv={cpubind=1},rep={cpubind=1}",
+
+      "1-8",
+      "ldm={count=4},maint={cpubind=8}",
+      "main={cpubind=1},ldm={cpubind=2},ldm={cpubind=3},ldm={cpubind=4},ldm={cpubind=5},recv={cpubind=6},rep={cpubind=7},maint={cpubind=8}",
+
+      "1-8",
+      "ldm={count=4,cpubind=1,4,5,6}",
+      "main={cpubind=2},ldm={cpubind=1},ldm={cpubind=4},ldm={cpubind=5},ldm={cpubind=6},recv={cpubind=3},rep={cpubind=7}",
+
+      // END
+      0
+    };
+
+    for (unsigned i = 0; t[i]; i+= 3)
+    {
+      THRConfig tmp;
+      tmp.setLockExecuteThreadToCPU(t[i+0]);
+      int res = tmp.do_parse(t[i+1]);
+      int ok = strcmp(tmp.getConfigString(), t[i+2]) == 0;
+      printf("mask: %s conf: %s => %s(%s) - %s - %s\n",
+             t[i+0],
+             t[i+1],
+             res == 0 ? "OK" : "FAIL",
+             res == 0 ? "" : tmp.getErrorMessage(),
+             tmp.getConfigString(),
+             ok == 1 ? "CORRECT" : "INCORRECT");
+      OK(res == 0);
+      OK(ok == 1);
+    }
+  }
+
+  return 1;
+}
+
+#endif

=== added file 'storage/ndb/src/kernel/vm/mt_thr_config.hpp'
--- a/storage/ndb/src/kernel/vm/mt_thr_config.hpp	1970-01-01 00:00:00 +0000
+++ b/storage/ndb/src/kernel/vm/mt_thr_config.hpp	2011-08-30 12:00:48 +0000
@@ -0,0 +1,128 @@
+/*
+   Copyright (c) 2011, Oracle and/or its affiliates. All rights reserved.
+
+   This program is free software; you can redistribute it and/or modify
+   it under the terms of the GNU General Public License as published by
+   the Free Software Foundation; version 2 of the License.
+
+   This program is distributed in the hope that it will be useful,
+   but WITHOUT ANY WARRANTY; without even the implied warranty of
+   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+   GNU General Public License for more details.
+
+   You should have received a copy of the GNU General Public License
+   along with this program; if not, write to the Free Software
+   Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA
+*/
+
+#ifndef THRConfig_H
+#define THRConfig_H
+
+struct NdbThread;
+#include <Vector.hpp>
+#include <SparseBitmask.hpp>
+#include <BaseString.hpp>
+
+/**
+ * This class contains thread configuration
+ *   it supports parsing the ThreadConfig parameter
+ *   and handling LockExecuteThreadToCPU etc...
+ *
+ * This is used in ndb_mgmd when verifying configuration
+ *   and by ndbmtd
+ *
+ * TAP-TESTS are provided in mt_thr_config.cpp
+ */
+class THRConfig
+{
+public:
+  enum T_Type
+  {
+    T_MAIN  = 0, /* DIH/QMGR/TC/SPJ etc */
+    T_LDM   = 1, /* LQH/ACC/TUP/TUX etc */
+    T_RECV  = 2, /* CMVMI */
+    T_REP   = 3, /* SUMA */
+    T_MAINT = 4, /* FS, SocketServer etc */
+
+    T_END  = 5
+  };
+
+  THRConfig();
+  ~THRConfig();
+
+  // NOTE: needs to be called before do_parse
+  int setLockExecuteThreadToCPU(const char * val);
+  int setLockMaintThreadsToCPU(unsigned val);
+
+  int do_parse(const char * ThreadConfig);
+  int do_parse(unsigned MaxNoOfExecutionThreads,
+               unsigned __ndbmt_lqh_workers,
+               unsigned __ndbmt_classic);
+
+  const char * getConfigString();
+
+  const char * getErrorMessage() const;
+  const char * getInfoMessage() const;
+
+  Uint32 getThreadCount() const; // Don't count FS/MAINT thread
+  Uint32 getThreadCount(T_Type) const;
+  Uint32 getMtClassic() const { return m_classic; }
+private:
+  struct T_Thread
+  {
+    unsigned m_type;
+    unsigned m_no; // within type
+    enum BType { B_UNBOUND, B_CPU_BOUND, B_CPUSET_BOUND } m_bind_type;
+    unsigned m_bind_no; // cpu_no/cpuset_no
+  };
+  bool m_classic;
+  SparseBitmask m_LockExecuteThreadToCPU;
+  SparseBitmask m_LockMaintThreadsToCPU;
+  Vector<SparseBitmask> m_cpu_sets;
+  Vector<T_Thread> m_threads[T_END];
+
+  BaseString m_err_msg;
+  BaseString m_info_msg;
+  BaseString m_cfg_string;
+  BaseString m_print_string;
+
+  void add(T_Type);
+  Uint32 find_type(char *&);
+  int find_spec(char *&, T_Type);
+  int find_next(char *&);
+
+  unsigned createCpuSet(const SparseBitmask&);
+  int do_bindings();
+  int do_validate();
+
+  unsigned count_unbound(const Vector<T_Thread>& vec) const;
+  void bind_unbound(Vector<T_Thread> & vec, unsigned cpu);
+
+public:
+  struct Entries
+  {
+    const char * m_name;
+    unsigned m_type;
+    unsigned m_min_cnt;
+    unsigned m_max_cnt;
+  };
+
+  struct Param
+  {
+    const char * name;
+    enum { S_UNSIGNED, S_BITMASK } type;
+  };
+};
+
+/**
+ * This class is used by ndbmtd
+ *   when setting up threads (and locking)
+ */
+class THRConfigApplier : public THRConfig
+{
+public:
+  int create_cpusets();
+  int do_bind(unsigned t_type, unsigned no, NdbThread*);
+};
+
+#endif // IPCConfig_H

=== modified file 'storage/ndb/src/ndbapi/NdbQueryOperation.cpp'
--- a/storage/ndb/src/ndbapi/NdbQueryOperation.cpp	2011-08-18 11:47:22 +0000
+++ b/storage/ndb/src/ndbapi/NdbQueryOperation.cpp	2011-08-22 12:56:56 +0000
@@ -57,6 +57,7 @@ static const int Err_FunctionNotImplemen
 static const int Err_UnknownColumn = 4004;
 static const int Err_ReceiveTimedOut = 4008;
 static const int Err_NodeFailCausedAbort = 4028;
+static const int Err_ParameterError = 4118;
 static const int Err_SimpleDirtyReadFailed = 4119;
 static const int Err_WrongFieldLength = 4209;
 static const int Err_ReadTooMuch = 4257;
@@ -72,6 +73,21 @@ static const Uint16 tupleNotFound = 0xff
 /** Set to true to trace incomming signals.*/
 const bool traceSignals = false;
 
+enum
+{
+  /**
+   * Set NdbQueryOperationImpl::m_parallelism to this value to indicate that
+   * scan parallelism should be adaptive.
+   */
+  Parallelism_adaptive = 0xffff0000,
+
+  /**
+   * Set NdbQueryOperationImpl::m_parallelism to this value to indicate that
+   * all fragments should be scanned in parallel.
+   */
+  Parallelism_max = 0xffff0001
+};
+
 /**
  * A class for accessing the correlation data at the end of a tuple (for 
  * scan queries). These data have the following layout:
@@ -358,9 +374,6 @@ public:
   Uint32 getRowCount() const
   { return m_rowCount; }
 
-  char* getRow(Uint32 tupleNo) const
-  { return (m_buffer + (tupleNo*m_rowSize)); }
-
 private:
   /** No copying.*/
   NdbResultSet(const NdbResultSet&);
@@ -372,6 +385,9 @@ private:
   /** Used for checking if buffer overrun occurred. */
   Uint32* m_batchOverflowCheck;
 
+  /** Array of TupleCorrelations for all rows in m_buffer */
+  TupleCorrelation* m_correlations;
+
   Uint32 m_rowSize;
 
   /** The current #rows in 'm_buffer'.*/
@@ -415,8 +431,8 @@ public:
   const NdbReceiver& getReceiver() const
   { return m_receiver; }
 
-  const char* getCurrentRow() const
-  { return m_receiver.peek_row(); }
+  const char* getCurrentRow()
+  { return m_receiver.get_row(); }
 
   /**
    * Process an incomming tuple for this stream. Extract parent and own tuple 
@@ -653,6 +669,7 @@ void* NdbBulkAllocator::allocObjMem(Uint
 NdbResultSet::NdbResultSet() :
   m_buffer(NULL),
   m_batchOverflowCheck(NULL),
+  m_correlations(NULL),
   m_rowSize(0),
   m_rowCount(0)
 {}
@@ -672,6 +689,12 @@ NdbResultSet::init(NdbQueryImpl& query,
     m_batchOverflowCheck = 
       reinterpret_cast<Uint32*>(bufferAlloc.allocObjMem(sizeof(Uint32)));
     *m_batchOverflowCheck = 0xacbd1234;
+
+    if (query.getQueryDef().isScanQuery())
+    {
+      m_correlations = reinterpret_cast<TupleCorrelation*>
+                         (bufferAlloc.allocObjMem(maxRows*sizeof(TupleCorrelation)));
+    }
   }
 }
 
@@ -856,19 +879,17 @@ void
 NdbResultStream::execTRANSID_AI(const Uint32 *ptr, Uint32 len,
                                 TupleCorrelation correlation)
 {
-  m_receiver.execTRANSID_AI(ptr, len);
-  m_resultSets[m_recv].m_rowCount++;
-
+  NdbResultSet& receiveSet = m_resultSets[m_recv];
   if (isScanQuery())
   {
     /**
-     * Store TupleCorrelation as hidden value imm. after received row
-     * (NdbQueryOperationImpl::getRowSize() has reserved space for it)
+     * Store TupleCorrelation.
      */
-    Uint32* row_recv = reinterpret_cast<Uint32*>
-                         (m_receiver.m_record.m_row_recv);
-    row_recv[-1] = correlation.toUint32();
+    receiveSet.m_correlations[receiveSet.m_rowCount] = correlation;
   }
+
+  m_receiver.execTRANSID_AI(ptr, len);
+  receiveSet.m_rowCount++;
 } // NdbResultStream::execTRANSID_AI()
 
 /**
@@ -994,8 +1015,8 @@ NdbResultStream::prepareResultSet(Uint32
 
 /**
  * Fill m_tupleSet[] with correlation data between parent 
- * and child tuples. The 'TupleCorrelation' is stored as
- * and extra Uint32 after each row in the NdbResultSet
+ * and child tuples. The 'TupleCorrelation' is stored
+ * in an array of TupleCorrelations in each ResultSet
  * by execTRANSID_AI().
  *
  * NOTE: In order to reduce work done when holding the 
@@ -1024,12 +1045,9 @@ NdbResultStream::buildResultCorrelations
     /* Rebuild correlation & hashmap from 'readResult' */
     for (Uint32 tupleNo=0; tupleNo<readResult.m_rowCount; tupleNo++)
     {
-      const Uint32* row = (Uint32*)readResult.getRow(tupleNo+1);
-      const TupleCorrelation correlation(row[-1]);
-
-      const Uint16 tupleId  = correlation.getTupleId();
+      const Uint16 tupleId  = readResult.m_correlations[tupleNo].getTupleId();
       const Uint16 parentId = (m_parent!=NULL) 
-                                ? correlation.getParentTupleId()
+                                ? readResult.m_correlations[tupleNo].getParentTupleId()
                                 : tupleNotFound;
 
       m_tupleSet[tupleNo].m_skip     = false;
@@ -1490,6 +1508,14 @@ int NdbQueryOperation::setParallelism(Ui
   return m_impl.setParallelism(parallelism);
 }
 
+int NdbQueryOperation::setMaxParallelism(){
+  return m_impl.setMaxParallelism();
+}
+
+int NdbQueryOperation::setAdaptiveParallelism(){
+  return m_impl.setAdaptiveParallelism();
+}
+
 int NdbQueryOperation::setBatchSize(Uint32 batchSize){
   return m_impl.setBatchSize(batchSize);
 }
@@ -2623,16 +2649,17 @@ NdbQueryImpl::prepareSend()
   {
     /* For the first batch, we read from all fragments for both ordered 
      * and unordered scans.*/
-    if (getQueryOperation(0U).m_parallelism > 0)
+    if (getQueryOperation(0U).m_parallelism == Parallelism_max)
     {
       m_rootFragCount
-        = MIN(getRoot().getQueryOperationDef().getTable().getFragmentCount(),
-              getQueryOperation(0U).m_parallelism);
+        = getRoot().getQueryOperationDef().getTable().getFragmentCount();
     }
     else
     {
+      assert(getQueryOperation(0U).m_parallelism != Parallelism_adaptive);
       m_rootFragCount
-        = getRoot().getQueryOperationDef().getTable().getFragmentCount();
+        = MIN(getRoot().getQueryOperationDef().getTable().getFragmentCount(),
+              getQueryOperation(0U).m_parallelism);
     }
     Ndb* const ndb = m_transaction.getNdb();
 
@@ -2682,14 +2709,13 @@ NdbQueryImpl::prepareSend()
   for (Uint32 opNo = 0; opNo < getNoOfOperations(); opNo++)
   {
     const NdbQueryOperationImpl& op = getQueryOperation(opNo);
+    // Add space for m_correlations, m_buffer & m_batchOverflowCheck
+    totalBuffSize += (sizeof(TupleCorrelation) * op.getMaxBatchRows());
     totalBuffSize += (op.getRowSize() * op.getMaxBatchRows());
+    totalBuffSize += sizeof(Uint32); // Overflow check
   }
-  /**
-   * Add one word per ResultStream for buffer overrun check. We add a word
-   * rather than a byte to avoid possible alignment problems.
-   */
-  m_rowBufferAlloc.init(m_rootFragCount * 
-                       (totalBuffSize + (sizeof(Uint32) * getNoOfOperations())) );
+
+  m_rowBufferAlloc.init(m_rootFragCount * totalBuffSize);
   if (getQueryDef().isScanQuery())
   {
     Uint32 totalRows = 0;
@@ -3455,8 +3481,6 @@ NdbQueryImpl::OrderedFragSet::getCurrent
 { 
   if (m_ordering!=NdbQueryOptions::ScanOrdering_unordered)
   {
-    // Results should be ordered.
-    assert(verifySortOrder());
     /** 
      * Must have tuples for each (non-completed) fragment when doing ordered
      * scan.
@@ -3667,7 +3691,8 @@ NdbQueryOperationImpl::NdbQueryOperation
   m_ordering(NdbQueryOptions::ScanOrdering_unordered),
   m_interpretedCode(NULL),
   m_diskInUserProjection(false),
-  m_parallelism(0),
+  m_parallelism(def.getQueryOperationIx() == 0
+                ? Parallelism_max : Parallelism_adaptive),
   m_rowSize(0xffffffff)
 { 
   if (errno == ENOMEM)
@@ -4266,9 +4291,10 @@ NdbQueryOperationImpl
                                       m_ndbRecord,
                                       m_firstRecAttr,
                                       0, // Key size.
-                                      getRoot().m_parallelism > 0 ?
-                                      getRoot().m_parallelism :
-                                      m_queryImpl.getRootFragCount(),
+                                      getRoot().m_parallelism
+                                      == Parallelism_max ?
+                                      m_queryImpl.getRootFragCount() :
+                                      getRoot().m_parallelism,
                                       maxBatchRows,
                                       batchByteSize,
                                       firstBatchRows);
@@ -4454,7 +4480,12 @@ NdbQueryOperationImpl::prepareAttrInfo(U
                                       firstBatchRows);
     assert(batchRows==getMaxBatchRows());
     assert(batchRows==firstBatchRows);
-    requestInfo |= QN_ScanIndexParameters::SIP_PARALLEL; // FIXME: SPJ always assume. SIP_PARALLEL
+    assert(m_parallelism == Parallelism_max ||
+           m_parallelism == Parallelism_adaptive);
+    if (m_parallelism == Parallelism_max)
+    {
+      requestInfo |= QN_ScanIndexParameters::SIP_PARALLEL;
+    }
     param->requestInfo = requestInfo; 
     param->batchSize = ((Uint16)batchByteSize << 16) | (Uint16)firstBatchRows;
     param->resultData = getIdOfReceiver();
@@ -4786,7 +4817,7 @@ NdbQueryOperationImpl::execTRANSID_AI(co
   if (getQueryDef().isScanQuery())
   {
     const CorrelationData correlData(ptr, len);
-    const Uint32 receiverId = CorrelationData(ptr, len).getRootReceiverId();
+    const Uint32 receiverId = correlData.getRootReceiverId();
     
     /** receiverId holds the Id of the receiver of the corresponding stream
      * of the root operation. We can thus find the correct root fragment 
@@ -4958,7 +4989,7 @@ NdbQueryOperationImpl::setOrdering(NdbQu
     return -1;
   }
 
-  if (m_parallelism != 0)
+  if (m_parallelism != Parallelism_max)
   {
     getQuery().setErrorCode(QRY_SEQUENTIAL_SCAN_SORTED);
     return -1;
@@ -5053,10 +5084,40 @@ int NdbQueryOperationImpl::setParallelis
     getQuery().setErrorCode(Err_FunctionNotImplemented);
     return -1;
   }
+  else if (parallelism < 1 || parallelism > MAX_NDB_PARTITIONS)
+  {
+    getQuery().setErrorCode(Err_ParameterError);
+    return -1;
+  }
   m_parallelism = parallelism;
   return 0;
 }
 
+int NdbQueryOperationImpl::setMaxParallelism(){
+  if (!getQueryOperationDef().isScanOperation())
+  {
+    getQuery().setErrorCode(QRY_WRONG_OPERATION_TYPE);
+    return -1;
+  }
+  m_parallelism = Parallelism_max;
+  return 0;
+}
+
+int NdbQueryOperationImpl::setAdaptiveParallelism(){
+  if (!getQueryOperationDef().isScanOperation())
+  {
+    getQuery().setErrorCode(QRY_WRONG_OPERATION_TYPE);
+    return -1;
+  }
+  else if (getQueryOperationDef().getQueryOperationIx() == 0)
+  {
+    getQuery().setErrorCode(Err_FunctionNotImplemented);
+    return -1;
+  }
+  m_parallelism = Parallelism_adaptive;
+  return 0;
+}
+
 int NdbQueryOperationImpl::setBatchSize(Uint32 batchSize){
   if (!getQueryOperationDef().isScanOperation())
   {
@@ -5124,12 +5185,6 @@ Uint32 NdbQueryOperationImpl::getRowSize
   {
     m_rowSize = 
       NdbReceiver::ndbrecord_rowsize(m_ndbRecord, m_firstRecAttr, 0, false);
-
-    const bool withCorrelation = getRoot().getQueryDef().isScanQuery();
-    if (withCorrelation)
-    {
-      m_rowSize += TupleCorrelation::wordCount*sizeof(Uint32);
-    }
   }
   return m_rowSize;
 }

=== modified file 'storage/ndb/src/ndbapi/NdbQueryOperation.hpp'
--- a/storage/ndb/src/ndbapi/NdbQueryOperation.hpp	2011-06-20 13:25:48 +0000
+++ b/storage/ndb/src/ndbapi/NdbQueryOperation.hpp	2011-08-22 08:35:35 +0000
@@ -335,13 +335,29 @@ public:
   NdbQueryOptions::ScanOrdering getOrdering() const;
 
   /** 
-   * Set the number of fragments to be scanned in parallel for the root 
-   * operation of this query. This only applies to table scans and non-sorted
-   * scans of ordered indexes.
+   * Set the number of fragments to be scanned in parallel. This only applies
+   * to table scans and non-sorted scans of ordered indexes. This method is
+   * only implemented for then root scan operation.
    * @return 0 if ok, -1 in case of error (call getNdbError() for details.)
    */
   int setParallelism(Uint32 parallelism);
 
+  /**
+   * Set the number of fragments to be scanned in parallel to the maximum
+   * possible value. This is the default for the root scan operation.
+   * @return 0 if ok, -1 in case of error (call getNdbError() for details.)
+   */
+  int setMaxParallelism();
+
+  /**
+   * Let the system dynamically choose the number of fragments to scan in
+   * parallel. The system will try to choose a value that gives optimal
+   * performance. This is the default for all scans but the root scan. This
+   * method only implemented for non-root scan operations.
+   * @return 0 if ok, -1 in case of error (call getNdbError() for details.)
+   */
+  int setAdaptiveParallelism();
+
   /** Set the batch size (max rows per batch) for this operation. This
    * only applies to scan operations, as lookup operations always will
    * have the same batch size as its parent operation, or 1 if it is the

=== modified file 'storage/ndb/src/ndbapi/NdbQueryOperationImpl.hpp'
--- a/storage/ndb/src/ndbapi/NdbQueryOperationImpl.hpp	2011-08-17 13:16:22 +0000
+++ b/storage/ndb/src/ndbapi/NdbQueryOperationImpl.hpp	2011-08-22 08:35:35 +0000
@@ -661,14 +661,30 @@ public:
   NdbQueryOptions::ScanOrdering getOrdering() const
   { return m_ordering; }
 
-   /** 
-   * Set the number of fragments to be scanned in parallel for the root 
-   * operation of this query. This only applies to table scans and non-sorted
-   * scans of ordered indexes.
+  /**
+   * Set the number of fragments to be scanned in parallel. This only applies
+   * to table scans and non-sorted scans of ordered indexes. This method is
+   * only implemented for then root scan operation.
    * @return 0 if ok, -1 in case of error (call getNdbError() for details.)
    */
   int setParallelism(Uint32 parallelism);
 
+  /**
+   * Set the number of fragments to be scanned in parallel to the maximum
+   * possible value. This is the default for the root scan operation.
+   * @return 0 if ok, -1 in case of error (call getNdbError() for details.)
+   */
+  int setMaxParallelism();
+
+  /**
+   * Let the system dynamically choose the number of fragments to scan in
+   * parallel. The system will try to choose a value that gives optimal
+   * performance. This is the default for all scans but the root scan. This
+   * method only implemented for non-root scan operations.
+   * @return 0 if ok, -1 in case of error (call getNdbError() for details.)
+   */
+  int setAdaptiveParallelism();
+
   /** Set the batch size (max rows per batch) for this operation. This
    * only applies to scan operations, as lookup operations always will
    * have the same batch size as its parent operation, or 1 if it is the

No bundle (reason: useless for push emails).
Thread
bzr push into mysql-5.1-telco-7.0 branch (pekka.nousiainen:4471 to 4490) Pekka Nousiainen31 Aug