List:Commits« Previous MessageNext Message »
From:tomas Date:October 30 2007 3:09pm
Subject:bk commit into 5.1 tree (tomas:1.2687)
View as plain text  
Below is the list of changes that have just been committed into a local
5.1 repository of tomas. When tomas does a push these changes will
be propagated to the main repository and, within 24 hours after the
push, to the public repository.
For information on how to access the public repository
see http://dev.mysql.com/doc/mysql/en/installing-source-tree.html

ChangeSet@stripped, 2007-10-30 16:09:42+01:00, tomas@stripped +2 -0
  wl#3686 - remove read before update/delete
  - handler part (disabled)

  sql/ha_ndbcluster.cc@stripped, 2007-10-30 16:09:39+01:00, tomas@stripped +395 -118
    wl#3686 - remove read before update/delete
    - handler part (disabled)

  sql/ha_ndbcluster.h@stripped, 2007-10-30 16:09:39+01:00, tomas@stripped +22 -2
    wl#3686 - remove read before update/delete
    - handler part (disabled)

diff -Nrup a/sql/ha_ndbcluster.cc b/sql/ha_ndbcluster.cc
--- a/sql/ha_ndbcluster.cc	2007-10-29 15:07:05 +01:00
+++ b/sql/ha_ndbcluster.cc	2007-10-30 16:09:39 +01:00
@@ -361,8 +361,11 @@ ha_ndbcluster::write_conflict_row(NdbTra
 #endif
 
 inline int
-check_completed_operations_pre_commit(Thd_ndb *thd_ndb, NdbTransaction *trans, const NdbOperation *first)
+check_completed_operations_pre_commit(Thd_ndb *thd_ndb, NdbTransaction *trans,
+                                      const NdbOperation *first,
+                                      uint *ignore_count)
 {
+  uint ignores= 0;
   DBUG_ENTER("check_completed_operations_pre_commit");
   /*
     Check that all errors are "accepted" errors
@@ -447,8 +450,12 @@ check_completed_operations_pre_commit(Th
 #endif
         DBUG_RETURN(err.code);
     }
+    if (err.classification != NdbError::NoError)
+      ignores++;
     first= trans->getNextCompletedOperation(first);
   }
+  if (ignore_count)
+    *ignore_count= ignores;
 #ifdef HAVE_NDB_BINLOG
   if (conflict_rows_written)
   {
@@ -465,8 +472,11 @@ check_completed_operations_pre_commit(Th
 }
 
 inline int
-check_completed_operations(Thd_ndb *thd_ndb, NdbTransaction *trans, const NdbOperation *first)
+check_completed_operations(Thd_ndb *thd_ndb, NdbTransaction *trans,
+                           const NdbOperation *first,
+                           uint *ignore_count)
 {
+  uint ignores= 0;
   DBUG_ENTER("check_completed_operations");
   /*
     Check that all errors are "accepted" errors
@@ -483,8 +493,12 @@ check_completed_operations(Thd_ndb *thd_
 #endif
       DBUG_RETURN(err.code);
     }
+    if (err.classification != NdbError::NoError)
+      ignores++;
     first= trans->getNextCompletedOperation(first);
   }
+  if (ignore_count)
+    *ignore_count= ignores;
   DBUG_RETURN(0);
 }
 
@@ -513,9 +527,13 @@ ha_ndbcluster::release_completed_operati
   trans->releaseCompletedOperations();
 }
 
+int execute_no_commit(Thd_ndb *thd_ndb, NdbTransaction *trans,
+                      bool force_release, bool ignore_no_key,
+                      uint *ignore_count= 0);
 inline
 int execute_no_commit(Thd_ndb *thd_ndb, NdbTransaction *trans,
-		      bool force_release, bool ignore_no_key)
+                      bool force_release, bool ignore_no_key,
+                      uint *ignore_count)
 {
   DBUG_ENTER("execute_no_commit");
   ha_ndbcluster::release_completed_operations(thd_ndb, trans, force_release);
@@ -532,12 +550,15 @@ int execute_no_commit(Thd_ndb *thd_ndb, 
   if (!ignore_no_key || trans->getNdbError().code == 0)
     DBUG_RETURN(trans->getNdbError().code);
 
-  DBUG_RETURN(check_completed_operations_pre_commit(thd_ndb, trans, first));
+  DBUG_RETURN(check_completed_operations_pre_commit(thd_ndb, trans, first,
+                                                    ignore_count));
 }
 
+int execute_commit(Thd_ndb *thd_ndb, NdbTransaction *trans,
+                   int force_send, int ignore_error, uint *ignore_count= 0);
 inline
 int execute_commit(Thd_ndb *thd_ndb, NdbTransaction *trans,
-                   int force_send, int ignore_error)
+                   int force_send, int ignore_error, uint *ignore_count)
 {
   DBUG_ENTER("execute_commit");
   const NdbOperation *first= trans->getFirstDefinedOperation();
@@ -560,7 +581,7 @@ int execute_commit(Thd_ndb *thd_ndb, Ndb
   thd_ndb->m_unsent_bytes= 0;
   if (!ignore_error || trans->getNdbError().code == 0)
     DBUG_RETURN(trans->getNdbError().code);
-  DBUG_RETURN(check_completed_operations(thd_ndb, trans, first));
+  DBUG_RETURN(check_completed_operations(thd_ndb, trans, first, ignore_count));
 }
 
 inline
@@ -1497,28 +1518,14 @@ int ha_ndbcluster::get_metadata(const ch
   DBUG_PRINT("info", ("fetched table %s", tab->getName()));
   m_table= tab;
 
-  if (bitmap_init(&m_bitmap, m_bitmap_buf, table_share->fields, 0) ||
-      bitmap_init(&m_pk_bitmap, m_pk_bitmap_buf, table_share->fields, 0))
+  if (bitmap_init(&m_bitmap, m_bitmap_buf, table_share->fields, 0))
   {
     error= HA_ERR_OUT_OF_MEM;
     goto err;
   }
-  if (table_share->primary_key != MAX_KEY)
-  {
-    KEY *pk_info= table->key_info + table_share->primary_key;
-    uint i;
-    for (i= 0; i < pk_info->key_parts; i++)
-    {
-      KEY_PART_INFO *kp= &pk_info->key_part[i];
-      bitmap_set_bit(&m_pk_bitmap, kp->fieldnr - 1);
-    }
-  }
-  else
+  if (table_share->primary_key == MAX_KEY)
   {
     /* Hidden primary key. */
-    uint field_no= table_share->fields;
-    ((uchar *)m_pk_bitmap_buf)[field_no>>3]|= (1 << (field_no & 7));
-
     if ((error= add_hidden_pk_ndb_record(dict)) != 0)
       goto err;
   }
@@ -2481,17 +2488,7 @@ int ha_ndbcluster::complemented_read(con
 
   const NdbRecord *key_rec;
   const uchar *key_row;
-  if (table_share->primary_key != MAX_KEY)
-  {
-    key_rec= m_index[table->s->primary_key].ndb_unique_record_row;
-    key_row= old_data;
-  }
-  else
-  {
-    /* Hidden primary key, previously read into m_ref. */
-    key_rec= m_ndb_hidden_key_record;
-    key_row= (const uchar*)(&m_ref);
-  }
+  setup_key_ref_for_ndb_record(&key_rec, &key_row, old_data, FALSE);
 
   /*
     Use mask only with columns that are not in write_set, not in
@@ -2674,11 +2671,12 @@ int ha_ndbcluster::peek_indexed_rows(con
   KEY* key_info;
   for (i= 0, key_info= table->key_info; i < table->s->keys; i++, key_info++)
   {
-    if (i != table->s->primary_key &&
-        key_info->flags & HA_NOSAME)
+    if (i != table_share->primary_key &&
+        key_info->flags & HA_NOSAME &&
+        bitmap_is_overlapping(table->write_set, m_key_fields[i]))
     {
       /*
-        A unique index is defined on table.
+        A unique index is defined on table and it's being updated
         We cannot look up a NULL field value in a unique index. But since
         keys with NULLs are not indexed, such rows cannot conflict anyway, so
         we just skip the index in this case.
@@ -4016,12 +4014,18 @@ int ha_ndbcluster::exec_bulk_update(uint
 {
   DBUG_ENTER("ha_ndbcluster::exec_bulk_update");
   *dup_key_found= 0;
-  if (m_thd_ndb->m_unsent_bytes &&
-      execute_no_commit(m_thd_ndb, m_thd_ndb->trans,
-                        FALSE, m_ignore_no_key) != 0)
+  if (m_thd_ndb->m_unsent_bytes)
   {
-    no_uncommitted_rows_execute_failure();
-    DBUG_RETURN(ndb_err(m_thd_ndb->trans));
+    uint ignore_count= 0;
+    if (execute_no_commit(m_thd_ndb, m_thd_ndb->trans, FALSE,
+                          m_ignore_no_key || m_read_before_write_removal_used,
+                          &ignore_count) != 0)
+    {
+      no_uncommitted_rows_execute_failure();
+      DBUG_RETURN(ndb_err(m_thd_ndb->trans));
+    }
+    m_rows_changed-= ignore_count;
+    m_rows_updated-= ignore_count;
   }
   DBUG_RETURN(0);
 }
@@ -4037,6 +4041,43 @@ int ha_ndbcluster::update_row(const ucha
   return ndb_update_row(old_data, new_data, 0);
 }
 
+uint
+ha_ndbcluster::setup_key_ref_for_ndb_record(const NdbRecord **key_rec,
+                                            const uchar **key_row,
+                                            const uchar *record,
+                                            bool use_active_index)
+{
+  DBUG_ENTER("setup_key_ref_for_ndb_record");
+  if (table_share->primary_key != MAX_KEY)
+  {
+    if (use_active_index)
+    {
+      /*
+        Using unique key and getting read before write removal
+        optimisation working. Use key_rec according to this
+        unique index instead of primary key index
+      */
+      *key_rec= m_index[active_index].ndb_unique_record_row;
+    }
+    else
+      *key_rec= m_index[table_share->primary_key].ndb_unique_record_row;
+    *key_row= record;
+    DBUG_RETURN(table->s->reclength);
+  }
+  else
+  {
+    /* Use hidden primary key previously read into m_ref. */
+    *key_rec= m_ndb_hidden_key_record;
+    *key_row= (const uchar *)(&m_ref);
+    DBUG_RETURN(sizeof(m_ref));
+  }
+}
+
+
+/*
+  Update one record in NDB using primary key
+*/
+
 int ha_ndbcluster::ndb_update_row(const uchar *old_data, uchar *new_data,
                                   int is_bulk_update)
 {
@@ -4049,8 +4090,9 @@ int ha_ndbcluster::ndb_update_row(const 
   int error;
   longlong func_value;
   bool have_pk= (table_share->primary_key != MAX_KEY);
-  bool pk_update= (have_pk &&
-                   bitmap_is_overlapping(table->write_set, &m_pk_bitmap) &&
+  bool pk_update= (!m_read_before_write_removal_possible &&
+                   have_pk &&
+                   bitmap_is_overlapping(table->write_set, m_pk_bitmap_p) &&
                    primary_key_cmp(old_data, new_data));
   bool batch_allowed= is_bulk_update || (thd->options & OPTION_ALLOW_BATCH);
   DBUG_ENTER("ndb_update_row");
@@ -4151,7 +4193,7 @@ int ha_ndbcluster::ndb_update_row(const 
     real changes.
   */
   bitmap_copy(&m_bitmap, table->write_set);
-  bitmap_subtract(&m_bitmap, &m_pk_bitmap);
+  bitmap_subtract(&m_bitmap, m_pk_bitmap_p);
   uchar *mask= (uchar *)(m_bitmap.bitmap);
   /* Need to initialize bits for any extra hidden columns. */
   if (!have_pk || m_user_defined_partitioning)
@@ -4216,20 +4258,9 @@ int ha_ndbcluster::ndb_update_row(const 
   {  
     const NdbRecord *key_rec;
     const uchar *key_row;
-    uint key_len;
-    if (have_pk)
-    {
-      key_rec= m_index[table_share->primary_key].ndb_unique_record_row;
-      key_row= row;
-      key_len= table->s->reclength;
-    }
-    else
-    {
-      /* Use hidden primary key previously read into m_ref. */
-      key_rec= m_ndb_hidden_key_record;
-      key_row= (const uchar *)(&m_ref);
-      key_len= sizeof(m_ref);
-    }
+    uint key_len=
+      setup_key_ref_for_ndb_record(&key_rec, &key_row, row,
+                                   m_read_before_write_removal_used);
 
     if (!need_execute)
     {
@@ -4300,15 +4331,17 @@ int ha_ndbcluster::ndb_update_row(const 
 
   eventSetAnyValue(thd, op);
 
-  m_rows_changed++;
-
+  uint ignore_count= 0;
   if (need_execute &&
-      execute_no_commit(m_thd_ndb, trans, FALSE, m_ignore_no_key) != 0)
+      execute_no_commit(m_thd_ndb, trans, FALSE,
+                        m_ignore_no_key || m_read_before_write_removal_used,
+                        &ignore_count) != 0)
   {
     no_uncommitted_rows_execute_failure();
     DBUG_RETURN(ndb_err(trans));
   }
-  
+  m_rows_changed+= 1 - ignore_count;
+  m_rows_updated+= 1 - ignore_count;
   DBUG_RETURN(0);
 }
 
@@ -4337,12 +4370,17 @@ int ha_ndbcluster::bulk_delete_row(const
 int ha_ndbcluster::end_bulk_delete()
 {
   DBUG_ENTER("end_bulk_delete");
-  if (m_thd_ndb->m_unsent_bytes &&
-      execute_no_commit(m_thd_ndb, m_thd_ndb->trans,
-                        FALSE, m_ignore_no_key) != 0)
+  if (m_thd_ndb->m_unsent_bytes)
   {
-    no_uncommitted_rows_execute_failure();
-    DBUG_RETURN(ndb_err(m_thd_ndb->trans));
+    uint ignore_count= 0;
+    if (execute_no_commit(m_thd_ndb, m_thd_ndb->trans, FALSE,
+                          m_ignore_no_key || m_read_before_write_removal_used,
+                          &ignore_count) != 0)
+    {
+      no_uncommitted_rows_execute_failure();
+      DBUG_RETURN(ndb_err(m_thd_ndb->trans));
+    }
+    m_rows_deleted-= ignore_count;
   }
   DBUG_RETURN(0);
 }
@@ -4400,29 +4438,23 @@ int ha_ndbcluster::ndb_delete_row(const 
     eventSetAnyValue(thd, op);
 
     if (!(primary_key_update || m_delete_cannot_batch))
+    {
       // If deleting from cursor, NoCommit will be handled in next_result
+      m_rows_deleted++;
       DBUG_RETURN(0);
+    }
   }
   else
   {
     const NdbRecord *key_rec;
     const uchar *key_row;
-    uint key_len;
-    if (table_share->primary_key != MAX_KEY)
-    {
-      key_rec= m_index[table_share->primary_key].ndb_unique_record_row;
-      key_row= record;
-      key_len= table->s->reclength;
-    }
-    else
-    {
-      key_rec= m_ndb_hidden_key_record;
-      key_row= (const uchar *)(&m_ref);
-      key_len= sizeof(m_ref);
-    }
+    uint key_len=
+      setup_key_ref_for_ndb_record(&key_rec, &key_row, record,
+                                   m_read_before_write_removal_used);
     /*
       Check if we can batch the delete; if so we need to buffer the key.
 
+      We don't batch deletes as part of primary key updates.
       We do not batch deletes on tables with no primary key. For such tables,
       replication uses full table scan to locate the row to delete. The
       problem is the following scenario when deleting 2 (or more) rows:
@@ -4441,7 +4473,8 @@ int ha_ndbcluster::ndb_delete_row(const 
     */
     bool need_execute;
     if (allow_batch &&
-        table_share->primary_key != MAX_KEY)
+        table_share->primary_key != MAX_KEY &&
+        !primary_key_update)
     {
       /*
         Poor approx. let delete ~ tabsize / 4
@@ -4466,15 +4499,23 @@ int ha_ndbcluster::ndb_delete_row(const 
     eventSetAnyValue(thd, op);
 
     if (!need_execute)
+    {
+      m_rows_deleted++;
       DBUG_RETURN(0);
+    }
   }
 
   // Execute delete operation
-  if (execute_no_commit(m_thd_ndb, trans, FALSE, m_ignore_no_key) != 0)
+  uint ignore_count= 0;
+  if (execute_no_commit(m_thd_ndb, trans, FALSE,
+                        m_ignore_no_key || m_read_before_write_removal_used,
+                        &ignore_count) != 0)
   {
     no_uncommitted_rows_execute_failure();
     DBUG_RETURN(ndb_err(trans));
   }
+  if (!primary_key_update)
+    m_rows_deleted+= 1 - ignore_count;
   DBUG_RETURN(0);
 }
   
@@ -4652,6 +4693,11 @@ int ha_ndbcluster::index_init(uint index
     and no sub-sequent call to unlock_row()
   */
   m_lock_tuple= FALSE;
+  if (table_share->primary_key == MAX_KEY &&
+      m_use_partition_pruning)
+    include_partition_fields_in_used_fields(
+      m_part_info->full_part_field_array,
+      table->read_set);
   DBUG_RETURN(0);
 }
 
@@ -5151,6 +5197,11 @@ int ha_ndbcluster::info(uint flag)
         stats.auto_increment_value= (ulonglong)auto_increment_value64;
     }
   }
+  if (flag & HA_STATUS_WRITTEN_ROWS)
+  {
+    stats.rows_updated= m_rows_updated;
+    stats.rows_deleted= m_rows_deleted;
+  }
 
   if(result == -1)
     result= HA_ERR_NO_CONNECTION;
@@ -5225,6 +5276,52 @@ int ha_ndbcluster::extra(enum ha_extra_f
 }
 
 
+bool ha_ndbcluster::read_before_write_removal_possible(List<Item> *fields,
+                                                       List<Item> *values)
+{
+  THD *thd= table->in_use;
+  DBUG_ENTER("read_before_write_removal_possible");
+  /*
+    We need to verify a large number of things before accepting to remove
+    the read before the update. We cannot avoid read before when primary
+    key is updated, when a unique key is updated, when a BLOB is updated,
+    for deletes on tables with BLOB's it is also not possible to avoid
+    the read before the update and finally it is necessary that the
+    update expressions only contain constant expressions.
+  */
+  if (uses_blob_value(table->write_set) ||
+      (thd->lex->sql_command == SQLCOM_DELETE &&
+       table_share->blob_fields) ||
+      (values && !check_constant_expressions(values)) ||
+      (table_share->primary_key != MAX_KEY &&
+       bitmap_is_overlapping(table->write_set, m_pk_bitmap_p)))
+  {
+    DBUG_RETURN(FALSE);
+  }
+  if (m_has_unique_index)
+  {
+    KEY *key;
+    uint i;
+    for (i= 0; i < table_share->keys; i++)
+    {
+      key= table->key_info + i;
+      if ((key->flags & HA_NOSAME) &&
+          bitmap_is_overlapping(table->write_set,
+                                m_key_fields[key - table->key_info]))
+      {
+        DBUG_RETURN(FALSE);
+      }
+    }
+  }
+  /* disable read_before_write_removal until commit ack marker patch is in */
+  DBUG_RETURN(FALSE);
+
+  DBUG_PRINT("info", ("read_before_write_removal_possible TRUE"));
+  m_read_before_write_removal_possible= TRUE;
+  DBUG_RETURN(TRUE);
+}
+
+
 int ha_ndbcluster::reset()
 {
   DBUG_ENTER("ha_ndbcluster::reset");
@@ -5242,6 +5339,9 @@ int ha_ndbcluster::reset()
     bitmap_set_all(&m_part_info->used_partitions);
 
   /* reset flags set by extra calls */
+  m_read_before_write_removal_possible= FALSE;
+  m_read_before_write_removal_used= FALSE;
+  m_rows_updated= m_rows_deleted= 0;
   m_ignore_dup_key= FALSE;
   m_use_write= FALSE;
   m_ignore_no_key= FALSE;
@@ -5533,7 +5633,10 @@ int ha_ndbcluster::start_statement(THD *
     if ((!trans) &&
         (thd->options & (OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN)))
       trans_register_ha(thd, TRUE, ndbcluster_hton);
+    thd_ndb->m_handler= this;
   }
+  else
+    thd_ndb->m_handler= NULL;
   if (!trans)
   {
     DBUG_PRINT("trans",("Possibly starting transaction"));
@@ -5867,7 +5970,7 @@ ha_ndbcluster::start_transaction_part_id
   Commit a transaction started in NDB
  */
 
-static int ndbcluster_commit(handlerton *hton, THD *thd, bool all)
+int ndbcluster_commit(handlerton *hton, THD *thd, bool all)
 {
   int res= 0;
   Thd_ndb *thd_ndb= get_thd_ndb(thd);
@@ -5913,7 +6016,20 @@ static int ndbcluster_commit(handlerton 
   }
   else
   {
-    res= execute_commit(thd_ndb, trans, thd->variables.ndb_force_send, FALSE);
+    if (thd_ndb->m_handler &&
+        thd_ndb->m_handler->m_read_before_write_removal_possible)
+    {
+      uint ignore_count= 0;
+      res= execute_commit(thd_ndb, trans, thd->variables.ndb_force_send,
+                          TRUE, &ignore_count);
+      if (!res && ignore_count)
+        if (thd->lex->sql_command == SQLCOM_DELETE)
+          thd_ndb->m_handler->m_rows_deleted-= ignore_count;
+        else
+          thd_ndb->m_handler->m_rows_updated-= ignore_count;
+    }
+    else
+      res= execute_commit(thd_ndb, trans, thd->variables.ndb_force_send, FALSE);
   }
 
   if (res != 0)
@@ -7571,6 +7687,10 @@ ha_ndbcluster::ha_ndbcluster(handlerton 
   m_ignore_dup_key(FALSE),
   m_has_unique_index(FALSE),
   m_ignore_no_key(FALSE),
+  m_read_before_write_removal_possible(FALSE),
+  m_read_before_write_removal_used(FALSE),
+  m_rows_updated(0),
+  m_rows_deleted(0),
   m_rows_to_insert((ha_rows) 1),
   m_rows_inserted((ha_rows) 0),
   m_rows_changed((ha_rows) 0),
@@ -7585,6 +7705,7 @@ ha_ndbcluster::ha_ndbcluster(handlerton 
   m_row_buffer(0),
   m_row_buffer_size(0),
   m_extra_reclength(0),
+  m_key_fields(NULL),
   m_ndb_record(0),
   m_ndb_record_fragment(0),
   m_ndb_hidden_key_record(0),
@@ -7668,20 +7789,29 @@ ha_ndbcluster::~ha_ndbcluster() 
 void
 ha_ndbcluster::column_bitmaps_signal(uint sig_type)
 {
-  DBUG_ENTER("ha_ndbcluster::column_bitmaps_signal");
-  /*
-    We need to make sure we always read all of the primary key.
-    Otherwise we cannot support position() and rnd_pos().
-  */
-  bitmap_union(table->read_set, &m_pk_bitmap);
+  THD *thd= table->in_use;
+  bool write_query= (thd->lex->sql_command == SQLCOM_UPDATE ||
+                     thd->lex->sql_command == SQLCOM_DELETE);
+  DBUG_ENTER("column_bitmaps_signal");
+  DBUG_PRINT("enter", ("read_set: 0x%lx  write_set: 0x%lx",
+             (long) table->read_set->bitmap[0],
+             (long) table->write_set->bitmap[0]));
+  if (sig_type & HA_COMPLETE_TABLE_READ_BITMAP)
+    bitmap_copy(&m_save_read_set, table->read_set);
+  if (!write_query || (sig_type & HA_COMPLETE_TABLE_READ_BITMAP))
+  {
+    /*
+      We need to make sure we always read all of the primary key.
+      Otherwise we cannot support position() and rnd_pos().
+  
+      Alternatively, we could just set a flag, and in the reader methods set
+      the extra bits as required if the flag is set, followed by clearing the
+      flag.  This to save doing the work of setting bits twice or more.
+      On the other hand this is quite fast in itself.
+    */
+    bitmap_union(table->read_set, m_pk_bitmap_p);
+  }
   DBUG_VOID_RETURN;
-
-  /*
-    Alternatively, we could just set a flag, and in the reader methods set the
-    extra bits as required if the flag is set, followed by clearing the flag.
-    This to save doing the work of setting bits twice or more.
-    On the other hand this is quite fast in itself.
-  */
 }
 
 /*
@@ -7698,6 +7828,8 @@ int ha_ndbcluster::open(const char *name
 {
   int res;
   KEY *key;
+  KEY_PART_INFO *key_part_info;
+  uint key_parts, i, j;
   DBUG_ENTER("ha_ndbcluster::open");
   DBUG_PRINT("enter", ("name: %s  mode: %d  test_if_locked: %d",
                        name, mode, test_if_locked));
@@ -7707,6 +7839,10 @@ int ha_ndbcluster::open(const char *name
     primary key to be written in the ref variable
   */
   
+  if (bitmap_init(&m_save_read_set, NULL, table_share->fields, FALSE))
+  {
+    DBUG_RETURN(1);
+  }
   if (table_share->primary_key != MAX_KEY) 
   {
     key= table->key_info+table_share->primary_key;
@@ -7721,11 +7857,62 @@ int ha_ndbcluster::open(const char *name
   }
 
   DBUG_PRINT("info", ("ref_length: %d", ref_length));
+  {
+    char* bitmap_array;
+    uint extra_hidden_keys= table_share->primary_key != MAX_KEY ? 0 : 1;
+    uint n_keys= table_share->keys + extra_hidden_keys;
+    uint ptr_size= sizeof(MY_BITMAP*) * (n_keys + 1 /* null termination */);
+    uint map_size= sizeof(MY_BITMAP) * n_keys;
+    m_key_fields= (MY_BITMAP**)my_malloc(ptr_size + map_size,
+                                         MYF(MY_WME + MY_ZEROFILL));
+    if (!m_key_fields)
+    {
+      local_close(NULL, FALSE);
+      DBUG_RETURN(1);
+    } 
+    bitmap_array= ((char*)m_key_fields) + ptr_size;
+    for (i= 0; i < n_keys; i++)
+    {
+      my_bitmap_map *bitbuf= NULL;
+      bool is_hidden_key= (i == table_share->keys);
+      m_key_fields[i]= (MY_BITMAP*)bitmap_array;
+      if (is_hidden_key || (i == table_share->primary_key))
+      {
+        m_pk_bitmap_p= m_key_fields[i];
+        bitbuf= m_pk_bitmap_buf;
+      }
+      if (bitmap_init(m_key_fields[i], bitbuf,
+                      table_share->fields, FALSE))
+      {
+        m_key_fields[i]= NULL;
+        local_close(NULL, FALSE);
+        DBUG_RETURN(1);
+      }
+      if (!is_hidden_key)
+      {
+        key= table->key_info + i;
+        key_part_info= key->key_part;
+        key_parts= key->key_parts;
+        for (j= 0; j < key_parts; j++, key_part_info++)
+          bitmap_set_bit(m_key_fields[i], key_part_info->fieldnr-1);
+      }
+      else
+      {
+        uint field_no= table_share->fields;
+        ((uchar *)m_pk_bitmap_buf)[field_no>>3]|= (1 << (field_no & 7));
+      }
+      bitmap_array+= sizeof(MY_BITMAP);
+    }
+    m_key_fields[i]= NULL;
+  }
 
   // Init table lock structure 
   /* ndb_share reference handler */
   if (!(m_share=get_share(name, table)))
+  {
+    local_close(NULL, FALSE);
     DBUG_RETURN(1);
+  }
   DBUG_PRINT("NDB_SHARE", ("%s handler  use_count: %u",
                            m_share->key, m_share->use_count));
   thr_lock_data_init(&m_share->lock,&m_lock,(void*) 0);
@@ -7736,11 +7923,7 @@ int ha_ndbcluster::open(const char *name
   if ((res= check_ndb_connection()) ||
       (res= get_metadata(name)))
   {
-    /* ndb_share reference handler free */
-    DBUG_PRINT("NDB_SHARE", ("%s handler free  use_count: %u",
-                             m_share->key, m_share->use_count));
-    free_share(&m_share);
-    m_share= 0;
+    local_close(NULL, FALSE);
     DBUG_RETURN(res);
   }
   while (1)
@@ -7764,9 +7947,7 @@ int ha_ndbcluster::open(const char *name
   }
   if (res)
   {
-    free_share(&m_share);
-    m_share= 0;
-    release_metadata(current_thd, get_ndb());
+    local_close(current_thd, TRUE);
     DBUG_RETURN(res);
   }
 #ifdef HAVE_NDB_BINLOG
@@ -7835,17 +8016,43 @@ void ha_ndbcluster::set_part_info(partit
   - release resources setup by open()
  */
 
+void ha_ndbcluster::local_close(THD *thd, bool release_metadata_flag)
+{
+  Ndb *ndb;
+  DBUG_ENTER("ha_ndbcluster::local_close");
+  if (m_key_fields)
+  {
+    MY_BITMAP **inx_bitmap;
+    for (inx_bitmap= m_key_fields;
+         (inx_bitmap != NULL) && ((*inx_bitmap) != NULL);
+         inx_bitmap++)
+      if ((*inx_bitmap)->bitmap != m_pk_bitmap_buf)
+        bitmap_free(*inx_bitmap);
+    my_free((char*)m_key_fields, MYF(0));
+    m_key_fields= NULL;
+  }
+  bitmap_free(&m_save_read_set);
+  if (m_share)
+  {
+    /* ndb_share reference handler free */
+    DBUG_PRINT("NDB_SHARE", ("%s handler free  use_count: %u",
+                             m_share->key, m_share->use_count));
+    free_share(&m_share);
+  }
+  m_share= 0;
+  if (release_metadata_flag)
+  {
+    ndb= thd ? check_ndb_in_thd(thd) : g_ndb;
+    release_metadata(thd, ndb);
+  }
+  DBUG_VOID_RETURN;
+}
+
 int ha_ndbcluster::close(void)
 {
   DBUG_ENTER("close");
   THD *thd= table->in_use;
-  Ndb *ndb= thd ? check_ndb_in_thd(thd) : g_ndb;
-  /* ndb_share reference handler free */
-  DBUG_PRINT("NDB_SHARE", ("%s handler free  use_count: %u",
-                           m_share->key, m_share->use_count));
-  free_share(&m_share);
-  m_share= 0;
-  release_metadata(thd, ndb);
+  local_close(thd, TRUE);
   DBUG_RETURN(0);
 }
 
@@ -10314,6 +10521,46 @@ ha_ndbcluster::null_value_index_search(K
   DBUG_RETURN(FALSE);
 }
 
+void ha_ndbcluster::check_read_before_write_removal()
+{
+  bool use_removal= TRUE;
+  DBUG_ENTER("check_read_before_write_removal");
+  DBUG_ASSERT(m_read_before_write_removal_possible);
+  /*
+    We are doing an update or delete and it is possible that we
+    can ignore the read before the update or delete. This is
+    possible here since we are not updating the primary key and
+    if the index used is unique or primary and if the WHERE clause
+    only involves fields from this index we are ok to go. At this
+    moment we can only updates where all SET expressions are
+    constants. Thus no read set will come from SET expressions.
+  */
+  if (table_share->primary_key == active_index)
+  {
+    if (!bitmap_cmp(&m_save_read_set, m_pk_bitmap_p))
+      use_removal= FALSE;
+  }
+  else
+  {
+    KEY *key= table->key_info + active_index;
+    if (!(key->flags & HA_NOSAME))
+    {
+      /* Optimisation not applicable on non-unique indexes */
+      use_removal= FALSE;
+    }
+    else if (!bitmap_cmp(&m_save_read_set,
+                         m_key_fields[active_index]))
+    {
+      use_removal= FALSE;
+    }
+  }
+  m_read_before_write_removal_used= use_removal;
+  DBUG_PRINT("info", ("m_read_before_write_removal_used: %d",
+                      m_read_before_write_removal_used));
+  DBUG_VOID_RETURN;
+}
+
+
 /*
   This is used to check if an ordered index scan is needed for a range in
   a multi range read.
@@ -10355,7 +10602,8 @@ read_multi_bounds_callback(void *arg, Ui
     (struct read_multi_callback_data *)arg;
 
   /* Skip any ranges not to be included in the scan. */
-  while (data->range->range_flag & (SKIP_RANGE|UNIQUE_RANGE))
+  while (data->range->range_flag &
+         (SKIP_RANGE|UNIQUE_RANGE|READ_KEY_FROM_RANGE))
     data->range++;
 
   compute_index_bounds(bound, data->key_info,
@@ -10446,7 +10694,10 @@ ha_ndbcluster::read_multi_range_first(KE
   uint num_scan_ranges= 0;
   uint i;
   int error;
+  bool any_real_read= FALSE;
 
+  if (m_read_before_write_removal_possible)
+    check_read_before_write_removal();
   for (i= 0; i < range_count; i++)
   {
     KEY_MULTI_RANGE *r= &ranges[i];
@@ -10497,6 +10748,7 @@ ha_ndbcluster::read_multi_range_first(KE
 
     if (read_multi_needs_scan(cur_index_type, key_info, r))
     {
+      any_real_read= TRUE;
       /*
         If we reach the limit of ranges allowed in a single scan: stop
         here, send what we have so far, and continue when done with that.
@@ -10519,6 +10771,14 @@ ha_ndbcluster::read_multi_range_first(KE
       if (row_buf + reclength > end_of_buffer)
         break;
 
+      if (m_read_before_write_removal_used)
+      {
+        r->range_flag|= READ_KEY_FROM_RANGE;
+        continue;
+      }
+      else
+        any_real_read= TRUE;
+
       r->range_flag|= UNIQUE_RANGE;
 
       if (!(op= pk_unique_index_read_key(active_index,
@@ -10590,12 +10850,15 @@ ha_ndbcluster::read_multi_range_first(KE
   /**
    * Set first operation in multi range
    */
-  m_current_multi_operation= 
-    lastOp ? lastOp->next() : trans->getFirstDefinedOperation();
-  if (execute_no_commit_ie(m_thd_ndb, trans, TRUE))
-    ERR_RETURN(trans->getNdbError());
+  if (any_real_read)
+  {
+    m_current_multi_operation= 
+      lastOp ? lastOp->next() : trans->getFirstDefinedOperation();
+    if (execute_no_commit_ie(m_thd_ndb, trans, TRUE))
+      ERR_RETURN(trans->getNdbError());
 
-  m_multi_range_result_ptr= buffer->buffer;
+    m_multi_range_result_ptr= buffer->buffer;
+  }
 
   DBUG_RETURN(read_multi_range_next(found_range_p));
 }
@@ -10616,6 +10879,16 @@ ha_ndbcluster::read_multi_range_next(KEY
       /* Nothing in this range, move to next one. */
       multi_range_curr++;
     }
+    else if (multi_range_curr->range_flag & READ_KEY_FROM_RANGE)
+    {
+      DBUG_PRINT("info", ("using read before write removal optimisation"));
+      KEY* key_info= table->key_info + active_index;
+      key_restore(table->record[0], (uchar*)multi_range_curr->start_key.key,
+                  key_info, key_info->key_length);
+      table->status= 0;
+      multi_range_curr++;
+      DBUG_RETURN(0);
+    }
     else if (multi_range_curr->range_flag & UNIQUE_RANGE)
     {
       /*
@@ -10783,6 +11056,10 @@ ha_ndbcluster::read_multi_range_fetch_ne
       return res;
     }
   }
+  /*
+    potentially not needed call to this function
+  */
+  DBUG_ASSERT(!table->in_use->slave_thread || (m_ignore_no_key == TRUE));
   return 0;
 }
 
diff -Nrup a/sql/ha_ndbcluster.h b/sql/ha_ndbcluster.h
--- a/sql/ha_ndbcluster.h	2007-10-29 15:07:06 +01:00
+++ b/sql/ha_ndbcluster.h	2007-10-30 16:09:39 +01:00
@@ -278,6 +278,8 @@ class Thd_ndb 
 
   Ndb_cluster_connection *connection;
   Ndb *ndb;
+  /* this */
+  ha_ndbcluster *m_handler;
   ulong count;
   uint lock_count;
   uint start_stmt_count;
@@ -310,6 +312,7 @@ class Thd_ndb 
   uint m_conflict_fn_usage_count;
 };
 
+int ndbcluster_commit(handlerton *hton, THD *thd, bool all);
 class ha_ndbcluster: public handler
 {
  public:
@@ -320,6 +323,7 @@ class ha_ndbcluster: public handler
   void column_bitmaps_signal(uint sig_type);
   int open(const char *name, int mode, uint test_if_locked);
   int close(void);
+  void local_close(THD *thd, bool release_metadata);
 
   int write_row(uchar *buf);
   int update_row(const uchar *old_data, uchar *new_data);
@@ -366,6 +370,8 @@ class ha_ndbcluster: public handler
   int info(uint);
   void get_dynamic_partition_info(PARTITION_INFO *stat_info, uint part_id);
   uint32 calculate_key_hash_value(Field **field_array);
+  bool read_before_write_removal_possible(List<Item> *fields,
+                                          List<Item> *values);
   int extra(enum ha_extra_function operation);
   int extra_opt(enum ha_extra_function operation, ulong cache_size);
   int reset();
@@ -519,11 +525,16 @@ private:
                                  uchar *new_data,
                                  NdbInterpretedCode *);
 #endif
+  uint setup_key_ref_for_ndb_record(const NdbRecord **key_rec,
+                                    const uchar **key_row,
+                                    const uchar *record,
+                                    bool use_active_index);
   friend int ndbcluster_drop_database_impl(const char *path);
   friend int ndb_handle_schema_change(THD *thd, 
                                       Ndb *ndb, NdbEventOperation *pOp,
                                       NDB_SHARE *share);
 
+  void check_read_before_write_removal();
   static int delete_table(ha_ndbcluster *h, Ndb *ndb,
 			  const char *path,
 			  const char *db,
@@ -687,7 +698,9 @@ private:
   int write_conflict_row(NdbTransaction*, const uchar* row, NdbError&);
   friend int check_completed_operations_pre_commit(Thd_ndb*,
                                                    NdbTransaction*,
-                                                   const NdbOperation*);
+                                                   const NdbOperation*,
+                                                   uint *ignore_count);
+  friend int ndbcluster_commit(handlerton *hton, THD *thd, bool all);
   int start_statement(THD *thd, Thd_ndb *thd_ndb, uint table_count);
   int init_handler_for_statement(THD *thd);
 
@@ -715,7 +728,7 @@ private:
                               8*sizeof(my_bitmap_map) - 1) /
                              (8*sizeof(my_bitmap_map))]; // Buffer for m_bitmap
   /* Bitmap with bit set for all primary key columns. */
-  MY_BITMAP m_pk_bitmap;
+  MY_BITMAP *m_pk_bitmap_p;
   my_bitmap_map m_pk_bitmap_buf[(NDB_MAX_ATTRIBUTES_IN_TABLE +
                                  8*sizeof(my_bitmap_map) - 1) /
                                 (8*sizeof(my_bitmap_map))]; // Buffer for m_pk_bitmap
@@ -750,6 +763,9 @@ private:
   char *m_row_buffer_current;
   /* Extra bytes needed in row for hidden fields. */
   uint m_extra_reclength;
+
+  MY_BITMAP **m_key_fields;
+  MY_BITMAP m_save_read_set;
   // NdbRecAttr has no reference to blob
   NdbValue m_value[NDB_MAX_ATTRIBUTES_IN_TABLE];
   Uint64 m_ref;
@@ -762,6 +778,10 @@ private:
   bool m_ignore_dup_key;
   bool m_has_unique_index;
   bool m_ignore_no_key;
+  bool m_read_before_write_removal_possible;
+  bool m_read_before_write_removal_used;
+  ha_rows m_rows_updated;
+  ha_rows m_rows_deleted;
   ha_rows m_rows_to_insert; // TODO: merge it with handler::estimation_rows_to_insert?
   ha_rows m_rows_inserted;
   ha_rows m_rows_changed;
Thread
bk commit into 5.1 tree (tomas:1.2687)tomas30 Oct