List:Commits« Previous MessageNext Message »
From:Sergey Petrunia Date:February 2 2008 3:28pm
Subject:bk commit into 6.0 tree (sergefp:1.2798) WL#2771
View as plain text  
Below is the list of changes that have just been committed into a local
6.0 repository of sergefp.  When sergefp does a push these changes
will be propagated to the main repository and, within 24 hours after the
push, to the public repository.
For information on how to access the public repository
see http://dev.mysql.com/doc/mysql/en/installing-source-tree.html

ChangeSet@stripped, 2008-02-02 18:28:08+03:00, sergefp@stripped +23 -0
  Kristian's patch: Merge NdbRecord part 1 with 6.0/BKA 
  - merged into recent 6.0, WL#2771 Batched Key Access tree.

  sql/ha_ndbcluster.cc@stripped, 2008-02-02 18:28:00+03:00, sergefp@stripped +306 -203
    Merge NdbRecord part 1 with 6.0/BKA

  sql/ha_ndbcluster.h@stripped, 2008-02-02 18:28:00+03:00, sergefp@stripped +7 -5
    Merge NdbRecord part 1 with 6.0/BKA

  sql/handler.cc@stripped, 2008-02-02 18:28:00+03:00, sergefp@stripped +1 -2
    Merge NdbRecord part 1 with 6.0/BKA

  sql/sql_select.cc@stripped, 2008-02-02 18:28:00+03:00, sergefp@stripped +3 -3
    Merge NdbRecord part 1 with 6.0/BKA

  storage/ndb/include/ndbapi/NdbBlob.hpp@stripped, 2008-02-02 18:28:00+03:00, sergefp@stripped +3 -2
    Merge NdbRecord part 1 with 6.0/BKA

  storage/ndb/include/ndbapi/NdbIndexScanOperation.hpp@stripped, 2008-02-02 18:28:00+03:00, sergefp@stripped +30 -1
    Merge NdbRecord part 1 with 6.0/BKA

  storage/ndb/include/ndbapi/NdbOperation.hpp@stripped, 2008-02-02 18:28:00+03:00, sergefp@stripped +146 -8
    Merge NdbRecord part 1 with 6.0/BKA

  storage/ndb/include/ndbapi/NdbScanOperation.hpp@stripped, 2008-02-02 18:28:00+03:00, sergefp@stripped +90 -58
    Merge NdbRecord part 1 with 6.0/BKA

  storage/ndb/include/ndbapi/NdbTransaction.hpp@stripped, 2008-02-02 18:28:00+03:00, sergefp@stripped +226 -120
    Merge NdbRecord part 1 with 6.0/BKA

  storage/ndb/ndbapi-examples/ndbapi_blob_ndbrecord/main.cpp@stripped, 2008-02-02 18:28:01+03:00, sergefp@stripped +8 -8
    Merge NdbRecord part 1 with 6.0/BKA

  storage/ndb/ndbapi-examples/ndbapi_s_i_ndbrecord/main.cpp@stripped, 2008-02-02 18:28:01+03:00, sergefp@stripped +21 -12
    Merge NdbRecord part 1 with 6.0/BKA

  storage/ndb/src/ndbapi/NdbBlob.cpp@stripped, 2008-02-02 18:28:01+03:00, sergefp@stripped +221 -54
    Merge NdbRecord part 1 with 6.0/BKA

  storage/ndb/src/ndbapi/NdbOperation.cpp@stripped, 2008-02-02 18:28:01+03:00, sergefp@stripped +53 -7
    Merge NdbRecord part 1 with 6.0/BKA

  storage/ndb/src/ndbapi/NdbOperationDefine.cpp@stripped, 2008-02-02 18:28:01+03:00, sergefp@stripped +253 -15
    Merge NdbRecord part 1 with 6.0/BKA

  storage/ndb/src/ndbapi/NdbOperationExec.cpp@stripped, 2008-02-02 18:28:01+03:00, sergefp@stripped +133 -34
    Merge NdbRecord part 1 with 6.0/BKA

  storage/ndb/src/ndbapi/NdbReceiver.cpp@stripped, 2008-02-02 18:28:01+03:00, sergefp@stripped +143 -126
    Merge NdbRecord part 1 with 6.0/BKA

  storage/ndb/src/ndbapi/NdbScanFilter.cpp@stripped, 2008-02-02 18:28:01+03:00, sergefp@stripped +10 -0
    Merge NdbRecord part 1 with 6.0/BKA

  storage/ndb/src/ndbapi/NdbScanOperation.cpp@stripped, 2008-02-02 18:28:01+03:00, sergefp@stripped +114 -82
    Merge NdbRecord part 1 with 6.0/BKA

  storage/ndb/src/ndbapi/NdbTransaction.cpp@stripped, 2008-02-02 18:28:01+03:00, sergefp@stripped +528 -231
    Merge NdbRecord part 1 with 6.0/BKA

  storage/ndb/src/ndbapi/ndberror.c@stripped, 2008-02-02 18:28:01+03:00, sergefp@stripped +11 -1
    Merge NdbRecord part 1 with 6.0/BKA

  storage/ndb/test/ndbapi/flexBench.cpp@stripped, 2008-02-02 18:28:01+03:00, sergefp@stripped +3 -2
    Merge NdbRecord part 1 with 6.0/BKA

  storage/ndb/test/ndbapi/testBlobs.cpp@stripped, 2008-02-02 18:28:01+03:00, sergefp@stripped +604 -280
    Merge NdbRecord part 1 with 6.0/BKA

  storage/ndb/test/ndbapi/testScan.cpp@stripped, 2008-02-02 18:28:01+03:00, sergefp@stripped +0 -94
    Merge NdbRecord part 1 with 6.0/BKA

diff -Nrup a/sql/ha_ndbcluster.cc b/sql/ha_ndbcluster.cc
--- a/sql/ha_ndbcluster.cc	2008-02-02 06:03:10 +03:00
+++ b/sql/ha_ndbcluster.cc	2008-02-02 18:28:00 +03:00
@@ -925,7 +925,7 @@ int g_get_ndb_blobs_value(NdbBlob *ndb_b
   unpack_record().
 */
 int
-ha_ndbcluster::get_blob_values(NdbOperation *ndb_op, uchar *dst_record,
+ha_ndbcluster::get_blob_values(const NdbOperation *ndb_op, uchar *dst_record,
                                const MY_BITMAP *bitmap)
 {
   uint i;
@@ -961,7 +961,7 @@ ha_ndbcluster::get_blob_values(NdbOperat
 }
 
 int
-ha_ndbcluster::set_blob_values(NdbOperation *ndb_op, my_ptrdiff_t row_offset,
+ha_ndbcluster::set_blob_values(const NdbOperation *ndb_op, my_ptrdiff_t row_offset,
                                const MY_BITMAP *bitmap, uint *set_count)
 {
   uint field_no;
@@ -2121,7 +2121,7 @@ int ha_ndbcluster::pk_read(const uchar *
                            uint32 part_id)
 {
   NdbConnection *trans= m_thd_ndb->trans;
-  NdbOperation *op;
+  const NdbOperation *op;
   uchar *row;
   int res;
   DBUG_ENTER("pk_read");
@@ -2140,11 +2140,13 @@ int ha_ndbcluster::pk_read(const uchar *
 
   NdbOperation::LockMode lm=
     (NdbOperation::LockMode)get_ndb_lock_type(m_lock.type, table->read_set);
-  if (!(op= pk_unique_index_read_key(table->s->primary_key, key, row, lm)))
+  
+  if (!(op= pk_unique_index_read_key(table->s->primary_key, key, row, lm,
+                                     (m_user_defined_partitioning?
+                                      &part_id:
+                                      NULL))))
     ERR_RETURN(trans->getNdbError());
   
-  if (m_user_defined_partitioning)
-    op->setPartitionId(part_id);
 
   if ((res = execute_no_commit_ie(this,trans,FALSE)) != 0 ||
       op->getNdbError().code) 
@@ -2176,15 +2178,27 @@ int ha_ndbcluster::ndb_pk_update_row(THD
   NdbTransaction *trans= m_thd_ndb->trans;
   int read_needed= !bitmap_is_set_all(table->read_set);
   int error;
-  NdbOperation *op;
+  const NdbOperation *op;
   DBUG_ENTER("ndb_pk_update_row");
 
+  NdbOperation::OperationOptions *poptions = NULL;
+  NdbOperation::OperationOptions options;
+  options.optionsPresent=0;
+
   DBUG_PRINT("info", ("primary key update or partition change, "
                       "doing read+delete+insert"));
   // Get all old fields, since we optimize away fields not in query
 
   const NdbRecord *key_rec;
   const uchar *key_row;
+
+  if (m_user_defined_partitioning)
+  {
+    options.optionsPresent |= NdbOperation::OperationOptions::OO_PARTITION_ID;
+    options.partitionId=old_part_id;
+    poptions=&options;
+  }
+
   if (table_share->primary_key != MAX_KEY)
   {
     key_rec= m_index[table->s->primary_key].ndb_unique_record_row;
@@ -2215,7 +2229,8 @@ int ha_ndbcluster::ndb_pk_update_row(THD
       (NdbOperation::LockMode)get_ndb_lock_type(m_lock.type, &m_bitmap);
     if (!(op= trans->readTuple(key_rec, (const char *)key_row,
                                m_ndb_record, (char *)new_data,
-                               lm, (const unsigned char *)(m_bitmap.bitmap))))
+                             lm, (const unsigned char *)(m_bitmap.bitmap),
+                             poptions, sizeof(NdbOperation::OperationOptions))))
       ERR_RETURN(trans->getNdbError());
   }
 
@@ -2228,9 +2243,6 @@ int ha_ndbcluster::ndb_pk_update_row(THD
       ERR_RETURN(op->getNdbError());
   }
 
-  if (m_user_defined_partitioning)
-    op->setPartitionId(old_part_id);
-
   if (execute_no_commit(this, trans, FALSE) != 0)
   {
     table->status= STATUS_NOT_FOUND;
@@ -2282,10 +2294,10 @@ delete_tuple:
 }
 
 /*
- * Check that all operations between first and last all
- * have gotten the errcode
- * If checking for HA_ERR_KEY_NOT_FOUND then update m_dupkey
- * for all succeeding operations
+  Check that all operations between first and last all
+  have gotten the errcode
+  If checking for HA_ERR_KEY_NOT_FOUND then update m_dupkey
+  for all succeeding operations
  */
 bool ha_ndbcluster::check_all_operations_for_error(NdbTransaction *trans,
                                                    const NdbOperation *first,
@@ -2380,8 +2392,11 @@ int ha_ndbcluster::peek_indexed_rows(con
                                      NDB_WRITE_OP write_op)
 {
   NdbTransaction *trans= m_thd_ndb->trans;
-  NdbOperation *op;
+  const NdbOperation *op;
   const NdbOperation *first, *last;
+  NdbOperation::OperationOptions options;
+  NdbOperation::OperationOptions *poptions=NULL;
+  options.optionsPresent = 0;
   uint i;
   int res;
   DBUG_ENTER("peek_indexed_rows");
@@ -2396,11 +2411,6 @@ int ha_ndbcluster::peek_indexed_rows(con
      */
     const NdbRecord *key_rec=
       m_index[table->s->primary_key].ndb_unique_record_row;
-    if (!(op= trans->readTuple(key_rec, (const char *)record,
-                               key_rec, dummy_row, lm, empty_mask)))
-      ERR_RETURN(trans->getNdbError());
-    
-    first= op;
 
     if (m_user_defined_partitioning)
     {
@@ -2415,8 +2425,18 @@ int ha_ndbcluster::peek_indexed_rows(con
         m_part_info->err_value= func_value;
         DBUG_RETURN(error);
       }
-      op->setPartitionId(part_id);
-    }
+      options.optionsPresent |= NdbOperation::OperationOptions::OO_PARTITION_ID;
+      options.partitionId=part_id;
+      poptions=&options;
+    }    
+
+    if (!(op= trans->readTuple(key_rec, (const char *)record,
+                               key_rec, dummy_row, lm, empty_mask,
+                               poptions, 
+                               sizeof(NdbOperation::OperationOptions))))
+      ERR_RETURN(trans->getNdbError());
+    
+    first= op;
   }
   /*
    * Fetch any rows with colliding unique indexes
@@ -2444,7 +2464,7 @@ int ha_ndbcluster::peek_indexed_rows(con
         continue;
       }
 
-      NdbOperation *iop;
+      const NdbOperation *iop;
       const NdbRecord *key_rec= m_index[i].ndb_unique_record_row;
       if (!(iop= trans->readTuple(key_rec, (const char *)record,
                                   key_rec, dummy_row,
@@ -2486,7 +2506,7 @@ int ha_ndbcluster::unique_index_read(con
                                      uint key_len, uchar *buf)
 {
   NdbTransaction *trans= m_thd_ndb->trans;
-  NdbOperation *op;
+  const NdbOperation *op;
   uchar *row;
   DBUG_ENTER("ha_ndbcluster::unique_index_read");
   DBUG_PRINT("enter", ("key_len: %u, index: %u", key_len, active_index));
@@ -2503,7 +2523,7 @@ int ha_ndbcluster::unique_index_read(con
 
   NdbOperation::LockMode lm=
     (NdbOperation::LockMode)get_ndb_lock_type(m_lock.type, table->read_set);
-  if (!(op= pk_unique_index_read_key(active_index, key, row, lm)))
+  if (!(op= pk_unique_index_read_key(active_index, key, row, lm, NULL)))
     ERR_RETURN(trans->getNdbError());
   
   if (execute_no_commit_ie(this,trans,FALSE) != 0 ||
@@ -2543,7 +2563,7 @@ ha_ndbcluster::scan_handle_lock_tuple(Nd
       LOCK WITH SHARE MODE) and row was not explictly unlocked 
       with unlock_row() call
     */
-    NdbOperation *op;
+    const NdbOperation *op;
     // Lock row
     DBUG_PRINT("info", ("Keeping lock on scanned row"));
       
@@ -2588,7 +2608,7 @@ inline int ha_ndbcluster::fetch_next(Ndb
         DBUG_RETURN(ndb_err(trans));
     }
     
-    if ((local_check= cursor->nextResult(_m_next_row,
+    if ((local_check= cursor->nextResult(&_m_next_row,
                                          contact_ndb,
                                          m_force_send)) == 0)
     {
@@ -2683,14 +2703,19 @@ inline int ha_ndbcluster::next_result(uc
   Do a primary key or unique key index read operation.
   The key value is taken from a buffer in mysqld key format.
 */
-NdbOperation *
+const NdbOperation *
 ha_ndbcluster::pk_unique_index_read_key(uint idx, const uchar *key, uchar *buf,
-                                        NdbOperation::LockMode lm)
+                                        NdbOperation::LockMode lm,
+                                        Uint32 *ppartition_id)
 {
-  NdbOperation *op;
+  const NdbOperation *op;
   const NdbRecord *ndb_record= m_ndb_record;
   uchar *mask= (uchar *)(table->read_set->bitmap);
   const NdbRecord *key_rec;
+  NdbOperation::OperationOptions options;
+  NdbOperation::OperationOptions *poptions = NULL;
+  options.optionsPresent=0;
+  
   if (idx != MAX_KEY)
     key_rec= m_index[idx].ndb_unique_record_key;
   else
@@ -2713,9 +2738,18 @@ ha_ndbcluster::pk_unique_index_read_key(
       if (m_user_defined_partitioning)
         ndb_record= m_ndb_record_fragment;
     }
+
+    if (ppartition_id !=NULL)
+    {
+      options.optionsPresent |= NdbOperation::OperationOptions::OO_PARTITION_ID;
+      options.partitionId=*ppartition_id;
+      poptions=&options;
+    }
   }
   op= m_thd_ndb->trans->readTuple(key_rec, (const char *)key,
-                                ndb_record, (char *)buf, lm, mask);
+                                  ndb_record, (char *)buf, lm, mask,
+                                  poptions,
+                                  sizeof(NdbOperation::OperationOptions));
 
   if (uses_blob_value(table->read_set) &&
       get_blob_values(op, buf, table->read_set) != 0)
@@ -2977,28 +3011,9 @@ compute_index_bounds(NdbIndexScanOperati
   }
 }
 
-struct ordered_index_scan_data {
-  const KEY *key_info;
-  const key_range *start_key;
-  const key_range *end_key;
-};
-
-/* Callback to set up scan bounds for ordered_index_scan(). */
-static int
-ordered_index_scan_callback(void *arg, Uint32 i,
-                            NdbIndexScanOperation::IndexBound & bound)
-{
-  struct ordered_index_scan_data *data= (struct ordered_index_scan_data *)arg;
-  compute_index_bounds(bound, data->key_info, data->start_key, data->end_key);
-  bound.range_no= 0;
-  return 0;                                     // Success
-}
-
-
 /*
-  Start ordered index scan in NDB
-*/
-
+   Start ordered index scan in NDB
+ */
 int ha_ndbcluster::ordered_index_scan(const key_range *start_key,
                                       const key_range *end_key,
                                       bool sorted, bool descending,
@@ -3006,7 +3021,6 @@ int ha_ndbcluster::ordered_index_scan(co
 {  
   NdbTransaction *trans= m_thd_ndb->trans;
   NdbIndexScanOperation *op;
-  struct ordered_index_scan_data data;
   uchar *mask;
   int error;
 
@@ -3032,40 +3046,58 @@ int ha_ndbcluster::ordered_index_scan(co
 
   NdbOperation::LockMode lm=
     (NdbOperation::LockMode)get_ndb_lock_type(m_lock.type, table->read_set);
-  Uint32 scan_flags= 0;
+
+  NdbTransaction::ScanOptions options;
+  options.optionsPresent=NdbTransaction::ScanOptions::SO_SCANFLAGS;
+  options.scan_flags=0;
+
   if (lm == NdbOperation::LM_Read)
-    scan_flags|= NdbScanOperation::SF_KeyInfo;
+    options.scan_flags|= NdbScanOperation::SF_KeyInfo;
   if (sorted)
-    scan_flags|= NdbScanOperation::SF_OrderBy;
+    options.scan_flags|= NdbScanOperation::SF_OrderBy;
   if (descending)
-    scan_flags|= NdbScanOperation::SF_Descending;
+    options.scan_flags|= NdbScanOperation::SF_Descending;
   const NdbRecord *key_rec= m_index[active_index].ndb_record_key;
   const NdbRecord *row_rec= m_index[active_index].ndb_record_row;
-  Uint32 num_bounds= (start_key != NULL || end_key != NULL);
-  data.key_info= table->key_info + active_index;
-  if (!descending) {
-    data.start_key= start_key;
-    data.end_key= end_key;
+
+  NdbIndexScanOperation::IndexBound bound;
+  NdbIndexScanOperation::IndexBound *pbound = NULL;
+
+  if (start_key != NULL || end_key != NULL)
+  {
+    /* 
+       Compute bounds info, reversing range boundaries
+       if descending
+     */
+    compute_index_bounds(bound, 
+                         table->key_info + active_index,
+                         (descending?
+                          end_key : start_key),
+                         (descending?
+                          start_key : end_key));
+    bound.range_no = 0;
+    pbound = &bound;
   }
-  else
+
+  /* Partition pruning */
+  if (m_use_partition_pruning && part_spec != NULL &&
+      part_spec->start_part == part_spec->end_part)
   {
-    data.start_key= end_key;
-    data.end_key= start_key;
+    options.partitionId = part_spec->start_part;
+    options.optionsPresent |= NdbTransaction::ScanOptions::SO_PARTITION_ID;
   }
 
-  if (!(op= trans->scanIndex(key_rec, ordered_index_scan_callback, &data,
-                             num_bounds, row_rec, lm,
+  if (!(op= trans->scanIndex(key_rec, row_rec, lm,
                              mask,
-                             scan_flags, parallelism, 0)))
+                             pbound,
+                             &options,
+                             sizeof(NdbTransaction::ScanOptions))))
     ERR_RETURN(trans->getNdbError());
 
   if (uses_blob_value(table->read_set) &&
       get_blob_values(op, NULL, table->read_set) != 0)
     ERR_RETURN(op->getNdbError());
 
-  if (m_use_partition_pruning && part_spec != NULL &&
-      part_spec->start_part == part_spec->end_part)
-    op->setPartitionId(part_spec->start_part);
   m_active_cursor= op;
 
   if (m_cond && m_cond->generate_scan_filter(op))
@@ -3121,6 +3153,15 @@ int ha_ndbcluster::full_table_scan(const
 
   if (table_share->primary_key == MAX_KEY || m_user_defined_partitioning)
     mask= copy_column_set(table->read_set);
+
+  NdbOperation::LockMode lm=
+    (NdbOperation::LockMode)get_ndb_lock_type(m_lock.type, table->read_set);
+  NdbTransaction::ScanOptions options;
+  options.optionsPresent = (NdbTransaction::ScanOptions::SO_SCANFLAGS |
+                            NdbTransaction::ScanOptions::SO_PARALLEL);
+  options.scan_flags = guess_scan_flags(lm, m_table, table->read_set);
+  options.parallel = parallelism;
+
   if (m_use_partition_pruning)
   {
     part_spec.start_part= 0;
@@ -3136,6 +3177,16 @@ int ha_ndbcluster::full_table_scan(const
     {
       DBUG_RETURN(HA_ERR_END_OF_FILE);
     }
+
+    /*
+      If partition pruning has found exactly one partition in set
+      we can optimize scan to run towards that partition only.
+    */
+    if (part_spec.start_part == part_spec.end_part)
+    {
+      options.optionsPresent|= NdbTransaction::ScanOptions::SO_PARTITION_ID;
+      options.partitionId= part_spec.start_part;
+    }
   }
   if (table_share->primary_key == MAX_KEY)
   {
@@ -3149,10 +3200,8 @@ int ha_ndbcluster::full_table_scan(const
     }
   }
 
-  NdbOperation::LockMode lm=
-    (NdbOperation::LockMode)get_ndb_lock_type(m_lock.type, table->read_set);
-  int flags= guess_scan_flags(lm, m_table, table->read_set);
-  if (!(op= trans->scanTable(ndb_record, lm, mask, flags, parallelism)))
+  if (!(op= trans->scanTable(ndb_record, lm, mask,
+                             &options, sizeof(NdbTransaction::ScanOptions))))
     ERR_RETURN(trans->getNdbError());
   m_active_cursor= op;
 
@@ -3160,23 +3209,6 @@ int ha_ndbcluster::full_table_scan(const
       get_blob_values(op, NULL, table->read_set) != 0)
     ERR_RETURN(op->getNdbError());
 
-  if (m_use_partition_pruning)
-  {
-    /*
-      If partition pruning has found exactly one partition in set
-      we can optimize scan to run towards that partition only.
-    */
-    if (part_spec.start_part == part_spec.end_part)
-    {
-      /*
-        Only one partition is required to scan, if sorted is required we
-        don't need it any more since output from one ordered partitioned
-        index is always sorted.
-      */
-      m_active_cursor->setPartitionId(part_spec.start_part);
-    }
-  }
-
   if (!key_info)
   {
     if (m_cond && m_cond->generate_scan_filter(op))
@@ -3227,7 +3259,8 @@ ha_ndbcluster::set_auto_inc(THD *thd, Fi
 }
 
 inline void
-ha_ndbcluster::eventSetAnyValue(THD *thd, NdbOperation *op)
+ha_ndbcluster::eventSetAnyValue(THD *thd, 
+                                NdbOperation::OperationOptions *options)
 {
   if (unlikely(m_slow_path))
   {
@@ -3239,9 +3272,15 @@ ha_ndbcluster::eventSetAnyValue(THD *thd
     */
     Thd_ndb *thd_ndb= get_thd_ndb(thd);
     if (thd->slave_thread)
-      op->setAnyValue(thd->server_id);
+    {
+      options->optionsPresent |= NdbOperation::OperationOptions::OO_ANYVALUE;
+      options->anyValue=thd->server_id;
+    }
     else if (thd_ndb->trans_options & TNTO_NO_LOGGING)
-      op->setAnyValue(NDB_ANYVALUE_FOR_NOLOGGING);
+    {
+      options->optionsPresent |= NdbOperation::OperationOptions::OO_ANYVALUE;
+      options->anyValue=NDB_ANYVALUE_FOR_NOLOGGING;
+    }
   }
 }
 
@@ -3260,7 +3299,7 @@ int ha_ndbcluster::ndb_write_row(uchar *
 {
   bool has_auto_increment;
   NdbTransaction *trans= m_thd_ndb->trans;
-  NdbOperation *op;
+  const NdbOperation *op;
   THD *thd= table->in_use;
   Thd_ndb *thd_ndb= get_thd_ndb(thd);
   uint32 part_id;
@@ -3395,6 +3434,23 @@ int ha_ndbcluster::ndb_write_row(uchar *
     table->timestamp_field->set_time();
 
   /*
+     Setup OperationOptions
+   */
+  NdbOperation::OperationOptions options;
+  NdbOperation::OperationOptions *poptions = NULL;
+  options.optionsPresent=0;
+  
+  eventSetAnyValue(thd, &options); 
+
+  if (m_user_defined_partitioning)
+  {
+    options.optionsPresent |= NdbOperation::OperationOptions::OO_PARTITION_ID;
+    options.partitionId=part_id;
+  }
+  if (options.optionsPresent != 0)
+    poptions=&options;
+
+  /*
     We do not use the table->write_set here.
     The reason is that for REPLACE INTO t(a), the write_set is passed with
     only column 'a' enabled.
@@ -3442,22 +3498,22 @@ int ha_ndbcluster::ndb_write_row(uchar *
       key_row= row;
     }
     op= trans->writeTuple(key_rec, (const char *)key_row,
-                          m_ndb_record, (char *)row, mask);
+                          m_ndb_record, (char *)row, mask,
+                          poptions,
+                          sizeof(NdbOperation::OperationOptions));
   }
   else
   {
     /* Using insert, we write all user visible columns */
     user_cols_written_bitmap= NULL;
-    op= trans->insertTuple(m_ndb_record, (char *)row);
+    op= trans->insertTuple(m_ndb_record, (char *)row,
+                           NULL, // No mask
+                           poptions,
+                           sizeof(NdbOperation::OperationOptions));
   }
   if (!(op))
     ERR_RETURN(trans->getNdbError());
 
-  eventSetAnyValue(thd, op);
-
-  if (m_user_defined_partitioning)
-    op->setPartitionId(part_id);
-
   uint blob_count= 0;
   if (table_share->blob_fields > 0)
   {
@@ -3547,7 +3603,7 @@ int ha_ndbcluster::update_row(const ucha
   Thd_ndb *thd_ndb= get_thd_ndb(thd);
   NdbTransaction *trans= m_thd_ndb->trans;
   NdbScanOperation* cursor= m_active_cursor;
-  NdbOperation *op;
+  const NdbOperation *op;
   uint32 old_part_id= 0, new_part_id= 0;
   int error;
   longlong func_value;
@@ -3649,14 +3705,23 @@ int ha_ndbcluster::update_row(const ucha
       row= new_data;
   }
 
+  NdbOperation::OperationOptions *poptions = NULL;
+  NdbOperation::OperationOptions options;
+  options.optionsPresent=0;
+
   if (m_user_defined_partitioning)
   {
     if (func_value >= INT_MAX32)
       func_value= INT_MAX32;
     set_partition_function_value(row, (uint32)func_value);
     request_partition_function_value(mask);
-  }
 
+    options.optionsPresent |= NdbOperation::OperationOptions::OO_PARTITION_ID;
+    options.partitionId = new_part_id;
+  }
+  
+  eventSetAnyValue(thd, &options);
+  
   if (cursor)
   {
     /*
@@ -3668,8 +3733,14 @@ int ha_ndbcluster::update_row(const ucha
     */
     DBUG_PRINT("info", ("Calling updateTuple on cursor, write_set=0x%x",
                         table->write_set->bitmap[0]));
+
+    if (options.optionsPresent != 0)
+      poptions = &options;
+
     if (!(op= cursor->updateCurrentTuple(trans, m_ndb_record,
-                                         (const char*)row, mask)))
+                                         (const char*)row, mask,
+                                         poptions,
+                                         sizeof(NdbOperation::OperationOptions))))
       ERR_RETURN(trans->getNdbError());
 
     m_lock_tuple= FALSE;
@@ -3706,14 +3777,14 @@ int ha_ndbcluster::update_row(const ucha
         DBUG_RETURN(ER_OUTOFMEMORY);
     }
 
+    if (options.optionsPresent !=0)
+      poptions= &options;
+
     if (!(op= trans->updateTuple(key_rec, (const char *)key_row,
                                  m_ndb_record, (const char*)row, mask)))
       ERR_RETURN(trans->getNdbError());  
   }
 
-  if (m_user_defined_partitioning)
-    op->setPartitionId(new_part_id);
-
   uint blob_count= 0;
   if (uses_blob_value(table->write_set))
   {
@@ -3722,8 +3793,6 @@ int ha_ndbcluster::update_row(const ucha
       ERR_RETURN(op->getNdbError());
   }
 
-  eventSetAnyValue(thd, op);
-
   m_rows_changed++;
 
   if (need_execute)
@@ -3756,7 +3825,7 @@ int ha_ndbcluster::ndb_delete_row(const 
   Thd_ndb *thd_ndb= get_thd_ndb(thd);
   NdbTransaction *trans= m_thd_ndb->trans;
   NdbScanOperation* cursor= m_active_cursor;
-  NdbOperation *op;
+  const NdbOperation *op;
   uint32 part_id;
   int error;
   DBUG_ENTER("ndb_delete_row");
@@ -3771,6 +3840,21 @@ int ha_ndbcluster::ndb_delete_row(const 
     DBUG_RETURN(error);
   }
 
+  NdbOperation::OperationOptions options;
+  NdbOperation::OperationOptions *poptions = NULL;
+  options.optionsPresent=0;
+
+  if (m_user_defined_partitioning)
+  {
+    options.optionsPresent |= NdbOperation::OperationOptions::OO_PARTITION_ID;
+    options.partitionId = part_id;
+  }
+
+  eventSetAnyValue(thd, &options);
+
+  if (options.optionsPresent != 0)
+    poptions = &options;
+
   if (cursor)
   {
     /*
@@ -3781,18 +3865,17 @@ int ha_ndbcluster::ndb_delete_row(const 
       the active record in cursor
     */
     DBUG_PRINT("info", ("Calling deleteTuple on cursor"));
-    if ((op= cursor->deleteCurrentTuple(trans, m_ndb_record)) == 0)
+    if ((op = cursor->deleteCurrentTuple(trans, m_ndb_record,
+                                         NULL, // result_row
+                                         NULL, // result_mask
+                                         poptions, 
+                                         sizeof(NdbOperation::OperationOptions))) == 0)
       ERR_RETURN(trans->getNdbError());     
     m_lock_tuple= FALSE;
     thd_ndb->m_unsent_bytes+= 12;
 
-    if (m_user_defined_partitioning)
-      op->setPartitionId(part_id);
-
     no_uncommitted_rows_update(-1);
 
-    eventSetAnyValue(thd, op);
-
     if (!(primary_key_update || m_delete_cannot_batch))
       // If deleting from cursor, NoCommit will be handled in next_result
       DBUG_RETURN(0);
@@ -3849,16 +3932,16 @@ int ha_ndbcluster::ndb_delete_row(const 
     else
       need_execute= TRUE;
 
-    if (!(op=trans->deleteTuple(key_rec, (const char *)key_row)))
+    if (!(op=trans->deleteTuple(key_rec, (const char *)key_row,
+                                NULL, // result_rec
+                                NULL, // result_row
+                                NULL, // result_mask
+                                poptions,
+                                sizeof(NdbOperation::OperationOptions))))
       ERR_RETURN(trans->getNdbError());
-    
-    if (m_user_defined_partitioning)
-      op->setPartitionId(part_id);
 
     no_uncommitted_rows_update(-1);
 
-    eventSetAnyValue(thd, op);
-
     if (!need_execute)
       DBUG_RETURN(0);
   }
@@ -9145,9 +9228,15 @@ ndb_get_table_statistics(ha_ndbcluster* 
       goto retry;
     }
 
+    NdbTransaction::ScanOptions options;
+    options.optionsPresent=NdbTransaction::ScanOptions::SO_BATCH;
     /* Set batch_size=1, as we need only one row per fragment. */
+    options.batch=1;
+
     if ((pOp= pTrans->scanTable(record, NdbOperation::LM_CommittedRead,
-                                NULL, 0, 0, 1)) == NULL)
+                                NULL, 
+                                &options,
+                                sizeof(NdbTransaction::ScanOptions))) == NULL)
     {
       error= pTrans->getNdbError();
       goto retry;
@@ -9168,7 +9257,7 @@ ndb_get_table_statistics(ha_ndbcluster* 
       goto retry;
     }
     
-    while ((check= pOp->nextResult(row, TRUE, TRUE)) == 0)
+    while ((check= pOp->nextResult(&row, TRUE, TRUE)) == 0)
     {
       /* NDB API ensures proper alignment of rows to make the cast valid. */
       const ndb_table_statistics_row *stat=
@@ -9303,6 +9392,7 @@ ha_ndbcluster::release_completed_operati
   }
   if (!force_release)
   {
+/* ToDo: remove need for this special case (and NDB_QUERY_MULTI_READ_RANGE... and force_release?). */
     if (m_thd_ndb->query_state & NDB_QUERY_MULTI_READ_RANGE)
     {
       /* We are batching reads and have not consumed all fetched
@@ -9505,7 +9595,8 @@ uint16 &mrr_persistent_flag_storage2(ha_
 /*** psergey: MRR helper hacks end ****/
 
 
-
+#if 0
+psergey-merge: not needed:
 /* Callback to set up scan bounds for read multi range. */
 static int
 read_multi_bounds_callback(void *arg, Uint32 i,
@@ -9531,7 +9622,7 @@ read_multi_bounds_callback(void *arg, Ui
 
   return 0;                                     // Success
 }
-
+#endif
 
 
 int ha_ndbcluster::multi_range_read_init(RANGE_SEQ_IF *seq_funcs, 
@@ -9618,7 +9709,7 @@ int ha_ndbcluster::multi_range_start_ret
   int range_res;
   KEY* key_info= table->key_info + active_index;
   ulong reclength= table_share->reclength;
-  NdbOperation* op;
+  const NdbOperation* op;
   NDB_INDEX_TYPE cur_index_type= get_index_type(active_index);
   DBUG_ENTER("multi_range_start_retrievals");
   
@@ -9646,6 +9737,7 @@ int ha_ndbcluster::multi_range_start_ret
 
   DBUG_ASSERT(cur_index_type != UNDEFINED_INDEX);
 
+  m_multi_cursor= 0;
   const NdbOperation* lastOp= m_thd_ndb->trans->getLastDefinedOperation();
   NdbOperation::LockMode lm= 
     (NdbOperation::LockMode)get_ndb_lock_type(m_lock.type, table->read_set);
@@ -9654,22 +9746,20 @@ int ha_ndbcluster::multi_range_start_ret
   uint num_scan_ranges= 0;
   int range_no= -1;
   int mrr_range_no= starting_range;
-  struct read_multi_callback_data data;
   // ToDo: proper interface to save/retrieve point to iterate from
   
   void *saved_start_pos;
   uint saved_start_no; 
   mrr_funcs.save_current_pos(mrr_iter, &saved_start_no, &saved_start_pos);
 
-  data.h= this;
-  data.first_range= starting_range;
   /*
     psergey trying to fix the case where this function is invoked and when
     there are actually no ranges in the sequence
   */
   bool no_ranges= TRUE; 
 
-  while ((row_buf+reclength <= end_of_buffer) && !(range_res= mrr_funcs.next(mrr_iter, &mrr_cur_range)))
+  while ((row_buf+reclength <= end_of_buffer) && 
+         !(range_res= mrr_funcs.next(mrr_iter, &mrr_cur_range)))
   // ToDo: is this really correct, will not mrr_cur_range be offset on reentering
   {
     no_ranges= FALSE;
@@ -9709,9 +9799,79 @@ int ha_ndbcluster::multi_range_start_ret
       */
       if (range_no > NdbIndexScanOperation::MaxRangeNo)
         break; // ToDo: is this really correct, will not mrr_cur_range be offset on reentering
+      /* Create the scan operation for the first scan range. */
+      if (!m_multi_cursor)
+      {
+        /* Do a multi-range index scan for ranges not done by primary/unique key. */
+        NdbTransaction::ScanOptions options;
+        uchar *mask;
+
+        options.optionsPresent=
+          NdbTransaction::ScanOptions::SO_SCANFLAGS |
+          NdbTransaction::ScanOptions::SO_PARALLEL;
+
+        options.scan_flags= 
+          NdbScanOperation::SF_ReadRangeNo |
+          NdbScanOperation::SF_MultiRange;
+
+        if (lm == NdbOperation::LM_Read)
+          options.scan_flags|= NdbScanOperation::SF_KeyInfo;
+        if (mrr_is_output_sorted)
+          options.scan_flags|= NdbScanOperation::SF_OrderBy;
+
+        options.parallel=parallelism;
+
+        if (m_user_defined_partitioning || table_share->primary_key == MAX_KEY)
+        {
+          mask= copy_column_set(table->read_set);
+          if (table_share->primary_key == MAX_KEY)
+            request_hidden_key(mask);
+        }
+        else
+          mask= (uchar *)(table->read_set->bitmap);
+
+        /* Define scan */
+        NdbIndexScanOperation *scanOp= m_thd_ndb->trans->scanIndex
+          (m_index[active_index].ndb_record_key,
+           m_index[active_index].ndb_record_row, 
+           lm,
+           mask,
+           NULL, /* All bounds specified below */
+           &options,
+           sizeof(NdbTransaction::ScanOptions));
+
+        if (!scanOp)
+          ERR_RETURN(m_thd_ndb->trans->getNdbError());
+        m_multi_cursor= scanOp;
+
+        /*
+          We do not get_blob_values() here, as when using blobs we always
+          fallback to non-batched multi range read (see if statement at
+          top of this function).
+        */
+
+        if (m_cond && m_cond->generate_scan_filter(scanOp))
+          ERR_RETURN(scanOp->getNdbError());
+
+        /* We set m_next_row=0 to say that no row was fetched from the scan yet. */
+        m_next_row= 0;
+      }
 
       /* Include this range in the ordered index scan. */
       mrr_persistent_flag_storage2(this, mrr_iter, mrr_range_no)&= ~(uint)UNIQUE_RANGE;
+      //psergey-todo: need this anymore? ^^ 
+      NdbIndexScanOperation::IndexBound bound;
+      compute_index_bounds(bound, key_info,
+                           &mrr_cur_range.start_key, &mrr_cur_range.end_key);
+      bound.range_no= mrr_range_no;
+
+      if (m_multi_cursor->setBound(m_index[active_index].ndb_record_key,
+                                   bound))
+      {
+        ERR_RETURN(m_thd_ndb->trans->getNdbError());
+      }
+
+
       num_scan_ranges++;
     }
     else
@@ -9727,82 +9887,25 @@ int ha_ndbcluster::multi_range_start_ret
 
       mrr_persistent_flag_storage2(this, mrr_iter, mrr_range_no)|= UNIQUE_RANGE;
 
-      if (!(op= pk_unique_index_read_key(active_index,
-                                         mrr_cur_range.start_key.key,
-                                         row_buf, lm)))
-        ERR_RETURN(m_thd_ndb->trans->getNdbError());
+      Uint32 partitionId;
+      Uint32* ppartitionId = NULL;
 
       if (m_user_defined_partitioning &&
           (cur_index_type == PRIMARY_KEY_ORDERED_INDEX ||
            cur_index_type == PRIMARY_KEY_INDEX))
-        op->setPartitionId(part_spec.start_part);
-
-      row_buf+= reclength;
-    }
-  }
-
-  if (num_scan_ranges > 0)
-  {
-    /* 
-      save state of iterator 
-      (psergey's note: why do that, wouldn't the callback function call
-       next() just the right number of times to position the iterator exactly
-       at the end??)
-    */
-    void *saved_end_pos;
-    uint saved_end_no;
-    mrr_funcs.save_current_pos(mrr_iter, &saved_end_no, &saved_end_pos);
-    mrr_funcs.restore_pos(mrr_iter, saved_start_no, saved_start_pos);
-
-    /* Do a multi-range index scan for ranges not done by primary/unique key. */
-    uchar *mask;
+      {
+        partitionId=part_spec.start_part;
+        ppartitionId=&partitionId;
+      }
 
-    data.key_info= key_info;
-    data.range= data.first_range;
-    data.mrr_iter= mrr_iter;
-    data.mrr_funcs= &mrr_funcs;
-    data.first_range++;
-
-    Uint32 flags= NdbScanOperation::SF_ReadRangeNo;
-    if (lm == NdbOperation::LM_Read)
-      flags|= NdbScanOperation::SF_KeyInfo;
-    if (mrr_is_output_sorted)
-      flags|= NdbScanOperation::SF_OrderBy;
+      if (!(op= pk_unique_index_read_key(active_index,
+                                         mrr_cur_range.start_key.key,
+                                         row_buf, lm,
+                                         ppartitionId)))
+        ERR_RETURN(m_thd_ndb->trans->getNdbError());
 
-    if (m_user_defined_partitioning || table_share->primary_key == MAX_KEY)
-    {
-      mask= copy_column_set(table->read_set);
-      if (table_share->primary_key == MAX_KEY)
-        request_hidden_key(mask);
+      row_buf+= reclength;
     }
-    else
-      mask= (uchar *)(table->read_set->bitmap);
-
-    NdbIndexScanOperation *scanOp= m_thd_ndb->trans->scanIndex
-      (m_index[active_index].ndb_record_key, read_multi_bounds_callback,
-       &data, num_scan_ranges, m_index[active_index].ndb_record_row, lm,
-       mask, flags, parallelism, 0);
-    if (!scanOp)
-      ERR_RETURN(m_thd_ndb->trans->getNdbError());
-    m_multi_cursor= scanOp;
-
-    /*
-      We do not get_blob_values() here, as when using blobs we always
-      fallback to non-batched multi range read (see if statement at
-      top of this function).
-    */
-
-    if (m_cond && m_cond->generate_scan_filter(scanOp))
-      ERR_RETURN(scanOp->getNdbError());
-
-    /* We set m_next_row=0 to say that no row was fetched from the scan yet. */
-    m_next_row= 0;
-
-    mrr_funcs.restore_pos(mrr_iter, saved_end_no, saved_end_pos);
-  }
-  else
-  {
-    m_multi_cursor= 0;
   }
 
   /**
diff -Nrup a/sql/ha_ndbcluster.h b/sql/ha_ndbcluster.h
--- a/sql/ha_ndbcluster.h	2008-02-02 05:40:35 +03:00
+++ b/sql/ha_ndbcluster.h	2008-02-02 18:28:00 +03:00
@@ -615,16 +615,18 @@ private:
   void clear_extended_column_set(uchar *mask);
   uchar *copy_column_set(MY_BITMAP *bitmap);
 
-  int get_blob_values(NdbOperation *ndb_op, uchar *dst_record,
+  int get_blob_values(const NdbOperation *ndb_op, uchar *dst_record,
                       const MY_BITMAP *bitmap);
-  int set_blob_values(NdbOperation *ndb_op, my_ptrdiff_t row_offset,
+  int set_blob_values(const NdbOperation *ndb_op, my_ptrdiff_t row_offset,
                       const MY_BITMAP *bitmap, uint *set_count);
   friend int g_get_ndb_blobs_value(NdbBlob *ndb_blob, void *arg);
-  void eventSetAnyValue(THD *thd, NdbOperation *op);
+  void eventSetAnyValue(THD *thd, NdbOperation::OperationOptions *options);
   bool check_index_fields_in_write_set(uint keyno);
 
-  NdbOperation *pk_unique_index_read_key(uint idx, const uchar *key, uchar *buf,
-                                         NdbOperation::LockMode lm);
+  const NdbOperation *pk_unique_index_read_key(uint idx, 
+                                               const uchar *key, uchar *buf,
+                                               NdbOperation::LockMode lm,
+                                               Uint32 *ppartition_id);
   int read_multi_range_fetch_next();
   
   int set_bounds(NdbIndexScanOperation*, uint inx, bool rir,
diff -Nrup a/sql/handler.cc b/sql/handler.cc
--- a/sql/handler.cc	2008-02-02 06:03:10 +03:00
+++ b/sql/handler.cc	2008-02-02 18:28:00 +03:00
@@ -3533,8 +3533,7 @@ int DsMrr_impl::dsmrr_init(handler *h, K
   DBUG_ASSERT(!semi_join || is_mrr_assoc);
 
   if (is_mrr_assoc)
-    h->ha_statistic_increment(&SSV::ha_multi_range_read_init_count);
- 
+    status_var_increment(table->in_use->status_var.ha_multi_range_read_init_count);
 
   rowids_buf_end= buf->buffer_end;
   elem_size= h->ref_length + (int)is_mrr_assoc * sizeof(void*);
diff -Nrup a/sql/sql_select.cc b/sql/sql_select.cc
--- a/sql/sql_select.cc	2008-02-02 05:40:37 +03:00
+++ b/sql/sql_select.cc	2008-02-02 18:28:00 +03:00
@@ -224,7 +224,7 @@ void select_describe(JOIN *join, bool ne
 			    bool distinct, const char *message=NullS);
 static Item *remove_additional_cond(Item* conds);
 static void add_group_and_distinct_keys(JOIN *join, JOIN_TAB *join_tab);
-static bool test_if_ref(Item_field *left_item,Item *right_item);
+static bool test_if_ref(COND *root_cond, Item_field *left_item,Item *right_item);
 
 /*
   This is used to mark equalities that were made from i-th IN-equality.
@@ -832,8 +832,8 @@ void JOIN::remove_subq_pushed_predicates
       ((Item_func *)this->conds)->functype() == Item_func::EQ_FUNC &&
       ((Item_func *)conds)->arguments()[0]->type() == Item::REF_ITEM &&
       ((Item_func *)conds)->arguments()[1]->type() == Item::FIELD_ITEM &&
-      test_if_ref ((Item_field *)((Item_func *)conds)->arguments()[1],
-                   ((Item_func *)conds)->arguments()[0]))
+      test_if_ref(conds, (Item_field *)((Item_func *)conds)->arguments()[1],
+                  ((Item_func *)conds)->arguments()[0]))
   {
     *where= 0;
     return;
diff -Nrup a/storage/ndb/include/ndbapi/NdbBlob.hpp b/storage/ndb/include/ndbapi/NdbBlob.hpp
--- a/storage/ndb/include/ndbapi/NdbBlob.hpp	2007-05-09 12:45:19 +04:00
+++ b/storage/ndb/include/ndbapi/NdbBlob.hpp	2008-02-02 18:28:00 +03:00
@@ -80,7 +80,8 @@ class NdbEventOperationImpl;
  *
  * Usage notes for different operation types:
  *
- * - insertTuple must use setValue if blob attribute is non-nullable
+ * - insertTuple must be followed by a setValue() call for every non
+ *   nullable blob in the row.
  *
  * - readTuple or scan readTuples with lock mode LM_CommittedRead is
  *   automatically upgraded to lock mode LM_Read if any blob attributes
@@ -417,7 +418,7 @@ private:
                      Buf& packedBuf, Buf& unpackedBuf);
   Uint32 getHeadInlineSize() { return theHeadSize + theInlineSize; }
   void prepareSetHeadInlineValue();
-  void getBlobHeadData(const char * & data, Uint32 & byteSize);
+  void getNullOrEmptyBlobHeadDataPtr(const char * & data, Uint32 & byteSize);
   // getters and setters
   void packBlobHead();
   void unpackBlobHead();
diff -Nrup a/storage/ndb/include/ndbapi/NdbIndexScanOperation.hpp b/storage/ndb/include/ndbapi/NdbIndexScanOperation.hpp
--- a/storage/ndb/include/ndbapi/NdbIndexScanOperation.hpp	2007-05-02 20:24:08 +04:00
+++ b/storage/ndb/include/ndbapi/NdbIndexScanOperation.hpp	2008-02-02 18:28:00 +03:00
@@ -76,7 +76,7 @@ public:
     return readTuples(lock_mode, scan_flags, parallel, batch);
   }
 #endif
-
+  
   /**
    * Type of ordered index key bound.  The values (0-4) will not change
    * and can be used explicitly (e.g. they could be computed).
@@ -191,6 +191,29 @@ public:
     Uint32 range_no;
   };
 
+
+  /**
+   * Add a bound to an NdbRecord defined Index scan
+   * 
+   * This method is called to add a bound to an IndexScan operation
+   * which has been defined with a call to NdbTransaction::scanIndex().
+   * To add extra bounds, the index scan operation must have been
+   * defined with the the SF_MultiRange flag set.
+   *
+   * Where multiple numbered ranges are defined with multiple calls to 
+   * setBound, and the scan is ordered, the range number for
+   * each bound must be larger than the range number for the 
+   * previously defined bound.
+   * 
+   * @param key_record NdbRecord structure for the key the bound is 
+   *        to be added to
+   * @param bound The bound to add
+   * @return 0 for Success, other for Failure.
+   */
+  int setBound(const NdbRecord *key_record,
+               const IndexBound& bound);
+
+
   /**
    * Is current scan sorted
    */
@@ -230,6 +253,12 @@ private:
   Uint32 m_sort_columns;
   Uint32 m_this_bound_start;
   Uint32 * m_first_bound_word;
+
+  /* Number of IndexBounds for this scan (NdbRecord only) */
+  Uint32 m_num_bounds;
+  /* Most recently added IndexBound's range number */
+  Uint32 m_previous_range_num;
+
 
   friend struct Ndb_free_list_t<NdbIndexScanOperation>;
 };
diff -Nrup a/storage/ndb/include/ndbapi/NdbOperation.hpp b/storage/ndb/include/ndbapi/NdbOperation.hpp
--- a/storage/ndb/include/ndbapi/NdbOperation.hpp	2007-10-23 17:57:26 +04:00
+++ b/storage/ndb/include/ndbapi/NdbOperation.hpp	2008-02-02 18:28:00 +03:00
@@ -451,6 +451,8 @@ public:
    */
   virtual NdbBlob* getBlobHandle(const char* anAttrName);
   virtual NdbBlob* getBlobHandle(Uint32 anAttrId);
+  virtual NdbBlob* getBlobHandle(const char* anAttrName) const;
+  virtual NdbBlob* getBlobHandle(Uint32 anAttrId) const;
  
 #ifndef DOXYGEN_SHOULD_SKIP_INTERNAL
   /** @} *********************************************************************/
@@ -834,6 +836,125 @@ public:
   void setPartitionId(Uint32 id);
   Uint32 getPartitionId() const;
 #endif
+
+  /* Specification of an extra value to get
+   * as part of an NdbRecord operation.
+   * Inputs : 
+   *  To specify an extra value to read, the
+   *  caller must provide a column, and a 
+   *  (optionally NULL) appStorage pointer.
+   * Outputs : 
+   *  After the operation is defined, the 
+   *  recAttr member will contain a pointer
+   *  to the NdbRecAttr object for receiving
+   *  the data.
+   * 
+   * appStorage pointer
+   *  If the appStorage pointer is null, then
+   *  the received value will be stored in
+   *  memory managed by the NdbRecAttr object.
+   *
+   *  If the appStorage pointer is non-null then 
+   *  the received value will be stored at the 
+   *  location pointed to (and will still be 
+   *  accessable via the NdbRecAttr object).  
+   *  It is the caller's responsibility to 
+   *  ensure that :
+   *    - appStorage points to sufficient space 
+   *      to store any returned data.
+   *    - Memory pointed to by appStorage is not
+   *      reused/freed until after the execute()
+   *      call returns.
+   *
+   * Limitation : Blob reads cannot be specified 
+   * using GetValueSpec.
+   */
+  struct GetValueSpec
+  {
+    const NdbDictionary::Column *column;
+    void *appStorage;
+    NdbRecAttr *recAttr;
+  };
+  
+  /* Specification of an extra value to set
+   * as part of an NdbRecord operation.
+   * The value ptr must point to the value
+   * to set, or NULL if the attribute is to
+   * be set to NULL.
+   * The pointed to value is copied when the 
+   * operation is defined and need not remain
+   * in place until execution time.
+   *
+   * Limitation : Blobs cannot be set using 
+   * SetValueSpec.
+   */
+  struct SetValueSpec
+  {
+    const NdbDictionary::Column *column;
+    const void * value;
+  };
+
+  /*
+   * OperationOptions
+   *  These are options passed to the NdbRecord primary key and scan 
+   *  takeover operation methods defined in the NdbTransaction and 
+   *  NdbScanOperation classes.
+   *  
+   *  Each option type is marked as present by setting the corresponding
+   *  bit in the optionsPresent field.  Only the option types marked in the
+   *  optionsPresent structure need have sensible data.
+   *  All data is copied out of the OperationOptions structure (and any
+   *  subtended structures) at operation definition time.
+   *  If no options are required, then NULL may be passed as the 
+   *  OperationOptions pointer.
+   *
+   *  Most methods take a supplementary sizeOfOptions parameter.  This
+   *  is optional, and is intended to allow the interface implementation
+   *  to remain backwards compatible with older un-recompiled clients 
+   *  that may pass an older (smaller) version of the OperationOptions 
+   *  structure.  This effect is achieved by passing
+   *  sizeof(OperationOptions) into this parameter.
+   */
+  struct OperationOptions
+  {
+    /*
+     * Which options are present.  See below for option details
+     */
+    Uint64 optionsPresent;
+    enum Flags { OO_ABORTOPTION  = 0x01,
+                 OO_GETVALUE     = 0x02, 
+                 OO_SETVALUE     = 0x04, 
+                 OO_PARTITION_ID = 0x08, 
+                 OO_INTERPRETED  = 0x10,
+                 OO_ANYVALUE     = 0x20 };
+
+    /* An operation-specific abort option.
+     * Only necessary if the default abortoption behaviour
+     * is not satisfactory 
+     */
+    AbortOption abortOption;
+
+    /* Extra column values to be read */
+    GetValueSpec *extraGetValues;
+    Uint32        numExtraGetValues;
+    
+    /* Extra column values to be set  */
+    const SetValueSpec *extraSetValues;
+    Uint32              numExtraSetValues;
+
+    /* Specific partition to execute this operation on */
+    Uint32 partitionId;
+
+    /* Interpreted code to be executed in this operation
+     * Only supported for update operations currently 
+     */
+    const NdbInterpretedCode *interpretedCode;
+
+    /* anyValue to be used for this operation */
+    Uint32 anyValue;
+  };
+
+
 protected:
   int handle_distribution_key(const NdbColumnImpl*, const Uint64 *, Uint32 len);
 protected:
@@ -952,6 +1073,15 @@ protected:
 					      	// the operations object.      
   void		    setStartIndicator();
 
+  /* Utility method to 'add' operation options to an NdbOperation
+   *
+   * @return 0 for success.  NDBAPI to set error otherwise.
+   */
+  static int        handleOperationOptions (const OperationType type,
+                                            const OperationOptions *opts,
+                                            const Uint32 sizeOfOptions,
+                                            NdbOperation *op);
+
 /******************************************************************************
  * The methods below is the execution part of the NdbOperation
  * class. This is where the NDB signals are sent and received. The
@@ -969,14 +1099,16 @@ protected:
     
   int	 prepareSendInterpreted();            // Help routine to prepare*
 
-  int    prepareSendNdbRecord(Uint32 aTC_ConnectPtr, Uint64 aTransId,
-                              AbortOption ao);
+  // Method which prepares signals at operation definition time.
+  int    buildSignalsNdbRecord(Uint32 aTC_ConnectPtr, Uint64 aTransId);
 
-  /* Helper routines for prepareSendNdbRecord(). */
+  // Method which does final preparations at execute time.
+  int    prepareSendNdbRecord(AbortOption ao);
+
+  /* Helper routines for buildSignalsNdbRecord(). */
   Uint32 fillTcKeyReqHdr(TcKeyReq *tcKeyReq,
                          Uint32 connectPtr,
-                         Uint64 transId,
-                         AbortOption ao);
+                         Uint64 transId);
   int    allocKeyInfo(Uint32 connectPtr, Uint64 transId,
                       Uint32 **dstPtr, Uint32 *remain);
   int    allocAttrInfo(Uint32 connectPtr, Uint64 transId,
@@ -1015,6 +1147,7 @@ protected:
   NdbRecAttr* getValue_NdbRecord(const NdbColumnImpl* tAttrInfo, char* aValue);
   int setValue(const NdbColumnImpl* anAttrObject, const char* aValue);
   NdbBlob* getBlobHandle(NdbTransaction* aCon, const NdbColumnImpl* anAttrObject);
+  NdbBlob* getBlobHandle(NdbTransaction* aCon, const NdbColumnImpl* anAttrObject) const;
   int incValue(const NdbColumnImpl* anAttrObject, Uint32 aValue);
   int incValue(const NdbColumnImpl* anAttrObject, Uint64 aValue);
   int subValue(const NdbColumnImpl* anAttrObject, Uint32 aValue);
@@ -1028,7 +1161,8 @@ protected:
                             const NdbColumnImpl *column,
                             NdbBlob * & lastPtr);
   int getBlobHandlesNdbRecord(NdbTransaction* aCon);
-  int getBlobHandlesDelete(NdbTransaction* aCon);  
+  int getBlobHandlesNdbRecordDelete(NdbTransaction* aCon);
+
   // Handle ATTRINFO signals   
   int insertATTRINFO(Uint32 aData);
   int insertATTRINFOloop(const Uint32* aDataPtr, Uint32 aLength);
@@ -1038,8 +1172,8 @@ protected:
 		    Uint32 aKeyLenInByte);
   void reorderKEYINFO();
   
-  virtual void setErrorCode(int aErrorCode);
-  virtual void setErrorCodeAbort(int aErrorCode);
+  virtual void setErrorCode(int aErrorCode) const;
+  virtual void setErrorCodeAbort(int aErrorCode) const;
 
   int	      incCheck(const NdbColumnImpl* anAttrObject);
   int	      initial_interpreterCheck();
@@ -1189,6 +1323,10 @@ protected:
   Uint32 m_read_mask[(NDB_MAX_ATTRIBUTES_IN_TABLE+31)>>5];
   /* Interpreted program for NdbRecord operations. */
   const NdbInterpretedCode *m_interpreted_code;
+
+  /* Ptr to supplied SetValueSpec for NdbRecord */
+  const SetValueSpec *m_extraSetValues;
+  Uint32 m_numExtraSetValues;
 
   Uint32 m_any_value;                           // Valid if m_use_any_value!=0
 
diff -Nrup a/storage/ndb/include/ndbapi/NdbScanOperation.hpp b/storage/ndb/include/ndbapi/NdbScanOperation.hpp
--- a/storage/ndb/include/ndbapi/NdbScanOperation.hpp	2007-05-08 17:32:06 +04:00
+++ b/storage/ndb/include/ndbapi/NdbScanOperation.hpp	2008-02-02 18:28:00 +03:00
@@ -177,15 +177,21 @@ public:
   int nextResult(bool fetchAllowed = true, bool forceSend = false);
 
   /*
-    NdbRecord version of nextResult.
-    This sets a pointer to the next row in out_row (if returning 0). This
-    pointer is valid (only) until the next call to nextResult() with
-    fetchAllowed==true.
-    The NdbRecord object defining the row format was specified in the
-    NdbTransaction::scanTable (or scanIndex) call.
-  */
-  int nextResult(const char * & out_row,
-                 bool fetchAllowed = true, bool forceSend = false);
+   * NdbRecord version of nextResult.
+   * 
+   * When 0 is returned, this method updates out_row_ptr to point 
+   * to the next result row.  The location pointed to is valid 
+   * (only) until the next call to nextResult() with
+   * fetchAllowed==true.
+   * The NdbRecord object defining the row format was specified in the
+   * NdbTransaction::scanTable (or scanIndex) call.
+   * Note that this variant of nextResult has three parameters, and
+   * all must be supplied to avoid invoking the two-parameter
+   * variant.
+   */
+  int nextResult(const char ** out_row_ptr,
+                 bool fetchAllowed, 
+                 bool forceSend);
 
   /**
    * Close scan
@@ -236,43 +242,52 @@ public:
   int deleteCurrentTuple(NdbTransaction* takeOverTransaction);
   
   /*
-    NdbRecord versions of scan lock take-over operations.
-
-    Note that calling NdbRecord scan lock take-over on an NdbRecAttr-style
-    scan is not valid, nor is calling NdbRecAttr-style scan lock take-over
-    on an NdbRecord-style scan.
-  */
+   * NdbRecord versions of scan lock take-over operations.
+   *
+   * Note that calling NdbRecord scan lock take-over on an NdbRecAttr-style
+   * scan is not valid, nor is calling NdbRecAttr-style scan lock take-over
+   * on an NdbRecord-style scan.
+   */
 
   /*
-    Take over the lock without changing the row.
-    Optionally also read from the row (call with default value NULL for row
-    to not read any attributes.).
-    The NdbRecord * is required even when not reading any attributes.
-  */
-  NdbOperation *lockCurrentTuple(NdbTransaction *takeOverTrans,
-                                 const NdbRecord *record,
-                                 char *row= 0,
-                                 const unsigned char *mask= 0);
+   * Take over the lock without changing the row.
+   * Optionally also read from the row (call with default value NULL for 
+   * result_row to not read any attributes.).
+   * The NdbRecord * is required even when not reading any attributes.
+   * Supported OperationOptions : OO_ABORTOPTION, OO_GETVALUE, OO_ANYVALUE
+   */
+  const NdbOperation *lockCurrentTuple(NdbTransaction *takeOverTrans,
+                                       const NdbRecord *result_rec,
+                                       char *result_row= 0,
+                                       const unsigned char *result_mask= 0,
+                                       const NdbOperation::OperationOptions *opts = 0,
+                                       Uint32 sizeOfOptions = 0);
 
   /*
-    Update the current tuple, NdbRecord version.
-    Values to update with are contained in the passed-in row.
-  */
-  NdbOperation *updateCurrentTuple(NdbTransaction *takeOverTrans,
-                                   const NdbRecord *record,
-                                   const char *row,
-                                   const unsigned char *mask= 0);
-
-  /* Delete the current tuple. */
-  NdbOperation *deleteCurrentTuple(NdbTransaction *takeOverTrans,
-                                   const NdbRecord *record);
+   * Update the current tuple, NdbRecord version.
+   * Values to update with are contained in the passed-in row.
+   * Supported OperationOptions : OO_ABORTOPTION, OO_SETVALUE, 
+   *                              OO_INTERPRETED, OO_ANYVALUE
+   */
+  const NdbOperation *updateCurrentTuple(NdbTransaction *takeOverTrans,
+                                         const NdbRecord *attr_rec,
+                                         const char *attr_row,
+                                         const unsigned char *mask= 0,
+                                         const NdbOperation::OperationOptions *opts = 0,
+                                         Uint32 sizeOfOptions = 0);
+
+  /* Delete the current tuple. NdbRecord version.
+   * The tuple can be read before being deleted.  Specify the columns to read
+   * and the result storage as usual with result_rec, result_row and result_mask.
+   * Supported OperationOptions : OO_ABORTOPTION, OO_GETVALUE, OO_ANYVALUE
+   */
+  const NdbOperation *deleteCurrentTuple(NdbTransaction *takeOverTrans,
+                                         const NdbRecord *result_rec,
+                                         char *result_row = 0,
+                                         const unsigned char *result_mask = 0,
+                                         const NdbOperation::OperationOptions *opts = 0,
+                                         Uint32 sizeOfOptions = 0);
 
-  /**
-   * Restart scan with exactly the same
-   *   getValues and search conditions
-   */
-  int restart(bool forceSend = false);
-  
 protected:
   NdbScanOperation(Ndb* aNdb,
                    NdbOperation::Type aType = NdbOperation::TableScan);
@@ -299,7 +314,11 @@ protected:
 
   virtual void setErrorCode(int aErrorCode);
   virtual void setErrorCodeAbort(int aErrorCode);
-
+  
+  /* This is the transaction which defined this scan
+   *   The transaction(connection) used for the scan is
+   *   pointed to by NdbOperation::theNdbCon
+   */
   NdbTransaction *m_transConnection;
 
   // Scan related variables
@@ -380,7 +399,9 @@ protected:
                                         NdbTransaction* pTrans,
                                         const NdbRecord *record,
                                         char *row,
-                                        const unsigned char *mask);
+                                        const unsigned char *mask,
+                                        const NdbOperation::OperationOptions *opts,
+                                        Uint32 sizeOfOptions);
   bool m_ordered;
   bool m_descending;
   Uint32 m_read_range_no;
@@ -444,29 +465,34 @@ NdbScanOperation::deleteCurrentTuple(Ndb
 }
 
 inline
-NdbOperation *
+const NdbOperation *
 NdbScanOperation::lockCurrentTuple(NdbTransaction *takeOverTrans,
-                                   const NdbRecord *record,
-                                   char *row,
-                                   const unsigned char *mask)
+                                   const NdbRecord *result_rec,
+                                   char *result_row,
+                                   const unsigned char *result_mask,
+                                   const NdbOperation::OperationOptions *opts,
+                                   Uint32 sizeOfOptions)
 {
   unsigned char empty_mask[NDB_MAX_ATTRIBUTES_IN_TABLE>>3];
   /* Default is to not read any attributes, just take over the lock. */
-  if (!row)
+  if (!result_row)
   {
     bzero(empty_mask, sizeof(empty_mask));
-    mask= &empty_mask[0];
+    result_mask= &empty_mask[0];
   }
   return takeOverScanOpNdbRecord(NdbOperation::ReadRequest, takeOverTrans,
-                                 record, row, mask);
+                                 result_rec, result_row, 
+                                 result_mask, opts, sizeOfOptions);
 }
 
 inline
-NdbOperation *
+const NdbOperation *
 NdbScanOperation::updateCurrentTuple(NdbTransaction *takeOverTrans,
-                                     const NdbRecord *record,
-                                     const char *row,
-                                     const unsigned char *mask)
+                                     const NdbRecord *attr_rec,
+                                     const char *attr_row,
+                                     const unsigned char *mask,
+                                     const NdbOperation::OperationOptions *opts,
+                                     Uint32 sizeOfOptions)
 {
   /*
     We share the code implementing lockCurrentTuple() and updateCurrentTuple().
@@ -475,16 +501,22 @@ NdbScanOperation::updateCurrentTuple(Ndb
     the row since we pass type 'UpdateRequest'.
    */
   return takeOverScanOpNdbRecord(NdbOperation::UpdateRequest, takeOverTrans,
-                                 record, (char *)row, mask);
+                                 attr_rec, (char *)attr_row, mask,
+                                 opts, sizeOfOptions);
 }
 
 inline
-NdbOperation *
+const NdbOperation *
 NdbScanOperation::deleteCurrentTuple(NdbTransaction *takeOverTrans,
-                                     const NdbRecord *record)
+                                     const NdbRecord *result_rec,
+                                     char *result_row,
+                                     const unsigned char *result_mask,
+                                     const NdbOperation::OperationOptions *opts,
+                                     Uint32 sizeOfOptions)
 {
   return takeOverScanOpNdbRecord(NdbOperation::DeleteRequest, takeOverTrans,
-                                 record, 0, 0);
+                                 result_rec, result_row, result_mask,
+                                 opts, sizeOfOptions);
 }
 
 #endif
diff -Nrup a/storage/ndb/include/ndbapi/NdbTransaction.hpp b/storage/ndb/include/ndbapi/NdbTransaction.hpp
--- a/storage/ndb/include/ndbapi/NdbTransaction.hpp	2007-10-12 11:12:25 +04:00
+++ b/storage/ndb/include/ndbapi/NdbTransaction.hpp	2008-02-02 18:28:00 +03:00
@@ -599,9 +599,22 @@ public:
    * on the returned NdbOperation object.
    *
    * @return The NdbOperation causing the latest error.
+   * @deprecated Use the const NdbOperation returning variant.
    */
   NdbOperation*	getNdbErrorOperation();
 
+  /**
+   * Get the latest NdbOperation which had an error. 
+   * This method is used on the NdbTransaction object to find the
+   * NdbOperation causing an error.  
+   * To find more information about the
+   * actual error, use method NdbOperation::getNdbError()
+   * on the returned NdbOperation object.
+   *
+   * @return The NdbOperation causing the latest error.
+   */
+  const NdbOperation* getNdbErrorOperation() const;
+
   /** 
    * Get the method number where the latest error occured.
    * 
@@ -653,134 +666,214 @@ public:
 #endif
 
   /*
-    NdbRecord primary key and unique key operations.
-
-    If the key_rec passed in is for a table, the operation will be a primary
-    key operation. If it is for an index, it will be a unique key operation
-    using that index.
-
-    The key_row passed in defines the primary or unique key of the affected
-    tuple, and must remain valid until execute() is called. The key_rec must
-    include all columns of the key.
-
-    The mask, if != NULL, defines a subset of attributes to read, update, or
-    insert. Only if (mask[attrId >> 3] & (1<<(attrId & 7))) is set is the
-    column affected. The mask is copied by the methods, so need not remain
-    valid after the call returns.
-
-    For unique index operations, the attr_rec must refer to the underlying
-    table of the index.
-  */
-
-  NdbOperation *readTuple(const NdbRecord *key_rec, const char *key_row,
-                          const NdbRecord *result_rec, char *result_row,
-                          NdbOperation::LockMode lock_mode= NdbOperation::LM_Read,
-                          const unsigned char *result_mask= 0);
-  NdbOperation *insertTuple(const NdbRecord *rec, const char *row,
-                            const unsigned char *mask= 0);
-  NdbOperation *updateTuple(const NdbRecord *key_rec, const char *key_row,
-                            const NdbRecord *attr_rec, const char *attr_row,
-                            const unsigned char *mask= 0,
-                            const Uint32 *setPartitionId = 0,
-                            const void *getSetValue = 0,
-                            const NdbInterpretedCode *interpreted_code = 0);
-  NdbOperation *writeTuple(const NdbRecord *key_rec, const char *key_row,
-                           const NdbRecord *attr_rec, const char *attr_row,
-                           const unsigned char *mask= 0);
-  NdbOperation *deleteTuple(const NdbRecord *key_rec, const char *key_row);
+   * NdbRecord primary key and unique key operations.
+   *
+   * If the key_rec passed in is for a table, the operation will be a primary
+   * key operation. If it is for an index, it will be a unique key operation
+   * using that index.
+   *
+   * The key_row passed in defines the primary or unique key of the affected
+   * tuple, and must remain valid until execute() is called. The key_rec must
+   * include all columns of the key.
+   *
+   * The mask, if != NULL, defines a subset of attributes to read, update, or
+   * insert. Only if (mask[attrId >> 3] & (1<<(attrId & 7))) is set is the
+   * column affected. The mask is copied by the methods, so need not remain
+   * valid after the call returns.
+   *
+   * For unique index operations, the attr_rec must refer to the underlying
+   * table of the index.
+   *
+   * OperationOptions can be used to give finer-grained control of operation
+   * definition.  An OperationOptions structure is passed with flags
+   * indicating which operation definition options are present.  Not all
+   * operation types support all operation options.  See the definition of
+   * the OperationOptions structure for more information on individual options.
+   *
+   *   Operation type        Supported OperationOptions flags
+   *   --------------        --------------------------------
+   *   readTuple             OO_ABORTOPTION, OO_GETVALUE,
+   *                         OO_PARTITION_ID, OO_ANYVALUE
+   *   insertTuple           OO_ABORTOPTION, OO_SETVALUE, 
+   *                         OO_PARTITION_ID, OO_ANYVALUE
+   *   updateTuple           OO_ABORTOPTION, OO_SETVALUE,
+   *                         OO_PARTITION_ID, OO_INTERPRETED,
+   *                         OO_ANYVALUE
+   *   writeTuple            OO_ABORTOPTION, OO_SETVALUE,
+   *                         OO_PARTITION_ID, OO_INTERPRETED,
+   *                         OO_ANYVALUE
+   *   deleteTuple           OO_ABORTOPTION, OO_GETVALUE,
+   *                         OO_PARTITION_ID, OO_ANYVALUE
+   *
+   * The sizeOfOptions optional parameter is used to allow this interface
+   * to be backwards compatible with previous definitions of the OperationOptions
+   * structure.  If an unusual size is detected by the interface implementation, 
+   * it can use this to determine how to interpret the passed OperationOptions 
+   * structure.  To enable this functionality, the caller should pass 
+   * sizeof(NdbOperation::OperationOptions) for this argument.
+   */
+  const NdbOperation *readTuple(const NdbRecord *key_rec, const char *key_row,
+                                const NdbRecord *result_rec, char *result_row,
+                                NdbOperation::LockMode lock_mode= NdbOperation::LM_Read,
+                                const unsigned char *result_mask= 0,
+                                const NdbOperation::OperationOptions *opts = 0,
+                                Uint32 sizeOfOptions = 0);
+  const NdbOperation *insertTuple(const NdbRecord *rec, const char *row,
+                                  const unsigned char *mask= 0,
+                                  const NdbOperation::OperationOptions *opts = 0,
+                                  Uint32 sizeOfOptions = 0);
+  const NdbOperation *updateTuple(const NdbRecord *key_rec, const char *key_row,
+                                  const NdbRecord *attr_rec, const char *attr_row,
+                                  const unsigned char *mask= 0,
+                                  const NdbOperation::OperationOptions *opts = 0,
+                                  Uint32 sizeOfOptions = 0);
+  const NdbOperation *writeTuple(const NdbRecord *key_rec, const char *key_row,
+                                 const NdbRecord *attr_rec, const char *attr_row,
+                                 const unsigned char *mask= 0,
+                                 const NdbOperation::OperationOptions *opts = 0,
+                                 Uint32 sizeOfOptions = 0);
+  const NdbOperation *deleteTuple(const NdbRecord *key_rec, const char *key_row,
+                                  const NdbRecord *result_rec = 0, char *result_row = 0,
+                                  const unsigned char *result_mask = 0,
+                                  const NdbOperation::OperationOptions *opts = 0,
+                                  Uint32 sizeOfOptions = 0);
 
   /*
-    Scan a table, using NdbRecord to read out column data.
-
-    The result_record pointer must remain valid until after the call to
-    execute().
-
-    The result_mask pointer is optional, if present only columns for
-    which the corresponding bit (by attribute id order) in result_mask
-    is set will be retrieved in the scan. The result_mask is copied
-    internally, so in contrast to result_record need not be valid at
-    execute().
-
-    The parallel argument is the desired parallelism, or 0 for maximum
-    parallelism (receiving rows from all fragments in parallel).
-  */
-  NdbScanOperation *
-  scanTable(const NdbRecord *result_record,
-            NdbOperation::LockMode lock_mode= NdbOperation::LM_Read,
-            const unsigned char *result_mask= 0,
-            Uint32 scan_flags= 0,
-            Uint32 parallel= 0,
-            Uint32 batch= 0);
+   * ScanOptions
+   *  These are options passed to the NdbRecord based scanTable and 
+   *  scanIndex methods.
+   *  Each option type is marked as present by setting the corresponding
+   *  bit in the optionsPresent field.  Only the option types marked 
+   *  in the optionsPresent field need have sensible data.
+   *  All data is copied out of the ScanOptions structure (and any
+   *  subtended structures) at operation definition time.
+   *  If no options are required, then NULL may be passed as the 
+   *  ScanOptions pointer.
+   *
+   *  Most methods take a supplementary sizeOfOptions parameter.  This
+   *  is optional, and is intended to allow the interface implementation
+   *  to remain backwards compatible with older un-recompiled clients 
+   *  that may pass an older (smaller) version of the ScanOptions 
+   *  structure.  This effect is achieved by passing
+   *  sizeof(NdbTransaction::ScanOptions) into this parameter.
+   */
+  struct ScanOptions
+  {
+    /* Which options are present - see below for possibilities */
+    Uint64 optionsPresent;
+
+    enum Type { SO_SCANFLAGS    = 0x01,
+                SO_PARALLEL     = 0x02,
+                SO_BATCH        = 0x04,
+                SO_GETVALUE     = 0x08,
+                SO_PARTITION_ID = 0x10,
+                SO_INTERPRETED  = 0x20,
+                SO_CUSTOMDATA   = 0x40 };
+
+    /* Flags controlling scan behaviour
+     * See NdbScanOperation::ScanFlag for details
+     */
+    Uint32 scan_flags;
+
+    /* Desired scan parallelism.
+     * Default == 0 == Maximum parallelism
+     */
+    Uint32 parallel;
+
+    /* Desired scan batchsize in rows 
+     * for NDBD -> API transfers
+     * Default == 0 == Automatically chosen size
+     */
+    Uint32 batch;
+    
+    /* Extra values to be read for each row meeting
+     * scan criteria
+     */
+    NdbOperation::GetValueSpec *extraGetValues;
+    Uint32                     numExtraGetValues;
 
-//private:
-  /*
-    Do an index range scan (optionally ordered) of a table.
+    /* Specific partition to limit this scan to */
+    Uint32 partitionId;
 
-    The key_record describes the index to be scanned. It must be a key record
-    for the index, ie. it must specify (at least) all the key columns of the
-    index. And it must be created from the index to be scanned (not from the
-    underlying table).
-
-    The result_record describes the rows to be returned from the scan. For an
-    ordered index scan, result_record must be a key record for the index to
-    be scanned, that is it must include at least all of the column in the
-    index (the reason is that the index key is needed for merge sorting the
-    scans returned from each fragment).
-
-    The call uses a callback function as a flexible way of specifying multiple
-    range bounds. The callback will be called once for each bound to define
-    lower and upper key value etc.
-
-    The callback received a private callback_data void *, and the index of the
-    bound (0 .. num_key_bounds). However, it is guaranteed that it will be
-    called in ordered sequence, so it is permissible to ignore the passed
-    bound_index and just return the values for the next bound (for example
-    if data is kept in a linked list).
-
-    Note that for multi-range, the IndexBound::low_key and IndexBound::high_key
-    pointers must be unique, ie. it is not permissible to re-use the same row
-    buffer for several different range bounds within a single scan. It is
-    however permissible to use the same row pointer as low_key and high_key (to
-    specify an equals bound), and it is also permissible to re-use the rows
-    after the scanIndex() method returns (ie. they need not remain valid until
-    ececute() time, like the NdbRecord pointers do).
+    /* Interpreted code to execute as part of the scan */
+    const NdbInterpretedCode *interpretedCode;
 
-    The callback can return 0 to denote success, and -1 to denote error (the
-    latter causing the creation of the NdbIndexScanOperation to fail).
+    /* CustomData ptr to associate with the scan operation */
+    void * customData;
+  };
 
-    This multi-range method is only for use in mysqld code.
-  */
-  NdbIndexScanOperation *
-  scanIndex(const NdbRecord *key_record,
-            int (*get_bound_callback)(void *callback_data,
-                                      Uint32 bound_index,
-                                      NdbIndexScanOperation::IndexBound & bound),
-            void *callback_data,
-            Uint32 num_key_bounds,
-            const NdbRecord *result_record,
+  /**
+   * Scan a table, using NdbRecord to read out column data.
+   *
+   * The result_record pointer must remain valid until after the call to
+   * execute().
+   *
+   * The result_mask pointer is optional, if present only columns for
+   * which the corresponding bit (by attribute id order) in result_mask
+   * is set will be retrieved in the scan. The result_mask is copied
+   * internally, so in contrast to result_record need not be valid at
+   * execute().
+   * 
+   * A ScanOptions structure can be passed, specifying extra options.  See
+   * the definition of the ScanOptions structure for more information.
+   *
+   * To enable backwards compatability of this interface, a sizeOfOptions
+   * parameter can be passed.  This parameter indicates the size of the
+   * ScanOptions structure at the time the client was compiled, and enables
+   * detection of the use of an old ScanOptions structure.  If this functionality
+   * is not desired, it can be left set to zero.
+   */
+  NdbScanOperation *
+  scanTable(const NdbRecord *result_record,
             NdbOperation::LockMode lock_mode= NdbOperation::LM_Read,
             const unsigned char *result_mask= 0,
-            Uint32 scan_flags= 0,
-            Uint32 parallel= 0,
-            Uint32 batch= 0);
+            const ScanOptions *options = 0,
+            Uint32 sizeOfOptions = 0);
 
-public:
-
-  /* A convenience wrapper for simpler specification of a single bound. */
+  /**
+   * Do an index range scan (optionally ordered) of a table.
+   *
+   * The key_record describes the index to be scanned. It must be a key record
+   * for the index, ie. it must specify (at least) all the key columns of the
+   * index. And it must be created from the index to be scanned (not from the
+   * underlying table).
+   *
+   * The result_record describes the rows to be returned from the scan. For an
+   * ordered index scan, result_record must be a key record for the index to
+   * be scanned, that is it must include at least all of the column in the
+   * index (the reason is that the full index key is needed for merge sorting 
+   * the scans returned from each fragment).
+   *
+   * Both the key_record and result_record NdbRecord structures must stay
+   * in-place until the scan operation is closed.
+   *
+   * A single IndexBound can either be specified in this call or in a separate
+   * call to NdbIndexScanOperation::setBound().  To perform a multi range read, 
+   * the scan_flags in the ScanOptions structure must include SF_MULTIRANGE.  
+   * Additional bounds can be added using calls to 
+   * NdbIndexScanOperation::setBound().
+   * 
+   * To specify an equals bound, use the same row pointer for the low_key and
+   * high_key.
+   *
+   * A ScanOptions structure can be passed, specifying extra options.  See
+   * the definition of the ScanOptions structure for more information.
+   *
+   * To enable backwards compatability of this interface, a sizeOfOptions
+   * parameter can be passed.  This parameter indicates the size of the
+   * ScanOptions structure at the time the client was compiled, and enables
+   * detection of the use of an old ScanOptions structure.  If this functionality
+   * is not desired, it can be left set to zero.
+   * 
+   */
   NdbIndexScanOperation *
   scanIndex(const NdbRecord *key_record,
-            const char *low_key,
-            Uint32 low_key_count,
-            bool low_inclusive,
-            const char * high_key,
-            Uint32 high_key_count,
-            bool high_inclusive,
             const NdbRecord *result_record,
-            NdbOperation::LockMode lock_mode= NdbOperation::LM_Read,
-            const unsigned char *result_mask= 0,
-            Uint32 scan_flags= 0,
-            Uint32 parallel= 0,
-            Uint32 batch= 0);
+            NdbOperation::LockMode lock_mode = NdbOperation::LM_Read,
+            const unsigned char *result_mask = 0,
+            const NdbIndexScanOperation::IndexBound *bound = 0,
+            const ScanOptions *options = 0,
+            Uint32 sizeOfOptions = 0);
 
 private:						
   /**
@@ -903,14 +996,23 @@ private:						
   
   NdbOperation *setupRecordOp(NdbOperation::OperationType type,
                               NdbOperation::LockMode lock_mode,
+                              NdbOperation::AbortOption default_ao,
                               const NdbRecord *key_record,
                               const char *key_row,
                               const NdbRecord *attribute_record,
                               const char *attribute_row,
                               const unsigned char *mask,
-                              const Uint32 *setPartitionId = 0,
-                              const void *getSetValue = 0,
-                              const NdbInterpretedCode *interpreted_code = 0);
+                              const NdbOperation::OperationOptions *opts,
+                              Uint32 sizeOfOptions);
+
+  /* Helper for scanTable */
+  int handleScanOptions(NdbScanOperation *op,
+                        const ScanOptions *options);
+
+  /* Adding IndexBound to an NdbRecord-defined IndexScanOperation */
+  int addIndexScanBound(NdbIndexScanOperation *sop,
+                        const NdbRecord *key_record,
+                        const NdbIndexScanOperation::IndexBound& bound);
 
   void		handleExecuteCompletion();
   
@@ -974,7 +1076,11 @@ private:						
   } theCompletionStatus;	  // The Completion status of the transaction
   CommitStatusType theCommitStatus;			// The commit status of the transaction
   Uint32	theMagicNumber;				// Magic Number to verify correct object
-
+                                                        // Current meanings :
+                                                        //   0x00FE11DC : NdbTransaction not in use
+                                                        //   0x37412619 : NdbTransaction in use
+                                                        //   0x00FE11DF : NdbTransaction for scan operation
+                                                        //                scan definition not yet complete
   Uint32	thePriority;				// Transaction Priority
 
   enum ReturnType {  ReturnSuccess,  ReturnFailure };
diff -Nrup a/storage/ndb/ndbapi-examples/ndbapi_blob_ndbrecord/main.cpp b/storage/ndb/ndbapi-examples/ndbapi_blob_ndbrecord/main.cpp
--- a/storage/ndb/ndbapi-examples/ndbapi_blob_ndbrecord/main.cpp	2007-07-04 10:48:21 +04:00
+++ b/storage/ndb/ndbapi-examples/ndbapi_blob_ndbrecord/main.cpp	2008-02-02 18:28:01 +03:00
@@ -24,7 +24,7 @@
   read/write methods.
  */
 
-
+#include <ndb_global.h>
 #include <mysql.h>
 #include <mysqld_error.h>
 #include <NdbApi.hpp>
@@ -201,7 +201,7 @@ int populate(Ndb *myNdb)
 
   Uint32 id= 1;
   memcpy(&row[0], &id, 4);
-  NdbOperation *myNdbOperation= myTrans->insertTuple(full_record, row);
+  const NdbOperation *myNdbOperation= myTrans->insertTuple(full_record, row);
   if (myNdbOperation == NULL)
     APIERROR(myTrans->getNdbError());
   NdbBlob *myBlobHandle= myNdbOperation->getBlobHandle("my_text");
@@ -231,7 +231,7 @@ int update_key(Ndb *myNdb)
 
   Uint32 id= 1;
   memcpy(&row[0], &id, 4);
-  NdbOperation *myNdbOperation=
+  const NdbOperation *myNdbOperation=
     myTrans->updateTuple(key_record, row, blob_record, row);
   if (myNdbOperation == NULL)
     APIERROR(myTrans->getNdbError());
@@ -323,7 +323,7 @@ int update_scan(Ndb *myNdb)
   int res;
   for (;;)
   {
-    res= myScanOp->nextResult(out_row, true);
+    res= myScanOp->nextResult(&out_row, true, false);
     if (res==1)
       break;                                    // Scan done.
     else if (res)
@@ -337,7 +337,7 @@ int update_scan(Ndb *myNdb)
     for (Uint64 j= 0; j < length; j++)
       buffer[j]= tolower(buffer[j]);
 
-    NdbOperation *myUpdateOp=
+    const NdbOperation *myUpdateOp=
       myScanOp->updateCurrentTuple(myTrans, blob_record, row);
     if (myUpdateOp == NULL)
       APIERROR(myTrans->getNdbError());
@@ -388,7 +388,7 @@ int fetch_key(Ndb *myNdb)
 
   Uint32 id= 1;
   memcpy(&key_row[0], &id, 4);
-  NdbOperation *myNdbOperation=
+  const NdbOperation *myNdbOperation=
     myTrans->readTuple(key_record, key_row, blob_record, out_row);
   if (myNdbOperation == NULL)
     APIERROR(myTrans->getNdbError());
@@ -429,7 +429,7 @@ int update2_key(Ndb *myNdb)
 
   Uint32 id= 1;
   memcpy(&row[0], &id, 4);
-  NdbOperation *myNdbOperation=
+  const NdbOperation *myNdbOperation=
     myTrans->updateTuple(key_record, row, blob_record, row);
   if (myNdbOperation == NULL)
     APIERROR(myTrans->getNdbError());
@@ -460,7 +460,7 @@ int delete_key(Ndb *myNdb)
 
   Uint32 id= 1;
   memcpy(&row[0], &id, 4);
-  NdbOperation *myNdbOperation= myTrans->deleteTuple(key_record, row);
+  const NdbOperation *myNdbOperation= myTrans->deleteTuple(key_record, row);
   if (myNdbOperation == NULL)
     APIERROR(myTrans->getNdbError());
 
diff -Nrup a/storage/ndb/ndbapi-examples/ndbapi_s_i_ndbrecord/main.cpp b/storage/ndb/ndbapi-examples/ndbapi_s_i_ndbrecord/main.cpp
--- a/storage/ndb/ndbapi-examples/ndbapi_s_i_ndbrecord/main.cpp	2007-07-04 10:42:14 +04:00
+++ b/storage/ndb/ndbapi-examples/ndbapi_s_i_ndbrecord/main.cpp	2008-02-02 18:28:01 +03:00
@@ -195,7 +195,7 @@ int main(int argc, char** argv)
     memcpy(&row[1][0], &value, 4);
     memcpy(&row[1][4], &value, 4);
 
-    NdbOperation *myOperation=
+    const NdbOperation *myOperation=
       myTransaction->insertTuple(attr_record, &row[0][0]);
     if (myOperation == NULL)
       APIERROR(myTransaction->getNdbError());
@@ -220,21 +220,30 @@ int main(int argc, char** argv)
     if (myTransaction == NULL)
       APIERROR(myNdb->getNdbError());
 
+    /* Demonstrate the posibility to use OperationOptions for 
+     * the odd extra read. 
+     */
+    Uint32 frag;
+    NdbOperation::GetValueSpec getSpec[1];
+    getSpec[0].column=NdbDictionary::Column::FRAGMENT;
+    getSpec[0].appStorage=&frag;
+
+    NdbOperation::OperationOptions options;
+    options.optionsPresent |= NdbOperation::OperationOptions::OO_GETVALUE;
+    options.extraGetValues = &getSpec[0];
+    options.numExtraGetValues = 1;
+
     memcpy(&row[0][4], &i, 4);
     unsigned char mask[1]= { 0x01 };            // Only read ATTR1
-    NdbOperation *myOperation=
+    const NdbOperation *myOperation=
       myTransaction->readTuple(key_record, &row[0][0],
                                attr_record, &row[1][0],
-                               NdbOperation::LM_Read, mask);
+                               NdbOperation::LM_Read, mask,
+                               &options, 
+                               sizeof(NdbOperation::OperationOptions));
     if (myOperation == NULL)
       APIERROR(myTransaction->getNdbError());
 
-    /* Demonstrate the posibility to use getValue() for the odd extra read. */
-    Uint32 frag;
-    if (myOperation->getValue(NdbDictionary::Column::FRAGMENT,
-                              (char *)(&frag)) == 0)
-      APIERROR(myOperation->getNdbError());
-
     if (myTransaction->execute( NdbTransaction::Commit,
                                 NdbOperation::AbortOnError ) != -1)
     {
@@ -258,7 +267,7 @@ int main(int argc, char** argv)
     int value= i+10;
     memcpy(&row[1][4], &value, 4);
     unsigned char mask[1]= { 0x02 };            // Only update ATTR2
-    NdbOperation *myOperation=
+    const NdbOperation *myOperation=
       myTransaction->updateTuple(key_record, &row[0][0],
                                  attr_record, &row[1][0], mask);
     if (myOperation == NULL)
@@ -280,7 +289,7 @@ int main(int argc, char** argv)
 
     int value= 3;
     memcpy(&row[0][4], &value, 4);
-    NdbOperation *myOperation=
+    const NdbOperation *myOperation=
       myTransaction->deleteTuple(key_record, &row[0][0]);
     if (myOperation == NULL)
       APIERROR(myTransaction->getNdbError());
@@ -303,7 +312,7 @@ int main(int argc, char** argv)
         APIERROR(myNdb->getNdbError());
 
       memcpy(&row[0][0], &i, 4);
-      NdbOperation *myOperation=
+      const NdbOperation *myOperation=
         myTransaction->readTuple(pk_record, &row[0][0],
                                  attr_record, &row[1][0]);
       if (myOperation == NULL)
diff -Nrup a/storage/ndb/src/ndbapi/NdbBlob.cpp b/storage/ndb/src/ndbapi/NdbBlob.cpp
--- a/storage/ndb/src/ndbapi/NdbBlob.cpp	2007-10-15 12:08:57 +04:00
+++ b/storage/ndb/src/ndbapi/NdbBlob.cpp	2008-02-02 18:28:01 +03:00
@@ -679,23 +679,63 @@ NdbBlob::copyKeyFromRow(const NdbRecord 
   DBUG_RETURN(0);
 }
 
+/* 
+ * This method is used to get data ptr and length values for the
+ * header for an 'empty' Blob.  This is a blob with length zero,
+ * or a NULL BLOB.
+ * This header is used to build signals for an insert or write 
+ * operation before the correct blob header information is known.  
+ * Once the blob header information is known, another operation will 
+ * set the header information correctly.
+ */
 void
-NdbBlob::getBlobHeadData(const char * & data, Uint32 & byteSize)
+NdbBlob::getNullOrEmptyBlobHeadDataPtr(const char * & data, 
+                                       Uint32 & byteSize)
 {
-  prepareSetHeadInlineValue();
-  if (theNullFlag)
+  /* Only for use when preparing signals before a blob value has been set
+   * e.g. NdbRecord
+   */
+  assert(theState==Prepared);
+  assert(theLength==0);
+  assert(theSetBuf==NULL);
+  assert(theGetSetBytes==0);
+  assert(thePos==0);
+  assert(theHeadInlineBuf.data!=NULL);
+
+  DBUG_PRINT("info", ("getNullOrEmptyBlobHeadDataPtr.  Nullable : %d",
+                      theColumn->m_nullable));
+
+  if (theColumn->m_nullable)
   {
-    data= NULL;
-    byteSize= 0;
+    /* Null Blob */
+    data = NULL;
+    byteSize = 0;
+    return;
   }
+
+  /* Set up the buffer ptr to appear to be pointing to some data */
+  theSetBuf=(char*) 1; // Extremely nasty way of being non-null
+                       // If it's ever de-reffed, should show up 
+  
+  /* Pack header etc. */
+  prepareSetHeadInlineValue();
+  
+  data=theHeadInlineBuf.data;
+
+  /* Calculate size */
+  if (unlikely(theBlobVersion == NDB_BLOB_V1))
+    byteSize = theHeadInlineBuf.size;
   else
-  {
-    data= theHeadInlineBuf.data;
-    if (unlikely(theBlobVersion == NDB_BLOB_V1))
-      byteSize = theHeadInlineBuf.size;
-    else
-      byteSize = theHead.varsize + 2;
-  }
+    byteSize = theHead.varsize + 2;
+
+  /* Reset affected members */
+  theSetBuf=NULL;
+  memset(&theHead, 0, sizeof(theHead));
+
+  /* This column is not null anymore - record the fact so that
+   * a setNull() call will modify state
+   */
+  theNullFlag=false;
 }
 
 
@@ -969,8 +1009,14 @@ int
 NdbBlob::getHeadInlineValue(NdbOperation* anOp)
 {
   DBUG_ENTER("NdbBlob::getHeadInlineValue");
+
+  /* Get values using implementation of getValue to avoid NdbRecord
+   * specific checks
+   */
   theHeadInlineRecAttr = anOp->getValue_impl(theColumn, theHeadInlineBuf.data);
-  thePartitionIdRecAttr = anOp->getValue(NdbDictionary::Column::FRAGMENT);
+  thePartitionIdRecAttr = 
+    anOp->getValue_impl(&NdbColumnImpl::getImpl(*NdbDictionary::Column::FRAGMENT));
+  
   if (theHeadInlineRecAttr == NULL ||
       thePartitionIdRecAttr == NULL) {
     setErrorCode(anOp);
@@ -1108,9 +1154,11 @@ NdbBlob::setValue(const void* data, Uint
       theLength = 0;
     }
     /*
-      In NdbRecAttr case, we set the value of the blob head here.
-      In NdbRecord case, this is done in NdbOperation::prepareSendNdbRecord().
-    */
+     * In NdbRecAttr case, we set the value of the blob head here with
+     * an extra setValue()
+     * In NdbRecord case, this is done by adding a separate operation in 
+     * preExecute() as we cannot modify the head-table NdbOperation.
+     */
     if (!theNdbRecordFlag)
     {
       if (setHeadInlineValue(theNdbOp) == -1)
@@ -1247,6 +1295,11 @@ NdbBlob::truncate(Uint64 length)
       Uint32 off = getPartOffset(length);
       if (off != 0) {
         assert(off < thePartSize);
+        /* Ensure all previous writes to this blob are flushed so
+         * that we can read their updates
+         */
+        if (executePendingBlobWrites() == -1)
+          DBUG_RETURN(-1);
         Uint16 len = 0;
         if (readPart(thePartBuf.data, part1, len) == -1)
           DBUG_RETURN(-1);
@@ -2278,30 +2331,74 @@ NdbBlob::prepareColumn()
 }
 
 /*
- * Before execute of prepared operation.  May add new operations before
- * this one.  May ask that this operation and all before it (a "batch")
- * is executed immediately in no-commit mode.  In this case remaining
- * prepared operations are saved in a separate list.  They are added
- * back after postExecute.
+ * Before execute of prepared operation.  
+ * 
+ * This method adds any extra operations required to perform the
+ * requested Blob operations.
+ * This can include : 
+ *   Extra read operations added before the 'main table' operation
+ *     Read Blob head + inline bytes
+ *     Read original table key via access index
+ *   Extra operations added after the 'main table' operation
+ *     Update Blob head + inline bytes
+ *     Insert Blob parts
+ * 
+ * Generally, operations are performed in preExecute() if possible,
+ * and postExecute if not.
+ *
+ * If this method sets the batch parameter to true, then 
+ *  - any remaining Blobs in the current user defined operation
+ *    will have their preExecute() method called.
+ *  - all operations up to the last one added will be executed with
+ *    NoCommit BEFORE the next user-defined operation is executed.
+ *  - NdbBlob::postExecute() will be called for all Blobs in the
+ *    executed batch.
+ *  - Processing will continue with the next user-defined operation
+ *    (if any)
+ * This control flow can be seen in NdbTransaction::execute().
  */
 int
-NdbBlob::preExecute(NdbTransaction::ExecType anExecType, bool& batch)
+NdbBlob::preExecute(NdbTransaction::ExecType anExecType, 
+                    bool& batch)
 {
   DBUG_ENTER("NdbBlob::preExecute");
   DBUG_PRINT("info", ("this=%p op=%p con=%p", this, theNdbOp, theNdbCon));
+  DBUG_PRINT("info", ("optype=%d theGetSetBytes=%d theSetFlag=%d", 
+                      theNdbOp->theOperationType,
+                      theGetSetBytes,
+                      theSetFlag));
   if (theState == Invalid)
     DBUG_RETURN(-1);
   assert(theState == Prepared);
   // handle different operation types
   assert(isKeyOp());
+
+  /* Check that a non-nullable blob handle has had a value set 
+   * before proceeding 
+   */
+  if (!theColumn->m_nullable && 
+      (isInsertOp() || isWriteOp()) &&
+      !theSetFlag)
+  {
+    /* Illegal null attribute */
+    setErrorCode(839);
+    DBUG_RETURN(-1);
+  }
+
   if (isReadOp()) {
     if (theGetFlag && theGetSetBytes > theInlineSize) {
-      // need blob head before proceeding
+      /* Need blob head before proceeding
+       * Not safe to do a speculative read of parts, as we do not
+       * yet hold a lock on the blob head+inline
+       */
       batch = true;
     }
   }
-  if (isInsertOp()) {
-    if (theSetFlag && theGetSetBytes > theInlineSize) {
+  if (isInsertOp() && theSetFlag) {
+    /* Add operations to insert parts and update the
+     * Blob head+inline in the main tables
+     */
+    if (theGetSetBytes > theInlineSize) {
       // add ops to write rest of a setValue
       assert(theSetBuf != NULL);
       const char* buf = theSetBuf + theInlineSize;
@@ -2309,26 +2406,28 @@ NdbBlob::preExecute(NdbTransaction::Exec
       assert(thePos == theInlineSize);
       if (writeDataPrivate(buf, bytes) == -1)
         DBUG_RETURN(-1);
-      if (theHeadInlineUpdateFlag) {
-          // add an operation to update head+inline
-          NdbOperation* tOp = theNdbCon->getNdbOperation(theTable);
-          if (tOp == NULL ||
-              tOp->updateTuple() == -1 ||
-              setTableKeyValue(tOp) == -1 ||
-              setHeadInlineValue(tOp) == -1) {
-            setErrorCode(NdbBlobImpl::ErrAbort);
-            DBUG_RETURN(-1);
-          }
-          if (thePartitionId != noPartitionId()) {
-            tOp->setPartitionId(thePartitionId);
-          }
-          DBUG_PRINT("info", ("added op to update head+inline"));
+    }
+    
+    if (theHeadInlineUpdateFlag)
+    {
+      NdbOperation* tOp = theNdbCon->getNdbOperation(theTable);
+      if (tOp == NULL ||
+          tOp->updateTuple() == -1 ||
+          setTableKeyValue(tOp) == -1 ||
+          setHeadInlineValue(tOp) == -1) {
+        setErrorCode(NdbBlobImpl::ErrAbort);
+        DBUG_RETURN(-1);
       }
+      if (thePartitionId != noPartitionId()) {
+        tOp->setPartitionId(thePartitionId);
+      }
+      DBUG_PRINT("info", ("Insert : added op to update head+inline"));
     }
   }
+
   if (isTableOp()) {
     if (isUpdateOp() || isWriteOp() || isDeleteOp()) {
-      // add operation before this one to read head+inline
+      // add operation before main table op to read head+inline
       NdbOperation* tOp = theNdbCon->getNdbOperation(theTable, theNdbOp);
       /*
        * If main op is from take over scan lock, the added read is done
@@ -2355,11 +2454,24 @@ NdbBlob::preExecute(NdbTransaction::Exec
         tOp->setPartitionId(thePartitionId);
       }
       if (isWriteOp()) {
+        /* There may be no data currently, so ignore tuple not found etc. */
         tOp->m_abortOption = NdbOperation::AO_IgnoreError;
         tOp->m_noErrorPropagation = true;
       }
       theHeadInlineReadOp = tOp;
+      // TODO : Could reuse this op for fetching other blob heads in 
+      //        the request?
+      //        Add their getHeadInlineValue() calls to this, rather
+      //        than having separate ops?  (Similar to Index read below)
       // execute immediately
+      // TODO : Why can't we continue with pre-execute of other user ops?
+      //        Rationales that occur:
+      //          - We're trying to keep user's op order consistent - 
+      //            1 op completes before another starts.  
+      //            - They probably shouldn't rely on this
+      //            - Maybe it makes failure more atomic w.r.t. separate
+      //              operations on Blobs
+      //          - Or perhaps error handling is easier?
       batch = true;
       DBUG_PRINT("info", ("added op before to read head+inline"));
     }
@@ -2389,8 +2501,8 @@ NdbBlob::preExecute(NdbTransaction::Exec
           DBUG_RETURN(-1);
         }
       }
+      DBUG_PRINT("info", ("Index op : added op before to read table key"));
     }
-    DBUG_PRINT("info", ("added op before to read table key"));
     if (isUpdateOp() || isDeleteOp()) {
       // add op before this one to read head+inline via index
       NdbIndexOperation* tOp = theNdbCon->getNdbIndexOperation(theAccessTable->m_index, theTable, theNdbOp);
@@ -2401,12 +2513,10 @@ NdbBlob::preExecute(NdbTransaction::Exec
         setErrorCode(tOp);
         DBUG_RETURN(-1);
       }
-      if (isWriteOp()) {
-        tOp->m_abortOption = NdbOperation::AO_IgnoreError;
-        tOp->m_noErrorPropagation = true;
-      }
       theHeadInlineReadOp = tOp;
       // execute immediately
+      // TODO : Why execute immediately?  We could continue with other blobs
+      // etc. here
       batch = true;
       DBUG_PRINT("info", ("added index op before to read head+inline"));
     }
@@ -2420,6 +2530,7 @@ NdbBlob::preExecute(NdbTransaction::Exec
       // write head+inline now
       theNullFlag = true;
       theLength = 0;
+      /* Copy data into the headinline buffer */
       if (theSetBuf != NULL) {
         Uint32 n = theGetSetBytes;
         if (n > theInlineSize)
@@ -2429,15 +2540,44 @@ NdbBlob::preExecute(NdbTransaction::Exec
           DBUG_RETURN(-1);
       }
       /*
-        For NdbRecAttr case, we need to set the value of the blob head here.
-        For NdbRecord case, it is done in NdbOperation::prepareSendNdbRecord().
-      */
+       * We set the value of the blob head and inline data here if possible.
+       * Note that the length is being set to max theInlineSize.  This will
+       * be written with the correct length later if necessary.
+       */
       if (!theNdbRecordFlag)
       {
         if (setHeadInlineValue(theNdbOp) == -1)
           DBUG_RETURN(-1);
       }
-      // the read op before us may overwrite
+      else
+      {
+        /* For table based NdbRecord writes we can set the head+inline 
+         * bytes here.  For index based writes, we need to wait until 
+         * after the execute for the table key data to be available.
+         * TODO : Is it worth doing this at all?
+         */
+        if (isTableOp())
+        {
+          /* NdbRecord - add an update operation after the main op */
+          NdbOperation* tOp = 
+            theNdbCon->getNdbOperation(theTable);
+          if (tOp == NULL ||
+              tOp->updateTuple() == -1 ||
+              setTableKeyValue(tOp) == -1 ||
+              setHeadInlineValue(tOp) == -1) {
+            setErrorCode(NdbBlobImpl::ErrAbort);
+            DBUG_RETURN(-1);
+          }
+          if (thePartitionId != noPartitionId()) {
+            tOp->setPartitionId(thePartitionId);
+          }
+          DBUG_PRINT("info", ("NdbRecord table write : added op to update head+inline"));
+        }
+      }
+      /* Save the contents of the head inline buf for postExecute
+       * It may get overwritten by the read operation injected
+       * above
+       */
       theHeadInlineCopyBuf.copyfrom(theHeadInlineBuf);
     }
   }
@@ -2450,10 +2590,34 @@ NdbBlob::preExecute(NdbTransaction::Exec
 }
 
 /*
- * After execute, for any operation.  If already Active, this routine
- * has been done previously.  Operations which requested a no-commit
- * batch can add new operations after this one.  They are added before
- * any remaining prepared operations.
+ * After execute, for each Blob in an operation.  If already Active, 
+ * this routine has been done previously and is not rerun.  
+ * Operations which requested a no-commit batch can add new operations 
+ * after this one.  They are added before any remaining prepared user 
+ * operations (See NdbTransaction::execute())
+ *
+ * This method has the following duties : 
+ *  - Operation specific duties : 
+ *    - Index based ops : Store main table key retrieved in preExecute
+ *    - Read ops : Store read head+inline and read parts (inline execute)
+ *    - Update ops : Store read head+inline and update parts (inline execute)
+ *    - Table based write : Either store read head+inline and delete then 
+ *                          insert parts and head+inline (inline execute) OR
+ *                          Perform deletePartsUnknown() to avoid lockless
+ *                          race with another transaction, then update head
+ *                          and insert parts (inline execute)
+ *    - Index based write : Always perform deletePartsUnknown based on 
+ *                          fetched main table key then update head+inline
+ *                          and insert parts (inline execute)
+ *                          Rationale: Couldn't read head+inline safely as
+ *                          Index ops don't support IgnoreError so could
+ *                          cause Txn fail for write()?
+ *    - Delete op : Store read head+inline info and use to delete parts
+ *                  (inline execute)
+ *  - Change Blob handle state to Active
+ *  - Execute user's activeHook function if set
+ *  - Add an operation to update the Blob's head+inline bytes if
+ *    necesary 
  */
 int
 NdbBlob::postExecute(NdbTransaction::ExecType anExecType)
@@ -2579,6 +2743,7 @@ NdbBlob::postExecute(NdbTransaction::Exe
     if (invokeActiveHook() == -1)
       DBUG_RETURN(-1);
   }
+  /* Cope with any changes to the head */
   if (anExecType == NdbTransaction::NoCommit && theHeadInlineUpdateFlag) {
     NdbOperation* tOp = theNdbCon->getNdbOperation(theTable);
     if (tOp == NULL ||
@@ -2599,7 +2764,8 @@ NdbBlob::postExecute(NdbTransaction::Exe
 
 /*
  * Before commit of completed operation.  For write add operation to
- * update head+inline.
+ * update head+inline if necessary.  This code is the same as the
+ * last part of postExecute()
  */
 int
 NdbBlob::preCommit()
@@ -2801,3 +2967,4 @@ NdbBlob::blobsNextBlob()
 {
   return theNext;
 }
+
diff -Nrup a/storage/ndb/src/ndbapi/NdbOperation.cpp b/storage/ndb/src/ndbapi/NdbOperation.cpp
--- a/storage/ndb/src/ndbapi/NdbOperation.cpp	2007-10-12 11:12:25 +04:00
+++ b/storage/ndb/src/ndbapi/NdbOperation.cpp	2008-02-02 18:28:01 +03:00
@@ -97,11 +97,15 @@ NdbOperation::~NdbOperation( )
  *                 on connection set an error status.
  *****************************************************************************/
 void
-NdbOperation::setErrorCode(int anErrorCode)
+NdbOperation::setErrorCode(int anErrorCode) const
 {
-  theError.code = anErrorCode;
+  /* Setting an error is considered to be a const 
+     operation, hence the nasty cast here */
+  NdbOperation *pnonConstThis=const_cast<NdbOperation *>(this);
+
+  pnonConstThis->theError.code = anErrorCode;
   theNdbCon->theErrorLine = theErrorLine;
-  theNdbCon->theErrorOperation = this;
+  theNdbCon->theErrorOperation = pnonConstThis;
   if (!(m_abortOption == AO_IgnoreError && m_noErrorPropagation))
     theNdbCon->setOperationErrorCode(anErrorCode);
 }
@@ -113,11 +117,15 @@ NdbOperation::setErrorCode(int anErrorCo
  *                 an error status.
  *****************************************************************************/
 void
-NdbOperation::setErrorCodeAbort(int anErrorCode)
+NdbOperation::setErrorCodeAbort(int anErrorCode) const
 {
-  theError.code = anErrorCode;
+  /* Setting an error is considered to be a const 
+     operation, hence the nasty cast here */
+  NdbOperation *pnonConstThis=const_cast<NdbOperation *>(this);
+
+  pnonConstThis->theError.code = anErrorCode;
   theNdbCon->theErrorLine = theErrorLine;
-  theNdbCon->theErrorOperation = this;
+  theNdbCon->theErrorOperation = pnonConstThis;
   // ignore m_noErrorPropagation
   theNdbCon->setOperationErrorCodeAbort(anErrorCode);
 }
@@ -169,6 +177,8 @@ NdbOperation::init(const NdbTableImpl* t
   m_noErrorPropagation = false;
   m_no_disk_flag = 1;
   m_interpreted_code = NULL;
+  m_extraSetValues = NULL;
+  m_numExtraSetValues = 0;
   m_use_any_value = 0;
 
   tSignal = theNdb->getSignal();
@@ -292,7 +302,12 @@ NdbOperation::getValue(Uint32 anAttrId, 
 NdbRecAttr*
 NdbOperation::getValue(const NdbDictionary::Column* col, char* aValue)
 {
-  return getValue_impl(&NdbColumnImpl::getImpl(*col), aValue);
+  if (theStatus != UseNdbRecord)
+    return getValue_impl(&NdbColumnImpl::getImpl(*col), aValue);
+  
+  setErrorCodeAbort(4508);
+  /* GetValue not allowed for NdbRecord defined operation */
+  return NULL;
 }
 
 int
@@ -349,6 +364,37 @@ NdbOperation::getBlobHandle(Uint32 anAtt
     return getBlobHandle(theNdbCon, col);
   }
 }
+
+NdbBlob*
+NdbOperation::getBlobHandle(const char* anAttrName) const
+{
+  const NdbColumnImpl* col = m_currentTable->getColumn(anAttrName);
+  if (col == NULL)
+  {
+    setErrorCode(4004);
+    return NULL;
+  }
+  else
+  {
+    return getBlobHandle(theNdbCon, col);
+  }
+}
+
+NdbBlob*
+NdbOperation::getBlobHandle(Uint32 anAttrId) const
+{
+  const NdbColumnImpl* col = m_currentTable->getColumn(anAttrId);
+  if (col == NULL)
+  {
+    setErrorCode(4004);
+    return NULL;
+  }
+  else
+  {
+    return getBlobHandle(theNdbCon, col);
+  }
+}
+
 
 int
 NdbOperation::incValue(const char* anAttrName, Uint32 aValue)
diff -Nrup a/storage/ndb/src/ndbapi/NdbOperationDefine.cpp b/storage/ndb/src/ndbapi/NdbOperationDefine.cpp
--- a/storage/ndb/src/ndbapi/NdbOperationDefine.cpp	2007-12-11 18:07:48 +03:00
+++ b/storage/ndb/src/ndbapi/NdbOperationDefine.cpp	2008-02-02 18:28:01 +03:00
@@ -441,6 +441,9 @@ NdbRecAttr*
 NdbOperation::getValue_NdbRecord(const NdbColumnImpl* tAttrInfo, char* aValue)
 {
   NdbRecAttr* tRecAttr;
+
+  m_no_disk_flag &= (tAttrInfo->m_storageType == NDB_STORAGETYPE_DISK ? 0:1);
+
   /*
     For getValue with NdbRecord operations, we just allocate the NdbRecAttr,
     the signal data will be constructed later.
@@ -665,7 +668,9 @@ NdbOperation::setAnyValue(Uint32 any_val
   return -1;
 }
 
-
+/* Non-const variant of getBlobHandle - can return existing blob
+ * handles, or create new ones for non-NdbRecord operations 
+ */
 NdbBlob*
 NdbOperation::getBlobHandle(NdbTransaction* aCon, const NdbColumnImpl* tAttrInfo)
 {
@@ -679,10 +684,10 @@ NdbOperation::getBlobHandle(NdbTransacti
   }
 
   /*
-    For NdbRecord operation, we only fetch existing blob handles here,
-    creation must be done by requesting the blob in the NdbRecord and
-    mask when creating the operation.
-  */
+   * For NdbRecord operation, we only fetch existing blob handles here,
+   * creation must be done by requesting the blob in the NdbRecord and
+   * mask when creating the operation.
+   */
   if (m_attribute_record)
   {
     setErrorCodeAbort(4288);
@@ -705,6 +710,25 @@ NdbOperation::getBlobHandle(NdbTransacti
   return tBlob;
 }
 
+/* const variant of getBlobHandle - only returns existing blob handles */
+NdbBlob*
+NdbOperation::getBlobHandle(NdbTransaction* aCon, const NdbColumnImpl* tAttrInfo) const
+{
+  NdbBlob* tBlob = theBlobList;
+  while (tBlob != NULL) {
+    if (tBlob->theColumn == tAttrInfo)
+      return tBlob;
+    tBlob = tBlob->theNext;
+  }
+
+  /*
+    Const method - cannot create a new BLOB handle, NdbRecord
+    or NdbRecAttr
+  */
+  setErrorCodeAbort(4288);
+  return NULL;
+}
+
 /*
   This is used to set up a blob handle for an NdbRecord operation.
 
@@ -765,12 +789,12 @@ NdbOperation::linkInBlobHandle(NdbTransa
 }
 
 /*
-  Setup blob handles for an NdbRecord operation.
-
-  Create blob handles for all requested blob columns.
-
-  For read request, store the pointers to blob handles in the row.
-*/
+ * Setup blob handles for an NdbRecord operation.
+ *
+ * Create blob handles for all requested blob columns.
+ *
+ * For read request, store the pointers to blob handles in the row.
+ */
 int
 NdbOperation::getBlobHandlesNdbRecord(NdbTransaction* aCon)
 {
@@ -797,9 +821,9 @@ NdbOperation::getBlobHandlesNdbRecord(Nd
     if (theOperationType == ReadRequest || theOperationType == ReadExclusive)
     {
       /*
-        For read request, it is safe to cast away const-ness for the
-        m_attribute_row.
-      */
+       * For read request, it is safe to cast away const-ness for the
+       * m_attribute_row.
+       */
       memcpy((char *)&m_attribute_row[col->offset], &bh, sizeof(bh));
     }
   }
@@ -812,7 +836,7 @@ NdbOperation::getBlobHandlesNdbRecord(Nd
   so that we can be sure to delete all blob parts for the row.
 */
 int
-NdbOperation::getBlobHandlesDelete(NdbTransaction* aCon)
+NdbOperation::getBlobHandlesNdbRecordDelete(NdbTransaction* aCon)
 {
   NdbBlob *lastBlob= NULL;
 
@@ -998,4 +1022,218 @@ NdbOperation::setAbortOption(AbortOption
     default:
       return -1;
   }
+}
+
+
+/*
+ * handleOperationOptions
+ * static member for setting operation options
+ * Called when defining operations, from NdbTransaction and
+ * NdbScanOperation
+ */
+int
+NdbOperation::handleOperationOptions (const OperationType type,
+                                      const OperationOptions *opts,
+                                      const Uint32 sizeOfOptions,
+                                      NdbOperation *op)
+{
+  /* Check options size for versioning... */
+  if (unlikely((sizeOfOptions != 0) && 
+               (sizeOfOptions != sizeof(OperationOptions))))
+  {
+    // Handle different sized OperationOptions
+    // Probably smaller is old version, larger is new version.
+    
+    // No other versions currently supported
+    // Invalid or unsupported OperationOptions structure
+    return 4297;
+  }
+
+  bool isScanTakeoverOp = (op->m_key_record == NULL); 
+  
+  if (opts->optionsPresent & OperationOptions::OO_ABORTOPTION)
+  {
+    /* User defined operation abortoption : Allowed for 
+     * any operation 
+     */
+    switch (opts->abortOption)
+    {
+    case AO_IgnoreError:
+    case AbortOnError:
+    {
+      op->m_abortOption=opts->abortOption;
+      break;
+    }
+    default:
+      // Non-specific abortoption
+      // Invalid AbortOption
+      return 4296;
+    }
+  } 
+
+  if ((opts->optionsPresent & OperationOptions::OO_GETVALUE) &&
+      (opts->numExtraGetValues > 0))
+  {
+    if (opts->extraGetValues == NULL)
+    {
+      // Incorrect combination of OperationOptions optionsPresent, 
+      // extraGet/SetValues ptr and numExtraGet/SetValues
+      return 4512;
+    }
+
+    // Only certain operation types allow extra GetValues
+    // Update could be made to support it in future
+    if (type == ReadRequest ||
+        type == ReadExclusive ||
+        type == DeleteRequest)
+    {
+      // Could be readTuple(), or lockCurrentTuple().
+      // We perform old-school NdbRecAttr reads on
+      // these values.
+      for (unsigned int i=0; i < opts->numExtraGetValues; i++)
+      {
+        GetValueSpec *pvalSpec 
+          = &(opts->extraGetValues[i]);
+
+        pvalSpec->recAttr=NULL;
+
+        if (pvalSpec->column == NULL)
+        {
+          // Column is NULL in Get/SetValueSpec structure
+          return 4295;
+        }
+
+        NdbRecAttr *pra=
+          op->getValue_NdbRecord(&NdbColumnImpl::getImpl(*pvalSpec->column),
+                                 (char *) pvalSpec->appStorage);
+        
+        if (pra == NULL)
+        {
+          return -1;
+        }
+
+        pvalSpec->recAttr = pra;
+      }
+    }
+    else
+    {
+      // Bad operation type for GetValue
+      switch (type)
+      {
+      case WriteRequest : 
+      case UpdateRequest :
+      {
+        return 4502;
+        // GetValue not allowed in Update operation
+      }
+      case InsertRequest :
+      {
+        return 4503;
+        // GetValue not allowed in Insert operation
+      }
+      default :
+        return 4118;
+        // Parameter error in API call
+      }
+    }
+  }
+
+  if ((opts->optionsPresent & OperationOptions::OO_SETVALUE) &&
+      (opts->numExtraSetValues > 0))
+  {
+    if (opts->extraSetValues == NULL)
+    {
+      // Incorrect combination of OperationOptions optionsPresent, 
+      // extraGet/SetValues ptr and numExtraGet/SetValues
+      return 4512;
+    }
+
+    if ((type == InsertRequest) ||
+        (type == UpdateRequest) ||
+        (type == WriteRequest))
+    {
+      /* Could be insert/update/writeTuple() or 
+       * updateCurrentTuple()
+       */
+      // Validate SetValuesSpec
+      for (Uint32 i=0; i< opts->numExtraSetValues; i++)
+      {
+        const NdbDictionary::Column *pcol=opts->extraSetValues[i].column;
+        const void *pvalue=opts->extraSetValues[i].value;
+
+        if (pcol == NULL)
+        {
+          // Column is NULL in Get/SetValueSpec structure
+          return 4295;
+        }
+
+        if (pcol->getPrimaryKey())
+        {
+          // For insert, NdbRecord needed the full PK before we got here
+          // So if we get a setValue on a PK column here, it's a problem
+          // Set value on tuple key attribute is not allowed
+          return 4202;
+        }
+
+        if (pvalue == NULL)
+        {
+          if (!pcol->getNullable())
+          {
+            // Trying to set a NOT NULL attribute to NULL
+            return 4203;
+          }
+        }
+          
+        NdbDictionary::Column::Type colType=pcol->getType();
+          
+        if ((colType == NdbDictionary::Column::Blob) ||
+            (colType == NdbDictionary::Column::Text))
+        {
+          // Invalid usage of blob attribute
+          return 4264;
+        }          
+      }
+
+      // Store details of extra set values for later
+      op->m_extraSetValues = opts->extraSetValues;
+      op->m_numExtraSetValues = opts->numExtraSetValues;
+    }
+    else
+    {
+      // Set value and Read/Delete etc is incompatible
+      return 4204;
+    }
+  }
+
+  if (opts->optionsPresent & OperationOptions::OO_PARTITION_ID)
+  {
+    /* Should not have any blobs defined at this stage */
+    assert(op->theBlobList == NULL);
+
+    /* Not allowed for scan takeover ops */
+    if (unlikely(isScanTakeoverOp))
+    {
+      return 4510;
+      /* User-specified partition id not allowed for scan 
+       * takeover operation 
+       */
+    }
+    op->theDistributionKey=opts->partitionId;
+    op->theDistrKeyIndicator_= 1;       
+  }
+    
+  if (opts->optionsPresent & OperationOptions::OO_INTERPRETED)
+  {
+    /* Todo : Improve checks on operation type here */
+    op->m_interpreted_code = opts->interpretedCode;
+  }
+
+  if (opts->optionsPresent & OperationOptions::OO_ANYVALUE)
+  {
+    /* Any operation can have an ANYVALUE set */
+    op->m_any_value = opts->anyValue;
+    op->m_use_any_value = 1;
+  }
+
+  return 0;
 }
diff -Nrup a/storage/ndb/src/ndbapi/NdbOperationExec.cpp b/storage/ndb/src/ndbapi/NdbOperationExec.cpp
--- a/storage/ndb/src/ndbapi/NdbOperationExec.cpp	2007-12-10 09:07:27 +03:00
+++ b/storage/ndb/src/ndbapi/NdbOperationExec.cpp	2008-02-02 18:28:01 +03:00
@@ -606,23 +606,26 @@ NdbOperation::prepareSendInterpreted()
 
 
 /*
-  Prepares TCKEYREQ and (if needed) KEYINFO and ATTRINFO signals, for
-  operations using NdbRecord.
+ Prepares TCKEYREQ and (if needed) KEYINFO and ATTRINFO signals for
+ operations using the NdbRecord API
+ Executed when the operation is defined.
+ @returns 0 for success
 */
 int
-NdbOperation::prepareSendNdbRecord(Uint32 aTC_ConnectPtr, Uint64 aTransId,
-                                   AbortOption ao)
+NdbOperation::buildSignalsNdbRecord(Uint32 aTC_ConnectPtr, Uint64 aTransId)
 {
   char buf[256];
   Uint32 *keyInfoPtr, *attrInfoPtr;
   Uint32 remain;
   int res;
   Uint32 no_disk_flag;
-  Uint32 interpreted_code_end;
-  Uint32 *update_len_addr;
+  Uint32 interpreted_code_end=0;
+  Uint32 *update_len_addr=NULL;
 
   assert(theStatus==UseNdbRecord);
-  /* Not yet support for NdbRecord with interpreted operations. */
+  /* Interpreted operations not supported with NdbRecord
+   * use NdbInterpretedCode instead
+   */
   assert(!theInterpretIndicator);
 
   const NdbRecord *key_rec= m_key_record;
@@ -631,7 +634,7 @@ NdbOperation::prepareSendNdbRecord(Uint3
   const char *updRow;
 
   TcKeyReq *tcKeyReq= CAST_PTR(TcKeyReq, theTCREQ->getDataPtrSend());
-  Uint32 hdrSize= fillTcKeyReqHdr(tcKeyReq, aTC_ConnectPtr, aTransId, ao);
+  Uint32 hdrSize= fillTcKeyReqHdr(tcKeyReq, aTC_ConnectPtr, aTransId);
   keyInfoPtr= theTCREQ->getDataPtrSend() + hdrSize;
   remain= TcKeyReq::MaxKeyInfo;
 
@@ -639,6 +642,7 @@ NdbOperation::prepareSendNdbRecord(Uint3
   if (!key_rec)
   {
     /* This means that key_row contains the KEYINFO20 data. */
+    /* i.e. lock takeover */
     tcKeyReq->tableId= attr_rec->tableId;
     tcKeyReq->tableSchemaVersion= attr_rec->tableVersion;
     res= insertKEYINFO_NdbRecord(aTC_ConnectPtr, aTransId, key_row,
@@ -667,7 +671,7 @@ NdbOperation::prepareSendNdbRecord(Uint3
         return -1;
       }
 
-      Uint32 length;
+      Uint32 length=0;
 
       bool len_ok;
       const char *src;
@@ -790,21 +794,35 @@ NdbOperation::prepareSendNdbRecord(Uint3
         }
         else
         {
-          /*
-            Blob column.
-            For insert and write, we need to set the value of the blob head
-            (cannot leave it unset in case the blob is non-nullable).
-            For update, do nothing, as another operation will be injected to
-            update the blob head.
-          */
           NdbBlob *bh= currentBlob;
           currentBlob= currentBlob->theNext;
+
+          /* 
+           * Blob column
+           * We cannot prepare signals to update the Blob yet, as the 
+           * user has not had a chance to specify the data to write yet.
+           *
+           * Writes to the blob head, inline data and parts are handled
+           * by separate operations, injected before and after this one 
+           * as part of the blob handling code in NdbTransaction::execute().
+           * However, for Insert and Write to non-nullable columns, we must 
+           * write some BLOB data here in case the BLOB is non-nullable.  
+           * For this purpose, we write data of zero length
+           * For nullable columns, we write null data.  This is necessary
+           * as it is valid for users to never call setValue() for nullable
+           * blobs.
+           */
           if (tOpType == UpdateRequest)
-            continue;
+            continue; // Do nothing in this operation
 
-          bh->getBlobHeadData(data, length);
+          /* 
+           * Blob call that sets up a data pointer for blob header data
+           * for an 'empty' blob - length zero or null depending on
+           * Blob's 'nullability'
+           */
+          bh->getNullOrEmptyBlobHeadDataPtr(data, length);
         }
-      }
+      } // if Blob or Bitfield
 
       res= insertATTRINFOHdr_NdbRecord(aTC_ConnectPtr, aTransId,
                                        attrId, length,
@@ -819,9 +837,67 @@ NdbOperation::prepareSendNdbRecord(Uint3
         if(res)
           return res;
       }
-    }
+    } // for noOfColumns
+
+    /* Now handle any extra setValues passed in */
+    if (m_extraSetValues != NULL)
+    {
+      for (Uint32 i=0; i<m_numExtraSetValues; i++)
+      {
+        const NdbDictionary::Column *extraCol=m_extraSetValues[i].column;
+        const void * pvalue=m_extraSetValues[i].value;
+
+        if (extraCol->getStorageType( )== NDB_STORAGETYPE_DISK)
+          no_disk_flag=0;
+
+        Uint32 length;
+        
+        if (pvalue==NULL)
+          length=0;
+        else
+        { 
+          length=extraCol->getSizeInBytes();          
+          if (extraCol->getArrayType() != NdbDictionary::Column::ArrayTypeFixed)
+          {
+            Uint32 lengthInfoBytes;
+            if (!NdbSqlUtil::get_var_length((Uint32) extraCol->getType(),
+                                            pvalue,
+                                            length,
+                                            lengthInfoBytes,
+                                            length))
+            {
+              // Length parameter in equal/setValue is incorrect
+              setErrorCodeAbort(4209);
+              return -1;
+            }
+          }
+        }       
+        
+        // Add ATTRINFO
+        res= insertATTRINFOHdr_NdbRecord(aTC_ConnectPtr, aTransId,
+                                         extraCol->getAttrId(), length,
+                                         &attrInfoPtr, &remain);
+        if(res)
+          return res;
+
+        if(length>0)
+        {
+          res=insertATTRINFOData_NdbRecord(aTC_ConnectPtr, aTransId,
+                                           (char*)pvalue, length,
+                                           &attrInfoPtr, &remain);
+          if(res)
+            return res;
+        }
+      } // for numExtraSetValues
+    }// if m_extraSetValues!=null
+
+    /* Don't need these any more */
+    m_extraSetValues = NULL;
+    m_numExtraSetValues = 0;
   }
-  else if (tOpType == ReadRequest || tOpType == ReadExclusive)
+  else if (tOpType == ReadRequest || tOpType == ReadExclusive ||
+           (tOpType == DeleteRequest && 
+            m_attribute_row != NULL)) // Read as part of delete
   {
     Uint32 column_count= 0;
     for (Uint32 i= 0; i<attr_rec->noOfColumns; i++)
@@ -836,7 +912,9 @@ NdbOperation::prepareSendNdbRecord(Uint3
                             m_read_mask, attrId))
         continue;
 
-      /* Blob reads are handled with a getValue() in NdbBlob.cpp. */
+      /* Blob head reads were defined as extra GetValues, 
+       * processed below, not here.
+       */
       if (unlikely(col->flags & NdbRecord::IsBlob))
         continue;
 
@@ -845,8 +923,6 @@ NdbOperation::prepareSendNdbRecord(Uint3
 
       /*
         Read the column.
-        For blobs, we actually read the blob head, and treat it as a special
-        case in the receiver.
       */
       res= insertATTRINFOHdr_NdbRecord(aTC_ConnectPtr, aTransId,
                                        attrId, 0,
@@ -857,7 +933,10 @@ NdbOperation::prepareSendNdbRecord(Uint3
     }
     theReceiver.m_record.m_column_count= column_count;
 
-    /* Handle any additional getValue(). */
+    /* Handle any additional getValue(). 
+     * Note : This includes extra getValue()s to read Blob
+     * header + inline data
+     */
     const NdbRecAttr *ra= theReceiver.theFirstRecAttr;
     while (ra)
     {
@@ -913,6 +992,31 @@ assert(update_word_length > 0);
                             theTotalCurrAI_Len < TcKeyReq::MaxAttrInfo ?
                                 theTotalCurrAI_Len : TcKeyReq::MaxAttrInfo);
 
+  return 0;
+
+}
+
+/*
+  Do final signal preparation before sending.
+*/
+int
+NdbOperation::prepareSendNdbRecord(AbortOption ao)
+{
+  // There are a number of flags in the TCKEYREQ header that
+  // we have to set at this point...they are not correctly 
+  // defined before the call to execute().
+  TcKeyReq *tcKeyReq=CAST_PTR(TcKeyReq, theTCREQ->getDataPtrSend());
+  
+  Uint8 abortOption= (ao == DefaultAbortOption) ?
+    (Uint8) m_abortOption : (Uint8) ao;
+  
+  m_abortOption= theSimpleIndicator && theOperationType==ReadRequest ?
+    (Uint8) AO_IgnoreError : (Uint8) abortOption;
+
+  TcKeyReq::setAbortOption(tcKeyReq->requestInfo, m_abortOption);
+  TcKeyReq::setCommitFlag(tcKeyReq->requestInfo, theCommitIndicator);
+  TcKeyReq::setStartFlag(tcKeyReq->requestInfo, theStartIndicator);
+
   theStatus= WaitResponse;
   theReceiver.prepareSend();
 
@@ -928,8 +1032,7 @@ assert(update_word_length > 0);
 Uint32
 NdbOperation::fillTcKeyReqHdr(TcKeyReq *tcKeyReq,
                               Uint32 connectPtr,
-                              Uint64 transId,
-                              AbortOption ao)
+                              Uint64 transId)
 {
   Uint32 hdrLen;
   UintR *hdrPtr;
@@ -944,17 +1047,13 @@ NdbOperation::fillTcKeyReqHdr(TcKeyReq *
 
   UintR reqInfo= 0;
   TcKeyReq::setSimpleFlag(reqInfo, theSimpleIndicator);
-  TcKeyReq::setCommitFlag(reqInfo, theCommitIndicator);
-  TcKeyReq::setStartFlag(reqInfo, theStartIndicator);
+  // CommitFlag set later in prepareSendNdbRecord()
+  // StartFlag set later in prepareSendNdbRecord()
   TcKeyReq::setInterpretedFlag(reqInfo, (m_interpreted_code != NULL));
   /* We will setNoDiskFlag() later when we have checked all columns. */
   TcKeyReq::setDirtyFlag(reqInfo, theDirtyIndicator);
   TcKeyReq::setOperationType(reqInfo, theOperationType);
-  Uint8 abortOption= (ao == DefaultAbortOption) ?
-    (Uint8) m_abortOption : (Uint8) ao;
-  m_abortOption= theSimpleIndicator && theOperationType==ReadRequest ?
-    (Uint8) AO_IgnoreError : (Uint8) abortOption;
-  TcKeyReq::setAbortOption(reqInfo, m_abortOption);
+  // AbortOption set later in prepareSendNdbRecord()
   TcKeyReq::setDistributionKeyFlag(reqInfo, theDistrKeyIndicator_);
   TcKeyReq::setScanIndFlag(reqInfo, theScanInfo & 1);
   /* We will setAIInTcKeyReq() and setKeyLength() later. */
diff -Nrup a/storage/ndb/src/ndbapi/NdbReceiver.cpp b/storage/ndb/src/ndbapi/NdbReceiver.cpp
--- a/storage/ndb/src/ndbapi/NdbReceiver.cpp	2007-10-15 13:26:08 +04:00
+++ b/storage/ndb/src/ndbapi/NdbReceiver.cpp	2008-02-02 18:28:01 +03:00
@@ -507,22 +507,41 @@ NdbReceiver::getScanAttrData(const char 
 int
 NdbReceiver::execTRANSID_AI(const Uint32* aDataPtr, Uint32 aLength)
 {
-  if (m_using_ndb_record)
+  /*
+   * NdbRecord and NdbRecAttr row result handling are merged here
+   *   First any NdbRecord attributes are extracted
+   *   Then any NdbRecAttr attributes are extracted
+   *   NdbRecord scans with extra NdbRecAttr getValue() attrs
+   *   are handled separately in the NdbRecord code
+   * Scenarios : 
+   *   NdbRecord only PK read result
+   *   NdbRecAttr only PK read result
+   *   Mixed PK read results
+   *   NdbRecord only scan read result
+   *   NdbRecAttr only scan read result
+   *   Mixed scan read results
+   */
+  Uint32 exp= m_expected_result_length;
+  Uint32 tmp= m_received_result_length + aLength;
+  Uint32 origLength=aLength;
+  const NdbRecord *rec= (m_using_ndb_record? m_record.m_ndb_record : NULL);
+    // BEWARE : *rec may be invalid in RecAttr only case
+  NdbRecAttr* currRecAttr = theCurrentRecAttr;
+  Uint32 rec_pos= 0;
+  Uint32 save_pos= 0;
+  Uint32 column_count= 0;
+
+  bool ndbrecord_part_done=!m_using_ndb_record;
+
+  while (aLength > 0)
   {
-    Uint32 exp= m_expected_result_length;
-    Uint32 tmp= m_received_result_length + aLength;
-    const NdbRecord *rec= m_record.m_ndb_record;
-    Uint32 rec_pos= 0;
-    Uint32 save_pos= 0;
-    Uint32 column_count= 0;
+    AttributeHeader ah(* aDataPtr++);
+    const Uint32 attrId= ah.getAttributeId();
+    Uint32 attrSize= ah.getByteSize();
+    aLength--;
 
-    while (aLength > 0)
+    if (!ndbrecord_part_done)
     {
-      AttributeHeader ah(* aDataPtr++);
-      const Uint32 attrId= ah.getAttributeId();
-      Uint32 attrSize= ah.getByteSize();
-      aLength--;
-
       /* Special case for RANGE_NO, which is stored just after the row. */
       if (attrId==AttributeHeader::RANGE_NO)
       {
@@ -561,143 +580,141 @@ NdbReceiver::execTRANSID_AI(const Uint32
             memcpy(m_record.m_row + m_record.m_row_offset - save_pos,
                    aDataPtr, attrSize);
           }
+
+          Uint32 sizeInWords= (attrSize+3)>>2;
+          aDataPtr+= sizeInWords;
+          aLength-= sizeInWords;
+          continue;
         }
         else
         {
-          /* Handle extra attributes requested with getValue(). */
           assert(theCurrentRecAttr != NULL);
           assert(theCurrentRecAttr->attrId() == attrId);
-          bool res= theCurrentRecAttr->receive_data(aDataPtr, attrSize);
-          assert(res);
-          theCurrentRecAttr= theCurrentRecAttr->next();
+          /* Handle extra attributes requested with getValue(). */
+          /* This implies that we've finished with the NdbRecord part
+             of the read, so move onto NdbRecAttr */
+          ndbrecord_part_done=true;
         }
-        Uint32 sizeInWords= (attrSize+3)>>2;
-        aDataPtr+= sizeInWords;
-        aLength-= sizeInWords;
-        continue;
       }
-      column_count++;
+      else
+      {
+        column_count++;
 
-      const NdbRecord::Attr *col= &rec->columns[rec_pos];
+        const NdbRecord::Attr *col= &rec->columns[rec_pos];
 
-      /* We should never get back an attribute not originally requested. */
-      assert(rec_pos < rec->noOfColumns && col->attrId == attrId);
+        /* We should never get back an attribute not originally requested. */
+        assert(rec_pos < rec->noOfColumns && col->attrId == attrId);
 
-      /* Blobs heads are read with getValue(), not using NdbRecord. */
-      assert((col->flags & NdbRecord::IsBlob) == 0);
-      /*
-        The fast path is for a plain offset/length column (not
-        mysqld-format bit field).
-      */
-      if (likely(!(col->flags & NdbRecord::IsMysqldBitfield)))
-      {
-        if (attrSize == 0)
-        {
-          setRecToNULL(col, m_record.m_row);
-        }
-        else
+        /* Blobs heads are read with getValue(), not using NdbRecord. */
+        assert((col->flags & NdbRecord::IsBlob) == 0);
+        /*
+          The fast path is for a plain offset/length column (not
+          mysqld-format bit field).
+        */
+        if (likely(!(col->flags & NdbRecord::IsMysqldBitfield)))
         {
-          assert(attrSize <= col->maxSize);
-          Uint32 sizeInWords= (attrSize+3)>>2;
-          /* Not sure how to deal with this, shouldn't happen. */
-          if (unlikely(sizeInWords > aLength))
+          if (attrSize == 0)
           {
-            sizeInWords= aLength;
-            attrSize= 4*aLength;
+            setRecToNULL(col, m_record.m_row);
+          }
+          else
+          {
+            assert(attrSize <= col->maxSize);
+            Uint32 sizeInWords= (attrSize+3)>>2;
+            /* Not sure how to deal with this, shouldn't happen. */
+            if (unlikely(sizeInWords > aLength))
+            {
+              sizeInWords= aLength;
+              attrSize= 4*aLength;
+            }
+
+            assignToRec(col, m_record.m_row, aDataPtr, attrSize);
+            aDataPtr+= sizeInWords;
+            aLength-= sizeInWords;
           }
-
-          assignToRec(col, m_record.m_row, aDataPtr, attrSize);
-          aDataPtr+= sizeInWords;
-          aLength-= sizeInWords;
-        }
-      }
-      else
-      {
-        /* Mysqld format bitfield. */
-        if (attrSize == 0)
-        {
-          setRecToNULL(col, m_record.m_row);
         }
         else
         {
-          assert(attrSize == col->maxSize);
-          Uint32 sizeInWords= (attrSize+3)>>2;
-          if (col->flags & NdbRecord::IsNullable)
-            m_record.m_row[col->nullbit_byte_offset]&=
-              ~(1 << col->nullbit_bit_in_byte);
-          col->put_mysqld_bitfield(m_record.m_row, (const char *)aDataPtr);
-          aDataPtr+= sizeInWords;
-          aLength-= sizeInWords;
+          /* Mysqld format bitfield. */
+          if (attrSize == 0)
+          {
+            setRecToNULL(col, m_record.m_row);
+          }
+          else
+          {
+            assert(attrSize == col->maxSize);
+            Uint32 sizeInWords= (attrSize+3)>>2;
+            if (col->flags & NdbRecord::IsNullable)
+              m_record.m_row[col->nullbit_byte_offset]&=
+                ~(1 << col->nullbit_bit_in_byte);
+            col->put_mysqld_bitfield(m_record.m_row, (const char *)aDataPtr);
+            aDataPtr+= sizeInWords;
+            aLength-= sizeInWords;
+          }
         }
+        rec_pos++;
       }
-      rec_pos++;
-    }
-
-    m_received_result_length = tmp;
-    m_record.m_row+= m_record.m_row_offset;
-
-    return (tmp == exp || (exp > TcKeyConf::DirtyReadBit) ? 1 : 0);
-  }
-
-  /* The old way, using getValue() and NdbRecAttr. */
-  NdbRecAttr* currRecAttr = theCurrentRecAttr;
-  
-  for (Uint32 used = 0; used < aLength ; used++){
-    AttributeHeader ah(* aDataPtr++);
-    const Uint32 tAttrId = ah.getAttributeId();
-    const Uint32 tAttrSize = ah.getByteSize();
+    } // / if (!ndbrecord_part_done)
     
-    if (tAttrId == AttributeHeader::READ_PACKED)
+    // Any NdbRecAttr parts to work on?
+    if (ndbrecord_part_done)
     {
-      NdbRecAttr* tmp = currRecAttr;
-      Uint32 len = receive_packed(&tmp, tAttrSize>>2, aDataPtr, aLength); 
-      aDataPtr += len;
-      used += len;
-      currRecAttr = tmp;
-      continue;
-    }
-    
-    /**
-     * Set all results to NULL if  not found...
-     */
-    while(currRecAttr && currRecAttr->attrId() != tAttrId){
-      currRecAttr = currRecAttr->next();
-    }
-    
-    if(currRecAttr && currRecAttr->receive_data(aDataPtr, tAttrSize)){
-      Uint32 add= (tAttrSize + 3) >> 2;
-      used += add;
-      aDataPtr += add;
-      currRecAttr = currRecAttr->next();
-    } else {
-      /*
-        This should not happen: we got back an attribute for which we have no
-        stored NdbRecAttr recording that we requested said attribute (or we got
-        back attributes in the wrong order).
-        So dump some info for debugging, and abort.
-      */
-      ndbout_c("this=%p: tAttrId: %d currRecAttr: %p theCurrentRecAttr: %p "
-               "tAttrSize: %d %d", this,
-	       tAttrId, currRecAttr, theCurrentRecAttr, tAttrSize,
-               currRecAttr ? currRecAttr->get_size_in_bytes() : 0);
-      currRecAttr = theCurrentRecAttr;
-      while(currRecAttr != 0){
-	ndbout_c("%d ", currRecAttr->attrId());
-	currRecAttr = currRecAttr->next();
+      // We've processed the NdbRecord part of the TRANSID_AI, if
+      // any.  There are signal words left, so they must be
+      // RecAttr data
+      //
+      if (attrId == AttributeHeader::READ_PACKED)
+      {
+        assert(!m_using_ndb_record);
+        NdbRecAttr* tmp = currRecAttr;
+        Uint32 len = receive_packed(&tmp, attrSize>>2, aDataPtr, origLength); 
+        aDataPtr += len;
+        aLength -= len;
+        currRecAttr = tmp;
+        continue;
+      }
+      /**
+       * Skip over missing attributes
+       */
+      while(currRecAttr && currRecAttr->attrId() != attrId){
+            currRecAttr = currRecAttr->next();
       }
-      abort();
-      return -1;
-    }
-  }
+
+      if(currRecAttr && currRecAttr->receive_data(aDataPtr, attrSize))
+      {
+        Uint32 add= (attrSize + 3) >> 2;
+        aLength -= add;
+        aDataPtr += add;
+        currRecAttr = currRecAttr->next();
+      } else {
+        /*
+          This should not happen: we got back an attribute for which we have no
+          stored NdbRecAttr recording that we requested said attribute (or we got
+          back attributes in the wrong order).
+          So dump some info for debugging, and abort.
+        */
+        ndbout_c("this=%p: attrId: %d currRecAttr: %p theCurrentRecAttr: %p "
+                 "attrSize: %d %d", this,
+	         attrId, currRecAttr, theCurrentRecAttr, attrSize,
+                 currRecAttr ? currRecAttr->get_size_in_bytes() : 0);
+        currRecAttr = theCurrentRecAttr;
+        while(currRecAttr != 0){
+	  ndbout_c("%d ", currRecAttr->attrId());
+	  currRecAttr = currRecAttr->next();
+        }
+        abort();
+        return -1;
+      } // if (currRecAttr...)      
+    } // /if (ndbrecord_part_done)
+  } // / while (aLength > 0)
 
   theCurrentRecAttr = currRecAttr;
-  
-  /**
-   * Update m_received_result_length
-   */
-  Uint32 exp = m_expected_result_length; 
-  Uint32 tmp = m_received_result_length + aLength;
+
   m_received_result_length = tmp;
+
+  if (m_using_ndb_record) {
+    m_record.m_row+= m_record.m_row_offset;
+  }
 
   return (tmp == exp || (exp > TcKeyConf::DirtyReadBit) ? 1 : 0);
 }
diff -Nrup a/storage/ndb/src/ndbapi/NdbScanFilter.cpp b/storage/ndb/src/ndbapi/NdbScanFilter.cpp
--- a/storage/ndb/src/ndbapi/NdbScanFilter.cpp	2007-10-14 19:51:08 +04:00
+++ b/storage/ndb/src/ndbapi/NdbScanFilter.cpp	2008-02-02 18:28:01 +03:00
@@ -120,6 +120,16 @@ NdbScanFilter::~NdbScanFilter(){
 int
 NdbScanFilter::begin(Group group){
 
+  // TODO : REMOVE when ScanFilter fixed for NdbRecord //
+  if (m_impl.m_operation->theStatus == NdbOperation::UseNdbRecord)
+  {
+    // Don't currently support ScanFilter definition for
+    // NdbRecord - will be fixed
+    m_impl.m_operation->setErrorCodeAbort(4231); // Temporary error
+    /* Illegal state when calling interpreter routine */
+    return -1;
+  }
+  // /TODO hack
   if (m_impl.m_stack2.push_back(m_impl.m_negative))
   {
     m_impl.m_operation->setErrorCodeAbort(4000);
diff -Nrup a/storage/ndb/src/ndbapi/NdbScanOperation.cpp b/storage/ndb/src/ndbapi/NdbScanOperation.cpp
--- a/storage/ndb/src/ndbapi/NdbScanOperation.cpp	2007-10-30 14:09:45 +03:00
+++ b/storage/ndb/src/ndbapi/NdbScanOperation.cpp	2008-02-02 18:28:01 +03:00
@@ -399,19 +399,18 @@ NdbScanOperation::executeCursor(int node
   TransporterFacade* tp = theNdb->theImpl->m_transporter_facade;
   Guard guard(tp->theMutexPtr);
 
-  Uint32 magic = tCon->theMagicNumber;
   Uint32 seq = tCon->theNodeSequence;
 
   if (tp->get_node_alive(nodeId) &&
       (tp->getNodeSequence(nodeId) == seq)) {
 
+    tCon->theMagicNumber = 0x37412619;
+
     /**
-     * Only call prepareSendScan first time (incase of restarts)
-     *   - check with theMagicNumber
+     * Call prepareSendScan() for old style scans only
      */
-    tCon->theMagicNumber = 0x37412619;
-    if(magic != 0x37412619 && 
-       prepareSendScan(tCon->theTCConPtr, tCon->theTransactionId) == -1)
+    if(theStatus != UseNdbRecord &&
+       (prepareSendScan(tCon->theTCConPtr, tCon->theTransactionId) == -1))
       return -1;
     
     
@@ -468,7 +467,7 @@ int NdbScanOperation::nextResult(bool fe
 
 /* nextResult() for NdbRecord operation. */
 int
-NdbScanOperation::nextResult(const char * & out_row,
+NdbScanOperation::nextResult(const char ** out_row_ptr,
                              bool fetchAllowed, bool forceSend)
 {
   int res;
@@ -479,7 +478,7 @@ NdbScanOperation::nextResult(const char 
     return -1;
   }
 
-  if ((res = nextResultNdbRecord(out_row, fetchAllowed, forceSend)) == 0) {
+  if ((res = nextResultNdbRecord(*out_row_ptr, fetchAllowed, forceSend)) == 0) {
     NdbBlob* tBlob= theBlobList;
     NdbRecAttr *getvalue_recattr= theReceiver.theFirstRecAttr;
     if (unlikely(((UintPtr)tBlob | (UintPtr)getvalue_recattr) != 0))
@@ -883,7 +882,6 @@ NdbScanOperation::send_next_scan(Uint32 
 int 
 NdbScanOperation::prepareSend(Uint32  TC_ConnectPtr, Uint64  TransactionId)
 {
-  printf("NdbScanOperation::prepareSend\n");
   abort();
   return 0;
 }
@@ -891,7 +889,6 @@ NdbScanOperation::prepareSend(Uint32  TC
 int 
 NdbScanOperation::doSend(int ProcessorId)
 {
-  printf("NdbScanOperation::doSend\n");
   return 0;
 }
 
@@ -1011,7 +1008,7 @@ int NdbScanOperation::prepareSendScan(Ui
 
   theErrorLine = 0;
 
-  // In preapareSendInterpreted we set the sizes (word 4-8) in the
+  // In prepareSendInterpreted we set the sizes (word 4-8) in the
   // first ATTRINFO signal.
   if (prepareSendInterpreted() == -1)
     return -1;
@@ -1097,6 +1094,7 @@ int NdbScanOperation::prepareSendScan(Ui
   }
   else
   {
+    // Todo : remove when removing old scan implementation
     for(Uint32 i = 0; i<theParallelism; i++){
       if (m_receivers[i]->do_get_value(&theReceiver, batch_size, 
                                        key_size, 
@@ -1106,6 +1104,7 @@ int NdbScanOperation::prepareSendScan(Ui
       }
     }
   }
+
   return 0;
 }
 
@@ -1127,7 +1126,7 @@ NdbScanOperation::calcGetValueSize()
 }
 
 /*****************************************************************************
-int doSend()
+int doSendScan()
 
 Return Value:   Return >0 : send was succesful, returns number of signals sent
                 Return -1: In all other case.   
@@ -1154,9 +1153,14 @@ NdbScanOperation::doSendScan(int aProces
   Uint32 aTC_ConnectPtr = theNdbCon->theTCConPtr;
   Uint64 transId = theNdbCon->theTransactionId;
   
-  // Update the "attribute info length in words" in SCAN_TABREQ before 
-  // sending it. This could not be done in openScan because 
-  // we created the ATTRINFO signals after the SCAN_TABREQ signal.
+  /**
+   * Update the "attribute info length in words" in SCAN_TABREQ before 
+   * sending it. This could not be done in openScan because 
+   * we created the ATTRINFO signals after the SCAN_TABREQ signal.
+   * With NdbRecord defined scans, the length can be modified after
+   * the operation is defined, so the total length is still 
+   * calculated here.
+   */
   ScanTabReq * const req = CAST_PTR(ScanTabReq, tSignal->getDataPtrSend());
   if (unlikely(theTotalCurrAI_Len > ScanTabReq::MaxTotalAttrInfo)) {
     setErrorCode(4257);
@@ -1382,7 +1386,9 @@ NdbScanOperation::takeOverScanOpNdbRecor
                                           NdbTransaction* pTrans,
                                           const NdbRecord *record,
                                           char *row,
-                                          const unsigned char *mask)
+                                          const unsigned char *mask,
+                                          const NdbOperation::OperationOptions *opts,
+                                          Uint32 sizeOfOptions)
 {
   int res;
 
@@ -1440,42 +1446,78 @@ NdbScanOperation::takeOverScanOpNdbRecor
 
   op->m_attribute_row= row;
   record->copyMask(op->m_read_mask, mask);
+
+  if (opType == ReadRequest)
+  {
+    op->theLockMode= theLockMode;
+    /*
+     * Apart from taking over the row lock, we also support reading again,
+     * though typical usage will probably use an empty mask to read nothing.
+     */
+    op->theReceiver.getValues(record, row);
+  }
+  else if (opType == DeleteRequest && row != NULL)
+  {
+    /* Delete with a 'pre-read' - prepare the Receiver */
+    op->theReceiver.getValues(record, row);
+  }
+
+
+  /* Handle any OperationOptions */
+  if (opts != NULL)
+  {
+    /* Delegate to static method in NdbOperation */
+    Uint32 result = NdbOperation::handleOperationOptions (opType,
+                                                          opts,
+                                                          sizeOfOptions,
+                                                          op);
+    if (result != 0)
+    {
+      setErrorCodeAbort(result);
+      return NULL;
+    }
+  }
+
+
+  /* Setup Blob handles... */
   switch (opType)
   {
-    case ReadRequest:
-      op->theLockMode= theLockMode;
-      /*
-        Apart from taking over the row lock, we also support reading again,
-        though typical usage will probably use an empty mask to read nothing.
-      */
-      op->theReceiver.getValues(record, row);
+  case ReadRequest:
+  case UpdateRequest:
+    if (unlikely(record->flags & NdbRecord::RecHasBlob))
+    {
+      if (op->getBlobHandlesNdbRecord(pTrans) == -1)
+        return NULL;
+    }
+    
+    break;
 
-      if (unlikely(record->flags & NdbRecord::RecHasBlob))
-      {
-        if (op->getBlobHandlesNdbRecord(pTrans) == -1)
-          return NULL;
-      }
+  case DeleteRequest:
+    /* Create blob handles if required, to properly delete all blob parts
+     * Also, blob handles are written into the result record if 
+     * a pre-delete read was requested
+     */
+    if (unlikely(record->flags & NdbRecord::RecTableHasBlob))
+    {
+      if (op->getBlobHandlesNdbRecordDelete(pTrans) == -1)
+        return NULL;
+    }
+    break;
+  default:
+    assert(false);
+    return NULL;
+  }
 
-      break;
-    case UpdateRequest:
-      if (unlikely(record->flags & NdbRecord::RecHasBlob))
-      {
-        if (op->getBlobHandlesNdbRecord(pTrans) == -1)
-          return NULL;
-      }
+  /* Now prepare the signals to be sent...
+   */
+  int returnCode=op->buildSignalsNdbRecord(pTrans->theTCConPtr, 
+                                           pTrans->theTransactionId);
 
-      break;
-    case DeleteRequest:
-      /* Create blob handles if any, to properly delete all blob parts. */
-      if (unlikely(record->flags & NdbRecord::RecTableHasBlob))
-      {
-        if (op->getBlobHandlesDelete(pTrans) == -1)
-          return NULL;
-      }
-      break;
-    default:
-      assert(false);
-      return NULL;
+  if (returnCode)
+  {
+    // buildSignalsNdbRecord should have set the error status
+    // So we can return NULL
+    return NULL;
   }
 
   return op;
@@ -1694,6 +1736,22 @@ NdbIndexScanOperation::setBound(const Nd
 }
 
 int
+NdbIndexScanOperation::setBound(const NdbRecord* key_record,
+                                const IndexBound& bound)
+{
+  /* Need to call addIndexScanBound on the transaction that
+   * started this scan, not the transaction for this scan
+   * This is accessed via m_transConnection.
+   */
+  return m_transConnection->addIndexScanBound(this, key_record, bound);
+}
+
+/**
+ * insertBOUNDS
+ * Helper for ndbrecord_insert_bound, copying data into the
+ * signal train and linking in new signals as required.
+ */
+int
 NdbIndexScanOperation::insertBOUNDS(Uint32 * data, Uint32 sz){
   Uint32 len;
   Uint32 remaining = KeyInfo::DataLength - theTotalNrOfKeyWordInSignal;
@@ -1703,6 +1761,7 @@ NdbIndexScanOperation::insertBOUNDS(Uint
     memcpy(dst, data, 4 * len);
     
     if(sz >= remaining){
+      /* Need to spill data into another signal */
       NdbApiSignal* tCurr = theLastKEYINFO;
       tCurr->setLength(KeyInfo::MaxSignalLength);
       NdbApiSignal* tSignal = tCurr->next();
@@ -1710,6 +1769,7 @@ NdbIndexScanOperation::insertBOUNDS(Uint
 	;
       else if((tSignal = theNdb->getSignal()) != 0)
       {
+        /* Link new signal into train and set type */
 	tCurr->next(tSignal);
 	tSignal->setSignal(GSN_KEYINFO);
       } else {
@@ -1733,6 +1793,8 @@ error:
   return -1;
 }
 
+
+
 int
 NdbIndexScanOperation::ndbrecord_insert_bound(const NdbRecord *key_record,
                                               Uint32 column_index,
@@ -1798,6 +1860,8 @@ NdbIndexScanOperation::ndbrecord_insert_
       memcpy(tempData+2, aValue, len);
       insertBOUNDS(tempData, 2+sizeInWords);
     } else {
+      /* No alignment or zeroing required, just
+       * need to spill into another signal */
       Uint32 buf[2] = { bound_type, ahValue };
       insertBOUNDS(buf, 2);
       insertBOUNDS((Uint32*)aValue, sizeInWords);
@@ -1882,7 +1946,9 @@ NdbIndexScanOperation::readTuples(LockMo
   }
   m_this_bound_start = 0;
   m_first_bound_word = theKEYINFOptr;
-  
+  m_num_bounds = 0;
+  m_previous_range_num = 0;
+
   return res;
 }
 
@@ -2507,40 +2573,6 @@ NdbScanOperation::reset_receivers(Uint32
   m_current_api_receiver = 0;
   m_sent_receivers_count = 0;
   m_conf_receivers_count = 0;
-}
-
-int
-NdbScanOperation::restart(bool forceSend)
-{
-  
-  TransporterFacade* tp = theNdb->theImpl->m_transporter_facade;
-  /*
-    The PollGuard has an implicit call of unlock_and_signal through the
-    ~PollGuard method. This method is called implicitly by the compiler
-    in all places where the object is out of context due to a return,
-    break, continue or simply end of statement block
-  */
-  PollGuard poll_guard(tp, &theNdb->theImpl->theWaiter,
-                       theNdb->theNdbBlockNumber);
-  Uint32 nodeId = theNdbCon->theDBnode;
-  
-  {
-    int res;
-    if((res= close_impl(tp, forceSend, &poll_guard)))
-    {
-      return res;
-    }
-  }
-  
-  /**
-   * Reset receivers
-   */
-  reset_receivers(theParallelism, m_ordered);
-  
-  theError.code = 0;
-  if (doSendScan(nodeId) == -1)
-    return -1;
-  return 0;
 }
 
 int
diff -Nrup a/storage/ndb/src/ndbapi/NdbTransaction.cpp b/storage/ndb/src/ndbapi/NdbTransaction.cpp
--- a/storage/ndb/src/ndbapi/NdbTransaction.cpp	2007-12-10 09:07:27 +03:00
+++ b/storage/ndb/src/ndbapi/NdbTransaction.cpp	2008-02-02 18:28:01 +03:00
@@ -289,6 +289,22 @@ NdbTransaction::execute(ExecType aTypeOf
    * - blob error does not terminate execution
    * - blob error sets error on operation
    * - if error on operation skip blob calls
+   *
+   * In the call to preExecute(), each operation involving blobs can 
+   * add (and execute) extra operations before (reads) and after 
+   * (writes) the operation on the main row.
+   * In the call to postExecute(), each blob can add extra read and
+   * write operations to be executed immediately
+   * It is assumed that all operations added in preExecute() are
+   * defined 'before' operations added in postExecute().
+   * To facilitate this, the transaction's list of operations is 
+   * pre-emptively split when a Blob operation is encountered.
+   * preExecute can add operations before and after the operation being
+   * processed, and if no batch execute is required, the list is rejoined.
+   * If batch execute is required, then execute() is performed, and then
+   * the postExecute() actions (which can add operations) are called before
+   * the list is rejoined.  See NdbBlob::preExecute() and 
+   * NdbBlob::postExecute() for more info.
    */
 
   ExecType tExecType;
@@ -298,12 +314,26 @@ NdbTransaction::execute(ExecType aTypeOf
 
   int ret = 0;
   do {
+    NdbOperation* firstSavedOp= NULL;
+    NdbOperation* lastSavedOp= NULL;
+
     tExecType = aTypeOfExec;
     tPrepOp = theFirstOpInList;
     while (tPrepOp != NULL) {
       if (tPrepOp->theError.code == 0) {
         bool batch = false;
         NdbBlob* tBlob = tPrepOp->theBlobList;
+        if (tBlob !=NULL) {
+          /* We split the operation list just after this
+           * operation, in case it adds extra ops
+           */
+          firstSavedOp = tPrepOp->next(); // Could be NULL
+          lastSavedOp = theLastOpInList;
+          DBUG_PRINT("info", ("Splitting ops list between %p and %p",
+                              firstSavedOp, lastSavedOp));
+          tPrepOp->next(NULL);
+          theLastOpInList= tPrepOp;
+        }
         while (tBlob != NULL) {
           if (tBlob->preExecute(tExecType, batch) == -1)
 	  {
@@ -314,24 +344,30 @@ NdbTransaction::execute(ExecType aTypeOf
           tBlob = tBlob->theNext;
         }
         if (batch) {
-          // blob asked to execute all up to here now
+          // blob asked to execute all up to lastOpInBatch now
           tExecType = NoCommit;
           break;
         }
+        else {
+          /* No batching yet - rejoin the current and
+           * saved operation lists
+           */
+          DBUG_PRINT("info", ("Rejoining ops list after preExecute between %p and %p",
+                              theLastOpInList,
+                              firstSavedOp));
+          if (firstSavedOp != NULL && lastSavedOp != NULL) {
+            if (theFirstOpInList == NULL)
+              theFirstOpInList = firstSavedOp;
+            else
+              theLastOpInList->next(firstSavedOp);
+            theLastOpInList = lastSavedOp;
+          }
+          firstSavedOp= lastSavedOp= NULL;
+        }
       }
       tPrepOp = tPrepOp->next();
     }
 
-    // save rest of prepared ops if batch
-    NdbOperation* tRestOp= 0;
-    NdbOperation* tLastOp= 0;
-    if (tPrepOp != NULL) {
-      tRestOp = tPrepOp->next();
-      tPrepOp->next(NULL);
-      tLastOp = theLastOpInList;
-      theLastOpInList = tPrepOp;
-    }
-
     if (tExecType == Commit) {
       NdbOperation* tOp = theCompletedFirstOp;
       while (tOp != NULL) {
@@ -370,10 +406,10 @@ NdbTransaction::execute(ExecType aTypeOf
     {
       if(savedError.code==0)
 	savedError= theError;
-      
       DBUG_RETURN(-1);
     }
-    
+
+
 #ifdef ndb_api_crash_on_complex_blob_abort
     assert(theFirstOpInList == NULL && theLastOpInList == NULL);
 #else
@@ -400,13 +436,16 @@ NdbTransaction::execute(ExecType aTypeOf
       }
     }
 
-    // add saved prepared ops if batch
-    if (tPrepOp != NULL && tRestOp != NULL) {
+    // Restore any saved prepared ops if we batched
+    if (firstSavedOp != NULL && lastSavedOp != NULL) {
+      DBUG_PRINT("info", ("Rejoining ops list after postExecute between %p and %p",
+                          theLastOpInList,
+                          firstSavedOp));
       if (theFirstOpInList == NULL)
-        theFirstOpInList = tRestOp;
+        theFirstOpInList = firstSavedOp;
       else
-        theLastOpInList->next(tRestOp);
-      theLastOpInList = tLastOp;
+        theLastOpInList->next(firstSavedOp);
+      theLastOpInList = lastSavedOp;
     }
     assert(theFirstOpInList == NULL || tExecType == NoCommit);
   } while (theFirstOpInList != NULL || tExecType != aTypeOfExec);
@@ -672,8 +711,7 @@ NdbTransaction::executeAsynchPrepare(Ndb
     NdbOperation* tNextOp = tOp->next();
 
     if (tOp->Status() == NdbOperation::UseNdbRecord)
-      tReturnCode= tOp->prepareSendNdbRecord(theTCConPtr, theTransactionId,
-                                             abortOption);
+      tReturnCode = tOp->prepareSendNdbRecord(abortOption);
     else
       tReturnCode= tOp->prepareSend(theTCConPtr, theTransactionId, abortOption);
 
@@ -2164,6 +2202,14 @@ NdbTransaction::getNdbErrorOperation()
   return theErrorOperation;
 }//NdbTransaction::getNdbErrorOperation()
 
+
+const NdbOperation*
+NdbTransaction::getNdbErrorOperation() const
+{
+  return theErrorOperation;
+}//NdbTransaction::getNdbErrorOperation()
+
+
 const NdbOperation * 
 NdbTransaction::getNextCompletedOperation(const NdbOperation * current) const {
   if(current == 0)
@@ -2174,16 +2220,17 @@ NdbTransaction::getNextCompletedOperatio
 NdbOperation *
 NdbTransaction::setupRecordOp(NdbOperation::OperationType type,
                               NdbOperation::LockMode lock_mode,
+                              NdbOperation::AbortOption default_ao,
                               const NdbRecord *key_record,
                               const char *key_row,
                               const NdbRecord *attribute_record,
                               const char *attribute_row,
                               const unsigned char *mask,
-                              const Uint32 *setPartitionId,
-                              const void *getSetValue,
-                              const NdbInterpretedCode *interpreted_code)
+                              const NdbOperation::OperationOptions *opts,
+                              Uint32 sizeOfOptions)
 {
   NdbOperation *op;
+
   /*
     We are actually passing the table object for the index here, not the table
     object of the underlying table. But we only need it to keep the existing
@@ -2217,29 +2264,72 @@ NdbTransaction::setupRecordOp(NdbOperati
   op->m_attribute_record= attribute_record;
   op->m_attribute_row= attribute_row;
   attribute_record->copyMask(op->m_read_mask, mask);
-
-  assert(getSetValue == NULL);                  // ToDo
-  if (setPartitionId != NULL)
+  op->m_abortOption=default_ao;
+  
+  /*
+   * Handle options
+   */
+  if (opts != NULL)
   {
-    op->theDistributionKey= *setPartitionId;
-    op->theDistrKeyIndicator_= 1;
+    /* Delegate to static method in NdbOperation */
+    Uint32 result = NdbOperation::handleOperationOptions (type,
+                                                          opts,
+                                                          sizeOfOptions,
+                                                          op);
+    if (result !=0)
+    {
+      setOperationErrorCodeAbort(result);
+      return NULL;
+    }
   }
-  op->m_interpreted_code= interpreted_code;
 
-  if (unlikely(attribute_record->flags & NdbRecord::RecHasBlob))
+  /* Handle delete + blobs */
+  if (type == NdbOperation::DeleteRequest)
+  {
+    /* Blobs reads not allowed in the delete result record */
+    if (unlikely(attribute_record->flags & NdbRecord::RecHasBlob))
+    {
+      setOperationErrorCodeAbort(4511);
+      /* Blobs not allowed in NdbRecord delete result record */
+      return NULL;
+    }
+
+    /* Need to link in all the Blob handles for delete */
+    if (op->getBlobHandlesNdbRecordDelete(this) == -1)
+      return NULL;
+  }
+  else if (unlikely(attribute_record->flags & NdbRecord::RecHasBlob))
   {
+    /* Create blob handles for non-delete operations */
     if (op->getBlobHandlesNdbRecord(this) == -1)
       return NULL;
   }
 
+  /*
+   * Now prepare the signals to be sent...
+   *
+   */
+  int returnCode=op->buildSignalsNdbRecord(theTCConPtr, theTransactionId);
+
+  if (returnCode)
+  {
+    // buildSignalsNdbRecord should have set the error status
+    // So we can return NULL
+    return NULL;
+  }
+
   return op;
 }
 
-NdbOperation *
+
+
+const NdbOperation *
 NdbTransaction::readTuple(const NdbRecord *key_rec, const char *key_row,
                           const NdbRecord *result_rec, char *result_row,
                           NdbOperation::LockMode lock_mode,
-                          const unsigned char *result_mask)
+                          const unsigned char *result_mask,
+                          const NdbOperation::OperationOptions *opts,
+                          Uint32 sizeOfOptions)
 {
   /* Check that the NdbRecord specifies the full primary key. */
   if (!(key_rec->flags & NdbRecord::RecHasAllKeys))
@@ -2256,12 +2346,15 @@ NdbTransaction::readTuple(const NdbRecor
   NdbOperation::OperationType opType=
     (lock_mode == NdbOperation::LM_Exclusive ?
        NdbOperation::ReadExclusive : NdbOperation::ReadRequest);
-  NdbOperation *op= setupRecordOp(opType, lock_mode, key_rec, key_row,
-                                  result_rec, result_row, result_mask);
+  NdbOperation *op= setupRecordOp(opType, lock_mode, 
+                                  NdbOperation::AO_IgnoreError,
+                                  key_rec, key_row, 
+                                  result_rec, result_row, result_mask,
+                                  opts,
+                                  sizeOfOptions);
   if (!op)
     return NULL;
 
-  op->m_abortOption= AO_IgnoreError;
   if (op->theLockMode == NdbOperation::LM_CommittedRead)
   {
     op->theDirtyIndicator= 1;
@@ -2284,9 +2377,11 @@ NdbTransaction::readTuple(const NdbRecor
   return op;
 }
 
-NdbOperation *
+const NdbOperation *
 NdbTransaction::insertTuple(const NdbRecord *rec, const char *row,
-                            const unsigned char *mask)
+                            const unsigned char *mask,
+                            const NdbOperation::OperationOptions *opts,
+                            Uint32 sizeOfOptions)
 {
   /* Check that the NdbRecord specifies the full primary key. */
   if (!(rec->flags & NdbRecord::RecHasAllKeys))
@@ -2296,24 +2391,26 @@ NdbTransaction::insertTuple(const NdbRec
   }
 
   NdbOperation *op= setupRecordOp(NdbOperation::InsertRequest,
-                                  NdbOperation::LM_Exclusive, rec, row,
-                                  rec, row, mask);
+                                  NdbOperation::LM_Exclusive, 
+                                  NdbOperation::AbortOnError, 
+                                  rec, row, 
+                                  rec, row, mask, 
+                                  opts,
+                                  sizeOfOptions);
   if (!op)
     return NULL;
 
-  op->m_abortOption = AbortOnError;
   theSimpleState= 0;
 
   return op;
 }
 
-NdbOperation *
+const NdbOperation *
 NdbTransaction::updateTuple(const NdbRecord *key_rec, const char *key_row,
                             const NdbRecord *attr_rec, const char *attr_row,
                             const unsigned char *mask,
-                            const Uint32 *setPartitionId,
-                            const void *getSetValue,
-                            const NdbInterpretedCode *interpreted_code)
+                            const NdbOperation::OperationOptions *opts,
+                            Uint32 sizeOfOptions)
 {
   /* Check that the NdbRecord specifies the full primary key. */
   if (!(key_rec->flags & NdbRecord::RecHasAllKeys))
@@ -2323,20 +2420,28 @@ NdbTransaction::updateTuple(const NdbRec
   }
 
   NdbOperation *op= setupRecordOp(NdbOperation::UpdateRequest,
-                                  NdbOperation::LM_Exclusive, key_rec, key_row,
-                                  attr_rec, attr_row, mask, setPartitionId,
-                                  getSetValue, interpreted_code);
+                                  NdbOperation::LM_Exclusive, 
+                                  NdbOperation::AbortOnError, 
+                                  key_rec, key_row,
+                                  attr_rec, attr_row, mask, 
+                                  opts,
+                                  sizeOfOptions);
   if(!op)
     return op;
 
-  op->m_abortOption = AbortOnError;
   theSimpleState= 0;
 
   return op;
 }
 
-NdbOperation *
-NdbTransaction::deleteTuple(const NdbRecord *key_rec, const char *key_row)
+const NdbOperation *
+NdbTransaction::deleteTuple(const NdbRecord *key_rec, 
+                            const char *key_row,
+                            const NdbRecord *result_rec,
+                            char *result_row,
+                            const unsigned char *result_mask,
+                            const NdbOperation::OperationOptions* opts,
+                            Uint32 sizeOfOptions)
 {
   /* Check that the NdbRecord specifies the full primary key. */
   if (!(key_rec->flags & NdbRecord::RecHasAllKeys))
@@ -2345,29 +2450,38 @@ NdbTransaction::deleteTuple(const NdbRec
     return NULL;
   }
 
+  bool readBeforeDelete=(result_rec && result_row != NULL);
+
+  /* If no data to read, pass in key_rec as attr_rec*/
+  const NdbRecord* result_record=readBeforeDelete?result_rec:key_rec;
+
   NdbOperation *op= setupRecordOp(NdbOperation::DeleteRequest,
-                                  NdbOperation::LM_Exclusive, key_rec, key_row,
-                                  key_rec, NULL, NULL);
+                                  NdbOperation::LM_Exclusive, 
+                                  NdbOperation::AbortOnError, 
+                                  key_rec, key_row,
+                                  result_record, result_row, result_mask, 
+                                  opts,
+                                  sizeOfOptions);
   if(!op)
     return op;
 
-  op->m_abortOption = AbortOnError;
   theSimpleState= 0;
 
-  /* Create blob handles if any, to properly delete all blob parts. */
-  if (unlikely(key_rec->flags & NdbRecord::RecTableHasBlob))
+  if (readBeforeDelete)
   {
-    if (op->getBlobHandlesDelete(this) == -1)
-      return NULL;
+    /* Setup the record/row for receiving the results. */
+    op->theReceiver.getValues(result_rec, result_row);
   }
 
   return op;
 }
 
-NdbOperation *
+const NdbOperation *
 NdbTransaction::writeTuple(const NdbRecord *key_rec, const char *key_row,
                            const NdbRecord *attr_rec, const char *attr_row,
-                           const unsigned char *mask)
+                           const unsigned char *mask,
+                           const NdbOperation::OperationOptions *opts,
+                           Uint32 sizeOfOptions)
 {
   /* Check that the NdbRecord specifies the full primary key. */
   if (!(key_rec->flags & NdbRecord::RecHasAllKeys))
@@ -2377,24 +2491,98 @@ NdbTransaction::writeTuple(const NdbReco
   }
 
   NdbOperation *op= setupRecordOp(NdbOperation::WriteRequest,
-                                  NdbOperation::LM_Exclusive, key_rec, key_row,
-                                  attr_rec, attr_row, mask);
+                                  NdbOperation::LM_Exclusive, 
+                                  NdbOperation::AbortOnError, 
+                                  key_rec, key_row,
+                                  attr_rec, attr_row, mask, 
+                                  opts,
+                                  sizeOfOptions);
   if(!op)
     return op;
 
-  op->m_abortOption = AbortOnError;
   theSimpleState= 0;
 
   return op;
 }
 
+/* Helper for scanTable and scanIndex */
+/* Note that op may be an NdbIndexScanOperation */
+int
+NdbTransaction::handleScanOptions(NdbScanOperation *op,
+                                  const ScanOptions *options)
+{
+  /* Options size has already been checked.
+   * scan_flags, parallel and batch have been handled
+   * already.
+   */
+  if ((options->optionsPresent & ScanOptions::SO_GETVALUE) &&
+      (options->numExtraGetValues > 0))
+  {
+    if (options->extraGetValues == NULL)
+    {
+      setOperationErrorCodeAbort(4299);
+      /* Incorrect combination of ScanOption flags, 
+       * extraGetValues ptr and numExtraGetValues */
+      return -1;
+    }
+
+    /* Add extra getValue()s */
+    for (unsigned int i=0; i < options->numExtraGetValues; i++)
+    {
+      NdbOperation::GetValueSpec *pvalSpec = &(options->extraGetValues[i]);
+
+      pvalSpec->recAttr=NULL;
+
+      if (pvalSpec->column == NULL)
+      {
+        setOperationErrorCodeAbort(4295);
+        // Column is NULL in Get/SetValueSpec structure
+        return -1;
+      }
+
+      /* Call internal NdbRecord specific getValue() method
+       * Same method handles table scans and index scans
+       */
+      NdbRecAttr *pra=
+        op->getValue_NdbRecord_scan(&NdbColumnImpl::getImpl(*pvalSpec->column),
+                                    (char *) pvalSpec->appStorage);
+        
+      if (pra == NULL)
+      {
+        return -1;
+      }
+      
+      pvalSpec->recAttr = pra;
+    }
+  }
+
+  if (options->optionsPresent & ScanOptions::SO_PARTITION_ID)
+  {
+    /* Should not have any blobs defined at this stage */
+    assert(op->theBlobList == NULL);
+    op->theDistributionKey = options->partitionId;
+    op->theDistrKeyIndicator_ = 1;
+    DBUG_PRINT("info", ("NdbOperation::setPartitionId: %u",
+                        op->theDistributionKey));
+  }
+
+  if (options->optionsPresent & ScanOptions::SO_INTERPRETED)
+  {
+    /* TODO : Implement or remove */
+    setOperationErrorCodeAbort(4298);
+    /* Invalid or unsupported ScanOptions structure */
+    return -1;
+  }
+
+  return 0;
+}
+
 NdbScanOperation *
 NdbTransaction::scanTable(const NdbRecord *result_record,
                           NdbOperation::LockMode lock_mode,
                           const unsigned char *result_mask,
-                          Uint32 scan_flags,
-                          Uint32 parallel,
-                          Uint32 batch)
+                          const ScanOptions *options,
+                          Uint32 sizeOfOptions)
 {
   /*
     Normal scan operations are created as NdbIndexScanOperations.
@@ -2403,10 +2591,14 @@ NdbTransaction::scanTable(const NdbRecor
   */
   NdbIndexScanOperation *op_idx;
   NdbScanOperation *op;
+  NdbTransaction *scanTransaction = NULL;
   NdbBlob *lastBlob;
   Uint32 column_count;
   int res;
   bool haveBlob= false;
+  Uint32 scan_flags = 0;
+  Uint32 parallel = 0;
+  Uint32 batch = 0;
 
   op_idx= getNdbScanOperation(result_record->table);
   if (op_idx==NULL)
@@ -2416,6 +2608,33 @@ NdbTransaction::scanTable(const NdbRecor
   }
   op= op_idx;
 
+  if (options != NULL)
+  {
+    /* Check options size for versioning... */
+    if (unlikely((sizeOfOptions !=0) &&
+                 (sizeOfOptions != sizeof(ScanOptions))))
+    {
+      /* Handle different sized ScanOptions
+       * Probably smaller is old version, larger is new version
+       */
+      
+      /* No other versions supported currently */
+      setOperationErrorCodeAbort(4298);
+      /* Invalid or unsupported ScanOptions structure */
+      goto giveup_err;
+    }
+    
+    /* Process some initial ScanOptions - most are 
+     * handled later
+     */
+    if (options->optionsPresent & ScanOptions::SO_SCANFLAGS)
+      scan_flags = options->scan_flags;
+    if (options->optionsPresent & ScanOptions::SO_PARALLEL)
+      parallel = options->parallel;
+    if (options->optionsPresent & ScanOptions::SO_BATCH)
+      batch = options->batch;
+  }
+
   res= op->readTuples(lock_mode, scan_flags, parallel, batch);
   if (res==-1)
     goto giveup_err;
@@ -2466,12 +2685,30 @@ NdbTransaction::scanTable(const NdbRecor
   op->theStatus= NdbOperation::UseNdbRecord;
   op->m_attribute_record= result_record;
 
+  /* Handle scan options */
+  if (options != NULL)
+  {
+    if (handleScanOptions(op, options) != 0)
+      goto giveup_err;
+  }
+
   if (unlikely(haveBlob))
   {
     if (op->getBlobHandlesNdbRecord(this) == -1)
       goto giveup_err;
   }
 
+  /* Scan is now fully defined, so let's start preparing
+   * signals.
+   */
+  /* Get ptr to scan's transaction object */
+  scanTransaction=op->theNdbCon;
+
+  if (op->prepareSendScan(scanTransaction->theTCConPtr, 
+                          scanTransaction->theTransactionId) == -1)
+    /* Error code should be set */
+    goto giveup_err;
+  
   return op_idx;
 
  giveup_err:
@@ -2563,30 +2800,191 @@ set_distribution_key_from_range(NdbIndex
 #endif
 }
 
+
+/** 
+ * addIndexScanBound
+ *
+ * This method is called from NdbTransaction::scanIndex() and
+ * NdbIndexScanOperation::setBound().  It adds a bound to an
+ * NdbRecord defined operation
+ */
+int 
+NdbTransaction::addIndexScanBound(NdbIndexScanOperation *sop,
+                                  const NdbRecord *key_record,
+                                  const NdbIndexScanOperation::IndexBound& bound)
+{
+  /*
+    Set up index range bounds, write into keyinfo.
+    
+    ToDo: We need to compare each attribute lower with upper bound, and use an
+    x=a condition instead of a<=x<=b as appropriate.
+    ToDo : Compare ptrs, or compare actual values?
+    
+    ToDo: We also need to send distribution key, but _only_ if distribution
+    key is scanned equal to the same value for all ranges (see BUG#25821).
+  */
+
+  if (unlikely((sop->theStatus != NdbOperation::UseNdbRecord)))
+  {
+    setOperationErrorCodeAbort(4284);
+    /* Cannot mix NdbRecAttr and NdbRecord operations */
+    return -1;
+  }
+
+  if (unlikely(key_record == NULL))
+  {
+    setOperationErrorCodeAbort(4285);
+    /* NULL NdbRecord pointer */
+    return -1;
+  }
+
+  sop->m_num_bounds++;
+
+  if (unlikely((sop->m_num_bounds > 1) &&
+               (sop->m_multi_range == 0)))
+  {
+    /* > 1 IndexBound, but not MRR */
+    setOperationErrorCodeAbort(4509);
+    /* Non SF_MultiRange scan cannot have more than one bound */
+    return -1;
+  }
+
+  Uint32 j;
+  Uint32 key_count, common_key_count;
+  Uint32 range_no;
+  Uint32 bound_head;
+
+  range_no= bound.range_no;
+  if (unlikely(range_no >= (1 << 12))) /* TODO : Use constant ! */
+  {
+    setOperationErrorCodeAbort(4286);
+    return -1;
+  }
+
+  /* Check valid ordering of supplied range numbers */
+  if ( sop->m_read_range_no && sop->m_ordered )
+  {
+    if (unlikely((sop->m_num_bounds > 1) &&
+                 (range_no <= sop->m_previous_range_num))) 
+    {
+      setOperationErrorCodeAbort(4282);
+      /* range_no not strictly increasing in ordered multi-range index scan */
+      return -1;
+    }
+    
+    sop->m_previous_range_num= range_no;
+  }
+
+  key_count= bound.low_key_count;
+  common_key_count= key_count;
+  if (key_count < bound.high_key_count)
+    key_count= bound.high_key_count;
+  else
+    common_key_count= bound.high_key_count;
+
+  if (unlikely(key_count > key_record->key_index_length))
+  {
+    /* Too many keys specified for key bound. */
+    setOperationErrorCodeAbort(4281);
+    return -1;
+  }
+
+  for (j= 0; j<key_count; j++)
+  {
+    Uint32 bound_type;
+    /* If key is part of lower bound */
+    if (bound.low_key && j<bound.low_key_count)
+    {
+      /* Inclusive if defined, or matching rows can include this value */
+      bound_type= bound.low_inclusive  || j+1 < bound.low_key_count ?
+        NdbIndexScanOperation::BoundLE : NdbIndexScanOperation::BoundLT;
+      sop->ndbrecord_insert_bound(key_record, key_record->key_indexes[j],
+                                  bound.low_key, bound_type);
+    }
+    /* If key is part of upper bound */
+    if (bound.high_key && j<bound.high_key_count)
+    {
+      /* Inclusive if defined, or matching rows can include this value */
+      bound_type= bound.high_inclusive  || j+1 < bound.high_key_count ?
+        NdbIndexScanOperation::BoundGE : NdbIndexScanOperation::BoundGT;
+      sop->ndbrecord_insert_bound(key_record, key_record->key_indexes[j],
+                                  bound.high_key, bound_type);
+    }
+  }
+
+  /* Set the length of this bound
+   * Length = bound end - bound start
+   * Pack into Uint32 with range no and bound type as described 
+   * in KeyInfo.hpp
+   */
+  bound_head= *sop->m_first_bound_word;
+  bound_head|=
+    (sop->theTupKeyLen - sop->m_this_bound_start) << 16 | (range_no << 4);
+  *sop->m_first_bound_word= bound_head;
+  sop->m_first_bound_word= sop->theKEYINFOptr + sop->theTotalNrOfKeyWordInSignal;
+  sop->m_this_bound_start= sop->theTupKeyLen;
+
+  /*
+    ToDo: This does not work, as it does not properly handle collations.
+    Ie. scanning a='aAa' and a='AaA' should use the same distribution key
+    for non-binary collations.
+    Part of WL#3682 should provide the facilities to fix this.
+    Long-term, it should be made possible in the protocol to specify a
+    (potentially different) bitmap of fragments to scan for _each_ individual
+    range in multi-range scan.
+  */
+  // TODO : Re-enable with a) code to avoid CHAR types b) key equality
+  //        check via ptr equals rather than actual comparison.
+#if 0
+  /*
+    Now check if the range bounds a single distribution key. If so, we need
+    scan only a single fragment.
+    
+    ToDo: we do not attempt to identify the case where we have multiple
+    ranges, but they all bound the same single distribution key. It seems
+    not really worth the effort to optimise this case, better to fix the
+    multi-range protocol so that the distribution key could be specified
+    individually for each of the multiple ranges.
+  */
+  if (num_key_bounds==1)
+  {
+    Uint32 distkey_min= key_record->m_min_distkey_prefix_length;
+    if (distkey_min > 0 &&
+        common_key_count >= distkey_min &&
+        bound.low_key &&
+        bound.high_key &&
+        0==compare_index_row_prefix(key_record,
+                                    bound.low_key,
+                                    bound.high_key,
+                                    distkey_min))
+      set_distribution_key_from_range(op, key_record, bound.low_key, distkey_min);
+  } 
+#endif
+  return 0;
+} // ::addIndexScanBound();
+
+
 NdbIndexScanOperation *
-NdbTransaction::scanIndex(
-            const NdbRecord *key_record,
-            int (*get_key_bound_callback)(void *callback_data,
-                                          Uint32 bound_index,
-                                          NdbIndexScanOperation::IndexBound & bound),
-            void *callback_data,
-            Uint32 num_key_bounds,
-            const NdbRecord *result_record,
-            NdbOperation::LockMode lock_mode,
-            const unsigned char *result_mask,
-            Uint32 scan_flags,
-            Uint32 parallel,
-            Uint32 batch)
+NdbTransaction::scanIndex(const NdbRecord *key_record,
+                          const NdbRecord *result_record,
+                                NdbOperation::LockMode lock_mode,
+                          const unsigned char *result_mask,
+                          const NdbIndexScanOperation::IndexBound *bound,
+                          const ScanOptions *options,
+                          Uint32 sizeOfOptions)
 {
   NdbIndexScanOperation *op;
   const NdbTableImpl *index_table_impl; // The index schema object
   const NdbTableImpl *table_impl;       // The table schema object
   NdbBlob *lastBlob;
   int res;
-  Uint32 i,j;
-  Uint32 previous_range_no;
+  Uint32 i;
   Uint32 column_count;
   bool haveBlob= false;
+  Uint32 scan_flags = 0;
+  Uint32 parallel = 0;
+  Uint32 batch = 0;
+  NdbTransaction *scanTransaction = NULL;
 
   if (!(key_record->flags & NdbRecord::RecHasAllKeys))
   {
@@ -2619,6 +3017,33 @@ NdbTransaction::scanIndex(
   op->m_type= NdbOperation::OrderedIndexScan;
   op->m_currentTable= table_impl;
 
+  if (options != NULL)
+  {
+    /* Check options size for versioning... */
+    if (unlikely((sizeOfOptions !=0) &&
+                 (sizeOfOptions != sizeof(ScanOptions))))
+    {
+      /* Handle different sized ScanOptions
+       * Probably smaller is old version, larger is new version
+       */
+      
+      /* No other versions supported currently */
+      setOperationErrorCodeAbort(4298);
+      /* Invalid or unsupported ScanOptions structure */
+      goto giveup_err;
+    }
+    
+    /* Process some initial ScanOptions here
+     * The rest will be handled later
+     */
+    if (options->optionsPresent & ScanOptions::SO_SCANFLAGS)
+      scan_flags = options->scan_flags;
+    if (options->optionsPresent & ScanOptions::SO_PARALLEL)
+      parallel = options->parallel;
+    if (options->optionsPresent & ScanOptions::SO_BATCH)
+      batch = options->batch;
+  }
+
   op->m_attribute_record= result_record; // Mark using NdbRecord for readTuples
   res= op->readTuples(lock_mode, scan_flags, parallel, batch);
   if (res==-1)
@@ -2674,122 +3099,18 @@ NdbTransaction::scanIndex(
   op->theInitialReadSize= op->theTotalCurrAI_Len - 5;
 
   /*
-    Set up index range bounds, write into keyinfo.
-
-    ToDo: We need to compare each attribute lower with upper bound, and use an
-    x=a condition instead of a<=x<=b as appropriate.
-
-    ToDo: We also need to send distribution key, but _only_ if distribution
-    key is scanned equal to the same value for all ranges (see BUG#25821).
-  */
-
-  for (i= 0; i<num_key_bounds; i++)
+   * Set up first key bound, if present
+   * Extra bounds (MRR) can be added later
+   */
+  if (bound != NULL)
   {
-    NdbIndexScanOperation::IndexBound bound;
-    int res;
-    Uint32 key_count, common_key_count;
-    Uint32 range_no;
-    Uint32 bound_head;
-
-    res= get_key_bound_callback(callback_data, i, bound);
-    if (res==-1)
-    {
-      setOperationErrorCodeAbort(4293);
-      goto giveup_err;
-    }
-    range_no= bound.range_no;
-    if (range_no >= (1 << 12))
-    {
-      setOperationErrorCodeAbort(4286);
-      goto giveup_err;
-    }
-
-    if ( (scan_flags & NdbScanOperation::SF_ReadRangeNo) &&
-         (scan_flags & NdbScanOperation::SF_OrderBy) )
-    {
-      if (i>0 && previous_range_no >= range_no)
-      {
-        setOperationErrorCodeAbort(4282);
-        goto giveup_err;
-      }
-      previous_range_no= range_no;
-    }
-
-    key_count= bound.low_key_count;
-    common_key_count= key_count;
-    if (key_count < bound.high_key_count)
-      key_count= bound.high_key_count;
-    else
-      common_key_count= bound.high_key_count;
+    addIndexScanBound(op, key_record, *bound);
+  }
 
-    if (key_count > key_record->key_index_length)
-    {
-      /* Too many keys specified for key bound. */
-      setOperationErrorCodeAbort(4281);
+  if (options != NULL)
+  {
+    if (handleScanOptions(op, options) != 0)
       goto giveup_err;
-    }
-
-    for (j= 0; j<key_count; j++)
-    {
-      Uint32 bound_type;
-      if (bound.low_key && j<bound.low_key_count)
-      {
-        bound_type= bound.low_inclusive  || j+1 < bound.low_key_count ?
-            NdbIndexScanOperation::BoundLE : NdbIndexScanOperation::BoundLT;
-        op->ndbrecord_insert_bound(key_record, key_record->key_indexes[j],
-                                   bound.low_key, bound_type);
-      }
-      if (bound.high_key && j<bound.high_key_count)
-      {
-        bound_type= bound.high_inclusive  || j+1 < bound.high_key_count ?
-            NdbIndexScanOperation::BoundGE : NdbIndexScanOperation::BoundGT;
-        op->ndbrecord_insert_bound(key_record, key_record->key_indexes[j],
-                                   bound.high_key, bound_type);
-      }
-    }
-
-    /* Set the length of this bound. */
-    bound_head= *op->m_first_bound_word;
-    bound_head|=
-      (op->theTupKeyLen - op->m_this_bound_start) << 16 | (range_no << 4);
-    *op->m_first_bound_word= bound_head;
-    op->m_first_bound_word= op->theKEYINFOptr + op->theTotalNrOfKeyWordInSignal;
-    op->m_this_bound_start= op->theTupKeyLen;
-
-    /*
-      ToDo: This does not work, as it does not properly handle collations.
-      Ie. scanning a='aAa' and a='AaA' should use the same distribution key
-      for non-binary collations.
-      Part of WL#3682 should provide the facilities to fix this.
-      Long-term, it should be made possible in the protocol to specify a
-      (potentially different) bitmap of fragments to scan for _each_ individual
-      range in multi-range scan.
-    */
-#if 0
-    /*
-      Now check if the range bounds a single distribution key. If so, we need
-      scan only a single fragment.
-
-      ToDo: we do not attempt to identify the case where we have multiple
-      ranges, but they all bound the same single distribution key. It seems
-      not really worth the effort to optimise this case, better to fix the
-      multi-range protocol so that the distribution key could be specified
-      individually for each of the multiple ranges.
-    */
-    if (num_key_bounds==1)
-    {
-      Uint32 distkey_min= key_record->m_min_distkey_prefix_length;
-      if (distkey_min > 0 &&
-          common_key_count >= distkey_min &&
-          bound.low_key &&
-          bound.high_key &&
-          0==compare_index_row_prefix(key_record,
-                                      bound.low_key,
-                                      bound.high_key,
-                                      distkey_min))
-        set_distribution_key_from_range(op, key_record, bound.low_key, distkey_min);
-    }
-#endif
   }
 
   if (unlikely(haveBlob))
@@ -2798,50 +3119,26 @@ NdbTransaction::scanIndex(
       goto giveup_err;
   }
 
+  /* Scan is now mostly defined, so let's start preparing
+   * the signals and the receiver.
+   * Extra signals can be linked in when :
+   *  - Bounds are added
+   */
+  /* Get ptr to scan's transaction object */
+  scanTransaction=op->theNdbCon;
+
+  if (op->prepareSendScan(scanTransaction->theTCConPtr, 
+                          scanTransaction->theTransactionId) == -1)
+    /* Error code should be set */
+    goto giveup_err;
+
   return op;
 
  giveup_err:
   releaseScanOperation(&m_theFirstScanOperation, &m_theLastScanOperation, op);
   return NULL;
-}
+} // ::scanIndex();
 
-static int scanIndexCallback(void *callback_data,
-                             Uint32 bound_index,
-                             NdbIndexScanOperation::IndexBound & bound)
-{
-  const NdbIndexScanOperation::IndexBound *s=
-    (NdbIndexScanOperation::IndexBound *)callback_data;
-  bound= *s;
-  return 0;
-}
-
-NdbIndexScanOperation *
-NdbTransaction::scanIndex(const NdbRecord *key_record,
-                          const char *low_key,
-                          Uint32 low_key_count,
-                          bool low_inclusive,
-                          const char * high_key,
-                          Uint32 high_key_count,
-                          bool high_inclusive,
-                          const NdbRecord *result_record,
-                          NdbOperation::LockMode lock_mode,
-                          const unsigned char *result_mask,
-                          Uint32 scan_flags,
-                          Uint32 parallel,
-                          Uint32 batch)
-{
-  NdbIndexScanOperation::IndexBound s;
-  s.low_key= low_key;
-  s.low_key_count= low_key_count;
-  s.low_inclusive= low_inclusive;
-  s.high_key= high_key;
-  s.high_key_count= high_key_count;
-  s.high_inclusive= high_inclusive;
-  s.range_no= 0;
-  return scanIndex(key_record, scanIndexCallback, &s, 1,
-                   result_record, lock_mode, result_mask,
-                   scan_flags, parallel, batch);
-}
 
 
 #ifdef VM_TRACE
diff -Nrup a/storage/ndb/src/ndbapi/ndberror.c b/storage/ndb/src/ndbapi/ndberror.c
--- a/storage/ndb/src/ndbapi/ndberror.c	2007-12-10 09:07:27 +03:00
+++ b/storage/ndb/src/ndbapi/ndberror.c	2008-02-02 18:28:01 +03:00
@@ -552,6 +552,11 @@ ErrorBundle ErrorCodes[] = {
   { 4505, DMEC, AE, "NULL value not allowed in primary key search" },
   { 4506, DMEC, AE, "Missing getValue/setValue when calling execute" },
   { 4507, DMEC, AE, "Missing operation request when calling execute" },
+  { 4508, DMEC, AE, "GetValue not allowed for NdbRecord defined operation" },
+  { 4509, DMEC, AE, "Non SF_MultiRange scan cannot have more than one bound" },
+  { 4510, DMEC, AE, "User specified partition id not allowed for scan takeover operation" },
+  { 4511, DMEC, AE, "Blobs not allowed in NdbRecord delete result record" },
+  { 4512, DMEC, AE, "Incorrect combination of OperationOptions optionsPresent, extraGet/SetValues ptr and numExtraGet/SetValues" },
 
   { 4200, DMEC, AE, "Status Error when defining an operation" },
   { 4201, DMEC, AE, "Variable Arrays not yet supported" },
@@ -633,7 +638,7 @@ ErrorBundle ErrorCodes[] = {
   { 4281, DMEC, AE, "Too many keys specified for key bound in scanIndex" },
   { 4282, DMEC, AE, "range_no not strictly increasing in ordered multi-range index scan" },
   { 4283, DMEC, AE, "key_record in index scan is not an index ndbrecord" },
-  { 4284, DMEC, AE, "Cannot mix NdbRecAttr and NdbRecord operations" },
+  { 4284, DMEC, AE, "Cannot mix NdbRecAttr and NdbRecord methods in one operation" },
   { 4285, DMEC, AE, "NULL NdbRecord pointer" },
   { 4286, DMEC, AE, "Invalid range_no (must be < 4096)" },
   { 4287, DMEC, AE, "The key_record and attribute_record in primary key operation do not belong to the same table" },
@@ -644,6 +649,11 @@ ErrorBundle ErrorCodes[] = {
   { 4292, DMEC, AE, "NdbRecord for tuple access is not an index key NdbRecord" },
   { 4293, DMEC, AE, "Error returned from application scanIndex() callback" },
   { 4294, DMEC, AE, "Scan filter is too large, discarded" },
+  { 4295, DMEC, AE, "Column is NULL in Get/SetValueSpec structure" },
+  { 4296, DMEC, AE, "Invalid AbortOption" },
+  { 4297, DMEC, AE, "Invalid or unsupported OperationOptions structure" },
+  { 4298, DMEC, AE, "Invalid or unsupported ScanOptions structure" },
+  { 4299, DMEC, AE, "Incorrect combination of ScanOption flags, extraGetValues ptr and numExtraGetValues" },
 
   { NO_CONTACT_WITH_PROCESS, DMEC, AE,
     "No contact with the process (dead ?)."},
diff -Nrup a/storage/ndb/test/ndbapi/flexBench.cpp b/storage/ndb/test/ndbapi/flexBench.cpp
--- a/storage/ndb/test/ndbapi/flexBench.cpp	2007-02-20 12:34:37 +03:00
+++ b/storage/ndb/test/ndbapi/flexBench.cpp	2008-02-02 18:28:01 +03:00
@@ -588,7 +588,8 @@ static void* flexBenchThread(void* pArg)
   unsigned int      threadNo, threadBase;
   Ndb*              pNdb = NULL ;
   NdbConnection     *pTrans = NULL ;
-  NdbOperation**    pOps = NULL ;
+  const NdbOperation**    
+                    pOps = NULL ;
   StartType         tType ;
   StartType         tSaveType ;
   NdbRecAttr*       tTmp = NULL ;
@@ -620,7 +621,7 @@ static void* flexBenchThread(void* pArg)
 
   attrValue = (int*)malloc(nReadBuffSize) ;
   attrRefValue = (int*)malloc(nRefBuffSize) ;
-  pOps = (NdbOperation**)malloc(tNoOfTables*sizeof(NdbOperation*)) ;
+  pOps = (const NdbOperation**)malloc(tNoOfTables*sizeof(NdbOperation*)) ;
   pNdb = new Ndb(g_cluster_connection, "TEST_DB" );
   pRec= (NdbRecord **)calloc(tNoOfTables*3, sizeof(*pRec));
   pAttrSet= (unsigned char **)calloc(tNoOfTables, sizeof(*pAttrSet));
diff -Nrup a/storage/ndb/test/ndbapi/testBlobs.cpp b/storage/ndb/test/ndbapi/testBlobs.cpp
--- a/storage/ndb/test/ndbapi/testBlobs.cpp	2007-05-09 12:42:55 +04:00
+++ b/storage/ndb/test/ndbapi/testBlobs.cpp	2008-02-02 18:28:01 +03:00
@@ -138,6 +138,9 @@ printusage()
     << "  -pk2cs      PK2 charset or collation [" << d.m_pk2chr.m_cs << "]" << endl
     << "  -pk2part    partition primary table by PK2" << endl
     << "  -oneblob    only 1 blob attribute [default 2]" << endl
+    << "api styles for test/skip.  Don't apply to performance test" << endl
+    << "  a           NdbRecAttr(old) interface" << endl
+    << "  b           NdbRecord interface" << endl
     << "test cases for test/skip" << endl
     << "  k           primary key ops" << endl
     << "  i           hash index ops" << endl
@@ -153,7 +156,8 @@ printusage()
     << "  0           getValue / setValue" << endl
     << "  1           setActiveHook" << endl
     << "  2           readData / writeData" << endl
-    << "example: -test kn0 (need all 3 parts)" << endl
+    << "example: -test akn0 (need all 4 parts)" << endl
+    << "example: -test abkisrunwd012 (Everything except performance tests" << endl
     << "bug tests" << endl
     << "  -bug 4088   ndb api hang with mixed ops on index table" << endl
     << "  -bug 27018  middle partial part write clobbers rest of part" << endl
@@ -178,6 +182,7 @@ static Ndb* g_ndb = 0;
 static NdbDictionary::Dictionary* g_dic = 0;
 static NdbConnection* g_con = 0;
 static NdbOperation* g_opr = 0;
+static const NdbOperation* g_const_opr = 0;
 static NdbIndexOperation* g_opx = 0;
 static NdbScanOperation* g_ops = 0;
 static NdbBlob* g_bh1 = 0;
@@ -214,6 +219,9 @@ printerror(int line, const char* msg)
     if (g_opr != 0 && g_opr->getNdbError().code != 0) {
       ndbout << "opr: table=" << g_opr->getTableName() << " " << g_opr->getNdbError() << endl;
     }
+    if (g_const_opr != 0 && g_const_opr->getNdbError().code !=0) {
+      ndbout << "const_opr: table=" << g_const_opr->getTableName() << " " << g_const_opr->getNdbError() << endl;
+    }
     if (g_opx != 0 && g_opx->getNdbError().code != 0) {
       ndbout << "opx: table=" << g_opx->getTableName() << " " << g_opx->getNdbError() << endl;
     }
@@ -222,8 +230,8 @@ printerror(int line, const char* msg)
     }
     NdbOperation* ope = g_con->getNdbErrorOperation();
     if (ope != 0 && ope->getNdbError().code != 0) {
-      if (ope != g_opr && ope != g_opx && ope != g_ops)
-        ndbout << "ope: table=" << ope->getTableName() << " " << ope->getNdbError() << endl;
+      if (ope != g_opr && ope != g_const_opr && ope != g_opx && ope != g_ops)
+        ndbout << "ope: ptr=" << ope << " table=" << ope->getTableName() << " type= "<< ope->getType() << " " << ope->getNdbError() << endl;
     }
   }
   if (g_bh1 != 0 && g_bh1->getNdbError().code != 0) {
@@ -632,7 +640,17 @@ calcTups(bool keys, bool keepsize = fals
 }
 
 // blob handle ops
+// const version for NdbRecord defined operations
+static int
+getBlobHandles(const NdbOperation* opr)
+{
+  CHK((g_bh1 = opr->getBlobHandle("BL1")) != 0);
+  if (! g_opt.m_oneblob)
+    CHK((g_bh2 = opr->getBlobHandle("BL2")) != 0);
+  return 0;
+}
 
+// non-const version for NdbRecAttr defined operations
 static int
 getBlobHandles(NdbOperation* opr)
 {
@@ -642,6 +660,7 @@ getBlobHandles(NdbOperation* opr)
   return 0;
 }
 
+
 static int
 getBlobHandles(NdbScanOperation* ops)
 {
@@ -716,6 +735,29 @@ getBlobValue(const Tup& tup)
   return 0;
 }
 
+/* 
+ * presetBH1
+ * This method controls how BL1 is pre-set (using setValue()) for 
+ * inserts and writes that later use writeData to set the correct 
+ * value.
+ * Sometimes it is set to length zero, other times to the value
+ * for some other row in the dataset.  This tests that the writeData()
+ * functionality correctly overwrites values written in the 
+ * prepare phase.
+ */
+static int presetBH1(int rowNumber)
+{
+  unsigned int variant = urandom(2);
+  DBG("presetBH1 - Variant=" << variant);
+  if (variant==0) 
+    CHK(g_bh1->setValue("", 0) == 0);
+  else
+  {
+    CHK(setBlobValue(g_tups[(rowNumber+1) % g_opt.m_rows]) == 0); // Pre-set to something else
+  };
+  return 0;
+}
+
 static int
 verifyBlobValue(NdbBlob* h, const Bval& v)
 {
@@ -766,6 +808,7 @@ writeBlobData(NdbBlob* h, const Bval& v)
     CHK(h->truncate(v.m_len) == 0 || h->getNdbError().code == error_code);
     if (error_code)
       return 0;
+    CHK(h->setPos(0) == 0); // Reset write pointer in case there was a previous write.
     unsigned n = 0;
     do {
       unsigned m = g_opt.m_full ? v.m_len : urandom(v.m_len + 1);
@@ -1098,33 +1141,60 @@ static const char* stylename[3] = {
   "style=readData/writeData"
 };
 
+// Blob API variants
+static const char* apiName[2] = {
+  "api=NdbRecAttr",
+  "api=NdbRecord"
+};
+
+static const char apiSymbol[2] = {
+  'a',  // RecAttr
+  'b'   // NdbRecord
+};
+
+static const int API_RECATTR=0;
+static const int API_NDBRECORD=1;
+
 // pk ops
 
 static int
-insertPk(int style)
+insertPk(int style, int api)
 {
-  DBG("--- insertPk " << stylename[style] << " ---");
+  DBG("--- insertPk " << stylename[style] << " " << apiName[api] << " ---");
   unsigned n = 0;
   CHK((g_con = g_ndb->startTransaction()) != 0);
   for (unsigned k = 0; k < g_opt.m_rows; k++) {
     Tup& tup = g_tups[k];
     DBG("insertPk pk1=" << hex << tup.m_pk1);
-    memcpy(&tup.m_row[g_pk1_offset], &tup.m_pk1, sizeof(tup.m_pk1));
-    if (g_opt.m_pk2chr.m_len != 0) {
-      memcpy(&tup.m_row[g_pk2_offset], tup.m_pk2, g_opt.m_pk2chr.m_totlen);
-      memcpy(&tup.m_row[g_pk3_offset], &tup.m_pk3, sizeof(tup.m_pk3));
+    if (api == API_RECATTR)
+    {
+      CHK((g_opr = g_con->getNdbOperation(g_opt.m_tname)) != 0);
+      CHK(g_opr->insertTuple() ==0);
+      CHK(g_opr->equal("PK1", tup.m_pk1) == 0);
+      if (g_opt.m_pk2chr.m_len != 0)
+      {
+        CHK(g_opr->equal("PK2", tup.m_pk2) == 0);
+        CHK(g_opr->equal("PK3", tup.m_pk3) == 0);
+      }
+      CHK(getBlobHandles(g_opr) == 0);
+    }
+    else
+    {
+      memcpy(&tup.m_row[g_pk1_offset], &tup.m_pk1, sizeof(tup.m_pk1));
+      if (g_opt.m_pk2chr.m_len != 0) {
+        memcpy(&tup.m_row[g_pk2_offset], tup.m_pk2, g_opt.m_pk2chr.m_totlen);
+        memcpy(&tup.m_row[g_pk3_offset], &tup.m_pk3, sizeof(tup.m_pk3));
+      }
+      CHK((g_const_opr = g_con->insertTuple(g_full_record, tup.m_row)) != 0);
+      CHK(getBlobHandles(g_const_opr) == 0);
     }
-    CHK((g_opr = g_con->insertTuple(g_full_record, tup.m_row)) != 0);
-    CHK(getBlobHandles(g_opr) == 0);
     if (style == 0) {
       CHK(setBlobValue(tup) == 0);
     } else if (style == 1) {
-      // non-nullable must be set
-      CHK(g_bh1->setValue("", 0) == 0);
+      CHK(presetBH1(k) == 0);
       CHK(setBlobWriteHook(tup) == 0);
     } else {
-      // non-nullable must be set
-      CHK(g_bh1->setValue("", 0) == 0);
+      CHK(presetBH1(k) == 0);
       CHK(g_con->execute(NoCommit) == 0);
       CHK(writeBlobData(tup) == 0);
     }
@@ -1134,6 +1204,7 @@ insertPk(int style)
       CHK((g_con = g_ndb->startTransaction()) != 0);
       n = 0;
     }
+    g_const_opr = 0;
     g_opr = 0;
     tup.m_exists = true;
   }
@@ -1147,26 +1218,44 @@ insertPk(int style)
 }
 
 static int
-readPk(int style)
+readPk(int style, int api)
 {
-  DBG("--- readPk " << stylename[style] << " ---");
+  DBG("--- readPk " << stylename[style] <<" " << apiName[api] << " ---");
   for (unsigned k = 0; k < g_opt.m_rows; k++) {
     Tup& tup = g_tups[k];
     DBG("readPk pk1=" << hex << tup.m_pk1);
     CHK((g_con = g_ndb->startTransaction()) != 0);
-    memcpy(&tup.m_key_row[g_pk1_offset], &tup.m_pk1, sizeof(tup.m_pk1));
-    if (g_opt.m_pk2chr.m_len != 0) {
-      memcpy(&tup.m_key_row[g_pk2_offset], tup.pk2(), g_opt.m_pk2chr.m_totlen);
-      memcpy(&tup.m_key_row[g_pk3_offset], &tup.m_pk3, sizeof(tup.m_pk3));
+    if (api == API_RECATTR)
+    {
+      CHK((g_opr = g_con->getNdbOperation(g_opt.m_tname)) != 0);
+      if (urandom(2) == 0)
+        CHK(g_opr->readTuple() == 0);
+      else
+        CHK(g_opr->readTuple(NdbOperation::LM_CommittedRead) == 0);
+      CHK(g_opr->equal("PK1", tup.m_pk1) == 0);
+      if (g_opt.m_pk2chr.m_len != 0)
+      {
+        CHK(g_opr->equal("PK2", tup.m_pk2) == 0);
+        CHK(g_opr->equal("PK3", tup.m_pk3) == 0);
+      }
+      CHK(getBlobHandles(g_opr) == 0);
     }
-    if (urandom(2) == 0)
-      CHK((g_opr = g_con->readTuple(g_key_record, tup.m_key_row,
-                                    g_blob_record, tup.m_row)) != 0);
     else
-      CHK((g_opr = g_con->readTuple(g_key_record, tup.m_key_row,
-                                    g_blob_record, tup.m_row,
-                                    NdbOperation::LM_CommittedRead)) != 0);
-    CHK(getBlobHandles(g_opr) == 0);
+    { // NdbRecord
+      memcpy(&tup.m_key_row[g_pk1_offset], &tup.m_pk1, sizeof(tup.m_pk1));
+      if (g_opt.m_pk2chr.m_len != 0) {
+        memcpy(&tup.m_key_row[g_pk2_offset], tup.pk2(), g_opt.m_pk2chr.m_totlen);
+        memcpy(&tup.m_key_row[g_pk3_offset], &tup.m_pk3, sizeof(tup.m_pk3));
+      }
+      if (urandom(2) == 0)
+        CHK((g_const_opr = g_con->readTuple(g_key_record, tup.m_key_row,
+                                            g_blob_record, tup.m_row)) != 0);
+      else
+        CHK((g_const_opr = g_con->readTuple(g_key_record, tup.m_key_row,
+                                            g_blob_record, tup.m_row,
+                                            NdbOperation::LM_CommittedRead)) != 0);
+      CHK(getBlobHandles(g_const_opr) == 0);
+    }
     if (style == 0) {
       CHK(getBlobValue(tup) == 0);
     } else if (style == 1) {
@@ -1177,21 +1266,23 @@ readPk(int style)
     }
     CHK(g_con->execute(Commit) == 0);
     // verify lock mode upgrade
-    CHK(g_opr->getLockMode() == NdbOperation::LM_Read);
+    CHK((g_opr?g_opr:g_const_opr)->getLockMode() == NdbOperation::LM_Read);
+
     if (style == 0 || style == 1) {
       CHK(verifyBlobValue(tup) == 0);
     }
     g_ndb->closeTransaction(g_con);
     g_opr = 0;
+    g_const_opr = 0;
     g_con = 0;
   }
   return 0;
 }
 
 static int
-updatePk(int style)
+updatePk(int style, int api)
 {
-  DBG("--- updatePk " << stylename[style] << " ---");
+  DBG("--- updatePk " << stylename[style] << " " << apiName[api] << " ---");
   for (unsigned k = 0; k < g_opt.m_rows; k++) {
     Tup& tup = g_tups[k];
     DBG("updatePk pk1=" << hex << tup.m_pk1);
@@ -1199,26 +1290,51 @@ updatePk(int style)
       int mode = urandom(3);
       int error_code = mode == 0 ? 0 : 4275;
       CHK((g_con = g_ndb->startTransaction()) != 0);
-      memcpy(&tup.m_key_row[g_pk1_offset], &tup.m_pk1, sizeof(tup.m_pk1));
-      if (g_opt.m_pk2chr.m_len != 0) {
-        memcpy(&tup.m_key_row[g_pk2_offset], tup.pk2(), g_opt.m_pk2chr.m_totlen);
-        memcpy(&tup.m_key_row[g_pk3_offset], &tup.m_pk3, sizeof(tup.m_pk3));
+      if (api == API_RECATTR)
+      {
+        CHK((g_opr = g_con->getNdbOperation(g_opt.m_tname)) != 0);
+        if (mode == 0) {
+          DBG("using updateTuple");
+          CHK(g_opr->updateTuple() == 0);
+        } else if (mode == 1) {
+          DBG("using readTuple exclusive");
+          CHK(g_opr->readTuple(NdbOperation::LM_Exclusive) == 0);
+        } else {
+          DBG("using readTuple - will fail and retry");
+          CHK(g_opr->readTuple() == 0);
+        }
+        CHK(g_opr->equal("PK1", tup.m_pk1) == 0);
+        if (g_opt.m_pk2chr.m_len != 0)
+        {
+          CHK(g_opr->equal("PK2", tup.m_pk2) == 0);
+          CHK(g_opr->equal("PK3", tup.m_pk3) == 0);
+        }
+        CHK(getBlobHandles(g_opr) == 0);
       }
-      if (mode == 0) {
-        DBG("using updateTuple");
-        CHK((g_opr= g_con->updateTuple(g_key_record, tup.m_key_row,
-                                       g_blob_record, tup.m_row)) != 0);
-      } else if (mode == 1) {
-        DBG("using readTuple exclusive");
-        CHK((g_opr= g_con->readTuple(g_key_record, tup.m_key_row,
-                                     g_blob_record, tup.m_row,
-                                     NdbOperation::LM_Exclusive)) != 0);
-      } else {
-        DBG("using readTuple - will fail and retry");
-        CHK((g_opr= g_con->readTuple(g_key_record, tup.m_key_row,
-                                     g_blob_record, tup.m_row)) != 0);
+      else
+      {
+        memcpy(&tup.m_key_row[g_pk1_offset], &tup.m_pk1, sizeof(tup.m_pk1));
+        if (g_opt.m_pk2chr.m_len != 0) {
+          memcpy(&tup.m_key_row[g_pk2_offset], tup.pk2(), g_opt.m_pk2chr.m_totlen);
+          memcpy(&tup.m_key_row[g_pk3_offset], &tup.m_pk3, sizeof(tup.m_pk3));
+        }
+        if (mode == 0) {
+          DBG("using updateTuple");
+          CHK((g_const_opr= g_con->updateTuple(g_key_record, tup.m_key_row,
+                                               g_blob_record, tup.m_row)) != 0);
+        } else if (mode == 1) {
+          DBG("using readTuple exclusive");
+          CHK((g_const_opr= g_con->readTuple(g_key_record, tup.m_key_row,
+                                             g_blob_record, tup.m_row,
+                                             NdbOperation::LM_Exclusive)) != 0);
+        } else {
+          DBG("using readTuple - will fail and retry");
+          CHK((g_const_opr= g_con->readTuple(g_key_record, tup.m_key_row,
+                                             g_blob_record, tup.m_row)) != 0);
+        }
+        CHK(getBlobHandles(g_const_opr) == 0);
       }
-      CHK(getBlobHandles(g_opr) == 0);
+
       if (style == 0) {
         CHK(setBlobValue(tup, error_code) == 0);
       } else if (style == 1) {
@@ -1234,6 +1350,7 @@ updatePk(int style)
       }
       g_ndb->closeTransaction(g_con);
     }
+    g_const_opr = 0;
     g_opr = 0;
     g_con = 0;
     tup.m_exists = true;
@@ -1242,38 +1359,52 @@ updatePk(int style)
 }
 
 static int
-writePk(int style)
+writePk(int style, int api)
 {
-  DBG("--- writePk " << stylename[style] << " ---");
+  DBG("--- writePk " << stylename[style] << " " << apiName[api] << " ---");
   for (unsigned k = 0; k < g_opt.m_rows; k++) {
     Tup& tup = g_tups[k];
     DBG("writePk pk1=" << hex << tup.m_pk1);
     CHK((g_con = g_ndb->startTransaction()) != 0);
-    memcpy(&tup.m_key_row[g_pk1_offset], &tup.m_pk1, sizeof(tup.m_pk1));
-    memcpy(&tup.m_row[g_pk1_offset], &tup.m_pk1, sizeof(tup.m_pk1));
-    if (g_opt.m_pk2chr.m_len != 0) {
-      memcpy(&tup.m_key_row[g_pk2_offset], tup.pk2(), g_opt.m_pk2chr.m_totlen);
-      memcpy(&tup.m_row[g_pk2_offset], tup.pk2(), g_opt.m_pk2chr.m_totlen);
-      memcpy(&tup.m_key_row[g_pk3_offset], &tup.m_pk3, sizeof(tup.m_pk3));
-      memcpy(&tup.m_row[g_pk3_offset], &tup.m_pk3, sizeof(tup.m_pk3));
+    if (api == API_RECATTR)
+    {
+      CHK((g_opr = g_con->getNdbOperation(g_opt.m_tname)) != 0);
+      CHK(g_opr->writeTuple() == 0);
+      CHK(g_opr->equal("PK1", tup.m_pk1) == 0);
+      if (g_opt.m_pk2chr.m_len != 0)
+      {
+        CHK(g_opr->equal("PK2", tup.m_pk2) == 0);
+        CHK(g_opr->equal("PK3", tup.m_pk3) == 0);
+      }
+      CHK(getBlobHandles(g_opr) == 0);
+    }
+    else
+    {
+      memcpy(&tup.m_key_row[g_pk1_offset], &tup.m_pk1, sizeof(tup.m_pk1));
+      memcpy(&tup.m_row[g_pk1_offset], &tup.m_pk1, sizeof(tup.m_pk1));
+      if (g_opt.m_pk2chr.m_len != 0) {
+        memcpy(&tup.m_key_row[g_pk2_offset], tup.pk2(), g_opt.m_pk2chr.m_totlen);
+        memcpy(&tup.m_row[g_pk2_offset], tup.pk2(), g_opt.m_pk2chr.m_totlen);
+        memcpy(&tup.m_key_row[g_pk3_offset], &tup.m_pk3, sizeof(tup.m_pk3));
+        memcpy(&tup.m_row[g_pk3_offset], &tup.m_pk3, sizeof(tup.m_pk3));
+      }
+      CHK((g_const_opr= g_con->writeTuple(g_key_record, tup.m_key_row,
+                                          g_full_record, tup.m_row)) != 0);
+      CHK(getBlobHandles(g_const_opr) == 0);
     }
-    CHK((g_opr= g_con->writeTuple(g_key_record, tup.m_key_row,
-                                  g_full_record, tup.m_row)) != 0);
-    CHK(getBlobHandles(g_opr) == 0);
     if (style == 0) {
       CHK(setBlobValue(tup) == 0);
     } else if (style == 1) {
-      // non-nullable must be set
-      CHK(g_bh1->setValue("", 0) == 0);
+      CHK(presetBH1(k) == 0);
       CHK(setBlobWriteHook(tup) == 0);
     } else {
-      // non-nullable must be set
-      CHK(g_bh1->setValue("", 0) == 0);
+      CHK(presetBH1(k) == 0);
       CHK(g_con->execute(NoCommit) == 0);
       CHK(writeBlobData(tup) == 0);
     }
     CHK(g_con->execute(Commit) == 0);
     g_ndb->closeTransaction(g_con);
+    g_const_opr = 0;
     g_opr = 0;
     g_con = 0;
     tup.m_exists = true;
@@ -1282,26 +1413,41 @@ writePk(int style)
 }
 
 static int
-deletePk()
+deletePk(int api)
 {
-  DBG("--- deletePk ---");
+  DBG("--- deletePk " << apiName[api] << " ---");
   unsigned n = 0;
   CHK((g_con = g_ndb->startTransaction()) != 0);
   for (unsigned k = 0; k < g_opt.m_rows; k++) {
     Tup& tup = g_tups[k];
     DBG("deletePk pk1=" << hex << tup.m_pk1);
-    memcpy(&tup.m_key_row[g_pk1_offset], &tup.m_pk1, sizeof(tup.m_pk1));
-    if (g_opt.m_pk2chr.m_len != 0) {
-      memcpy(&tup.m_key_row[g_pk2_offset], tup.pk2(), g_opt.m_pk2chr.m_totlen);
-      memcpy(&tup.m_key_row[g_pk3_offset], &tup.m_pk3, sizeof(tup.m_pk3));
+    if (api == API_RECATTR)
+    {
+      CHK((g_opr = g_con->getNdbOperation(g_opt.m_tname)) != 0);
+      CHK(g_opr->deleteTuple() == 0);
+      CHK(g_opr->equal("PK1", tup.m_pk1) == 0);
+      if (g_opt.m_pk2chr.m_len != 0)
+      {
+        CHK(g_opr->equal("PK2", tup.m_pk2) == 0);
+        CHK(g_opr->equal("PK3", tup.m_pk3) == 0);
+      }
+    }
+    else
+    {
+      memcpy(&tup.m_key_row[g_pk1_offset], &tup.m_pk1, sizeof(tup.m_pk1));
+      if (g_opt.m_pk2chr.m_len != 0) {
+        memcpy(&tup.m_key_row[g_pk2_offset], tup.pk2(), g_opt.m_pk2chr.m_totlen);
+        memcpy(&tup.m_key_row[g_pk3_offset], &tup.m_pk3, sizeof(tup.m_pk3));
+      }
+      CHK((g_const_opr= g_con->deleteTuple(g_key_record, tup.m_key_row)) != 0);
     }
-    CHK((g_opr= g_con->deleteTuple(g_key_record, tup.m_key_row)) != 0);
     if (++n == g_opt.m_batch) {
       CHK(g_con->execute(Commit) == 0);
       g_ndb->closeTransaction(g_con);
       CHK((g_con = g_ndb->startTransaction()) != 0);
       n = 0;
     }
+    g_const_opr = 0;
     g_opr = 0;
     tup.m_exists = false;
   }
@@ -1361,23 +1507,38 @@ deleteNoPk()
 // hash index ops
 
 static int
-readIdx(int style)
+readIdx(int style, int api)
 {
-  DBG("--- readIdx " << stylename[style] << " ---");
+  DBG("--- readIdx " << stylename[style] << " " << apiName[api] << " ---");
   for (unsigned k = 0; k < g_opt.m_rows; k++) {
     Tup& tup = g_tups[k];
     DBG("readIdx pk1=" << hex << tup.m_pk1);
     CHK((g_con = g_ndb->startTransaction()) != 0);
-    memcpy(&tup.m_key_row[g_pk2_offset], tup.pk2(), g_opt.m_pk2chr.m_totlen);
-    memcpy(&tup.m_key_row[g_pk3_offset], &tup.m_pk3, sizeof(tup.m_pk3));
-    if (urandom(2) == 0)
-      CHK((g_opr= g_con->readTuple(g_idx_record, tup.m_key_row,
-                                   g_blob_record, tup.m_row)) != 0);
+    if (api == API_RECATTR)
+    {
+      CHK((g_opx = g_con->getNdbIndexOperation(g_opt.m_x1name, g_opt.m_tname)) != 0);
+      if (urandom(2) == 0)
+        CHK(g_opx->readTuple() == 0);
+      else
+        CHK(g_opx->readTuple(NdbOperation::LM_CommittedRead) == 0);
+      CHK(g_opx->equal("PK2", tup.m_pk2) == 0);
+      CHK(g_opx->equal("PK3", tup.m_pk3) == 0);
+      CHK(getBlobHandles(g_opx) == 0);
+    }
     else
-      CHK((g_opr= g_con->readTuple(g_idx_record, tup.m_key_row,
-                                   g_blob_record, tup.m_row,
-                                   NdbOperation::LM_CommittedRead)) != 0);
-    CHK(getBlobHandles(g_opr) == 0);
+    {
+      memcpy(&tup.m_key_row[g_pk2_offset], tup.pk2(), g_opt.m_pk2chr.m_totlen);
+      memcpy(&tup.m_key_row[g_pk3_offset], &tup.m_pk3, sizeof(tup.m_pk3));
+      if (urandom(2) == 0)
+        CHK((g_const_opr= g_con->readTuple(g_idx_record, tup.m_key_row,
+                                           g_blob_record, tup.m_row)) != 0);
+      else
+        CHK((g_const_opr= g_con->readTuple(g_idx_record, tup.m_key_row,
+                                           g_blob_record, tup.m_row,
+                                           NdbOperation::LM_CommittedRead)) != 0);
+      CHK(getBlobHandles(g_const_opr) == 0);
+    }
+
     if (style == 0) {
       CHK(getBlobValue(tup) == 0);
     } else if (style == 1) {
@@ -1388,31 +1549,43 @@ readIdx(int style)
     }
     CHK(g_con->execute(Commit) == 0);
     // verify lock mode upgrade (already done by NdbIndexOperation)
-    CHK(g_opr->getLockMode() == NdbOperation::LM_Read);
+    CHK((g_opx?g_opx:g_const_opr)->getLockMode() == NdbOperation::LM_Read);
     if (style == 0 || style == 1) {
       CHK(verifyBlobValue(tup) == 0);
     }
     g_ndb->closeTransaction(g_con);
-    g_opr = 0;
+    g_const_opr = 0;
+    g_opx = 0;
     g_con = 0;
   }
   return 0;
 }
 
 static int
-updateIdx(int style)
+updateIdx(int style, int api)
 {
-  DBG("--- updateIdx " << stylename[style] << " ---");
+  DBG("--- updateIdx " << stylename[style] << " " << apiName[api] << " ---");
   for (unsigned k = 0; k < g_opt.m_rows; k++) {
     Tup& tup = g_tups[k];
     DBG("updateIdx pk1=" << hex << tup.m_pk1);
     // skip 4275 testing
     CHK((g_con = g_ndb->startTransaction()) != 0);
-    memcpy(&tup.m_key_row[g_pk2_offset], tup.pk2(), g_opt.m_pk2chr.m_totlen);
-    memcpy(&tup.m_key_row[g_pk3_offset], &tup.m_pk3, sizeof(tup.m_pk3));
-    CHK((g_opr= g_con->updateTuple(g_idx_record, tup.m_key_row,
-                                   g_blob_record, tup.m_row)) != 0);
-    CHK(getBlobHandles(g_opr) == 0);
+    if (api == API_RECATTR)
+    {
+      CHK((g_opx = g_con->getNdbIndexOperation(g_opt.m_x1name, g_opt.m_tname)) != 0);
+      CHK(g_opx->updateTuple() == 0);
+      CHK(g_opx->equal("PK2", tup.m_pk2) == 0);
+      CHK(g_opx->equal("PK3", tup.m_pk3) == 0);
+      CHK(getBlobHandles(g_opx) == 0);
+    }
+    else
+    {
+      memcpy(&tup.m_key_row[g_pk2_offset], tup.pk2(), g_opt.m_pk2chr.m_totlen);
+      memcpy(&tup.m_key_row[g_pk3_offset], &tup.m_pk3, sizeof(tup.m_pk3));
+      CHK((g_const_opr= g_con->updateTuple(g_idx_record, tup.m_key_row,
+                                           g_blob_record, tup.m_row)) != 0);
+      CHK(getBlobHandles(g_const_opr) == 0);
+    }
     if (style == 0) {
       CHK(setBlobValue(tup) == 0);
     } else if (style == 1) {
@@ -1423,7 +1596,8 @@ updateIdx(int style)
     }
     CHK(g_con->execute(Commit) == 0);
     g_ndb->closeTransaction(g_con);
-    g_opr = 0;
+    g_const_opr = 0;
+    g_opx = 0;
     g_con = 0;
     tup.m_exists = true;
   }
@@ -1431,21 +1605,32 @@ updateIdx(int style)
 }
 
 static int
-writeIdx(int style)
+writeIdx(int style, int api)
 {
-  DBG("--- writeIdx " << stylename[style] << " ---");
+  DBG("--- writeIdx " << stylename[style] << " " << apiName[api] << " ---");
   for (unsigned k = 0; k < g_opt.m_rows; k++) {
     Tup& tup = g_tups[k];
     DBG("writeIdx pk1=" << hex << tup.m_pk1);
     CHK((g_con = g_ndb->startTransaction()) != 0);
-    memcpy(&tup.m_key_row[g_pk2_offset], tup.pk2(), g_opt.m_pk2chr.m_totlen);
-    memcpy(&tup.m_key_row[g_pk3_offset], &tup.m_pk3, sizeof(tup.m_pk3));
-    memcpy(&tup.m_row[g_pk1_offset], &tup.m_pk1, sizeof(tup.m_pk1));
-    memcpy(&tup.m_row[g_pk2_offset], tup.pk2(), g_opt.m_pk2chr.m_totlen);
-    memcpy(&tup.m_row[g_pk3_offset], &tup.m_pk3, sizeof(tup.m_pk3));
-    CHK((g_opr= g_con->writeTuple(g_idx_record, tup.m_key_row,
-                                  g_full_record, tup.m_row)) != 0);
-    CHK(getBlobHandles(g_opr) == 0);
+    if (api == API_RECATTR)
+    {
+      CHK((g_opx = g_con->getNdbIndexOperation(g_opt.m_x1name, g_opt.m_tname)) != 0);
+      CHK(g_opx->writeTuple() == 0);
+      CHK(g_opx->equal("PK2", tup.m_pk2) == 0);
+      CHK(g_opx->equal("PK3", tup.m_pk3) == 0);
+      CHK(getBlobHandles(g_opx) == 0);
+    }
+    else
+    {
+      memcpy(&tup.m_key_row[g_pk2_offset], tup.pk2(), g_opt.m_pk2chr.m_totlen);
+      memcpy(&tup.m_key_row[g_pk3_offset], &tup.m_pk3, sizeof(tup.m_pk3));
+      memcpy(&tup.m_row[g_pk1_offset], &tup.m_pk1, sizeof(tup.m_pk1));
+      memcpy(&tup.m_row[g_pk2_offset], tup.pk2(), g_opt.m_pk2chr.m_totlen);
+      memcpy(&tup.m_row[g_pk3_offset], &tup.m_pk3, sizeof(tup.m_pk3));
+      CHK((g_const_opr= g_con->writeTuple(g_idx_record, tup.m_key_row,
+                                          g_full_record, tup.m_row)) != 0);
+      CHK(getBlobHandles(g_const_opr) == 0);
+    }
     if (style == 0) {
       CHK(setBlobValue(tup) == 0);
     } else if (style == 1) {
@@ -1460,7 +1645,8 @@ writeIdx(int style)
     }
     CHK(g_con->execute(Commit) == 0);
     g_ndb->closeTransaction(g_con);
-    g_opr = 0;
+    g_const_opr = 0;
+    g_opx = 0;
     g_con = 0;
     tup.m_exists = true;
   }
@@ -1468,24 +1654,35 @@ writeIdx(int style)
 }
 
 static int
-deleteIdx()
+deleteIdx(int api)
 {
-  DBG("--- deleteIdx ---");
+  DBG("--- deleteIdx " << apiName[api] << " ---");
   unsigned n = 0;
   CHK((g_con = g_ndb->startTransaction()) != 0);
   for (unsigned k = 0; k < g_opt.m_rows; k++) {
     Tup& tup = g_tups[k];
     DBG("deleteIdx pk1=" << hex << tup.m_pk1);
-    memcpy(&tup.m_key_row[g_pk2_offset], tup.pk2(), g_opt.m_pk2chr.m_totlen);
-    memcpy(&tup.m_key_row[g_pk3_offset], &tup.m_pk3, sizeof(tup.m_pk3));
-    CHK((g_opr= g_con->deleteTuple(g_idx_record, tup.m_key_row)) != 0);
+    if (api == API_RECATTR)
+    {
+      CHK((g_opx = g_con->getNdbIndexOperation(g_opt.m_x1name, g_opt.m_tname)) != 0);
+      CHK(g_opx->deleteTuple() == 0);
+      CHK(g_opx->equal("PK2", tup.m_pk2) == 0);
+      CHK(g_opx->equal("PK3", tup.m_pk3) == 0);
+    }
+    else
+    {
+      memcpy(&tup.m_key_row[g_pk2_offset], tup.pk2(), g_opt.m_pk2chr.m_totlen);
+      memcpy(&tup.m_key_row[g_pk3_offset], &tup.m_pk3, sizeof(tup.m_pk3));
+      CHK((g_const_opr= g_con->deleteTuple(g_idx_record, tup.m_key_row)) != 0);
+    }
     if (++n == g_opt.m_batch) {
       CHK(g_con->execute(Commit) == 0);
       g_ndb->closeTransaction(g_con);
       CHK((g_con = g_ndb->startTransaction()) != 0);
       n = 0;
     }
-    g_opr = 0;
+    g_const_opr = 0;
+    g_opx = 0;
     tup.m_exists = false;
   }
   if (n != 0) {
@@ -1498,27 +1695,50 @@ deleteIdx()
 // scan ops table and index
 
 static int
-readScan(int style, bool idx)
+readScan(int style, int api, bool idx)
 {
-  DBG("--- " << "readScan" << (idx ? "Idx" : "") << " " << stylename[style] << " ---");
+  DBG("--- " << "readScan" << (idx ? "Idx" : "") << " " << stylename[style] << " " << apiName[api] << " ---");
   Tup tup;
   tup.alloc();  // allocate buffers
   CHK((g_con = g_ndb->startTransaction()) != 0);
-  if (urandom(2) == 0)
-    if (! idx)
-      CHK((g_ops= g_con->scanTable(g_full_record,
-                                   NdbOperation::LM_Read)) != 0);
-  else 
-      CHK((g_ops= g_con->scanIndex(g_ord_record, NULL, NULL, 0, g_full_record,
-                                   NdbOperation::LM_Read)) != 0);
+  if (api == API_RECATTR)
+  {
+    if (! idx) {
+      CHK((g_ops = g_con->getNdbScanOperation(g_opt.m_tname)) != 0);
+    } else {
+      CHK((g_ops = g_con->getNdbIndexScanOperation(g_opt.m_x2name, g_opt.m_tname)) != 0);
+    }
+    if (urandom(2) == 0)
+      CHK(g_ops->readTuples(NdbOperation::LM_Read) == 0);
+    else
+      CHK(g_ops->readTuples(NdbOperation::LM_CommittedRead) == 0);
+    CHK(g_ops->getValue("PK1", (char*)&tup.m_pk1) != 0);
+    if (g_opt.m_pk2chr.m_len != 0)
+    {
+      CHK(g_ops->getValue("PK2", tup.m_pk2) != 0);
+      CHK(g_ops->getValue("PK3", (char *) &tup.m_pk3) != 0);
+    }
+    CHK(getBlobHandles(g_ops) == 0);   
+  }
   else
-    if (! idx)
-      CHK((g_ops= g_con->scanTable(g_full_record,
-                                   NdbOperation::LM_CommittedRead)) != 0);
+  {
+    if (urandom(2) == 0)
+      if (! idx)
+        CHK((g_ops= g_con->scanTable(g_full_record,
+                                     NdbOperation::LM_Read)) != 0);
+      else 
+        CHK((g_ops= g_con->scanIndex(g_ord_record, g_full_record,
+                                     NdbOperation::LM_Read)) != 0);
     else
-      CHK((g_ops= g_con->scanIndex(g_ord_record, NULL, NULL, 0, g_full_record,
-                                   NdbOperation::LM_CommittedRead)) != 0);
-  CHK(getBlobHandles(g_ops) == 0);
+      if (! idx)
+        CHK((g_ops= g_con->scanTable(g_full_record,
+                                     NdbOperation::LM_CommittedRead)) != 0);
+      else
+        CHK((g_ops= g_con->scanIndex(g_ord_record, g_full_record,
+                                     NdbOperation::LM_CommittedRead)) != 0);
+    CHK(getBlobHandles(g_ops) == 0);
+  }
+
   if (style == 0) {
     CHK(getBlobValue(tup) == 0);
   } else if (style == 1) {
@@ -1529,17 +1749,30 @@ readScan(int style, bool idx)
   CHK(g_ops->getLockMode() == NdbOperation::LM_Read);
   unsigned rows = 0;
   while (1) {
-    const char *out_row= NULL;
     int ret;
 
-    CHK((ret = g_ops->nextResult(out_row, true)) == 0 || ret == 1);
-    if (ret == 1)
-      break;
-    memcpy(&tup.m_pk1, &out_row[g_pk1_offset], sizeof(tup.m_pk1));
-    if (g_opt.m_pk2chr.m_len != 0)
+    if (api == API_RECATTR)
     {
-      memcpy(tup.m_pk2, &out_row[g_pk2_offset], g_opt.m_pk2chr.m_totlen);
-      memcpy(&tup.m_pk3, &out_row[g_pk3_offset], sizeof(tup.m_pk3));
+      tup.m_pk1 = (Uint32)-1;
+      memset(tup.m_pk2, 'x', g_opt.m_pk2chr.m_len);
+      tup.m_pk3 = -1;
+      CHK((ret = g_ops->nextResult(true)) == 0 || ret == 1);
+      if (ret == 1)
+        break;
+    }
+    else
+    {
+      const char *out_row= NULL;
+
+      CHK((ret = g_ops->nextResult(&out_row, true, false)) == 0 || ret == 1);
+      if (ret == 1)
+        break;
+      memcpy(&tup.m_pk1, &out_row[g_pk1_offset], sizeof(tup.m_pk1));
+      if (g_opt.m_pk2chr.m_len != 0)
+      {
+        memcpy(tup.m_pk2, &out_row[g_pk2_offset], g_opt.m_pk2chr.m_totlen);
+        memcpy(&tup.m_pk3, &out_row[g_pk3_offset], sizeof(tup.m_pk3));
+      }
     }
 
     DBG("readScan" << (idx ? "Idx" : "") << " pk1=" << hex << tup.m_pk1);
@@ -1564,31 +1797,62 @@ readScan(int style, bool idx)
 }
 
 static int
-updateScan(int style, bool idx)
+updateScan(int style, int api, bool idx)
 {
-  DBG("--- " << "updateScan" << (idx ? "Idx" : "") << " " << stylename[style] << " ---");
+  DBG("--- " << "updateScan" << (idx ? "Idx" : "") << " " << stylename[style] << " " << apiName[api] << " ---");
   Tup tup;
   tup.alloc();  // allocate buffers
   CHK((g_con = g_ndb->startTransaction()) != 0);
-  if (! idx)
-    CHK((g_ops= g_con->scanTable(g_key_record,
-                                 NdbOperation::LM_Exclusive)) != 0);
+  if (api == API_RECATTR)
+  {
+    if (! idx) {
+      CHK((g_ops = g_con->getNdbScanOperation(g_opt.m_tname)) != 0);
+    } else {
+      CHK((g_ops = g_con->getNdbIndexScanOperation(g_opt.m_x2name, g_opt.m_tname)) != 0);
+    }
+    CHK(g_ops->readTuples(NdbOperation::LM_Exclusive) == 0);
+    CHK(g_ops->getValue("PK1", (char*)&tup.m_pk1) != 0);
+    if (g_opt.m_pk2chr.m_len != 0)
+    {
+      CHK(g_ops->getValue("PK2", tup.m_pk2) != 0);
+      CHK(g_ops->getValue("PK3", (char *) &tup.m_pk3) != 0);
+    }
+  }
   else
-    CHK((g_ops= g_con->scanIndex(g_ord_record, NULL, NULL, 0, g_key_record,
-                                 NdbOperation::LM_Exclusive)) != 0);
+  {
+    if (! idx)
+      CHK((g_ops= g_con->scanTable(g_key_record,
+                                   NdbOperation::LM_Exclusive)) != 0);
+    else
+      CHK((g_ops= g_con->scanIndex(g_ord_record, g_key_record,
+                                   NdbOperation::LM_Exclusive)) != 0);
+  }
   CHK(g_con->execute(NoCommit) == 0);
   unsigned rows = 0;
   while (1) {
     const char *out_row= NULL;
     int ret;
 
-    CHK((ret = g_ops->nextResult(out_row, true)) == 0 || ret == 1);
-    if (ret == 1)
-      break;
-    memcpy(&tup.m_pk1, &out_row[g_pk1_offset], sizeof(tup.m_pk1));
-    if (g_opt.m_pk2chr.m_len != 0) {
-      memcpy(tup.m_pk2, &out_row[g_pk2_offset], g_opt.m_pk2chr.m_totlen);
-      memcpy(&tup.m_pk3, &out_row[g_pk3_offset], sizeof(tup.m_pk3));
+    if (api == API_RECATTR)
+    {
+      tup.m_pk1 = (Uint32)-1;
+      memset(tup.m_pk2, 'x', g_opt.m_pk2chr.m_totlen);
+      tup.m_pk3 = -1;
+
+      CHK((ret = g_ops->nextResult(true)) == 0 || ret == 1);
+      if (ret == 1)
+        break;
+    }
+    else
+    {
+      CHK((ret = g_ops->nextResult(&out_row, true, false)) == 0 || ret == 1);
+      if (ret == 1)
+        break;
+      memcpy(&tup.m_pk1, &out_row[g_pk1_offset], sizeof(tup.m_pk1));
+      if (g_opt.m_pk2chr.m_len != 0) {
+        memcpy(tup.m_pk2, &out_row[g_pk2_offset], g_opt.m_pk2chr.m_totlen);
+        memcpy(&tup.m_pk3, &out_row[g_pk3_offset], sizeof(tup.m_pk3));
+      }    
     }
 
     DBG("updateScan" << (idx ? "Idx" : "") << " pk1=" << hex << tup.m_pk1);
@@ -1598,8 +1862,16 @@ updateScan(int style, bool idx)
     calcBval(g_tups[k], false);
     tup.copyfrom(g_tups[k]);
     // cannot do 4275 testing, scan op error code controls execution
-    CHK((g_opr = g_ops->updateCurrentTuple(g_con, g_blob_record, tup.m_row)) != 0);
-    CHK(getBlobHandles(g_opr) == 0);
+    if (api == API_RECATTR)
+    {
+      CHK((g_opr = g_ops->updateCurrentTuple()) != 0);
+      CHK(getBlobHandles(g_opr) == 0);
+    }
+    else
+    {
+      CHK((g_const_opr = g_ops->updateCurrentTuple(g_con, g_blob_record, tup.m_row)) != 0);
+      CHK(getBlobHandles(g_const_opr) == 0);
+    }
     if (style == 0) {
       CHK(setBlobValue(tup) == 0);
     } else if (style == 1) {
@@ -1609,6 +1881,7 @@ updateScan(int style, bool idx)
       CHK(writeBlobData(tup) == 0);
     }
     CHK(g_con->execute(NoCommit) == 0);
+    g_const_opr = 0;
     g_opr = 0;
     rows++;
   }
@@ -1621,32 +1894,64 @@ updateScan(int style, bool idx)
 }
 
 static int
-deleteScan(bool idx)
+deleteScan(int api, bool idx)
 {
-  DBG("--- " << "deleteScan" << (idx ? "Idx" : "") << " ---");
+  DBG("--- " << "deleteScan" << (idx ? "Idx" : "") << apiName[api] << " ---");
   Tup tup;
   CHK((g_con = g_ndb->startTransaction()) != 0);
-  if (! idx)
-    CHK((g_ops= g_con->scanTable(g_key_record,
-                                 NdbOperation::LM_Exclusive)) != 0);
+  
+  if (api == API_RECATTR)
+  {
+    if (! idx) {
+      CHK((g_ops = g_con->getNdbScanOperation(g_opt.m_tname)) != 0);
+    } else {
+      CHK((g_ops = g_con->getNdbIndexScanOperation(g_opt.m_x2name, g_opt.m_tname)) != 0);
+    }
+    CHK(g_ops->readTuples(NdbOperation::LM_Exclusive) == 0);
+    CHK(g_ops->getValue("PK1", (char*)&tup.m_pk1) != 0);
+    if (g_opt.m_pk2chr.m_len != 0)
+    {
+      CHK(g_ops->getValue("PK2", tup.m_pk2) != 0);
+      CHK(g_ops->getValue("PK3", (char *) &tup.m_pk3) != 0);
+    }
+  }
   else
-    CHK((g_ops= g_con->scanIndex(g_ord_record, NULL, NULL, 0, g_key_record,
-                                 NdbOperation::LM_Exclusive)) != 0);
+  {
+    if (! idx)
+      CHK((g_ops= g_con->scanTable(g_key_record,
+                                   NdbOperation::LM_Exclusive)) != 0);
+    else
+      CHK((g_ops= g_con->scanIndex(g_ord_record, g_key_record,
+                                   NdbOperation::LM_Exclusive)) != 0);
+  }
   CHK(g_con->execute(NoCommit) == 0);
   unsigned rows = 0;
   unsigned n = 0;
   while (1) {
-    const char *out_row= NULL;
     int ret;
 
-    CHK((ret = g_ops->nextResult(out_row, true)) == 0 || ret == 1);
-    if (ret == 1)
-      break;
-    memcpy(&tup.m_pk1, &out_row[g_pk1_offset], sizeof(tup.m_pk1));
-    if (g_opt.m_pk2chr.m_len != 0)
+    if (api == API_RECATTR)
     {
-      memcpy(tup.m_pk2, &out_row[g_pk2_offset], g_opt.m_pk2chr.m_totlen);
-      memcpy(&tup.m_pk3, &out_row[g_pk3_offset], sizeof(tup.m_pk3));
+      tup.m_pk1 = (Uint32)-1;
+      memset(tup.m_pk2, 'x', g_opt.m_pk2chr.m_len);
+      tup.m_pk3 = -1;
+      CHK((ret = g_ops->nextResult(true)) == 0 || ret == 1);
+      if (ret == 1)
+        break;
+    }
+    else
+    {
+      const char *out_row= NULL;
+
+      CHK((ret = g_ops->nextResult(&out_row, true, false)) == 0 || ret == 1);
+      if (ret == 1)
+        break;
+      memcpy(&tup.m_pk1, &out_row[g_pk1_offset], sizeof(tup.m_pk1));
+      if (g_opt.m_pk2chr.m_len != 0)
+      {
+        memcpy(tup.m_pk2, &out_row[g_pk2_offset], g_opt.m_pk2chr.m_totlen);
+        memcpy(&tup.m_pk3, &out_row[g_pk3_offset], sizeof(tup.m_pk3));
+      }
     }
 
     while (1) {
@@ -1654,18 +1959,28 @@ deleteScan(bool idx)
       Uint32 k = tup.m_pk1 - g_opt.m_pk1off;
       CHK(k < g_opt.m_rows && g_tups[k].m_exists);
       g_tups[k].m_exists = false;
-      CHK(g_ops->deleteCurrentTuple(g_con, g_key_record) != NULL);
+      if (api == API_RECATTR)
+        CHK(g_ops->deleteCurrentTuple() == 0);
+      else
+        CHK(g_ops->deleteCurrentTuple(g_con, g_key_record) != NULL);
       rows++;
       tup.m_pk1 = (Uint32)-1;
       memset(tup.m_pk2, 'x', g_opt.m_pk2chr.m_len);
-      CHK((ret = g_ops->nextResult(out_row, false)) == 0 || ret == 1 || ret == 2);
-      if (ret == 0)
-      {
-        memcpy(&tup.m_pk1, &out_row[g_pk1_offset], sizeof(tup.m_pk1));
-        if (g_opt.m_pk2chr.m_len != 0)
+      tup.m_pk3 = -1;
+      if (api == API_RECATTR)
+        CHK((ret = g_ops->nextResult(false)) == 0 || ret == 1 || ret == 2);
+      else
+      {      
+        const char *out_row= NULL;
+        CHK((ret = g_ops->nextResult(&out_row, false, false)) == 0 || ret == 1 || ret == 2);
+        if (ret == 0)
         {
-          memcpy(tup.m_pk2, &out_row[g_pk2_offset], g_opt.m_pk2chr.m_totlen);
-          memcpy(&tup.m_pk3, &out_row[g_pk3_offset], sizeof(tup.m_pk3));
+          memcpy(&tup.m_pk1, &out_row[g_pk1_offset], sizeof(tup.m_pk1));
+          if (g_opt.m_pk2chr.m_len != 0)
+          {
+            memcpy(tup.m_pk2, &out_row[g_pk2_offset], g_opt.m_pk2chr.m_totlen);
+            memcpy(&tup.m_pk3, &out_row[g_pk3_offset], sizeof(tup.m_pk3));
+          }
         }
       }
       if (++n == g_opt.m_batch || ret == 2) {
@@ -1726,6 +2041,7 @@ testmain()
   }
   for (g_loop = 0; g_opt.m_loop == 0 || g_loop < g_opt.m_loop; g_loop++) {
     int style;
+    int api;
     DBG("=== loop " << g_loop << " ===");
     if (g_opt.m_seed == 0)
       srandom(g_loop);
@@ -1734,121 +2050,127 @@ testmain()
       CHK((*g_opt.m_bugtest)() == 0);
       continue;
     }
-    // pk
-    for (style = 0; style <= 2; style++) {
-      if (! testcase('k') || ! testcase(style))
+    /* Loop over API styles */
+    for (api = 0; api <=1; api++) {
+      // pk
+      if (! testcase(apiSymbol[api]))
         continue;
-      DBG("--- pk ops " << stylename[style] << " ---");
-      if (testcase('n')) {
-        calcTups(true);
-        CHK(insertPk(style) == 0);
-        CHK(verifyBlob() == 0);
-        CHK(readPk(style) == 0);
-        if (testcase('u')) {
-          calcTups(false);
-          CHK(updatePk(style) == 0);
+      for (style = 0; style <= 2; style++) {
+        if (! testcase('k') || ! testcase(style) )
+          continue;
+        DBG("--- pk ops " << stylename[style] << " " << apiName[api] << " ---");
+        if (testcase('n')) {
+          calcTups(true);
+          CHK(insertPk(style, api) == 0);
           CHK(verifyBlob() == 0);
-          CHK(readPk(style) == 0);
+          CHK(readPk(style, api) == 0);
+          if (testcase('u')) {
+            calcTups(false);
+            CHK(updatePk(style, api) == 0);
+            CHK(verifyBlob() == 0);
+            CHK(readPk(style, api) == 0);
+          }
+          if (testcase('d')) {
+            CHK(deletePk(api) == 0);
+            CHK(deleteNoPk() == 0);
+            CHK(verifyBlob() == 0);
+          }
         }
-        if (testcase('d')) {
-          CHK(deletePk() == 0);
-          CHK(deleteNoPk() == 0);
+        if (testcase('w')) {
+          calcTups(true);
+          CHK(writePk(style, api) == 0);
           CHK(verifyBlob() == 0);
+          CHK(readPk(style, api) == 0);
+          if (testcase('u')) {
+            calcTups(false);
+            CHK(writePk(style, api) == 0);
+            CHK(verifyBlob() == 0);
+            CHK(readPk(style, api) == 0);
+          }
+          if (testcase('d')) {
+            CHK(deletePk(api) == 0);
+            CHK(deleteNoPk() == 0);
+            CHK(verifyBlob() == 0);
+          }
         }
       }
-      if (testcase('w')) {
-        calcTups(true);
-        CHK(writePk(style) == 0);
-        CHK(verifyBlob() == 0);
-        CHK(readPk(style) == 0);
-        if (testcase('u')) {
-          calcTups(false);
-          CHK(writePk(style) == 0);
+
+      // hash index
+      for (style = 0; style <= 2; style++) {
+        if (! testcase('i') || ! testcase(style))
+          continue;
+        DBG("--- idx ops " << stylename[style] << " " << apiName[api] << " ---");
+        if (testcase('n')) {
+          calcTups(true);
+          CHK(insertPk(style, api) == 0);
           CHK(verifyBlob() == 0);
-          CHK(readPk(style) == 0);
+          CHK(readIdx(style, api) == 0);
+          if (testcase('u')) {
+            calcTups(false);
+            CHK(updateIdx(style, api) == 0);
+            CHK(verifyBlob() == 0);
+            CHK(readIdx(style, api) == 0);
+          }
+          if (testcase('d')) {
+            CHK(deleteIdx(api) == 0);
+            CHK(verifyBlob() == 0);
+          }
         }
-        if (testcase('d')) {
-          CHK(deletePk() == 0);
-          CHK(deleteNoPk() == 0);
+        if (testcase('w')) {
+          calcTups(false);
+          CHK(writePk(style, api) == 0);
           CHK(verifyBlob() == 0);
+          CHK(readIdx(style, api) == 0);
+          if (testcase('u')) {
+            calcTups(false);
+            CHK(writeIdx(style, api) == 0);
+            CHK(verifyBlob() == 0);
+            CHK(readIdx(style, api) == 0);
+          }
+          if (testcase('d')) {
+            CHK(deleteIdx(api) == 0);
+            CHK(verifyBlob() == 0);
+          }
         }
       }
-    }
-    // hash index
-    for (style = 0; style <= 2; style++) {
-      if (! testcase('i') || ! testcase(style))
-        continue;
-      DBG("--- idx ops " << stylename[style] << " ---");
-      if (testcase('n')) {
+      // scan table
+      for (style = 0; style <= 2; style++) {
+        if (! testcase('s') || ! testcase(style))
+          continue;
+        DBG("--- table scan " << stylename[style] << " " << apiName[api] << " ---");
         calcTups(true);
-        CHK(insertPk(style) == 0);
+        CHK(insertPk(style, api) == 0);
         CHK(verifyBlob() == 0);
-        CHK(readIdx(style) == 0);
+        CHK(readScan(style, api, false) == 0);
         if (testcase('u')) {
-          calcTups(false);
-          CHK(updateIdx(style) == 0);
+          CHK(updateScan(style, api, false) == 0);
           CHK(verifyBlob() == 0);
-          CHK(readIdx(style) == 0);
         }
         if (testcase('d')) {
-          CHK(deleteIdx() == 0);
+          CHK(deleteScan(api, false) == 0);
           CHK(verifyBlob() == 0);
         }
       }
-      if (testcase('w')) {
-        calcTups(false);
-        CHK(writePk(style) == 0);
+      // scan index
+      for (style = 0; style <= 2; style++) {
+        if (! testcase('r') || ! testcase(style))
+          continue;
+        DBG("--- index scan " << stylename[style] << " " << apiName[api] << " ---");
+        calcTups(true);
+        CHK(insertPk(style, api) == 0);
         CHK(verifyBlob() == 0);
-        CHK(readIdx(style) == 0);
+        CHK(readScan(style, api, true) == 0);
         if (testcase('u')) {
-          calcTups(false);
-          CHK(writeIdx(style) == 0);
+          CHK(updateScan(style, api, true) == 0);
           CHK(verifyBlob() == 0);
-          CHK(readIdx(style) == 0);
         }
         if (testcase('d')) {
-          CHK(deleteIdx() == 0);
+          CHK(deleteScan(api, true) == 0);
           CHK(verifyBlob() == 0);
         }
       }
-    }
-    // scan table
-    for (style = 0; style <= 2; style++) {
-      if (! testcase('s') || ! testcase(style))
-        continue;
-      DBG("--- table scan " << stylename[style] << " ---");
-      calcTups(true);
-      CHK(insertPk(style) == 0);
-      CHK(verifyBlob() == 0);
-      CHK(readScan(style, false) == 0);
-      if (testcase('u')) {
-        CHK(updateScan(style, false) == 0);
-        CHK(verifyBlob() == 0);
-      }
-      if (testcase('d')) {
-        CHK(deleteScan(false) == 0);
-        CHK(verifyBlob() == 0);
-      }
-    }
-    // scan index
-    for (style = 0; style <= 2; style++) {
-      if (! testcase('r') || ! testcase(style))
-        continue;
-      DBG("--- index scan " << stylename[style] << " ---");
-      calcTups(true);
-      CHK(insertPk(style) == 0);
-      CHK(verifyBlob() == 0);
-      CHK(readScan(style, true) == 0);
-      if (testcase('u')) {
-        CHK(updateScan(style, true) == 0);
-        CHK(verifyBlob() == 0);
-      }
-      if (testcase('d')) {
-        CHK(deleteScan(true) == 0);
-        CHK(verifyBlob() == 0);
-      }
-    }
-  }
+    } // for (api
+  } // for (loop
   delete g_ndb;
   return 0;
 }
@@ -2161,7 +2483,7 @@ bugtest_4088()
   DBG("bug test 4088 - ndb api hang with mixed ops on index table");
   // insert rows
   calcTups(true);
-  CHK(insertPk(false) == 0);
+  CHK(insertPk(0, API_NDBRECORD) == 0);
   // new trans
   CHK((g_con = g_ndb->startTransaction()) != 0);
   for (unsigned k = 0; k < g_opt.m_rows; k++) {
@@ -2204,7 +2526,7 @@ bugtest_27018()
 
   // insert rows
   calcTups(true);
-  CHK(insertPk(false) == 0);
+  CHK(insertPk(0, API_NDBRECORD) == 0);
   // new trans
   for (unsigned k= 0; k < g_opt.m_rows; k++)
   {
@@ -2224,9 +2546,9 @@ bugtest_27018()
       memcpy(&tup.m_key_row[g_pk2_offset], tup.m_pk2, g_opt.m_pk2chr.m_totlen);
       memcpy(&tup.m_key_row[g_pk3_offset], &tup.m_pk3, sizeof(tup.m_pk3));
     }
-    CHK((g_opr= g_con->updateTuple(g_key_record, tup.m_key_row,
-                                   g_blob_record, tup.m_row)) != 0);
-    CHK(getBlobHandles(g_opr) == 0);
+    CHK((g_const_opr= g_con->updateTuple(g_key_record, tup.m_key_row,
+                                         g_blob_record, tup.m_row)) != 0);
+    CHK(getBlobHandles(g_const_opr) == 0);
     CHK(g_con->execute(NoCommit) == 0);
 
     tup.m_bval1.m_buf[0]= 0xff ^ tup.m_bval1.m_val[offset];
@@ -2236,9 +2558,9 @@ bugtest_27018()
     g_ndb->closeTransaction(g_con);
 
     CHK((g_con= g_ndb->startTransaction()) != 0);
-    CHK((g_opr= g_con->readTuple(g_key_record, tup.m_key_row,
-                                 g_blob_record, tup.m_row)) != 0);
-    CHK(getBlobHandles(g_opr) == 0);
+    CHK((g_const_opr= g_con->readTuple(g_key_record, tup.m_key_row,
+                                       g_blob_record, tup.m_row)) != 0);
+    CHK(getBlobHandles(g_const_opr) == 0);
 
     CHK(g_bh1->getValue(tup.m_bval1.m_buf, tup.m_bval1.m_len) == 0);
     CHK(g_con->execute(Commit) == 0);
@@ -2254,8 +2576,10 @@ bugtest_27018()
     }
 
     g_ndb->closeTransaction(g_con);
+    g_con=0;
+    g_const_opr=0;
   }
-  CHK(deletePk() == 0);
+  CHK(deletePk(API_NDBRECORD) == 0);
 
   return 0;
 }
@@ -2284,7 +2608,7 @@ void *bugtest_27370_thread(void *arg)
     NdbConnection *con;
     if ((con= data->m_ndb->startTransaction()) == 0)
       return (void *)"Failed to create transaction";
-    NdbOperation *opr;
+    const NdbOperation *opr;
     memcpy(data->m_write_row, data->m_key_row, g_rowsize);
     if ((opr= con->writeTuple(g_key_record, data->m_key_row,
                               g_full_record, data->m_write_row)) == 0)
@@ -2337,9 +2661,9 @@ bugtest_27370()
 
   CHK((g_con= g_ndb->startTransaction()) != 0);
   memcpy(data.m_write_row, data.m_key_row, g_rowsize);
-  CHK((g_opr= g_con->writeTuple(g_key_record, data.m_key_row,
-                                g_full_record, data.m_write_row)) != 0);
-  CHK((g_bh1= g_opr->getBlobHandle("BL1")) != 0);
+  CHK((g_const_opr= g_con->writeTuple(g_key_record, data.m_key_row,
+                                      g_full_record, data.m_write_row)) != 0);
+  CHK((g_bh1= g_const_opr->getBlobHandle("BL1")) != 0);
   CHK(g_bh1->setValue(data.m_writebuf, data.m_blob1_size) == 0);
   CHK(g_con->execute(Commit) == 0);
   g_ndb->closeTransaction(g_con);
@@ -2353,10 +2677,10 @@ bugtest_27370()
   while (seen_updates < 50)
   {
     CHK((g_con= g_ndb->startTransaction()) != 0);
-    CHK((g_opr= g_con->readTuple(g_key_record, data.m_key_row,
-                                 g_blob_record, data.m_read_row,
-                                 NdbOperation::LM_CommittedRead)) != 0);
-    CHK((g_bh1= g_opr->getBlobHandle("BL1")) != 0);
+    CHK((g_const_opr= g_con->readTuple(g_key_record, data.m_key_row,
+                                       g_blob_record, data.m_read_row,
+                                       NdbOperation::LM_CommittedRead)) != 0);
+    CHK((g_bh1= g_const_opr->getBlobHandle("BL1")) != 0);
     CHK(g_con->execute(NoCommit, AbortOnError, 1) == 0);
 
     const Uint32 loop_max= 10;
@@ -2406,8 +2730,8 @@ bugtest_27370()
                                  NdbOperation::LM_CommittedRead)) != 0);
     CHK((g_bh1= g_ops->getBlobHandle("BL1")) != 0);
     CHK(g_con->execute(NoCommit, AbortOnError, 1) == 0);
-    const char *out_row;
-    CHK(g_ops->nextResult(out_row, true) == 0);
+    const char *out_row= NULL;
+    CHK(g_ops->nextResult(&out_row, true, false) == 0);
 
     const Uint32 loop_max= 10;
     char read_char;
@@ -2443,7 +2767,7 @@ bugtest_27370()
       CHK(g_con->execute(NoCommit, AbortOnError, 1) == 0);
     }
 
-    CHK(g_ops->nextResult(out_row, true) == 1);
+    CHK(g_ops->nextResult(&out_row, true, false) == 1);
     g_ndb->closeTransaction(g_con);
     g_con= NULL;
   }
@@ -2457,7 +2781,7 @@ bugtest_27370()
 
   delete [] data.m_key_row;
   g_con= NULL;
-  g_opr= NULL;
+  g_const_opr= NULL;
   g_bh1= NULL;
   return 0;
 }
diff -Nrup a/storage/ndb/test/ndbapi/testScan.cpp b/storage/ndb/test/ndbapi/testScan.cpp
--- a/storage/ndb/test/ndbapi/testScan.cpp	2007-10-14 20:42:47 +04:00
+++ b/storage/ndb/test/ndbapi/testScan.cpp	2008-02-02 18:28:01 +03:00
@@ -1009,94 +1009,6 @@ int runCheckInactivityBeforeClose(NDBT_C
 
 }
 
-int runScanRestart(NDBT_Context* ctx, NDBT_Step* step){
-  int loops = ctx->getNumLoops();
-  int records = ctx->getNumRecords();
-  Ndb * pNdb = GETNDB(step);
-  const NdbDictionary::Table*  pTab = ctx->getTab();
-
-  HugoCalculator calc(* pTab);
-  NDBT_ResultRow tmpRow(* pTab);
-
-  int i = 0;
-  while (i<loops && !ctx->isTestStopped()) {
-    g_info << i++ << ": ";
-    const int record = (rand() % records);
-    g_info << " row=" << record;
-
-    NdbConnection* pCon = pNdb->startTransaction();
-    NdbScanOperation* pOp = pCon->getNdbScanOperation(pTab->getName());	
-    if (pOp == NULL) {
-      ERR(pCon->getNdbError());
-      return NDBT_FAILED;
-    }
-    
-    if( pOp->readTuples() ) {
-      ERR(pCon->getNdbError());
-      return NDBT_FAILED;
-    }
-  
-    int check = pOp->interpret_exit_ok();
-    if( check == -1 ) {
-      ERR(pCon->getNdbError());
-      return NDBT_FAILED;
-    }
-    
-    // Define attributes to read  
-    for(int a = 0; a<pTab->getNoOfColumns(); a++){
-      if((tmpRow.attributeStore(a) = 
-	  pOp->getValue(pTab->getColumn(a)->getName())) == 0) {
-	ERR(pCon->getNdbError());
-	return NDBT_FAILED;
-      }
-    } 
-    
-    check = pCon->execute(NoCommit);
-    if( check == -1 ) {
-      ERR(pCon->getNdbError());
-      return NDBT_FAILED;
-    }
-
-    int res;
-    int row = 0;
-    while(row < record && (res = pOp->nextResult()) == 0) {
-      if(calc.verifyRowValues(&tmpRow) != 0){
-	abort();
-	return NDBT_FAILED;
-      }
-      row++;
-    }
-    if(row != record){
-      ERR(pCon->getNdbError());
-      abort();
-      return NDBT_FAILED;
-    }
-    g_info << " restarting" << endl;
-    if((res = pOp->restart()) != 0){
-      ERR(pCon->getNdbError());
-      abort();
-      return NDBT_FAILED;
-    }      
-
-    row = 0;
-    while((res = pOp->nextResult()) == 0) {
-      if(calc.verifyRowValues(&tmpRow) != 0){
-	abort();
-	return NDBT_FAILED;
-      }
-      row++;
-    }
-    if(res != 1 || row != records){
-      ERR(pCon->getNdbError());
-      abort();
-      return NDBT_FAILED;
-    }
-    pCon->close();
-  }
-  return NDBT_OK;
-}
-
-
 int 
 runScanParallelism(NDBT_Context* ctx, NDBT_Step* step){
   int loops = ctx->getNumLoops() + 3;
@@ -1719,12 +1631,6 @@ TESTCASE("ScanReadWhileNodeIsDown", 
   INITIALIZER(runLoadTable);
   STEP(runScanReadUntilStoppedPrintTime);
   STEP(runStopAndStartNode);
-  FINALIZER(runClearTable);
-}
-TESTCASE("ScanRestart", 
-	 "Verify restart functionallity"){
-  INITIALIZER(runLoadTable);
-  STEP(runScanRestart);
   FINALIZER(runClearTable);
 }
 TESTCASE("ScanParallelism", 
Thread
bk commit into 6.0 tree (sergefp:1.2798) WL#2771Sergey Petrunia2 Feb