=== modified file 'storage/ndb/include/ndbapi/NdbIndexScanOperation.hpp'
--- a/storage/ndb/include/ndbapi/NdbIndexScanOperation.hpp	2008-08-07 06:24:52 +0000
+++ b/storage/ndb/include/ndbapi/NdbIndexScanOperation.hpp	2008-08-14 12:10:55 +0000
@@ -146,17 +146,24 @@
 #endif
   int setBound(Uint32 anAttrId, int type, const void* aValue);
 
-#ifndef DOXYGEN_SHOULD_SKIP_DEPRECATED
   /**
-   * This method is not required and is deprecated.
-   * To perform an Index Scan with multiple batched bounds, use the 
-   * NdbRecord scanIndex() API.
-   * For an old Api Index scan with a single set of bounds, this call 
-   * is not necessary.
-   * Range numbers greater than zero are considered an error.
+   * This method is called to separate sets of bounds (ranges) when 
+   * defining an Index Scan with multiple ranges
+   * It can only be used with scans defined using the SF_MultiRange
+   * scan flag.
+   * For NdbRecord, ranges are specified using the IndexBound structure
+   * and the setBound() API.
+   * If an index scan has more than one range then end_of_bound must be 
+   * called after every range, including the last.
+   * If the SF_ReadRangeNo flag is set then the range_no supplied when
+   * the range is defined will be associated with each row returned from
+   * that range.  This can be obtained by calling get_range_no().
+   * If SF_ReadRangeNo and SF_OrderBy flags are provided then range_no
+   * values must be strictly increasing (i.e. starting at zero and 
+   * getting larger by 1 for each range specified).  This is to ensure 
+   * that rows are returned in order.
    */
   int end_of_bound(Uint32 range_no= 0);
-#endif
 
   /**
    * Return range number for current row, as defined in the IndexBound
@@ -223,8 +230,6 @@
   NdbIndexScanOperation(Ndb* aNdb);
   virtual ~NdbIndexScanOperation();
   
-  void initScanBoundStorageOldApi();
-
   int processIndexScanDefs(LockMode lm,
                            Uint32 scan_flags,
                            Uint32 parallel,
@@ -240,15 +245,37 @@
   /* Structure used to collect information about an IndexBound
    * as it is provided by the old Api setBound() calls
    */
-  struct OldApiScanBoundInfo
+  struct OldApiBoundInfo
   {
     Uint32 highestKey;
     bool highestSoFarIsStrict;
     Uint32 keysPresentBitmap;
-    NdbRecAttr *keyRecAttr;
-  };
-
-  int setBoundHelperOldApi(OldApiScanBoundInfo& boundInfo,
+    char* key;
+  };
+
+  struct OldApiScanRangeDefinition
+  {
+    /* OldApiBoundInfo used during definition
+     * IndexBound used once bound defined
+     * Todo : For heavy old Api use, consider splitting
+     * to allow NdbRecAttr use without malloc
+     */
+    union {
+      struct {
+        OldApiBoundInfo lowBound;
+        OldApiBoundInfo highBound;
+      };
+
+      IndexBound ib;
+    };
+    /* Space for key bounds 
+     *   Low bound from offset 0
+     *   High bound from offset key_record->m_row_size
+     */
+    char space[1];
+  };
+
+  int setBoundHelperOldApi(OldApiBoundInfo& boundInfo,
                            Uint32 maxKeyRecordBytes,
                            Uint32 index_attrId,
                            Uint32 valueLen,
@@ -259,8 +286,9 @@
                            const void *aValue);
 
   int setBound(const NdbColumnImpl*, int type, const void* aValue);
-  int buildIndexBoundOldApi(IndexBound& ib);
-  void releaseIndexBoundOldApi();
+  int buildIndexBoundOldApi(int range_no);
+  const IndexBound* getIndexBoundFromRecAttr(NdbRecAttr* recAttr);
+  void releaseIndexBoundsOldApi();
   int insertBOUNDS(Uint32 * data, Uint32 sz);
   int ndbrecord_insert_bound(const NdbRecord *key_record,
                              Uint32 column_index,
@@ -294,12 +322,16 @@
   /* Most recently added IndexBound's range number */
   Uint32 m_previous_range_num;
   
-  /* Old Scan API bound information */
-  bool oldApiBoundDefined;
-  OldApiScanBoundInfo lowBound;
-  OldApiScanBoundInfo highBound;
-
-  
+  /* Old Scan API range information 
+   * List of RecAttr structures containing OldApiScanRangeDefinition
+   * structures
+   * currentRangeOldApi is range currently being defined (if any)
+   * Once defined (end_of_bound() / execute()) they are added to 
+   * the list between first/lastRangeOldApi
+   */
+  NdbRecAttr* firstRangeOldApi;
+  NdbRecAttr* lastRangeOldApi;
+  NdbRecAttr* currentRangeOldApi;
 
   friend struct Ndb_free_list_t<NdbIndexScanOperation>;
 };

=== modified file 'storage/ndb/ndbapi-examples/ndbapi_recattr_vs_record/Makefile'
--- a/storage/ndb/ndbapi-examples/ndbapi_recattr_vs_record/Makefile	2008-04-01 15:37:28 +0000
+++ b/storage/ndb/ndbapi-examples/ndbapi_recattr_vs_record/Makefile	2008-08-14 12:10:55 +0000
@@ -15,7 +15,7 @@
 SYS_LIB = 
 
 $(TARGET): $(OBJS)
-	$(CXX) $(CXXFLAGS) $(LFLAGS) $(LIB_DIR) $(OBJS) -lndbclient -lmysqlclient_r -lmysys -lmystrings -lz $(SYS_LIB) -o $(TARGET)
+	$(CXX) $(CXXFLAGS) $(LFLAGS) $(LIB_DIR) $(OBJS) -lndbclient -lmysqlclient_r -lz $(SYS_LIB) -o $(TARGET)
 
 $(OBJS): $(SRCS)
 	$(CXX) $(CFLAGS) -I$(INCLUDE_DIR)/include -I$(INCLUDE_DIR)/storage/ndb/include -I$(INCLUDE_DIR)/storage/ndb/include/ndbapi $(SRCS)

=== modified file 'storage/ndb/ndbapi-examples/ndbapi_recattr_vs_record/main.cpp'
--- a/storage/ndb/ndbapi-examples/ndbapi_recattr_vs_record/main.cpp	2008-04-01 15:37:28 +0000
+++ b/storage/ndb/ndbapi-examples/ndbapi_recattr_vs_record/main.cpp	2008-08-14 12:10:55 +0000
@@ -1030,6 +1030,7 @@
        */
       Uint32 scanFlags= 
         NdbScanOperation::SF_OrderBy |
+        NdbScanOperation::SF_MultiRange |
         NdbScanOperation::SF_ReadRangeNo;
 
       if (psop->readTuples(NdbOperation::LM_Read, 
@@ -1038,14 +1039,34 @@
                            (Uint32) 0) != 0)    // parallel
         APIERROR (myTransaction->getNdbError());
 
+      /* Add a bound
+       * Tuples where ATTR1 >=2 and < 4 
+       * 2,[3 deleted] 
+       */
       Uint32 low=2;
-      Uint32 high=7;
+      Uint32 high=4;
 
-      /* Return tuples where ATTR1 >=2 and < 7 */
       if (psop->setBound("ATTR1", NdbIndexScanOperation::BoundLE, (char*)&low))
         APIERROR(myTransaction->getNdbError());
       if (psop->setBound("ATTR1", NdbIndexScanOperation::BoundGT, (char*)&high))
         APIERROR(myTransaction->getNdbError());
+      
+      if (psop->end_of_bound(0))
+        APIERROR(psop->getNdbError());
+
+      /* Second bound
+       * Tuples where ATTR1 > 5 and <=9
+       * 6,7,8,9 
+       */
+      low=5;
+      high=9;
+      if (psop->setBound("ATTR1", NdbIndexScanOperation::BoundLT, (char*)&low))
+        APIERROR(myTransaction->getNdbError());
+      if (psop->setBound("ATTR1", NdbIndexScanOperation::BoundGE, (char*)&high))
+        APIERROR(myTransaction->getNdbError());
+
+      if (psop->end_of_bound(1))
+        APIERROR(psop->getNdbError());
 
       /* Read all columns */
       recAttrAttr1=psop->getValue("ATTR1");
@@ -1126,7 +1147,10 @@
 
   if(myTransaction->execute( NdbTransaction::NoCommit ) != 0)
     APIERROR(myTransaction->getNdbError());
-    
+  
+  if (myTransaction->getNdbError().code != 0)
+    APIERROR(myTransaction->getNdbError());
+
   switch (accessType)
   {
     case api_attr :

=== modified file 'storage/ndb/src/ndbapi/NdbScanOperation.cpp'
--- a/storage/ndb/src/ndbapi/NdbScanOperation.cpp	2008-08-13 20:28:10 +0000
+++ b/storage/ndb/src/ndbapi/NdbScanOperation.cpp	2008-08-14 12:22:38 +0000
@@ -1757,23 +1757,13 @@
     NdbIndexScanOperation *isop = 
       static_cast<NdbIndexScanOperation*>(this);
 
-    /* Prepare a single bound if necessary */
-    NdbIndexScanOperation::IndexBound ib;
-    NdbIndexScanOperation::IndexBound* ib_ptr= NULL;
-
-    switch (isop->buildIndexBoundOldApi(ib)) {
-    case 0:
-      /* Bound was specified */
-      ib_ptr= &ib;
-      break;
-    case 1:
-      /* No bound was specified */
-      ib_ptr= NULL; 
-      break;
-    default:
-      return -1;
+    if (isop->currentRangeOldApi != NULL)
+    {
+      /* Add current bound to bound list */
+      if (isop->buildIndexBoundOldApi(0) != 0)
+        return -1;
     }
-
+    
     /* If this is an ordered scan, then we need
      * the pk columns in the mask, otherwise we
      * don't
@@ -1787,11 +1777,25 @@
                                 m_currentTable->m_ndbrecord,
                                 m_savedLockModeOldApi,
                                 resultMask,
-                                ib_ptr,
+                                NULL, // All bounds added below
                                 &options,
                                 sizeof(ScanOptions));
 
-    isop->releaseIndexBoundOldApi();
+    /* Add any bounds that were specified */
+    if (isop->firstRangeOldApi != NULL)
+    {
+      NdbRecAttr* bound= isop->firstRangeOldApi;
+      while (bound != NULL)
+      {
+        if (isop->setBound( m_accessTable->m_ndbrecord,
+                            *isop->getIndexBoundFromRecAttr(bound) ) != 0)
+          return -1;
+        
+        bound= bound->next();
+      }
+    }
+
+    isop->releaseIndexBoundsOldApi();
   }
 
   /* Free any scan-owned ScanFilter generated InterpretedCode
@@ -2463,35 +2467,15 @@
 NdbIndexScanOperation::NdbIndexScanOperation(Ndb* aNdb)
   : NdbScanOperation(aNdb, NdbOperation::OrderedIndexScan)
 {
-  lowBound.keyRecAttr= highBound.keyRecAttr= NULL;
-  initScanBoundStorageOldApi();
+  firstRangeOldApi= NULL;
+  lastRangeOldApi= NULL;
+  currentRangeOldApi= NULL;
+
 }
 
 NdbIndexScanOperation::~NdbIndexScanOperation(){
 }
 
-/* This method initialises the old scan bound storage space.
- * It is called from the NdbIndexScanOperation constructor and
- * from NdbTransaction::scanIndex()
- */
-void
-NdbIndexScanOperation::initScanBoundStorageOldApi()
-{
-  assert(lowBound.keyRecAttr == NULL);
-  assert(highBound.keyRecAttr == NULL);
-
-  oldApiBoundDefined=false;
-  lowBound.highestKey= 0;
-  lowBound.highestSoFarIsStrict= false;
-  /* Need to modify old Api scan bound handling code
-   * if max attributes in key becomes > 32
-   */
-  assert(NDB_MAX_NO_OF_ATTRIBUTES_IN_KEY == 32);
-  lowBound.keysPresentBitmap= 0;
-
-  highBound= lowBound;
-}
-
 int
 NdbIndexScanOperation::setBound(const char* anAttrName, int type, 
                                 const void* aValue)
@@ -2529,7 +2513,7 @@
  * processing using the normal NdbRecord setBound interface.
  */
 int
-NdbIndexScanOperation::setBoundHelperOldApi(OldApiScanBoundInfo& boundInfo,
+NdbIndexScanOperation::setBoundHelperOldApi(OldApiBoundInfo& boundInfo,
                                             Uint32 maxKeyRecordBytes,
                                             Uint32 index_attrId,
                                             Uint32 valueLen,
@@ -2539,22 +2523,6 @@
                                             Uint32 nullbit_bit_in_byte,
                                             const void *aValue)
 {
-  /* Grab RecAttr if necessary */
-  if (boundInfo.keyRecAttr == NULL)
-  {
-    boundInfo.keyRecAttr= theNdb->getRecAttr();
-    if (boundInfo.keyRecAttr != NULL)
-    {
-      boundInfo.keyRecAttr->setup(maxKeyRecordBytes, NULL);
-    }
-    else
-    {
-      /* Memory allocation error */
-      setErrorCodeAbort(4000);
-      return -1;
-    }
-  }
-  
   Uint32 presentBitMask= (1 << (index_attrId & 0x1f));
 
   if ((boundInfo.keysPresentBitmap & presentBitMask) != 0)
@@ -2593,16 +2561,15 @@
 
   /* Copy data into correct part of RecAttr */
   assert(byteOffset + valueLen <= maxKeyRecordBytes);
-  char *startOfRecord= boundInfo.keyRecAttr->aRef();
 
-  memcpy(startOfRecord+byteOffset,
+  memcpy(boundInfo.key + byteOffset,
          aValue, 
          valueLen);
 
   /* Set Null bit */
   bool nullBit=(aValue == NULL);
 
-  startOfRecord[nullbit_byte_offset]|= 
+  boundInfo.key[nullbit_byte_offset]|= 
     (nullBit) << nullbit_bit_in_byte;
 
   return 0;
@@ -2620,12 +2587,6 @@
     setErrorCodeAbort(4318);    // Invalid attribute
     return -1;
   }
-  if (oldApiBoundDefined)
-  {
-    /* Only one scan bound allowed for non-NdbRecord setBound() API */
-    setErrorCodeAbort(4513);
-    return -1;
-  }
   if (theOperationType == OpenRangeScanRequest &&
       (0 <= type && type <= 4)) 
   {
@@ -2667,12 +2628,52 @@
     
     bool inclusive= ! ((type == BoundLT) || (type == BoundGT));
 
+    if (currentRangeOldApi == NULL)
+    {
+      /* Current bound is undefined, allocate space for definition */
+      NdbRecAttr* boundSpace= theNdb->getRecAttr();
+      if (boundSpace == NULL)
+      {
+        /* Memory allocation error */
+        setErrorCodeAbort(4000);
+        return -1;
+      }
+      if (boundSpace->setup(sizeof(OldApiScanRangeDefinition) + 
+                            (2 * maxKeyRecordBytes) - 1, NULL) != 0)
+      {
+        theNdb->releaseRecAttr(boundSpace);
+        /* Memory allocation error */
+        setErrorCodeAbort(4000);
+        return -1;
+      }
+      
+      /* Initialise bounds definition info */
+      OldApiScanRangeDefinition* boundsDef= 
+        (OldApiScanRangeDefinition*) boundSpace->aRef();
+
+      boundsDef->lowBound.highestKey = 0;
+      boundsDef->lowBound.highestSoFarIsStrict = false;
+      /* Should be STATIC_ASSERT */
+      assert(NDB_MAX_NO_OF_ATTRIBUTES_IN_KEY == 32);
+      boundsDef->lowBound.keysPresentBitmap = 0;
+      
+      boundsDef->highBound= boundsDef->lowBound;
+      boundsDef->lowBound.key= &boundsDef->space[ 0 ];
+      boundsDef->highBound.key= &boundsDef->space[ maxKeyRecordBytes ];
+      
+      currentRangeOldApi= boundSpace;
+    }
+
+    OldApiScanRangeDefinition* bounds=
+      (OldApiScanRangeDefinition*) currentRangeOldApi->aRef();
+
+
     /* Add to lower bound if required */
     if (type == BoundEQ ||
         type == BoundLE ||
         type == BoundLT )
     {
-      if (setBoundHelperOldApi(lowBound,
+      if (setBoundHelperOldApi(bounds->lowBound,
                                maxKeyRecordBytes,
                                tAttrInfo->m_attrId,
                                valueLen,
@@ -2689,7 +2690,7 @@
         type == BoundGE ||
         type == BoundGT)
     {
-      if (setBoundHelperOldApi(highBound,
+      if (setBoundHelperOldApi(bounds->highBound,
                                maxKeyRecordBytes,
                                tAttrInfo->m_attrId,
                                valueLen,
@@ -2719,28 +2720,32 @@
  * -1 == error
  */
 int
-NdbIndexScanOperation::buildIndexBoundOldApi(IndexBound& ib)
+NdbIndexScanOperation::buildIndexBoundOldApi(int range_no)
 {
-  int result= 1;
+  IndexBound ib;
+  OldApiScanRangeDefinition* boundDef=
+    (OldApiScanRangeDefinition*) currentRangeOldApi->aRef();
 
-  if (lowBound.highestKey != 0)
+  int result = 1;
+  
+  if (boundDef->lowBound.highestKey != 0)
   {
     /* Have a low bound 
      * Check that a contiguous set of keys are supplied.
      * Setup low part of IndexBound
      */
-    Uint32 expectedValue= (~(Uint32) 0) >> (32 - lowBound.highestKey);
+    Uint32 expectedValue= (~(Uint32) 0) >> (32 - boundDef->lowBound.highestKey);
     
-    if (lowBound.keysPresentBitmap != expectedValue)
+    if (boundDef->lowBound.keysPresentBitmap != expectedValue)
     {
       /* Invalid set of range scan bounds */
       setErrorCodeAbort(4259);
       return -1;
     }
 
-    ib.low_key= lowBound.keyRecAttr->aRef();
-    ib.low_key_count= lowBound.highestKey;
-    ib.low_inclusive= !lowBound.highestSoFarIsStrict;
+    ib.low_key= boundDef->lowBound.key;
+    ib.low_key_count= boundDef->lowBound.highestKey;
+    ib.low_inclusive= !boundDef->lowBound.highestSoFarIsStrict;
     result= 0;
   }
   else
@@ -2750,23 +2755,23 @@
     ib.low_inclusive= false;
   }
 
-  if (highBound.highestKey != 0)
+  if (boundDef->highBound.highestKey != 0)
   {
     /* Have a high bound 
      * Check that a contiguous set of keys are supplied.
      */
-    Uint32 expectedValue= (~(Uint32) 0) >> (32 - highBound.highestKey);
+    Uint32 expectedValue= (~(Uint32) 0) >> (32 - boundDef->highBound.highestKey);
     
-    if (highBound.keysPresentBitmap != expectedValue)
+    if (boundDef->highBound.keysPresentBitmap != expectedValue)
     {
       /* Invalid set of range scan bounds */
       setErrorCodeAbort(4259);
       return -1;
     }
 
-    ib.high_key= highBound.keyRecAttr->aRef();
-    ib.high_key_count= highBound.highestKey;
-    ib.high_inclusive= !highBound.highestSoFarIsStrict;
+    ib.high_key= boundDef->highBound.key;
+    ib.high_key_count= boundDef->highBound.highestKey;
+    ib.high_inclusive= !boundDef->highBound.highestSoFarIsStrict;
     result= 0;
   }
   else
@@ -2775,33 +2780,58 @@
     ib.high_key_count= 0;
     ib.high_inclusive= false;
   }
-
-  ib.range_no= 0;
+  
+  ib.range_no= range_no;
+
+  boundDef->ib= ib;
+
+  assert( currentRangeOldApi->next() == NULL );
+
+  if (lastRangeOldApi == NULL)
+  {
+    /* First bound */
+    assert( firstRangeOldApi == NULL );
+    firstRangeOldApi= lastRangeOldApi= currentRangeOldApi;
+  }
+  else 
+  {
+    /* Other bounds exist, add this to the end of the bounds list */
+    assert( firstRangeOldApi != NULL );
+    assert( lastRangeOldApi->next() == NULL );
+    lastRangeOldApi->next(currentRangeOldApi);
+    lastRangeOldApi= currentRangeOldApi;
+  }
+  
+  currentRangeOldApi= NULL;
 
   return result;
 }
 
+const NdbIndexScanOperation::IndexBound* 
+NdbIndexScanOperation::getIndexBoundFromRecAttr(NdbRecAttr* recAttr)
+{
+  return &((OldApiScanRangeDefinition*)recAttr->aRef())->ib;
+};
+
+
 /* Method called to release any resources allocated by the old 
  * Index Scan bound API
  */
 void
-NdbIndexScanOperation::releaseIndexBoundOldApi()
+NdbIndexScanOperation::releaseIndexBoundsOldApi()
 {
-  if (lowBound.keyRecAttr != NULL)
-  {
-    theNdb->releaseRecAttr(lowBound.keyRecAttr);
-    lowBound.keyRecAttr= NULL;
-  }
-  if (highBound.keyRecAttr != NULL)
-  {
-    theNdb->releaseRecAttr(highBound.keyRecAttr);
-    highBound.keyRecAttr= NULL;
-  }
-  
-  /* Re-initialise scan bound storage for the next
-   * use of this NdbIndexScanOperation object
-   */
-  initScanBoundStorageOldApi();
+  NdbRecAttr* bound= firstRangeOldApi;
+  while (bound != NULL)
+  {
+    NdbRecAttr* release= bound;
+    bound= bound->next();
+    theNdb->releaseRecAttr(release);
+  }
+
+  if (currentRangeOldApi != NULL)
+    theNdb->releaseRecAttr(currentRangeOldApi);
+
+  firstRangeOldApi= lastRangeOldApi= currentRangeOldApi= NULL;
 }
 
 
@@ -3422,7 +3452,7 @@
       reinterpret_cast<NdbIndexScanOperation*> (this);
 
     /* Release any Index Bound resources */
-    isop->releaseIndexBoundOldApi();
+    isop->releaseIndexBoundsOldApi();
   }
 
   /* Free any scan-owned ScanFilter generated InterpretedCode
@@ -3456,11 +3486,47 @@
   DBUG_ENTER("end_of_bound");
   DBUG_PRINT("info", ("Range number %u", no));
 
-  /* Multirange no longer supported from old scan API */
-  if (no > 0 || oldApiBoundDefined)
-    DBUG_RETURN(-1);
+  if (! (m_savedScanFlagsOldApi & SF_MultiRange))
+  {
+    setErrorCodeAbort(4509);
+    /* Non SF_MultiRange scan cannot have more than one bound */
+    return -1;
+  }
+
+  if (currentRangeOldApi == NULL)
+  {
+    setErrorCodeAbort(4259);
+    /* Invalid set of range scan bounds */
+    return -1;
+  }
+
+  /* If it's an ordered scan and we're reading range numbers
+   * back then check that range numbers are strictly 
+   * increasing
+   */
+  if ((m_savedScanFlagsOldApi & SF_OrderBy) &&
+      (m_savedScanFlagsOldApi & SF_ReadRangeNo))
+  {
+    Uint32 expectedNum= 0;
+    
+    if (lastRangeOldApi != NULL)
+    {
+      assert( firstRangeOldApi != NULL );
+      expectedNum = 
+        getIndexBoundFromRecAttr(lastRangeOldApi)->range_no + 1;
+    }
+    
+    if (no != expectedNum)
+    {
+      setErrorCodeAbort(4282);
+      /* range_no not strictly increasing in ordered multi-range index scan */
+      return -1;
+    }
+  }
   
-  oldApiBoundDefined= true;
+  if (buildIndexBoundOldApi(no) != 0)
+    return -1;
+      
   DBUG_RETURN(0);
 }
 

=== modified file 'storage/ndb/test/ndbapi/testOIBasic.cpp'
--- a/storage/ndb/test/ndbapi/testOIBasic.cpp	2008-07-23 11:36:53 +0000
+++ b/storage/ndb/test/ndbapi/testOIBasic.cpp	2008-08-14 12:22:38 +0000
@@ -51,9 +51,11 @@
   NdbDictionary::Object::FragmentType m_fragtype;
   const char* m_index;
   uint m_loop;
+  uint m_mrrmaxrng;
   bool m_msglock;
   bool m_nologging;
   bool m_noverify;
+  uint m_pctmrr;
   uint m_pctnull;
   uint m_rows;
   uint m_samples;
@@ -81,9 +83,11 @@
     m_fragtype(NdbDictionary::Object::FragUndefined),
     m_index(0),
     m_loop(1),
+    m_mrrmaxrng(10),
     m_msglock(true),
     m_nologging(false),
     m_noverify(false),
+    m_pctmrr(50),
     m_pctnull(10),
     m_rows(1000),
     m_samples(0),
@@ -123,8 +127,10 @@
     << "  -fragtype T   fragment type single/small/medium/large" << endl
     << "  -index xyz    only given index numbers (digits 0-9)" << endl
     << "  -loop N       loop count full suite 0=forever [" << d.m_loop << "]" << endl
+    << "  -mrrmaxrng N  max ranges to supply for MRR scan [" << d.m_mrrmaxrng << "]" << endl
     << "  -nologging    create tables in no-logging mode" << endl
     << "  -noverify     skip index verifications" << endl
+    << "  -pctmrr N     pct of index scans to use MRR [" << d.m_pctmrr << "]" << endl
     << "  -pctnull N    pct NULL values in nullable column [" << d.m_pctnull << "]" << endl
     << "  -rows N       rows per thread [" << d.m_rows << "]" << endl
     << "  -samples N    samples for some timings (0=all) [" << d.m_samples << "]" << endl
@@ -319,6 +325,7 @@
   bool m_tupscan;
   bool m_ordered;
   bool m_descending;
+  bool m_multiRange;
   // threads used by current test case
   uint m_usedthreads;
   Par(const Opt& opt) :
@@ -345,6 +352,7 @@
     m_tupscan(false),
     m_ordered(false),
     m_descending(false),
+    m_multiRange(false),
     m_usedthreads(0)
   {
     m_currcase[0] = 0;
@@ -536,7 +544,7 @@
     int xlen = (*cs->coll->strnxfrm)(cs, xbytes, m_xmul * size, bytes, size);
     // check we got something
     ok = false;
-    for (uint j = 0; j < xlen; j++) {
+    for (uint j = 0; j < (uint)xlen; j++) {
       if (xbytes[j] != 0) {
         ok = true;
         break;
@@ -1460,6 +1468,11 @@
     scan_flags |= NdbScanOperation::SF_OrderBy;
   if (par.m_descending)
     scan_flags |= NdbScanOperation::SF_Descending;
+  if (par.m_multiRange)
+  {
+    scan_flags |= NdbScanOperation::SF_MultiRange;
+    scan_flags |= NdbScanOperation::SF_ReadRangeNo;
+  }
   CHKCON(m_indexscanop->readTuples(par.m_lockmode, scan_flags, par.m_scanpar, par.m_scanbatch) == 0, *this);
   return 0;
 }
@@ -1602,7 +1615,6 @@
 static int
 invalidateindex(Par par)
 {
-  Con& con = par.con();
   const Tab& tab = par.tab();
   for (uint i = 0; i < tab.m_itabs; i++) {
     if (tab.m_itab[i] == 0)
@@ -1939,7 +1951,6 @@
 {
   const Col& col = m_col;
   const Chs* chs = col.m_chs;
-  CHARSET_INFO* cs = chs->m_cs;
   n = 0;
   uint len = 0;
   while (len < col.m_length) {
@@ -2013,7 +2024,6 @@
 {
   const Col& col = m_col;
   const Chs* chs = col.m_chs;
-  CHARSET_INFO* cs = chs->m_cs;
   n = 0;
   uint len = 0;
   while (len < col.m_length) {
@@ -2363,7 +2373,6 @@
 int
 Row::setval(Par par, const ITab& itab)
 {
-  Con& con = par.con();
   Rsq rsq(itab.m_icols);
   for (uint k = 0; k < itab.m_icols; k++) {
     uint k2 = rsq.next();
@@ -2634,6 +2643,7 @@
   void calc(Par par, uint i, uint colmask = ~0);
   uint count() const;
   const Row* getrow(uint i, bool dirty = false) const;
+  int setrow(uint i, const Row* src, bool force=false);
   // transaction
   void post(Par par, ExecType et);
   // operations
@@ -2830,6 +2840,20 @@
   return rowp;
 }
 
+int
+Set::setrow(uint i, const Row* src, bool force)
+{
+  assert(i < m_rows);
+  if (m_row[i] != 0)
+    if (!force)
+      return -1;
+  
+  Row* newRow= new Row(src->m_tab);
+  newRow->copy(*src, true);
+  return 0;
+}
+
+
 // transaction
 
 void
@@ -2946,7 +2970,6 @@
 int
 Set::selrow(Par par, const Row& keyrow)
 {
-  Con& con = par.con();
   const Tab& tab = par.tab();
   LL5("selrow " << tab.m_name << " keyrow " << keyrow);
   m_keyrow->copyval(keyrow, tab.m_pkmask);
@@ -2958,7 +2981,6 @@
 int
 Set::selrow(Par par, const ITab& itab, const Row& keyrow)
 {
-  Con& con = par.con();
   LL5("selrow " << itab.m_name << " keyrow " << keyrow);
   m_keyrow->copyval(keyrow, itab.m_keymask);
   CHK(m_keyrow->selrow(par, itab) == 0);
@@ -2969,7 +2991,6 @@
 int
 Set::setrow(Par par, uint i)
 {
-  Con& con = par.con();
   assert(m_row[i] != 0);
   CHK(m_row[i]->setrow(par) == 0);
   return 0;
@@ -3007,6 +3028,7 @@
 {
   const Tab& tab = m_tab;
   LL4("putval key=" << i << " row=" << n << " old=" << m_row[i]);
+  CHK( i<m_rows );
   if (m_row[i] != 0) {
     assert(force);
     delete m_row[i];
@@ -3026,8 +3048,11 @@
     val.copy(aRef);
     val.m_null = false;
   }
-  if (n != ~0)
+  if (n != (uint) ~0)
+  {
+    CHK(n < m_rows);
     m_rowkey[n] = i;
+  }
   return 0;
 }
 
@@ -3118,10 +3143,9 @@
 int
 Set::verifyorder(Par par, const ITab& itab, bool descending) const
 {
-  const Tab& tab = m_tab;
   for (uint n = 0; n < m_rows; n++) {
     uint i2 = m_rowkey[n];
-    if (i2 == ~0)
+    if (i2 == (uint) ~0)
       break;
     if (n == 0)
       continue;
@@ -3129,10 +3153,19 @@
     assert(m_row[i1] != 0 && m_row[i2] != 0);
     const Row& row1 = *m_row[i1];
     const Row& row2 = *m_row[i2];
+    bool ok;
     if (!descending)
-      CHK(row1.cmp(par, row2, itab) <= 0);
+      ok= (row1.cmp(par, row2, itab) <= 0);
     else
-      CHK(row1.cmp(par, row2, itab) >= 0);
+      ok= (row1.cmp(par, row2, itab) >= 0);
+
+    if (!ok)
+    {
+      LL1("verifyorder " << n << " failed");
+      LL1("row1 " << row1);
+      LL1("row2 " << row2);
+      CHK(false);
+    }
   }
   return 0;
 }
@@ -3265,7 +3298,6 @@
   reset();
   for (uint k = 0; k < itab.m_icols; k++) {
     const ICol& icol = *itab.m_icol[k];
-    const Col& col = icol.m_col;
     for (uint i = 0; i <= 1; i++) {
       if (m_bvals == 0 && urandom(100) == 0)
         return;
@@ -3819,7 +3851,6 @@
 {
   Con& con = par.con();
   const Tab& tab = par.tab();
-  const Set& set = par.set();
   LL3("scanfast " << tab.m_name);
   CHK(con.startTransaction() == 0);
   CHK(con.getNdbScanOperation(tab) == 0);
@@ -3905,12 +3936,138 @@
   return 0;
 }
 
+
+static int
+scanreadindexmrr(Par par, const ITab& itab, int numBsets)
+{
+  Con& con = par.con();
+  const Tab& tab = par.tab();
+  const Set& set = par.set();
+
+  /* Create space for different sets of bounds, expected results and
+   * results
+   * Calculate bounds and the sets of rows which would result
+   */
+  BSet** boundSets;
+  Set** expectedResults;
+  Set** actualResults;
+  uint* setSizes;
+
+  CHK((boundSets= (BSet**) malloc(numBsets * sizeof(BSet*))) != 0);
+  CHK((expectedResults= (Set**) malloc(numBsets * sizeof(Set*))) != 0);
+  CHK((actualResults= (Set**) malloc(numBsets * sizeof(Set*))) != 0);
+  CHK((setSizes= (uint*) malloc(numBsets * sizeof(uint))) != 0);
+
+  for (int n=0; n < numBsets; n++)
+  {
+    CHK((boundSets[n]= new BSet(tab, itab)) != NULL );
+    CHK((expectedResults[n]= new Set(tab, set.m_rows)) != NULL);
+    CHK((actualResults[n]= new Set(tab, set.m_rows)) != NULL);
+    setSizes[n]= 0;
+
+    Set& results= *expectedResults[n];
+    /* Calculate some scan bounds which are selective */
+    do {
+      results.reset();
+      calcscanbounds(par, itab, *boundSets[n], set, results);
+    } while ((*boundSets[n]).m_bvals == 0);
+  } 
+
+  /* Define scan with bounds */
+  LL3("scanreadindexmrr " << itab.m_name << " ranges= " << numBsets << " lockmode=" << par.m_lockmode << " ordered=" << par.m_ordered << " descending=" << par.m_descending << " verify=" << par.m_verify);
+  Set set2(tab, set.m_rows);
+  /* Multirange + Read range number for this scan */
+  par.m_multiRange= true;
+  CHK(con.startTransaction() == 0);
+  CHK(con.getNdbIndexScanOperation(itab, tab) == 0);
+  CHK(con.readIndexTuples(par) == 0);
+  /* Set the bounds */
+  for (int n=0; n < numBsets; n++)
+  {
+    CHK(boundSets[n]->setbnd(par) == 0);
+    int res= con.m_indexscanop->end_of_bound(n);
+    if (res != 0)
+    {
+      LL1("end_of_bound error : " << con.m_indexscanop->getNdbError().code);
+      CHK (false);
+    }
+  }
+  set2.getval(par);
+  CHK(con.executeScan() == 0);
+  int rows_received= 0;
+  while (1) {
+    int ret;
+    uint err = par.m_catcherr;
+    CHK((ret = con.nextScanResult(true, err)) == 0 || ret == 1);
+    if (ret == 1)
+      break;
+    if (err) {
+      LL1("scanreadindexmrr stop on " << con.errname(err));
+      break;
+    }
+    uint i = (uint)-1;
+    /* Put value into set2 temporarily */
+    CHK(set2.getkey(par, &i) == 0);
+    CHK(set2.putval(i, false, -1) == 0);
+    
+    /* Now move it to the correct set, based on the range no */
+    int rangeNum= con.m_indexscanop->get_range_no();
+    CHK(rangeNum < numBsets);
+    CHK(set2.m_row[i] != NULL);
+    /* Get rowNum based on what's in the set already (slow) */
+    CHK(setSizes[rangeNum] == actualResults[rangeNum]->count());
+    int rowNum= setSizes[rangeNum];
+    setSizes[rangeNum] ++;
+    CHK((uint) rowNum < set2.m_rows);
+    actualResults[rangeNum]->m_row[i]= set2.m_row[i];
+    actualResults[rangeNum]->m_rowkey[rowNum]= i;
+    set2.m_row[i]= 0;
+    LL4("range " << rangeNum << " key " << i << " row " << rowNum << " " << *set2.m_row[i]);
+    rows_received++;
+  }
+  con.closeTransaction();
+
+  /* Verify that each set has the expected rows, and optionally, that
+   * they're ordered
+   */
+  if (par.m_verify) 
+  {
+    LL4("Verifying " << numBsets << " sets, " << rows_received << " rows");
+    for (int n=0; n < numBsets; n++)
+    {
+      LL5("Set " << n << " of " << expectedResults[n]->count() << " rows");
+      CHK(expectedResults[n]->verify(par, *actualResults[n], false) == 0);
+      if (par.m_ordered)
+      {
+        LL5("Verifying ordering");
+        CHK(actualResults[n]->verifyorder(par, itab, par.m_descending) == 0);
+      }
+    }
+  }
+  
+  /* Cleanup */
+  for (int n=0; n < numBsets; n++)
+  {
+    boundSets[n]->reset();
+    delete boundSets[n];
+    delete expectedResults[n];
+    delete actualResults[n];
+  }
+
+  free(boundSets);
+  free(expectedResults);
+  free(actualResults);
+  free(setSizes);
+
+  LL3("scanreadindexmrr " << itab.m_name << " done rows=" << rows_received);
+  return 0;
+}
+
 static int
 scanreadindexfast(Par par, const ITab& itab, const BSet& bset, uint countcheck)
 {
   Con& con = par.con();
   const Tab& tab = par.tab();
-  const Set& set = par.set();
   LL3("scanfast " << itab.m_name << " " << bset);
   LL4(bset);
   CHK(con.startTransaction() == 0);
@@ -3987,7 +4144,13 @@
     if (itab.m_type == ITab::OrderedIndex) {
       BSet bset(tab, itab);
       CHK(scanreadfilter(par, itab, bset, true) == 0);
-      CHK(scanreadindex(par, itab, bset, true) == 0);
+      /* Single range or Multi range scan */
+      if (randompct(g_opt.m_pctmrr))
+        CHK(scanreadindexmrr(par, 
+                             itab, 
+                             1+urandom(g_opt.m_mrrmaxrng-1)) == 0);
+      else
+        CHK(scanreadindex(par, itab, bset, true) == 0);
     }
   }
   return 0;
@@ -5645,7 +5808,7 @@
   ndb_init();
   uint i;
   ndbout << g_progname;
-  for (i = 1; i < argc; i++)
+  for (i = 1; i < (uint) argc; i++)
     ndbout << " " << argv[i];
   ndbout << endl;
   ndbout_mutex = NdbMutex_Create();
@@ -5736,6 +5899,12 @@
         continue;
       }
     }
+    if (strcmp(arg, "-mrrmaxrng") == 0) {
+      if (++argv, --argc > 0) {
+        g_opt.m_mrrmaxrng = atoi(argv[0]);
+        continue;
+      }
+    }
     if (strcmp(arg, "-nologging") == 0) {
       g_opt.m_nologging = true;
       continue;
@@ -5744,6 +5913,12 @@
       g_opt.m_noverify = true;
       continue;
     }
+    if (strcmp(arg, "-pctmrr") ==0 ) {
+      if (++argc, --argc > 0) {
+        g_opt.m_pctmrr = atoi(argv[0]);
+        continue;
+      }
+    }
     if (strcmp(arg, "-pctnull") == 0) {
       if (++argv, --argc > 0) {
         g_opt.m_pctnull = atoi(argv[0]);
@@ -5836,7 +6011,7 @@
     delete g_ncc;
     g_ncc = 0;
   }
-ok:
+// ok
   return NDBT_ProgramExit(NDBT_OK);
 failed:
   return NDBT_ProgramExit(NDBT_FAILED);

=== modified file 'storage/ndb/test/src/HugoOperations.cpp'
--- a/storage/ndb/test/src/HugoOperations.cpp	2008-07-01 12:35:34 +0000
+++ b/storage/ndb/test/src/HugoOperations.cpp	2008-08-14 12:22:38 +0000
@@ -120,15 +120,10 @@
     if (equalForRow(pOp, r+recordNo) != 0)
       return NDBT_FAILED;
 
-    // TODO : 
-    // Multi-read range functionality is disabled for
-    // old style scans, so we can't use it here
-    // Longer term, Hugo* should be changed to use
-    // NdbRecord in this area.
-    //if(pIndexScanOp)
-    //  pIndexScanOp->end_of_bound(r);
+    if(pIndexScanOp)
+      pIndexScanOp->end_of_bound(r);
     
-    // if(r == 0 || pIndexScanOp == 0)
+    if(r == 0 || pIndexScanOp == 0)
     {
       // Define attributes to read  
       for(a = 0; a<tab.getNoOfColumns(); a++){


