List:Commits« Previous MessageNext Message »
From:Luis Soares Date:November 5 2010 12:31am
Subject:bzr commit into mysql-next-mr branch (luis.soares:3204)
View as plain text  
#At file:///home/lsoares/Workspace/bzr/work/features/wl5597/mysql-next-mr/ based on revid:alexander.nozdrin@stripped

 3204 Luis Soares	2010-11-05
      WL 5597: Work in progress.

    modified:
      sql/log_event.cc
      sql/log_event.h
=== modified file 'sql/log_event.cc'
--- a/sql/log_event.cc	2010-10-17 23:27:40 +0000
+++ b/sql/log_event.cc	2010-11-05 00:31:22 +0000
@@ -7556,2165 +7556,2418 @@ int Rows_log_event::do_add_row_data(ucha
 #endif
 
 #if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
-int Rows_log_event::do_apply_event(Relay_log_info const *rli)
-{
-  DBUG_ENTER("Rows_log_event::do_apply_event(Relay_log_info*)");
-  int error= 0;
-  /*
-    If m_table_id == ~0UL, then we have a dummy event that does not
-    contain any data.  In that case, we just remove all tables in the
-    tables_to_lock list, close the thread tables, and return with
-    success.
-   */
-  if (m_table_id == ~0UL)
-  {
-    /*
-       This one is supposed to be set: just an extra check so that
-       nothing strange has happened.
-     */
-    DBUG_ASSERT(get_flags(STMT_END_F));
 
-    const_cast<Relay_log_info*>(rli)->slave_close_thread_tables(thd);
-    thd->clear_error();
-    DBUG_RETURN(0);
-  }
 
-  /*
-    'thd' has been set by exec_relay_log_event(), just before calling
-    do_apply_event(). We still check here to prevent future coding
-    errors.
-  */
-  DBUG_ASSERT(rli->info_thd == thd);
+/**
+  Checks if any of the columns in the given table is
+  signaled in the bitmap.
 
-  /*
-    If there is no locks taken, this is the first binrow event seen
-    after the table map events.  We should then lock all the tables
-    used in the transaction and proceed with execution of the actual
-    event.
-  */
-  if (!thd->lock)
-  {
-    /*
-      Lock_tables() reads the contents of thd->lex, so they must be
-      initialized.
+  For each column in the given table checks if it is
+  signaled in the bitmap. This is most useful when deciding
+  whether a before image (BI) can be used or not for
+  searching a row. If no column is signaled, then the
+  image cannot be used for searching a record (regardless
+  of using position(), index scan or table scan). Here is
+  an example:
 
-      We also call the mysql_reset_thd_for_next_command(), since this
-      is the logical start of the next "statement". Note that this
-      call might reset the value of current_stmt_binlog_format, so
-      we need to do any changes to that value after this function.
-    */
-    lex_start(thd);
-    mysql_reset_thd_for_next_command(thd);
-    /*
-      The current statement is just about to begin and 
-      has not yet modified anything. Note, all.modified is reset
-      by mysql_reset_thd_for_next_command.
-    */
-    thd->transaction.stmt.modified_non_trans_table= FALSE;
-    /*
-      This is a row injection, so we flag the "statement" as
-      such. Note that this code is called both when the slave does row
-      injections and when the BINLOG statement is used to do row
-      injections.
-    */
-    thd->lex->set_stmt_row_injection();
+  MASTER> SET @@binlog_row_image='MINIMAL';
+  MASTER> CREATE TABLE t1 (a int, b int, c int, primary key(c));
+  SLAVE> CREATE TABLE t1 (a int, b int);
+  MASTER> INSERT INTO t1 VALUES (1,2,3);
+  MASTER> UPDATE t1 SET a=2 WHERE b=2;
 
-    /*
-      There are a few flags that are replicated with each row event.
-      Make sure to set/clear them before executing the main body of
-      the event.
-    */
-    if (get_flags(NO_FOREIGN_KEY_CHECKS_F))
-        thd->variables.option_bits|= OPTION_NO_FOREIGN_KEY_CHECKS;
-    else
-        thd->variables.option_bits&= ~OPTION_NO_FOREIGN_KEY_CHECKS;
+  For the update statement only the PK (column c) is
+  logged in the before image (BI). As such, given that
+  the slave has no column c, it will not be able to
+  find the row, because BI has no values for the columns
+  the slave knows about (column a and b).
 
-    if (get_flags(RELAXED_UNIQUE_CHECKS_F))
-        thd->variables.option_bits|= OPTION_RELAXED_UNIQUE_CHECKS;
-    else
-        thd->variables.option_bits&= ~OPTION_RELAXED_UNIQUE_CHECKS;
-    /* A small test to verify that objects have consistent types */
-    DBUG_ASSERT(sizeof(thd->variables.option_bits) == sizeof(OPTION_RELAXED_UNIQUE_CHECKS));
+  @param table   the table reference on the slave.
+  @param cols the bitmap signaling columns available in
+                 the BI.
 
-    if (open_and_lock_tables(thd, rli->tables_to_lock, FALSE, 0))
-    {
-      uint actual_error= thd->stmt_da->sql_errno();
-      if (thd->is_slave_error || thd->is_fatal_error)
-      {
-        /*
-          Error reporting borrowed from Query_log_event with many excessive
-          simplifications. 
-          We should not honour --slave-skip-errors at this point as we are
-          having severe errors which should not be skiped.
-        */
-        rli->report(ERROR_LEVEL, actual_error,
-                    "Error executing row event: '%s'",
-                    (actual_error ? thd->stmt_da->message() :
-                     "unexpected success or fatal error"));
-        thd->is_slave_error= 1;
-      }
-      const_cast<Relay_log_info*>(rli)->slave_close_thread_tables(thd);
-      DBUG_RETURN(actual_error);
-    }
+  @return TRUE if BI contains usable colums for searching,
+          FALSE otherwise.
+*/
+static
+my_bool is_any_column_signaled_for_table(TABLE *table, MY_BITMAP *cols)
+{
 
-    /*
-      When the open and locking succeeded, we check all tables to
-      ensure that they still have the correct type.
+  int nfields_set= 0;
+  for (Field **ptr=table->field ;
+       *ptr && ((*ptr)->field_index < cols->n_bits);
+       ptr++)
+  {
+    if (bitmap_is_set(cols, (*ptr)->field_index))
+      nfields_set++;
+  }
 
-      We can use a down cast here since we know that every table added
-      to the tables_to_lock is a RPL_TABLE_LIST.
-    */
+  return (nfields_set != 0);
+}
 
-    {
-      DBUG_PRINT("debug", ("Checking compability of tables to lock - tables_to_lock: %p",
-                           rli->tables_to_lock));
-      RPL_TABLE_LIST *ptr= rli->tables_to_lock;
-      for ( ; ptr ; ptr= static_cast<RPL_TABLE_LIST*>(ptr->next_global))
-      {
-        TABLE *conv_table;
-        if (!ptr->m_tabledef.compatible_with(thd, const_cast<Relay_log_info*>(rli),
-                                             ptr->table, &conv_table))
-        {
-          DBUG_PRINT("debug", ("Table: %s.%s is not compatible with master",
-                               ptr->table->s->db.str,
-                               ptr->table->s->table_name.str));
-          /*
-            We should not honour --slave-skip-errors at this point as we are
-            having severe errors which should not be skiped.
-          */
-          thd->is_slave_error= 1;
-          const_cast<Relay_log_info*>(rli)->slave_close_thread_tables(thd);
-          DBUG_RETURN(ERR_BAD_TABLE_DEF);
-        }
-        DBUG_PRINT("debug", ("Table: %s.%s is compatible with master"
-                             " - conv_table: %p",
-                             ptr->table->s->db.str,
-                             ptr->table->s->table_name.str, conv_table));
-        ptr->m_conv_table= conv_table;
-      }
-    }
+/**
+  Checks if the fields in the given key are signaled in
+  the bitmap.
 
-    /*
-      ... and then we add all the tables to the table map and but keep
-      them in the tables to lock list.
+  Validates whether the before image is usable for the
+  given key. It can be the case that the before image
+  does not contain values for the key (eg, master was
+  using 'minimal' option for image logging and slave has
+  different index structure on the table). Here is an
+  example:
 
-      We also invalidate the query cache for all the tables, since
-      they will now be changed.
+  MASTER> SET @@binlog_row_image='MINIMAL';
+  MASTER> CREATE TABLE t1 (a int, b int, c int, primary key(c));
+  SLAVE> CREATE TABLE t1 (a int, b int, c int, key(a,c));
+  MASTER> INSERT INTO t1 VALUES (1,2,3);
+  MASTER> UPDATE t1 SET a=2 WHERE b=2;
 
-      TODO [/Matz]: Maybe the query cache should not be invalidated
-      here? It might be that a table is not changed, even though it
-      was locked for the statement.  We do know that each
-      Rows_log_event contain at least one row, so after processing one
-      Rows_log_event, we can invalidate the query cache for the
-      associated table.
-     */
-    for (TABLE_LIST *ptr= rli->tables_to_lock ; ptr ; ptr= ptr->next_global)
-    {
-      const_cast<Relay_log_info*>(rli)->m_table_map.set_table(ptr->table_id, ptr->table);
-    }
-#ifdef HAVE_QUERY_CACHE
-    query_cache.invalidate_locked_for_write(rli->tables_to_lock);
-#endif
+  When finding the row on the slave, one cannot use the
+  index (a,c) to search for the row, because there is only
+  data in the before image for column c. This function
+  checks the fields needed for a given key and searches
+  the bitmap to see if all the fields required are
+  signaled.
+
+  @param keyinfo  reference to key.
+  @param cols     the bitmap signaling which columns
+                  have available data.
+
+  @return TRUE if all fields are signaled in the bitmap
+          for the given key, FALSE otherwise.
+*/
+static
+my_bool are_all_columns_signaled_for_key(KEY *keyinfo, MY_BITMAP *cols)
+{
+  for (uint i=0 ; i < keyinfo->key_parts ;i++)
+  {
+    uint fieldnr= keyinfo->key_part[i].fieldnr - 1;
+    if (fieldnr >= cols->n_bits ||
+        !bitmap_is_set(cols, fieldnr))
+      return FALSE;
   }
 
-  TABLE* 
-    table= 
-    m_table= const_cast<Relay_log_info*>(rli)->m_table_map.get_table(m_table_id);
+  return TRUE;
+}
 
-  DBUG_PRINT("debug", ("m_table: 0x%lx, m_table_id: %lu", (ulong) m_table, m_table_id));
+/**
+  Searches the table for a given key that can be used
+  according to the existing values, ie, columns set
+  in the bitmap.
 
-  if (table)
-  {
-    bool transactional_table= table->file->has_transactions();
-    /*
-      table == NULL means that this table should not be replicated
-      (this was set up by Table_map_log_event::do_apply_event()
-      which tested replicate-* rules).
-    */
+  The caller can specify which type of key to find by
+  setting the following flags in the key_type parameter:
 
-    /*
-      It's not needed to set_time() but
-      1) it continues the property that "Time" in SHOW PROCESSLIST shows how
-      much slave is behind
-      2) it will be needed when we allow replication from a table with no
-      TIMESTAMP column to a table with one.
-      So we call set_time(), like in SBR. Presently it changes nothing.
-    */
-    thd->set_time((time_t)when);
+    - PRI_KEY_FLAG
+      Returns the primary key.
 
-    /*
-      Now we are in a statement and will stay in a statement until we
-      see a STMT_END_F.
+    - UNIQUE_KEY_FLAG
+      Returns a unique key (flagged with HA_NOSAME)
 
-      We set this flag here, before actually applying any rows, in
-      case the SQL thread is stopped and we need to detect that we're
-      inside a statement and halting abruptly might cause problems
-      when restarting.
-     */
-    const_cast<Relay_log_info*>(rli)->set_flag(Relay_log_info::IN_STMT);
+    - MULTIPLE_KEY_FLAG
+      Returns a key that is not unique (flagged with HA_NOSAME
+      and without HA_NULL_PART_KEY) nor PK.
 
-     if ( m_width == table->s->fields && bitmap_is_set_all(&m_cols))
-      set_flags(COMPLETE_ROWS_F);
+  The above flags can be used together, in which case, the
+  search is conducted in the above listed order. Eg, the
+  following flag:
 
-    /* 
-      Set tables write and read sets.
-      
-      Read_set contains all slave columns (in case we are going to fetch
-      a complete record from slave)
-      
-      Write_set equals the m_cols bitmap sent from master but it can be 
-      longer if slave has extra columns. 
-     */ 
+    (PRI_KEY_FLAG | UNIQUE_KEY_FLAG | MULTIPLE_KEY_FLAG)
 
-    DBUG_PRINT_BITSET("debug", "Setting table's write_set from: %s", &m_cols);
-    
-    bitmap_set_all(table->read_set);
-    if (get_type_code() == DELETE_ROWS_EVENT)
-        bitmap_intersect(table->read_set,&m_cols);
+  means that a primary key is returned if it is suitable. If
+  not then the unique keys are searched. If no unique key is
+  suitable, then the keys are searched. Finally, if no key
+  is suitable, MAX_KEY is returned.
 
-    bitmap_set_all(table->write_set);
-    if (!get_flags(COMPLETE_ROWS_F))
-    {
-      if (get_type_code() == UPDATE_ROWS_EVENT)
-        bitmap_intersect(table->write_set,&m_cols_ai);
-      else /* WRITE ROWS EVENTS store the bitmap in m_cols instead of m_cols_ai */
-        bitmap_intersect(table->write_set,&m_cols);
-    }
+  @param table    reference to the table.
+  @param bi_cols  a bitmap that filters out columns that should
+                  not be considered while searching the key.
+                  Columns that should be considered are set.
+  @param key_type the type of key to search for.
 
-    this->slave_exec_mode= slave_exec_mode_options; // fix the mode
+  @return MAX_KEY if no key, according to the key_type specified
+          is suitable. Returns the key otherwise.
 
-    // Do event specific preparations 
-    error= do_before_row_operations(rli);
+*/
+static
+uint
+search_key_in_table(TABLE *table, MY_BITMAP *bi_cols, uint key_type)
+{
+  KEY *keyinfo;
+  uint res= MAX_KEY;
+  uint key;
 
-    // row processing loop
+  if (key_type & PRI_KEY_FLAG && (table->s->primary_key < MAX_KEY))
+  {
+    keyinfo= table->s->key_info + (uint) table->s->primary_key;
+    if (are_all_columns_signaled_for_key(keyinfo, bi_cols))
+      return table->s->primary_key;
+  }
 
-    while (error == 0)
+  if (key_type & UNIQUE_KEY_FLAG && table->s->uniques)
+  {
+    for (key=0,keyinfo= table->key_info ;
+         (key < table->s->keys) && (res == MAX_KEY);
+         key++,keyinfo++)
     {
-      /* in_use can have been set to NULL in close_tables_for_reopen */
-      THD* old_thd= table->in_use;
-      if (!table->in_use)
-        table->in_use= thd;
-
-      error= do_exec_row(rli);
-
-      DBUG_PRINT("info", ("error: %s", HA_ERR(error)));
-      DBUG_ASSERT(error != HA_ERR_RECORD_DELETED);
-
-      table->in_use = old_thd;
-
-      if (error)
-      {
-        int actual_error= convert_handler_error(error, thd, table);
-        bool idempotent_error= (idempotent_error_code(error) &&
-                               (slave_exec_mode == SLAVE_EXEC_MODE_IDEMPOTENT));
-        bool ignored_error= (idempotent_error == 0 ?
-                             ignored_error_code(actual_error) : 0);
-
-        if (idempotent_error || ignored_error)
-        {
-          if (global_system_variables.log_warnings)
-            slave_rows_error_report(WARNING_LEVEL, error, rli, thd, table,
-                                    get_type_str(),
-                                    const_cast<Relay_log_info*>(rli)->get_rpl_log_name(),
-                                    (ulong) log_pos);
-          clear_all_errors(thd, const_cast<Relay_log_info*>(rli));
-          error= 0;
-          if (idempotent_error == 0)
-            break;
-        }
-      }
-
       /*
-       If m_curr_row_end  was not set during event execution (e.g., because
-       of errors) we can't proceed to the next row. If the error is transient
-       (i.e., error==0 at this point) we must call unpack_current_row() to set 
-       m_curr_row_end.
-      */ 
-   
-      DBUG_PRINT("info", ("curr_row: 0x%lu; curr_row_end: 0x%lu; rows_end: 0x%lu",
-                          (ulong) m_curr_row, (ulong) m_curr_row_end, (ulong) m_rows_end));
-
-      if (!m_curr_row_end && !error)
-        error= unpack_current_row(rli, &m_cols);
-  
-      // at this moment m_curr_row_end should be set
-      DBUG_ASSERT(error || m_curr_row_end != NULL); 
-      DBUG_ASSERT(error || m_curr_row <= m_curr_row_end);
-      DBUG_ASSERT(error || m_curr_row_end <= m_rows_end);
-  
-      m_curr_row= m_curr_row_end;
- 
-      if (error == 0 && !transactional_table)
-        thd->transaction.all.modified_non_trans_table=
-          thd->transaction.stmt.modified_non_trans_table= TRUE;
-
-      if (m_curr_row == m_rows_end)
-        break;
-    } // row processing loop
+        - Unique keys cannot be disabled, thence we skip the check.
+        - Skip unique keys with nullable parts
+        - Skip primary keys
+      */
+      if (!((keyinfo->flags & (HA_NOSAME | HA_NULL_PART_KEY)) != HA_NOSAME) ||
+          (key == table->s->primary_key))
+        continue;
+      res= are_all_columns_signaled_for_key(keyinfo, bi_cols) ?
+           key : MAX_KEY;
 
-    {/**
-         The following failure injecion works in cooperation with tests 
-         setting @@global.debug= 'd,stop_slave_middle_group'.
-         The sql thread receives the killed status and will proceed 
-         to shutdown trying to finish incomplete events group.
-     */
-      DBUG_EXECUTE_IF("stop_slave_middle_group",
-                      if (thd->transaction.all.modified_non_trans_table)
-                        const_cast<Relay_log_info*>(rli)->abort_slave= 1;);
+      if (res < MAX_KEY)
+        return res;
     }
+  }
 
-    if ((error= do_after_row_operations(rli, error)) &&
-        ignored_error_code(convert_handler_error(error, thd, table)))
+  if (key_type & MULTIPLE_KEY_FLAG && table->s->keys)
+  {
+    for (key=0,keyinfo= table->key_info ;
+         (key < table->s->keys) && (res == MAX_KEY);
+         key++,keyinfo++)
     {
+      /*
+        - Skip innactive keys
+        - Skip unique keys without nullable parts
+        - Skip primary keys
+      */
+      if (!(table->s->keys_in_use.is_set(key)) ||
+          ((keyinfo->flags & (HA_NOSAME | HA_NULL_PART_KEY)) == HA_NOSAME) ||
+          (key == table->s->primary_key))
+        continue;
 
-      if (global_system_variables.log_warnings)
-        slave_rows_error_report(WARNING_LEVEL, error, rli, thd, table,
-                                get_type_str(),
-                                const_cast<Relay_log_info*>(rli)->get_rpl_log_name(),
-                                (ulong) log_pos);
-      clear_all_errors(thd, const_cast<Relay_log_info*>(rli));
-      error= 0;
-    }
-  } // if (table)
-
-  
-  if (error)
-  {
-    slave_rows_error_report(ERROR_LEVEL, error, rli, thd, table,
-                             get_type_str(),
-                             const_cast<Relay_log_info*>(rli)->get_rpl_log_name(),
-                             (ulong) log_pos);
-    /*
-      @todo We should probably not call
-      reset_current_stmt_binlog_format_row() from here.
+      res= are_all_columns_signaled_for_key(keyinfo, bi_cols) ?
+           key : MAX_KEY;
 
-      Note: this applies to log_event_old.cc too.
-      /Sven
-    */
-    thd->reset_current_stmt_binlog_format_row();
-    const_cast<Relay_log_info*>(rli)->cleanup_context(thd, error);
-    thd->is_slave_error= 1;
-    DBUG_RETURN(error);
+      if (res < MAX_KEY)
+        return res;
+    }
   }
 
-  if (get_flags(STMT_END_F))
-    if ((error= rows_event_stmt_cleanup(rli, thd)))
-      rli->report(ERROR_LEVEL, error,
-                  "Error in %s event: commit of row events failed, "
-                  "table `%s`.`%s`",
-                  get_type_str(), m_table->s->db.str,
-                  m_table->s->table_name.str);
-
-  DBUG_RETURN(error);
+  return res;
 }
 
-Log_event::enum_skip_reason
-Rows_log_event::do_shall_skip(Relay_log_info *rli)
+static uint decide_row_lookup_method(TABLE* table, MY_BITMAP *cols, uint event_type)
 {
-  /*
-    If the slave skip counter is 1 and this event does not end a
-    statement, then we should not start executing on the next event.
-    Otherwise, we defer the decision to the normal skipping logic.
-  */
-  if (rli->slave_skip_counter == 1 && !get_flags(STMT_END_F))
-    return Log_event::EVENT_SKIP_IGNORE;
+  uint res= Rows_log_event::ROW_LOOKUP_NOT_NEEDED;
+  if (event_type == WRITE_ROWS_EVENT)
+    return res;
+
+  uint key_index= search_key_in_table(table, cols, (PRI_KEY_FLAG | UNIQUE_KEY_FLAG | MULTIPLE_KEY_FLAG));
+
+  /* No index */
+  if (key_index == MAX_KEY /* TODO: || key_index > number of keys in the table */)
+    // TODO: change so that it takes into account the slave_exec_mode flag
+    //res= slave_exec_mode & TABLE_SCAN ? TABLE_SCAN : HASH_SCAN;
+    res= Rows_log_event::ROW_LOOKUP_HASH_SCAN;
+
   else
-    return Log_event::do_shall_skip(rli);
+    // TODO: change so that it takes into account the slave_exec_mode flag
+    //res= slave_exec_mode & INDEX_SEARCH ? INDEX_SEARCH : HASH_SCAN;
+    res= Rows_log_event::ROW_LOOKUP_INDEX_SCAN;
+
+  return res;
 }
 
-/**
-   The function is called at Rows_log_event statement commit time,
-   normally from Rows_log_event::do_update_pos() and possibly from
-   Query_log_event::do_apply_event() of the COMMIT.
-   The function commits the last statement for engines, binlog and
-   releases resources have been allocated for the statement.
-  
-   @retval  0         Ok.
-   @retval  non-zero  Error at the commit.
- */
+/*
+  Compares table->record[0] and table->record[1]
 
-static int rows_event_stmt_cleanup(Relay_log_info const *rli, THD * thd)
+  Returns TRUE if different.
+*/
+static bool record_compare(TABLE *table, MY_BITMAP *cols)
 {
-  int error;
-  {
-    /*
-      This is the end of a statement or transaction, so close (and
-      unlock) the tables we opened when processing the
-      Table_map_log_event starting the statement.
+  /*
+    Need to set the X bit and the filler bits in both records since
+    there are engines that do not set it correctly.
 
-      OBSERVER.  This will clear *all* mappings, not only those that
-      are open for the table. There is not good handle for on-close
-      actions for tables.
+    In addition, since MyISAM checks that one hasn't tampered with the
+    record, it is necessary to restore the old bytes into the record
+    after doing the comparison.
 
-      NOTE. Even if we have no table ('table' == 0) we still need to be
-      here, so that we increase the group relay log position. If we didn't, we
-      could have a group relay log position which lags behind "forever"
-      (assume the last master's transaction is ignored by the slave because of
-      replicate-ignore rules).
-    */
-    error= thd->binlog_flush_pending_rows_event(TRUE);
+    TODO[record format ndb]: Remove it once NDB returns correct
+    records. Check that the other engines also return correct records.
+   */
 
-    /*
-      If this event is not in a transaction, the call below will, if some
-      transactional storage engines are involved, commit the statement into
-      them and flush the pending event to binlog.
-      If this event is in a transaction, the call will do nothing, but a
-      Xid_log_event will come next which will, if some transactional engines
-      are involved, commit the transaction and flush the pending event to the
-      binlog.
-    */
-    error|= (error ? trans_rollback_stmt(thd) : trans_commit_stmt(thd));
+  DBUG_DUMP("record[0]", table->record[0], table->s->reclength);
+  DBUG_DUMP("record[1]", table->record[1], table->s->reclength);
 
-    /*
-      Now what if this is not a transactional engine? we still need to
-      flush the pending event to the binlog; we did it with
-      thd->binlog_flush_pending_rows_event(). Note that we imitate
-      what is done for real queries: a call to
-      ha_autocommit_or_rollback() (sometimes only if involves a
-      transactional engine), and a call to be sure to have the pending
-      event flushed.
-    */
+  bool result= FALSE;
+  uchar saved_x[2]= {0, 0}, saved_filler[2]= {0, 0};
 
-    /*
-      @todo We should probably not call
-      reset_current_stmt_binlog_format_row() from here.
+  if (table->s->null_bytes > 0)
+  {
+    for (int i = 0 ; i < 2 ; ++i)
+    {
+      /*
+        If we have an X bit then we need to take care of it.
+      */
+      if (!(table->s->db_options_in_use & HA_OPTION_PACK_RECORD))
+      {
+        saved_x[i]= table->record[i][0];
+        table->record[i][0]|= 1U;
+      }
 
-      Note: this applies to log_event_old.cc too
+      /*
+         If (last_null_bit_pos == 0 && null_bytes > 1), then:
 
-      Btw, the previous comment about transactional engines does not
-      seem related to anything that happens here.
-      /Sven
-    */
-    thd->reset_current_stmt_binlog_format_row();
+         X bit (if any) + N nullable fields + M Field_bit fields = 8 bits
 
-    const_cast<Relay_log_info*>(rli)->cleanup_context(thd, 0);
+         Ie, the entire byte is used.
+      */
+      if (table->s->last_null_bit_pos > 0)
+      {
+        saved_filler[i]= table->record[i][table->s->null_bytes - 1];
+        table->record[i][table->s->null_bytes - 1]|=
+          256U - (1U << table->s->last_null_bit_pos);
+      }
+    }
   }
-  return error;
-}
-
-/**
-   The method either increments the relay log position or
-   commits the current statement and increments the master group 
-   possition if the event is STMT_END_F flagged and
-   the statement corresponds to the autocommit query (i.e replicated
-   without wrapping in BEGIN/COMMIT)
 
-   @retval 0         Success
-   @retval non-zero  Error in the statement commit
- */
-int
-Rows_log_event::do_update_pos(Relay_log_info *rli)
-{
-  DBUG_ENTER("Rows_log_event::do_update_pos");
-  int error= 0;
+  if (table->s->blob_fields + table->s->varchar_fields == 0 &&
+      bitmap_is_set_all(cols))
+  {
+    result= cmp_record(table,record[1]);
+    goto record_compare_exit;
+  }
 
-  DBUG_PRINT("info", ("flags: %s",
-                      get_flags(STMT_END_F) ? "STMT_END_F " : ""));
+  /* Compare null bits */
+  if (bitmap_is_set_all(cols) &&
+      memcmp(table->null_flags,
+       table->null_flags+table->s->rec_buff_length,
+       table->s->null_bytes))
+  {
+    result= TRUE;       // Diff in NULL value
+    goto record_compare_exit;
+  }
 
-  if (get_flags(STMT_END_F))
+  /* Compare updated fields */
+  for (Field **ptr=table->field ;
+       *ptr && ((*ptr)->field_index < cols->n_bits);
+       ptr++)
   {
-    /*
-      Indicate that a statement is finished.
-      Step the group log position if we are not in a transaction,
-      otherwise increase the event log position.
-    */
-    rli->stmt_done(log_pos);
-    /*
-      Clear any errors in thd->net.last_err*. It is not known if this is
-      needed or not. It is believed that any errors that may exist in
-      thd->net.last_err* are allowed. Examples of errors are "key not
-      found", which is produced in the test case rpl_row_conflicts.test
-    */
-    thd->clear_error();
+    if (bitmap_is_set(cols, (*ptr)->field_index))
+    {
+      if ((*ptr)->cmp_binary_offset(table->s->rec_buff_length))
+      {
+        result= TRUE;
+        goto record_compare_exit;
+      }
+    }
   }
-  else
+
+record_compare_exit:
+  /*
+    Restore the saved bytes.
+
+    TODO[record format ndb]: Remove this code once NDB returns the
+    correct record format.
+  */
+  if (table->s->null_bytes > 0)
   {
-    rli->inc_event_relay_log_pos();
+    for (int i = 0 ; i < 2 ; ++i)
+    {
+      if (!(table->s->db_options_in_use & HA_OPTION_PACK_RECORD))
+        table->record[i][0]= saved_x[i];
+
+      if (table->s->last_null_bit_pos)
+        table->record[i][table->s->null_bytes - 1]= saved_filler[i];
+    }
   }
 
-  DBUG_RETURN(error);
+  return result;
 }
 
-#endif /* !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION) */
 
-#ifndef MYSQL_CLIENT
-bool Rows_log_event::write_data_header(IO_CACHE *file)
+struct row_entry
 {
-  uchar buf[ROWS_HEADER_LEN];	// No need to init the buffer
-  DBUG_ASSERT(m_table_id != ~0UL);
-  DBUG_EXECUTE_IF("old_row_based_repl_4_byte_map_id_master",
-                  {
-                    int4store(buf + 0, m_table_id);
-                    int2store(buf + 4, m_flags);
-                    return (my_b_safe_write(file, buf, 6));
-                  });
-  int6store(buf + RW_MAPID_OFFSET, (ulonglong)m_table_id);
-  int2store(buf + RW_FLAGS_OFFSET, m_flags);
-  return (my_b_safe_write(file, buf, ROWS_HEADER_LEN));
-}
+  uchar *key;
+  uint length;
+  const uchar *m_curr_row;
+} typedef row_entry;
 
-bool Rows_log_event::write_data_body(IO_CACHE*file)
+extern "C" uchar *rows_log_event_get_key(const uchar *record, size_t *length,
+                          my_bool not_used __attribute__((unused)))
 {
-  /*
-     Note that this should be the number of *bits*, not the number of
-     bytes.
-  */
-  uchar sbuf[sizeof(m_width) + 1];
-  my_ptrdiff_t const data_size= m_rows_cur - m_rows_buf;
-  bool res= false;
-  uchar *const sbuf_end= net_store_length(sbuf, (size_t) m_width);
-  DBUG_ASSERT(static_cast<size_t>(sbuf_end - sbuf) <= sizeof(sbuf));
-
-  DBUG_DUMP("m_width", sbuf, (size_t) (sbuf_end - sbuf));
-  res= res || my_b_safe_write(file, sbuf, (size_t) (sbuf_end - sbuf));
-
-  DBUG_DUMP("m_cols", (uchar*) m_cols.bitmap, no_bytes_in_map(&m_cols));
-  res= res || my_b_safe_write(file, (uchar*) m_cols.bitmap,
-                              no_bytes_in_map(&m_cols));
-  /*
-    TODO[refactor write]: Remove the "down cast" here (and elsewhere).
-   */
-  if (get_type_code() == UPDATE_ROWS_EVENT)
-  {
-    DBUG_DUMP("m_cols_ai", (uchar*) m_cols_ai.bitmap,
-              no_bytes_in_map(&m_cols_ai));
-    res= res || my_b_safe_write(file, (uchar*) m_cols_ai.bitmap,
-                                no_bytes_in_map(&m_cols_ai));
-  }
-  DBUG_DUMP("rows", m_rows_buf, data_size);
-  res= res || my_b_safe_write(file, m_rows_buf, (size_t) data_size);
+  DBUG_ENTER("get_key");
 
-  return res;
+  row_entry *entry=(row_entry *) record;
+  *length= entry->length;
 
+  DBUG_RETURN((uchar*) entry->key);
 }
-#endif
 
-#if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT)
-void Rows_log_event::pack_info(Protocol *protocol)
+static void rows_log_event_free_entry(row_entry *entry)
 {
-  char buf[256];
-  char const *const flagstr=
-    get_flags(STMT_END_F) ? " flags: STMT_END_F" : "";
-  size_t bytes= my_snprintf(buf, sizeof(buf),
-                               "table_id: %lu%s", m_table_id, flagstr);
-  protocol->store(buf, bytes, &my_charset_bin);
+  DBUG_ENTER("free_entry");
+  my_free(entry->key);
+  my_free(entry);
+  DBUG_VOID_RETURN;
 }
-#endif
 
-#ifdef MYSQL_CLIENT
-void Rows_log_event::print_helper(FILE *file,
-                                  PRINT_EVENT_INFO *print_event_info,
-                                  char const *const name)
+int Rows_log_event::hash_row(Relay_log_info const *rli)
 {
-  IO_CACHE *const head= &print_event_info->head_cache;
-  IO_CACHE *const body= &print_event_info->body_cache;
-  if (!print_event_info->short_form)
+  int error= 0;
+
+  if ((error= unpack_current_row(rli, &m_cols)))
+    goto err;
+  else
   {
-    bool const last_stmt_event= get_flags(STMT_END_F);
-    print_header(head, print_event_info, !last_stmt_event);
-    my_b_printf(head, "\t%s: table id %lu%s\n",
-                name, m_table_id,
-                last_stmt_event ? " flags: STMT_END_F" : "");
-    print_base64(body, print_event_info, !last_stmt_event);
+    // TODO: remove blobs
+
+    row_entry *entry= (row_entry*)malloc(sizeof(row_entry));
+    entry->key= (uchar*) malloc(m_table->s->reclength);
+    memcpy(entry->key, m_table->record[0], m_table->s->reclength);
+    entry->length= m_table->s->reclength;
+    entry->m_curr_row=m_curr_row;
+    my_hash_insert(&m_hash, (uchar*)entry);
+
+
+    if (get_type_code() == UPDATE_ROWS_EVENT)
+    {
+      /*
+        This is the situation after locating BI:
+
+        ===|=== before image ====|=== after image ===|===
+           ^                     ^
+           m_curr_row            m_curr_row_end
+
+         We need to skip the AI as well, before moving on to the
+         next row.
+       */
+      m_curr_row=m_curr_row_end;
+      error= unpack_current_row(rli, &m_cols_ai);
+    }
+
+    m_curr_row=m_curr_row_end;
   }
 
-  if (get_flags(STMT_END_F))
+err:
+  return error;
+}
+
+int Rows_log_event::handle_idempotent_errors(Relay_log_info const *rli, int *err)
+{
+  int error= *err;
+  if (error)
   {
-    copy_event_cache_to_file_and_reinit(head, file);
-    copy_event_cache_to_file_and_reinit(body, file);
+    int actual_error= convert_handler_error(error, thd, m_table);
+    bool idempotent_error= (idempotent_error_code(error) &&
+                           (slave_exec_mode == SLAVE_EXEC_MODE_IDEMPOTENT));
+    bool ignored_error= (idempotent_error == 0 ?
+                         ignored_error_code(actual_error) : 0);
+
+    if (idempotent_error || ignored_error)
+    {
+      if (global_system_variables.log_warnings)
+        slave_rows_error_report(WARNING_LEVEL, error, rli, thd, m_table,
+                                get_type_str(),
+                                const_cast<Relay_log_info*>(rli)->get_rpl_log_name(),
+                                (ulong) log_pos);
+      clear_all_errors(thd, const_cast<Relay_log_info*>(rli));
+      *err= 0;
+      if (idempotent_error == 0)
+        return true;
+    }
   }
+
+  return false;
 }
-#endif
 
-/**************************************************************************
-	Table_map_log_event member functions and support functions
-**************************************************************************/
+int Rows_log_event::do_apply_row(Relay_log_info const *rli)
+{
+   int error= 0;
 
-/**
-  @page How replication of field metadata works.
-  
-  When a table map is created, the master first calls 
-  Table_map_log_event::save_field_metadata() which calculates how many 
-  values will be in the field metadata. Only those fields that require the 
-  extra data are added. The method also loops through all of the fields in 
-  the table calling the method Field::save_field_metadata() which returns the
-  values for the field that will be saved in the metadata and replicated to
-  the slave. Once all fields have been processed, the table map is written to
-  the binlog adding the size of the field metadata and the field metadata to
-  the end of the body of the table map.
+   /* in_use can have been set to NULL in close_tables_for_reopen */
+   THD* old_thd= m_table->in_use;
+   if (!m_table->in_use)
+     m_table->in_use= thd;
 
-  When a table map is read on the slave, the field metadata is read from the 
-  table map and passed to the table_def class constructor which saves the 
-  field metadata from the table map into an array based on the type of the 
-  field. Field metadata values not present (those fields that do not use extra 
-  data) in the table map are initialized as zero (0). The array size is the 
-  same as the columns for the table on the slave.
+   error= do_exec_row(rli);
 
-  Additionally, values saved for field metadata on the master are saved as a 
-  string of bytes (uchar) in the binlog. A field may require 1 or more bytes
-  to store the information. In cases where values require multiple bytes 
-  (e.g. values > 255), the endian-safe methods are used to properly encode 
-  the values on the master and decode them on the slave. When the field
-  metadata values are captured on the slave, they are stored in an array of
-  type uint16. This allows the least number of casts to prevent casting bugs
-  when the field metadata is used in comparisons of field attributes. When
-  the field metadata is used for calculating addresses in pointer math, the
-  type used is uint32. 
-*/
+   DBUG_PRINT("info", ("error: %s", HA_ERR(error)));
+   DBUG_ASSERT(error != HA_ERR_RECORD_DELETED);
 
-#if !defined(MYSQL_CLIENT)
-/**
-  Save the field metadata based on the real_type of the field.
-  The metadata saved depends on the type of the field. Some fields
-  store a single byte for pack_length() while others store two bytes
-  for field_length (max length).
-  
-  @retval  0  Ok.
+   m_table->in_use = old_thd;
 
-  @todo
-  We may want to consider changing the encoding of the information.
-  Currently, the code attempts to minimize the number of bytes written to 
-  the tablemap. There are at least two other alternatives; 1) using 
-  net_store_length() to store the data allowing it to choose the number of
-  bytes that are appropriate thereby making the code much easier to 
-  maintain (only 1 place to change the encoding), or 2) use a fixed number
-  of bytes for each field. The problem with option 1 is that net_store_length()
-  will use one byte if the value < 251, but 3 bytes if it is > 250. Thus,
-  for fields like CHAR which can be no larger than 255 characters, the method
-  will use 3 bytes when the value is > 250. Further, every value that is
-  encoded using 2 parts (e.g., pack_length, field_length) will be numerically
-  > 250 therefore will use 3 bytes for eah value. The problem with option 2
-  is less wasteful for space but does waste 1 byte for every field that does
-  not encode 2 parts. 
-*/
-int Table_map_log_event::save_field_metadata()
-{
-  DBUG_ENTER("Table_map_log_event::save_field_metadata");
-  int index= 0;
-  for (unsigned int i= 0 ; i < m_table->s->fields ; i++)
-  {
-    DBUG_PRINT("debug", ("field_type: %d", m_coltype[i]));
-    index+= m_table->s->field[i]->save_field_metadata(&m_field_metadata[index]);
-  }
-  DBUG_RETURN(index);
+   if (handle_idempotent_errors(rli, &error))
+     return error;
+
+   /*
+    If m_curr_row_end  was not set during event execution (e.g., because
+    of errors) we can't proceed to the next row. If the error is transient
+    (i.e., error==0 at this point) we must call unpack_current_row() to set
+    m_curr_row_end.
+   */
+
+   DBUG_PRINT("info", ("curr_row: 0x%lu; curr_row_end: 0x%lu; rows_end: 0x%lu",
+                       (ulong) m_curr_row, (ulong) m_curr_row_end, (ulong) m_rows_end));
+
+   if (!m_curr_row_end && !error)
+     error= unpack_current_row(rli, &m_cols);
+
+   // at this moment m_curr_row_end should be set
+   DBUG_ASSERT(error || m_curr_row_end != NULL);
+   DBUG_ASSERT(error || m_curr_row <= m_curr_row_end);
+   DBUG_ASSERT(error || m_curr_row_end <= m_rows_end);
+
+   m_curr_row= m_curr_row_end;
+
+   if (error == 0 && !m_table->file->has_transactions())
+     thd->transaction.all.modified_non_trans_table=
+       thd->transaction.stmt.modified_non_trans_table= TRUE;
+
+   return error;
 }
-#endif /* !defined(MYSQL_CLIENT) */
 
-/*
-  Constructor used to build an event for writing to the binary log.
-  Mats says tbl->s lives longer than this event so it's ok to copy pointers
-  (tbl->s->db etc) and not pointer content.
- */
-#if !defined(MYSQL_CLIENT)
-Table_map_log_event::Table_map_log_event(THD *thd, TABLE *tbl, ulong tid,
-                                         bool is_transactional)
-  : Log_event(thd, 0, is_transactional),
-    m_table(tbl),
-    m_dbnam(tbl->s->db.str),
-    m_dblen(m_dbnam ? tbl->s->db.length : 0),
-    m_tblnam(tbl->s->table_name.str),
-    m_tbllen(tbl->s->table_name.length),
-    m_colcnt(tbl->s->fields),
-    m_memory(NULL),
-    m_table_id(tid),
-    m_flags(TM_BIT_LEN_EXACT_F),
-    m_data_size(0),
-    m_field_metadata(0),
-    m_field_metadata_size(0),
-    m_null_bits(0),
-    m_meta_memory(NULL)
+
+int Rows_log_event::do_index_scan_and_update(Relay_log_info const *rli)
 {
-  uchar cbuf[sizeof(m_colcnt) + 1];
-  uchar *cbuf_end;
-  DBUG_ASSERT(m_table_id != ~0UL);
+  DBUG_ENTER("Rows_log_event::do_index_scan_and_update");
+  DBUG_ASSERT(m_table && m_table->in_use != NULL);
+
+  TABLE *table= m_table;
+  int error= 0;
+  KEY *keyinfo;
+  uint key;
+
   /*
-    In TABLE_SHARE, "db" and "table_name" are 0-terminated (see this comment in
-    table.cc / alloc_table_share():
-      Use the fact the key is db/0/table_name/0
-    As we rely on this let's assert it.
+    rpl_row_tabledefs.test specifies that
+    if the extra field on the slave does not have a default value
+    and this is okay with Delete or Update events.
+    Todo: fix wl3228 hld that requires defauls for all types of events
   */
-  DBUG_ASSERT((tbl->s->db.str == 0) ||
-              (tbl->s->db.str[tbl->s->db.length] == 0));
-  DBUG_ASSERT(tbl->s->table_name.str[tbl->s->table_name.length] == 0);
 
+  prepare_record(table, &m_cols, FALSE);
+  error= unpack_current_row(rli, &m_cols);
 
-  m_data_size=  TABLE_MAP_HEADER_LEN;
-  DBUG_EXECUTE_IF("old_row_based_repl_4_byte_map_id_master", m_data_size= 6;);
-  m_data_size+= m_dblen + 2;	// Include length and terminating \0
-  m_data_size+= m_tbllen + 2;	// Include length and terminating \0
-  cbuf_end= net_store_length(cbuf, (size_t) m_colcnt);
-  DBUG_ASSERT(static_cast<size_t>(cbuf_end - cbuf) <= sizeof(cbuf));
-  m_data_size+= (cbuf_end - cbuf) + m_colcnt;	// COLCNT and column types
+  // Temporary fix to find out why it fails [/Matz]
+  memcpy(m_table->read_set->bitmap, m_cols.bitmap, (m_table->read_set->n_bits + 7) / 8);
 
-  /* If malloc fails, caught in is_valid() */
-  if ((m_memory= (uchar*) my_malloc(m_colcnt, MYF(MY_WME))))
+  if (!is_any_column_signaled_for_table(table, &m_cols))
   {
-    m_coltype= reinterpret_cast<uchar*>(m_memory);
-    for (unsigned int i= 0 ; i < m_table->s->fields ; ++i)
-      m_coltype[i]= m_table->field[i]->type();
+    error= HA_ERR_END_OF_FILE;
+    goto err;
   }
 
-  /*
-    Calculate a bitmap for the results of maybe_null() for all columns.
-    The bitmap is used to determine when there is a column from the master
-    that is not on the slave and is null and thus not in the row data during
-    replication.
-  */
-  uint num_null_bytes= (m_table->s->fields + 7) / 8;
-  m_data_size+= num_null_bytes;
-  m_meta_memory= (uchar *)my_multi_malloc(MYF(MY_WME),
-                                 &m_null_bits, num_null_bytes,
-                                 &m_field_metadata, (m_colcnt * 2),
-                                 NULL);
+#ifndef DBUG_OFF
+  DBUG_PRINT("info",("looking for the following record"));
+  DBUG_DUMP("record[0]", table->record[0], table->s->reclength);
+#endif
 
-  bzero(m_field_metadata, (m_colcnt * 2));
+  if ((key= search_key_in_table(table, &m_cols, PRI_KEY_FLAG)) >= MAX_KEY)
+    /* we dont have a PK, or PK is not usable with BI values */
+    goto INDEX_SCAN;
 
-  /*
-    Create an array for the field metadata and store it.
-  */
-  m_field_metadata_size= save_field_metadata();
-  DBUG_ASSERT(m_field_metadata_size <= (m_colcnt * 2));
+  if ((table->file->ha_table_flags() & HA_PRIMARY_KEY_REQUIRED_FOR_POSITION))
+  {
+    /*
+      Use a more efficient method to fetch the record given by
+      table->record[0] if the engine allows it.  We first compute a
+      row reference using the position() member function (it will be
+      stored in table->file->ref) and the use rnd_pos() to position
+      the "cursor" (i.e., record[0] in this case) at the correct row.
 
-  /*
-    Now set the size of the data to the size of the field metadata array
-    plus one or three bytes (see pack.c:net_store_length) for number of 
-    elements in the field metadata array.
-  */
-  if (m_field_metadata_size < 251)
-    m_data_size+= m_field_metadata_size + 1; 
-  else
-    m_data_size+= m_field_metadata_size + 3; 
+      TODO: Add a check that the correct record has been fetched by
+      comparing with the original record. Take into account that the
+      record on the master and slave can be of different
+      length. Something along these lines should work:
 
-  bzero(m_null_bits, num_null_bytes);
-  for (unsigned int i= 0 ; i < m_table->s->fields ; ++i)
-    if (m_table->field[i]->maybe_null())
-      m_null_bits[(i / 8)]+= 1 << (i % 8);
+      ADD>>>  store_record(table,record[1]);
+              int error= table->file->rnd_pos(table->record[0], table->file->ref);
+      ADD>>>  DBUG_ASSERT(memcmp(table->record[1], table->record[0],
+                                 table->s->reclength) == 0);
 
-}
-#endif /* !defined(MYSQL_CLIENT) */
+    */
+    DBUG_PRINT("info",("locating record using primary key (position)"));
+    int error;
+    if (table->file->inited && (error= table->file->ha_index_end()))
+      DBUG_RETURN(error);
+    if ((error= table->file->ha_rnd_init(FALSE)))
+      DBUG_RETURN(error);
 
-/*
-  Constructor used by slave to read the event from the binary log.
- */
-#if defined(HAVE_REPLICATION)
-Table_map_log_event::Table_map_log_event(const char *buf, uint event_len,
-                                         const Format_description_log_event
-                                         *description_event)
+    error= table->file->rnd_pos_by_record(table->record[0]);
 
-  : Log_event(buf, description_event),
-#ifndef MYSQL_CLIENT
-    m_table(NULL),
-#endif
-    m_dbnam(NULL), m_dblen(0), m_tblnam(NULL), m_tbllen(0),
-    m_colcnt(0), m_coltype(0),
-    m_memory(NULL), m_table_id(ULONG_MAX), m_flags(0),
-    m_data_size(0), m_field_metadata(0), m_field_metadata_size(0),
-    m_null_bits(0), m_meta_memory(NULL)
-{
-  unsigned int bytes_read= 0;
-  DBUG_ENTER("Table_map_log_event::Table_map_log_event(const char*,uint,...)");
+    table->file->ha_rnd_end();
+    if (error)
+    {
+      DBUG_PRINT("info",("rnd_pos returns error %d",error));
+      if (error == HA_ERR_RECORD_DELETED)
+        error= HA_ERR_KEY_NOT_FOUND;
+      table->file->print_error(error, MYF(0));
+    }
+    DBUG_RETURN(error);
+  }
 
-  uint8 common_header_len= description_event->common_header_len;
-  uint8 post_header_len= description_event->post_header_len[TABLE_MAP_EVENT-1];
-  DBUG_PRINT("info",("event_len: %u  common_header_len: %d  post_header_len: %d",
-                     event_len, common_header_len, post_header_len));
+  // We can't use position() - try other methods.
+
+INDEX_SCAN:
 
   /*
-    Don't print debug messages when running valgrind since they can
-    trigger false warnings.
+    Save copy of the record in table->record[1]. It might be needed
+    later if linear search is used to find exact match.
    */
-#ifndef HAVE_purify
-  DBUG_DUMP("event buffer", (uchar*) buf, event_len);
-#endif
+  store_record(table,record[1]);
 
-  /* Read the post-header */
-  const char *post_start= buf + common_header_len;
-
-  post_start+= TM_MAPID_OFFSET;
-  if (post_header_len == 6)
-  {
-    /* Master is of an intermediate source tree before 5.1.4. Id is 4 bytes */
-    m_table_id= uint4korr(post_start);
-    post_start+= 4;
-  }
-  else
+  if ((key= search_key_in_table(table, &m_cols,
+                                (PRI_KEY_FLAG | UNIQUE_KEY_FLAG | MULTIPLE_KEY_FLAG)))
+       >= MAX_KEY)
+    /* we dont have a key, or no key is suitable for the BI values */
   {
-    DBUG_ASSERT(post_header_len == TABLE_MAP_HEADER_LEN);
-    m_table_id= (ulong) uint6korr(post_start);
-    post_start+= TM_FLAGS_OFFSET;
+    error= HA_ERR_KEY_NOT_FOUND;
+    goto err;
   }
 
-  DBUG_ASSERT(m_table_id != ~0UL);
-
-  m_flags= uint2korr(post_start);
+  {
+    keyinfo= table->key_info + key;
 
-  /* Read the variable part of the event */
-  const char *const vpart= buf + common_header_len + post_header_len;
 
-  /* Extract the length of the various parts from the buffer */
-  uchar const *const ptr_dblen= (uchar const*)vpart + 0;
-  m_dblen= *(uchar*) ptr_dblen;
+    DBUG_PRINT("info",("locating record using primary key (index_read)"));
 
-  /* Length of database name + counter + terminating null */
-  uchar const *const ptr_tbllen= ptr_dblen + m_dblen + 2;
-  m_tbllen= *(uchar*) ptr_tbllen;
+    /* The key'th key is active and usable: search the table using the index */
+    if (!table->file->inited && (error= table->file->ha_index_init(key, FALSE)))
+    {
+      DBUG_PRINT("info",("ha_index_init returns error %d",error));
+      table->file->print_error(error, MYF(0));
+      goto err;
+    }
 
-  /* Length of table name + counter + terminating null */
-  uchar const *const ptr_colcnt= ptr_tbllen + m_tbllen + 2;
-  uchar *ptr_after_colcnt= (uchar*) ptr_colcnt;
-  m_colcnt= net_field_length(&ptr_after_colcnt);
+    /* Fill key data for the row */
 
-  DBUG_PRINT("info",("m_dblen: %lu  off: %ld  m_tbllen: %lu  off: %ld  m_colcnt: %lu  off: %ld",
-                     (ulong) m_dblen, (long) (ptr_dblen-(const uchar*)vpart), 
-                     (ulong) m_tbllen, (long) (ptr_tbllen-(const uchar*)vpart),
-                     m_colcnt, (long) (ptr_colcnt-(const uchar*)vpart)));
+    DBUG_ASSERT(m_key);
+    key_copy(m_key, table->record[0], keyinfo, 0);
 
-  /* Allocate mem for all fields in one go. If fails, caught in is_valid() */
-  m_memory= (uchar*) my_multi_malloc(MYF(MY_WME),
-                                     &m_dbnam, (uint) m_dblen + 1,
-                                     &m_tblnam, (uint) m_tbllen + 1,
-                                     &m_coltype, (uint) m_colcnt,
-                                     NullS);
+    /*
+      Don't print debug messages when running valgrind since they can
+      trigger false warnings.
+     */
+#ifndef HAVE_purify
+    DBUG_DUMP("key data", m_key, keyinfo->key_length);
+#endif
 
-  if (m_memory)
-  {
-    /* Copy the different parts into their memory */
-    strncpy(const_cast<char*>(m_dbnam), (const char*)ptr_dblen  + 1, m_dblen + 1);
-    strncpy(const_cast<char*>(m_tblnam), (const char*)ptr_tbllen + 1, m_tbllen + 1);
-    memcpy(m_coltype, ptr_after_colcnt, m_colcnt);
+    /*
+      We need to set the null bytes to ensure that the filler bit are
+      all set when returning.  There are storage engines that just set
+      the necessary bits on the bytes and don't set the filler bits
+      correctly.
+    */
+    if (table->s->null_bytes > 0)
+      table->record[0][table->s->null_bytes - 1]|=
+        256U - (1U << table->s->last_null_bit_pos);
 
-    ptr_after_colcnt= ptr_after_colcnt + m_colcnt;
-    bytes_read= (uint) (ptr_after_colcnt - (uchar *)buf);
-    DBUG_PRINT("info", ("Bytes read: %d.\n", bytes_read));
-    if (bytes_read < event_len)
+    if ((error= table->file->ha_index_read_map(table->record[0], m_key,
+                                               HA_WHOLE_KEY,
+                                               HA_READ_KEY_EXACT)))
     {
-      m_field_metadata_size= net_field_length(&ptr_after_colcnt);
-      DBUG_ASSERT(m_field_metadata_size <= (m_colcnt * 2));
-      uint num_null_bytes= (m_colcnt + 7) / 8;
-      m_meta_memory= (uchar *)my_multi_malloc(MYF(MY_WME),
-                                     &m_null_bits, num_null_bytes,
-                                     &m_field_metadata, m_field_metadata_size,
-                                     NULL);
-      memcpy(m_field_metadata, ptr_after_colcnt, m_field_metadata_size);
-      ptr_after_colcnt= (uchar*)ptr_after_colcnt + m_field_metadata_size;
-      memcpy(m_null_bits, ptr_after_colcnt, num_null_bytes);
+      DBUG_PRINT("info",("no record matching the key found in the table"));
+      if (error == HA_ERR_RECORD_DELETED)
+        error= HA_ERR_KEY_NOT_FOUND;
+      table->file->print_error(error, MYF(0));
+      table->file->ha_index_end();
+      goto err;
     }
-  }
 
-  DBUG_VOID_RETURN;
-}
+  /*
+    Don't print debug messages when running valgrind since they can
+    trigger false warnings.
+   */
+#ifndef HAVE_purify
+    DBUG_PRINT("info",("found first matching record"));
+    DBUG_DUMP("record[0]", table->record[0], table->s->reclength);
 #endif
+    /*
+      Below is a minor "optimization".  If the key (i.e., key number
+      0) has the HA_NOSAME flag set, we know that we have found the
+      correct record (since there can be no duplicates); otherwise, we
+      have to compare the record with the one found to see if it is
+      the correct one.
 
-Table_map_log_event::~Table_map_log_event()
-{
-  my_free(m_meta_memory);
-  my_free(m_memory);
-}
+      CAVEAT! This behaviour is essential for the replication of,
+      e.g., the mysql.proc table since the correct record *shall* be
+      found using the primary key *only*.  There shall be no
+      comparison of non-PK columns to decide if the correct record is
+      found.  I can see no scenario where it would be incorrect to
+      chose the row to change only using a PK or an UNNI.
+    */
+    if (keyinfo->flags & HA_NOSAME || key == table->s->primary_key)
+    {
+      /* Unique does not have non nullable part */
+      if (!(table->key_info->flags & (HA_NULL_PART_KEY)))
+      {
+        table->file->ha_index_end();
+        goto record_found;
+      }
+      else
+      {
+        KEY *keyinfo= table->key_info;
+        /*
+          Unique has nullable part. We need to check if there is any field in the
+          BI image that is null and part of UNNI.
+        */
+        bool null_found= FALSE;
+        for (uint i=0; i < keyinfo->key_parts && !null_found; i++)
+        {
+          uint fieldnr= keyinfo->key_part[i].fieldnr - 1;
+          Field **f= table->field+fieldnr;
+          null_found= (*f)->is_null();
+        }
 
-/*
-  Return value is an error code, one of:
+        if (!null_found)
+        {
+          table->file->ha_index_end();
+          goto record_found;
+        }
 
-      -1     Failure to open table   [from open_tables()]
-       0     Success
-       1     No room for more tables [from set_table()]
-       2     Out of memory           [from set_table()]
-       3     Wrong table definition
-       4     Daisy-chaining RBR with SBR not possible
- */
+        /* else fall through to index scan */
+      }
+    }
 
-#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
-int Table_map_log_event::do_apply_event(Relay_log_info const *rli)
-{
-  RPL_TABLE_LIST *table_list;
-  char *db_mem, *tname_mem;
-  size_t dummy_len;
-  void *memory;
-  DBUG_ENTER("Table_map_log_event::do_apply_event(Relay_log_info*)");
-  DBUG_ASSERT(rli->info_thd == thd);
+    /*
+      In case key is not unique, we still have to iterate over records found
+      and find the one which is identical to the row given. A copy of the
+      record we are looking for is stored in record[1].
+     */
+    DBUG_PRINT("info",("non-unique index, scanning it to find matching record"));
 
-  /* Step the query id to mark what columns that are actually used. */
-  thd->set_query_id(next_query_id());
+    while (record_compare(table, &m_cols))
+    {
+      /*
+        We need to set the null bytes to ensure that the filler bit
+        are all set when returning.  There are storage engines that
+        just set the necessary bits on the bytes and don't set the
+        filler bits correctly.
 
-  if (!(memory= my_multi_malloc(MYF(MY_WME),
-                                &table_list, (uint) sizeof(RPL_TABLE_LIST),
-                                &db_mem, (uint) NAME_LEN + 1,
-                                &tname_mem, (uint) NAME_LEN + 1,
-                                NullS)))
-    DBUG_RETURN(HA_ERR_OUT_OF_MEM);
+        TODO[record format ndb]: Remove this code once NDB returns the
+        correct record format.
+      */
+      if (table->s->null_bytes > 0)
+      {
+        table->record[0][table->s->null_bytes - 1]|=
+          256U - (1U << table->s->last_null_bit_pos);
+      }
 
-  strmov(db_mem, rpl_filter->get_rewrite_db(m_dbnam, &dummy_len));
-  strmov(tname_mem, m_tblnam);
+      while ((error= table->file->ha_index_next(table->record[0])))
+      {
+        /* We just skip records that has already been deleted */
+        if (error == HA_ERR_RECORD_DELETED)
+          continue;
+        DBUG_PRINT("info",("no record matching the given row found"));
+        table->file->print_error(error, MYF(0));
+        table->file->ha_index_end();
+        goto err;
+      }
+    }
 
-  table_list->init_one_table(db_mem, strlen(db_mem),
-                             tname_mem, strlen(tname_mem),
-                             tname_mem, TL_WRITE);
+    /*
+      Have to restart the scan to be able to fetch the next row.
+    */
+    table->file->ha_index_end();
+  }
 
-  table_list->table_id= m_table_id;
-  table_list->updating= 1;
+record_found:
+  error= do_apply_row(rli);
+
+err:
+  table->default_column_bitmaps();
+  DBUG_RETURN(error);
+
+}
 
+int Rows_log_event::do_hash_scan_and_update(Relay_log_info const *rli)
+{
   int error= 0;
 
-  if (rli->info_thd->slave_thread /* filtering is for slave only */ &&
-      (!rpl_filter->db_ok(table_list->db) ||
-       (rpl_filter->is_on() && !rpl_filter->tables_ok("", table_list))))
+  DBUG_ENTER("Rows_log_event::do_hash_scan_and_update");
+
+  if ((error= hash_row(rli)))
+    goto err;
+
+  /**
+    Last row hashed.
+
+    Now do the table scan and update according to the hash table
+    matches.
+   */
+  if (m_curr_row == m_rows_end)
   {
-    my_free(memory);
+
+    TABLE* table= m_table;
+    MY_BITMAP* read_set= &m_cols;
+
+    if ((error= table->file->ha_rnd_init(1)))
+    {
+      DBUG_PRINT("info",("error initializing table scan"
+          " (ha_rnd_init returns %d)",error));
+      table->file->print_error(error, MYF(0));
+      goto err;
+    }
+
+    /* Continue until we find the right record or reached the end of the table */
+    do
+    {
+      error= table->file->ha_rnd_next(table->record[0]);
+
+      // TODO: remove blobs from record got from the engine
+
+      DBUG_PRINT("info", ("error: %s", HA_ERR(error)));
+      switch (error) {
+        case 0:
+        {
+          bool found_in_hash= false;
+          HASH_SEARCH_STATE state;
+          /* save a copy from the record got from the engine. */
+          store_record(table, record[1]);
+
+          /**
+             This is only needed because records are hashed without blobs, so
+             we may have false positives.
+           */
+          row_entry *entry= (row_entry *) my_hash_first(&m_hash, table->record[0], table->s->reclength, &state);
+          while (entry)
+          {
+            /**
+               unpack again the full record to table->record[0]. Now, both
+               table->record[0] and table->record[1] have the same contents.
+             */
+            m_curr_row= entry->m_curr_row;
+            if ((error= unpack_current_row(rli, &m_cols)))
+              goto close_table_and_err;
+
+            /*
+              compare the row_entry with row taking into account
+              blobs, if there is any. If there is a match do the
+              operation and remove the entry from the hash table.
+            */
+            if (!record_compare(table, read_set))
+            {
+              found_in_hash= true;
+
+              my_hash_delete(&m_hash, (uchar *)entry);
+              break;
+            }
+
+            // find next
+            entry= (row_entry *)my_hash_next(&m_hash, table->record[0], table->s->reclength,  &state);
+          }
+
+          if (found_in_hash)
+            if ((error= do_apply_row(rli)))
+              goto err;
+        }
+        break;
+
+          /*
+            If the record was deleted, we pick the next one without doing
+            any comparisons.
+          */
+        case HA_ERR_RECORD_DELETED:
+          break;
+
+        case HA_ERR_END_OF_FILE: // to make it clear
+        default:
+          DBUG_PRINT("info", ("Failed to get next record"
+              " (ha_rnd_next returns %d)",error));
+          goto close_table_and_err;
+      }
+    }
+
+    while ((m_hash.records > 0) && (!error || (error == HA_ERR_RECORD_DELETED)));
   }
-  else
+
+err:
+  DBUG_RETURN(error);
+
+close_table_and_err:
+  m_table->file->print_error(error, MYF(0));
+  m_table->file->ha_rnd_end();
+  DBUG_RETURN (error);
+
+}
+
+int Rows_log_event::do_table_scan_and_update(Relay_log_info const *rli)
+{
+  int error= 0;
+  DBUG_ENTER("Rows_log_event::do_table_scan_and_update");
+  DBUG_ASSERT(m_curr_row != m_rows_end);
+  DBUG_PRINT("info",("locating record using table scan (ha_rnd_next)"));
+
+  int restart_count= 0; // Number of times scanning has restarted from top
+
+  /* We don't have a key: search the table using ha_rnd_next() */
+  if ((error= m_table->file->ha_rnd_init(1)))
+  {
+    DBUG_PRINT("info",("error initializing table scan"
+                       " (ha_rnd_init returns %d)",error));
+    m_table->file->print_error(error, MYF(0));
+    goto err;
+  }
+
+  /* Continue until we find the right record or have made a full loop */
+  do
   {
-    DBUG_ASSERT(thd->lex->query_tables != table_list);
+  restart_ha_rnd_next:
+    error= m_table->file->ha_rnd_next(m_table->record[0]);
 
-    /*
-      Use placement new to construct the table_def instance in the
-      memory allocated for it inside table_list.
+    DBUG_PRINT("info", ("error: %s", HA_ERR(error)));
+    switch (error) {
 
-      The memory allocated by the table_def structure (i.e., not the
-      memory allocated *for* the table_def structure) is released
-      inside Relay_log_info::clear_tables_to_lock() by calling the
-      table_def destructor explicitly.
-    */
-    new (&table_list->m_tabledef)
-      table_def(m_coltype, m_colcnt,
-                m_field_metadata, m_field_metadata_size,
-                m_null_bits, m_flags);
-    table_list->m_tabledef_valid= TRUE;
+    case 0:
+      break;
 
     /*
-      We record in the slave's information that the table should be
-      locked by linking the table into the list of tables to lock.
+      If the record was deleted, we pick the next one without doing
+      any comparisons.
     */
-    table_list->next_global= table_list->next_local= rli->tables_to_lock;
-    const_cast<Relay_log_info*>(rli)->tables_to_lock= table_list;
-    const_cast<Relay_log_info*>(rli)->tables_to_lock_count++;
-    /* 'memory' is freed in clear_tables_to_lock */
+    case HA_ERR_RECORD_DELETED:
+      goto restart_ha_rnd_next;
+
+    case HA_ERR_END_OF_FILE:
+      if (++restart_count < 2)
+        m_table->file->ha_rnd_init(1);
+      break;
+
+    default:
+      DBUG_PRINT("info", ("Failed to get next record"
+                          " (ha_rnd_next returns %d)",error));
+      m_table->file->print_error(error, MYF(0));
+      m_table->file->ha_rnd_end();
+      goto err;
+    }
   }
+  while (restart_count < 2 && record_compare(m_table, &m_cols));
 
-  DBUG_RETURN(error);
-}
+  /*
+    Note: above record_compare will take into accout all record fields
+    which might be incorrect in case a partial row was given in the event
+   */
 
-Log_event::enum_skip_reason
-Table_map_log_event::do_shall_skip(Relay_log_info *rli)
-{
   /*
-    If the slave skip counter is 1, then we should not start executing
-    on the next event.
+    Have to restart the scan to be able to fetch the next row.
   */
-  return continue_group(rli);
-}
+  if (restart_count == 2)
+    DBUG_PRINT("info", ("Record not found"));
+  else
+    DBUG_DUMP("record found", m_table->record[0], m_table->s->reclength);
+  m_table->file->ha_rnd_end();
 
-int Table_map_log_event::do_update_pos(Relay_log_info *rli)
-{
-  rli->inc_event_relay_log_pos();
-  return 0;
-}
+  DBUG_ASSERT(error == HA_ERR_END_OF_FILE || error == 0);
 
-#endif /* !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION) */
+  /* If the row was found, apply it */
+  if (!error)
+  {
+    DBUG_ASSERT(restart_count < 2);
+    error= do_apply_row(rli);
+  }
+
+err:
+  m_table->default_column_bitmaps();
+  DBUG_RETURN(error);
 
-#ifndef MYSQL_CLIENT
-bool Table_map_log_event::write_data_header(IO_CACHE *file)
-{
-  DBUG_ASSERT(m_table_id != ~0UL);
-  uchar buf[TABLE_MAP_HEADER_LEN];
-  DBUG_EXECUTE_IF("old_row_based_repl_4_byte_map_id_master",
-                  {
-                    int4store(buf + 0, m_table_id);
-                    int2store(buf + 4, m_flags);
-                    return (my_b_safe_write(file, buf, 6));
-                  });
-  int6store(buf + TM_MAPID_OFFSET, (ulonglong)m_table_id);
-  int2store(buf + TM_FLAGS_OFFSET, m_flags);
-  return (my_b_safe_write(file, buf, TABLE_MAP_HEADER_LEN));
 }
 
-bool Table_map_log_event::write_data_body(IO_CACHE *file)
+int Rows_log_event::do_apply_event(Relay_log_info const *rli)
 {
-  DBUG_ASSERT(m_dbnam != NULL);
-  DBUG_ASSERT(m_tblnam != NULL);
-  /* We use only one byte per length for storage in event: */
-  DBUG_ASSERT(m_dblen < 128);
-  DBUG_ASSERT(m_tbllen < 128);
+  DBUG_ENTER("Rows_log_event::do_apply_event(Relay_log_info*)");
+  int error= 0;
+  /*
+    If m_table_id == ~0UL, then we have a dummy event that does not
+    contain any data.  In that case, we just remove all tables in the
+    tables_to_lock list, close the thread tables, and return with
+    success.
+   */
+  if (m_table_id == ~0UL)
+  {
+    /*
+       This one is supposed to be set: just an extra check so that
+       nothing strange has happened.
+     */
+    DBUG_ASSERT(get_flags(STMT_END_F));
 
-  uchar const dbuf[]= { (uchar) m_dblen };
-  uchar const tbuf[]= { (uchar) m_tbllen };
+    const_cast<Relay_log_info*>(rli)->slave_close_thread_tables(thd);
+    thd->clear_error();
+    DBUG_RETURN(0);
+  }
 
-  uchar cbuf[sizeof(m_colcnt) + 1];
-  uchar *const cbuf_end= net_store_length(cbuf, (size_t) m_colcnt);
-  DBUG_ASSERT(static_cast<size_t>(cbuf_end - cbuf) <= sizeof(cbuf));
+  /*
+    'thd' has been set by exec_relay_log_event(), just before calling
+    do_apply_event(). We still check here to prevent future coding
+    errors.
+  */
+  DBUG_ASSERT(rli->info_thd == thd);
 
   /*
-    Store the size of the field metadata.
+    If there is no locks taken, this is the first binrow event seen
+    after the table map events.  We should then lock all the tables
+    used in the transaction and proceed with execution of the actual
+    event.
   */
-  uchar mbuf[sizeof(m_field_metadata_size)];
-  uchar *const mbuf_end= net_store_length(mbuf, m_field_metadata_size);
+  if (!thd->lock)
+  {
+    /*
+      Lock_tables() reads the contents of thd->lex, so they must be
+      initialized.
 
-  return (my_b_safe_write(file, dbuf,      sizeof(dbuf)) ||
-          my_b_safe_write(file, (const uchar*)m_dbnam,   m_dblen+1) ||
-          my_b_safe_write(file, tbuf,      sizeof(tbuf)) ||
-          my_b_safe_write(file, (const uchar*)m_tblnam,  m_tbllen+1) ||
-          my_b_safe_write(file, cbuf, (size_t) (cbuf_end - cbuf)) ||
-          my_b_safe_write(file, m_coltype, m_colcnt) ||
-          my_b_safe_write(file, mbuf, (size_t) (mbuf_end - mbuf)) ||
-          my_b_safe_write(file, m_field_metadata, m_field_metadata_size),
-          my_b_safe_write(file, m_null_bits, (m_colcnt + 7) / 8));
- }
-#endif
+      We also call the mysql_reset_thd_for_next_command(), since this
+      is the logical start of the next "statement". Note that this
+      call might reset the value of current_stmt_binlog_format, so
+      we need to do any changes to that value after this function.
+    */
+    lex_start(thd);
+    mysql_reset_thd_for_next_command(thd);
+    /*
+      The current statement is just about to begin and 
+      has not yet modified anything. Note, all.modified is reset
+      by mysql_reset_thd_for_next_command.
+    */
+    thd->transaction.stmt.modified_non_trans_table= FALSE;
+    /*
+      This is a row injection, so we flag the "statement" as
+      such. Note that this code is called both when the slave does row
+      injections and when the BINLOG statement is used to do row
+      injections.
+    */
+    thd->lex->set_stmt_row_injection();
 
-#if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT)
+    /*
+      There are a few flags that are replicated with each row event.
+      Make sure to set/clear them before executing the main body of
+      the event.
+    */
+    if (get_flags(NO_FOREIGN_KEY_CHECKS_F))
+        thd->variables.option_bits|= OPTION_NO_FOREIGN_KEY_CHECKS;
+    else
+        thd->variables.option_bits&= ~OPTION_NO_FOREIGN_KEY_CHECKS;
 
-/*
-  Print some useful information for the SHOW BINARY LOG information
-  field.
- */
+    if (get_flags(RELAXED_UNIQUE_CHECKS_F))
+        thd->variables.option_bits|= OPTION_RELAXED_UNIQUE_CHECKS;
+    else
+        thd->variables.option_bits&= ~OPTION_RELAXED_UNIQUE_CHECKS;
+    /* A small test to verify that objects have consistent types */
+    DBUG_ASSERT(sizeof(thd->variables.option_bits) == sizeof(OPTION_RELAXED_UNIQUE_CHECKS));
 
-#if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT)
-void Table_map_log_event::pack_info(Protocol *protocol)
-{
-    char buf[256];
-    size_t bytes= my_snprintf(buf, sizeof(buf),
-                                 "table_id: %lu (%s.%s)",
-                              m_table_id, m_dbnam, m_tblnam);
-    protocol->store(buf, bytes, &my_charset_bin);
-}
-#endif
+    if (open_and_lock_tables(thd, rli->tables_to_lock, FALSE, 0))
+    {
+      uint actual_error= thd->stmt_da->sql_errno();
+      if (thd->is_slave_error || thd->is_fatal_error)
+      {
+        /*
+          Error reporting borrowed from Query_log_event with many excessive
+          simplifications. 
+          We should not honour --slave-skip-errors at this point as we are
+          having severe errors which should not be skiped.
+        */
+        rli->report(ERROR_LEVEL, actual_error,
+                    "Error executing row event: '%s'",
+                    (actual_error ? thd->stmt_da->message() :
+                     "unexpected success or fatal error"));
+        thd->is_slave_error= 1;
+      }
+      const_cast<Relay_log_info*>(rli)->slave_close_thread_tables(thd);
+      DBUG_RETURN(actual_error);
+    }
 
+    /*
+      When the open and locking succeeded, we check all tables to
+      ensure that they still have the correct type.
 
-#endif
+      We can use a down cast here since we know that every table added
+      to the tables_to_lock is a RPL_TABLE_LIST.
+    */
 
+    {
+      DBUG_PRINT("debug", ("Checking compability of tables to lock - tables_to_lock: %p",
+                           rli->tables_to_lock));
+      RPL_TABLE_LIST *ptr= rli->tables_to_lock;
+      for ( ; ptr ; ptr= static_cast<RPL_TABLE_LIST*>(ptr->next_global))
+      {
+        TABLE *conv_table;
+        if (!ptr->m_tabledef.compatible_with(thd, const_cast<Relay_log_info*>(rli),
+                                             ptr->table, &conv_table))
+        {
+          DBUG_PRINT("debug", ("Table: %s.%s is not compatible with master",
+                               ptr->table->s->db.str,
+                               ptr->table->s->table_name.str));
+          /*
+            We should not honour --slave-skip-errors at this point as we are
+            having severe errors which should not be skiped.
+          */
+          thd->is_slave_error= 1;
+          const_cast<Relay_log_info*>(rli)->slave_close_thread_tables(thd);
+          DBUG_RETURN(ERR_BAD_TABLE_DEF);
+        }
+        DBUG_PRINT("debug", ("Table: %s.%s is compatible with master"
+                             " - conv_table: %p",
+                             ptr->table->s->db.str,
+                             ptr->table->s->table_name.str, conv_table));
+        ptr->m_conv_table= conv_table;
+      }
+    }
 
-#ifdef MYSQL_CLIENT
-void Table_map_log_event::print(FILE *file, PRINT_EVENT_INFO *print_event_info)
-{
-  if (!print_event_info->short_form)
-  {
-    print_header(&print_event_info->head_cache, print_event_info, TRUE);
-    my_b_printf(&print_event_info->head_cache,
-                "\tTable_map: `%s`.`%s` mapped to number %lu\n",
-                m_dbnam, m_tblnam, m_table_id);
-    print_base64(&print_event_info->body_cache, print_event_info, TRUE);
-  }
-}
-#endif
+    /*
+      ... and then we add all the tables to the table map and but keep
+      them in the tables to lock list.
 
-/**************************************************************************
-	Write_rows_log_event member functions
-**************************************************************************/
+      We also invalidate the query cache for all the tables, since
+      they will now be changed.
 
-/*
-  Constructor used to build an event for writing to the binary log.
- */
-#if !defined(MYSQL_CLIENT)
-Write_rows_log_event::Write_rows_log_event(THD *thd_arg, TABLE *tbl_arg,
-                                           ulong tid_arg,
-                                           bool is_transactional)
-  : Rows_log_event(thd_arg, tbl_arg, tid_arg, tbl_arg->write_set, is_transactional)
-{
-}
+      TODO [/Matz]: Maybe the query cache should not be invalidated
+      here? It might be that a table is not changed, even though it
+      was locked for the statement.  We do know that each
+      Rows_log_event contain at least one row, so after processing one
+      Rows_log_event, we can invalidate the query cache for the
+      associated table.
+     */
+    for (TABLE_LIST *ptr= rli->tables_to_lock ; ptr ; ptr= ptr->next_global)
+    {
+      const_cast<Relay_log_info*>(rli)->m_table_map.set_table(ptr->table_id, ptr->table);
+    }
+#ifdef HAVE_QUERY_CACHE
+    query_cache.invalidate_locked_for_write(rli->tables_to_lock);
 #endif
+  }
 
-/*
-  Constructor used by slave to read the event from the binary log.
- */
-#ifdef HAVE_REPLICATION
-Write_rows_log_event::Write_rows_log_event(const char *buf, uint event_len,
-                                           const Format_description_log_event
-                                           *description_event)
-: Rows_log_event(buf, event_len, WRITE_ROWS_EVENT, description_event)
-{
-}
-#endif
+  TABLE* 
+    table= 
+    m_table= const_cast<Relay_log_info*>(rli)->m_table_map.get_table(m_table_id);
 
-#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
-int 
-Write_rows_log_event::do_before_row_operations(const Slave_reporting_capability *const)
-{
-  int error= 0;
+  DBUG_PRINT("debug", ("m_table: 0x%lx, m_table_id: %lu", (ulong) m_table, m_table_id));
 
-  /**
-     todo: to introduce a property for the event (handler?) which forces
-     applying the event in the replace (idempotent) fashion.
-  */
-  if ((slave_exec_mode == SLAVE_EXEC_MODE_IDEMPOTENT) ||
-      (m_table->s->db_type()->db_type == DB_TYPE_NDBCLUSTER))
+  if (table)
   {
     /*
-      We are using REPLACE semantics and not INSERT IGNORE semantics
-      when writing rows, that is: new rows replace old rows.  We need to
-      inform the storage engine that it should use this behaviour.
+      table == NULL means that this table should not be replicated
+      (this was set up by Table_map_log_event::do_apply_event()
+      which tested replicate-* rules).
     */
-    
-    /* Tell the storage engine that we are using REPLACE semantics. */
-    thd->lex->duplicates= DUP_REPLACE;
-    
+
     /*
-      Pretend we're executing a REPLACE command: this is needed for
-      InnoDB and NDB Cluster since they are not (properly) checking the
-      lex->duplicates flag.
+      It's not needed to set_time() but
+      1) it continues the property that "Time" in SHOW PROCESSLIST shows how
+      much slave is behind
+      2) it will be needed when we allow replication from a table with no
+      TIMESTAMP column to a table with one.
+      So we call set_time(), like in SBR. Presently it changes nothing.
     */
-    thd->lex->sql_command= SQLCOM_REPLACE;
-    /* 
-       Do not raise the error flag in case of hitting to an unique attribute
-    */
-    m_table->file->extra(HA_EXTRA_IGNORE_DUP_KEY);
-    /* 
-       NDB specific: update from ndb master wrapped as Write_rows
-       so that the event should be applied to replace slave's row
-    */
-    m_table->file->extra(HA_EXTRA_WRITE_CAN_REPLACE);
-    /* 
-       NDB specific: if update from ndb master wrapped as Write_rows
-       does not find the row it's assumed idempotent binlog applying
-       is taking place; don't raise the error.
-    */
-    m_table->file->extra(HA_EXTRA_IGNORE_NO_KEY);
-    /*
-      TODO: the cluster team (Tomas?) says that it's better if the engine knows
-      how many rows are going to be inserted, then it can allocate needed memory
-      from the start.
-    */
-  }
-
-  /*
-    We need TIMESTAMP_NO_AUTO_SET otherwise ha_write_row() will not use fill
-    any TIMESTAMP column with data from the row but instead will use
-    the event's current time.
-    As we replicate from TIMESTAMP to TIMESTAMP and slave has no extra
-    columns, we know that all TIMESTAMP columns on slave will receive explicit
-    data from the row, so TIMESTAMP_NO_AUTO_SET is ok.
-    When we allow a table without TIMESTAMP to be replicated to a table having
-    more columns including a TIMESTAMP column, or when we allow a TIMESTAMP
-    column to be replicated into a BIGINT column and the slave's table has a
-    TIMESTAMP column, then the slave's TIMESTAMP column will take its value
-    from set_time() which we called earlier (consistent with SBR). And then in
-    some cases we won't want TIMESTAMP_NO_AUTO_SET (will require some code to
-    analyze if explicit data is provided for slave's TIMESTAMP columns).
-  */
-  m_table->timestamp_field_type= TIMESTAMP_NO_AUTO_SET;
-  
-  /* Honor next number column if present */
-  m_table->next_number_field= m_table->found_next_number_field;
-  /*
-   * Fixed Bug#45999, In RBR, Store engine of Slave auto-generates new
-   * sequence numbers for auto_increment fields if the values of them are 0.
-   * If generateing a sequence number is decided by the values of
-   * table->auto_increment_field_not_null and SQL_MODE(if includes
-   * MODE_NO_AUTO_VALUE_ON_ZERO) in update_auto_increment function.
-   * SQL_MODE of slave sql thread is always consistency with master's.
-   * In RBR, auto_increment fields never are NULL.
-   */
-  m_table->auto_increment_field_not_null= TRUE;
-  return error;
-}
+    thd->set_time((time_t)when);
 
-int 
-Write_rows_log_event::do_after_row_operations(const Slave_reporting_capability *const,
-                                              int error)
-{
-  int local_error= 0;
-  m_table->next_number_field=0;
-  m_table->auto_increment_field_not_null= FALSE;
-  if ((slave_exec_mode == SLAVE_EXEC_MODE_IDEMPOTENT) ||
-      m_table->s->db_type()->db_type == DB_TYPE_NDBCLUSTER)
-  {
-    m_table->file->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
-    m_table->file->extra(HA_EXTRA_WRITE_CANNOT_REPLACE);
     /*
-      resetting the extra with 
-      table->file->extra(HA_EXTRA_NO_IGNORE_NO_KEY); 
-      fires bug#27077
-      explanation: file->reset() performs this duty
-      ultimately. Still todo: fix
-    */
-  }
-  if ((local_error= m_table->file->ha_end_bulk_insert()))
-  {
-    m_table->file->print_error(local_error, MYF(0));
-  }
-  return error? error : local_error;
-}
+      Now we are in a statement and will stay in a statement until we
+      see a STMT_END_F.
 
-#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
+      We set this flag here, before actually applying any rows, in
+      case the SQL thread is stopped and we need to detect that we're
+      inside a statement and halting abruptly might cause problems
+      when restarting.
+     */
+    const_cast<Relay_log_info*>(rli)->set_flag(Relay_log_info::IN_STMT);
 
-/*
-  Check if there are more UNIQUE keys after the given key.
-*/
-static int
-last_uniq_key(TABLE *table, uint keyno)
-{
-  while (++keyno < table->s->keys)
-    if (table->key_info[keyno].flags & HA_NOSAME)
-      return 0;
-  return 1;
-}
+     if ( m_width == table->s->fields && bitmap_is_set_all(&m_cols))
+      set_flags(COMPLETE_ROWS_F);
 
-/**
-   Check if an error is a duplicate key error.
+    /* 
+      Set tables write and read sets.
+      
+      Read_set contains all slave columns (in case we are going to fetch
+      a complete record from slave)
+      
+      Write_set equals the m_cols bitmap sent from master but it can be 
+      longer if slave has extra columns. 
+     */ 
 
-   This function is used to check if an error code is one of the
-   duplicate key error, i.e., and error code for which it is sensible
-   to do a <code>get_dup_key()</code> to retrieve the duplicate key.
+    DBUG_PRINT_BITSET("debug", "Setting table's write_set from: %s", &m_cols);
+    
+    bitmap_set_all(table->read_set);
+    if (get_type_code() == DELETE_ROWS_EVENT)
+        bitmap_intersect(table->read_set,&m_cols);
 
-   @param errcode The error code to check.
+    bitmap_set_all(table->write_set);
+    if (!get_flags(COMPLETE_ROWS_F))
+    {
+      if (get_type_code() == UPDATE_ROWS_EVENT)
+        bitmap_intersect(table->write_set,&m_cols_ai);
+      else /* WRITE ROWS EVENTS store the bitmap in m_cols instead of m_cols_ai */
+        bitmap_intersect(table->write_set,&m_cols);
+    }
 
-   @return <code>true</code> if the error code is such that
-   <code>get_dup_key()</code> will return true, <code>false</code>
-   otherwise.
- */
-bool
-is_duplicate_key_error(int errcode)
-{
-  switch (errcode)
-  {
-  case HA_ERR_FOUND_DUPP_KEY:
-  case HA_ERR_FOUND_DUPP_UNIQUE:
-    return true;
-  }
-  return false;
-}
+    this->slave_exec_mode= slave_exec_mode_options; // fix the mode
+    // Do event specific preparations
 
-/**
-  Write the current row into event's table.
+    error= do_before_row_operations(rli);
 
-  The row is located in the row buffer, pointed by @c m_curr_row member.
-  Number of columns of the row is stored in @c m_width member (it can be 
-  different from the number of columns in the table to which we insert). 
-  Bitmap @c m_cols indicates which columns are present in the row. It is assumed 
-  that event's table is already open and pointed by @c m_table.
+    /**
+       Check if update contains only values in AI for columns that do
+       not exist on the slave. If it does, we can just unpack the rows
+       and return (do nothing on the local table).
+
+       NOTE: We do the following optimization and check only if there
+       are usable values on the AI and disregard the fact that there
+       might be usable values in the BI. In practice this means that
+       the slave will not go through find_row (since we have nothing
+       on the record to update, why go looking for it?).
+
+       If we wanted find_row to run anyway, we could move this
+       check after find_row, but then we would have to face the fact
+       that the slave might stop without finding the proper record
+       (because it might have incomplete BI), even though there were
+       no values in AI.
+
+       On the other hand, if AI has usable values but BI has not,
+       then find_row will return an error (and the error is then
+       propagated as it was already).
+     */
+    if (get_type_code() != UPDATE_ROWS_EVENT ||
+        is_any_column_signaled_for_table(table, &m_cols_ai))
+    {
+      uint row_lookup_method= decide_row_lookup_method(table, &m_cols, get_type_code());
 
-  If the same record already exists in the table it can be either overwritten 
-  or an error is reported depending on the value of @c overwrite flag 
-  (error reporting not yet implemented). Note that the matching record can be
-  different from the row we insert if we use primary keys to identify records in
-  the table.
+      // row processing loop
+      while (!error && (m_curr_row != m_rows_end))
+      {
+        switch (row_lookup_method)
+        {
+          case ROW_LOOKUP_HASH_SCAN:
+            /**
+               scan the table and for each entry in the table, if
+               it exists in the hash, execute the row.
+             */
+            error= do_hash_scan_and_update(rli);
+           break;
 
-  The row to be inserted can contain values only for selected columns. The 
-  missing columns are filled with default values using @c prepare_record() 
-  function. If a matching record is found in the table and @c overwritte is
-  true, the missing columns are taken from it.
+          case ROW_LOOKUP_INDEX_SCAN:
+            error= do_index_scan_and_update(rli);
+            break;
 
-  @param  rli   Relay log info (needed for row unpacking).
-  @param  overwrite  
-                Shall we overwrite if the row already exists or signal 
-                error (currently ignored).
+          case ROW_LOOKUP_TABLE_SCAN:
+            error= do_table_scan_and_update(rli);
+            break;
 
-  @returns Error code on failure, 0 on success.
+          case ROW_LOOKUP_NOT_NEEDED:
+            DBUG_ASSERT(get_type_code() == WRITE_ROWS_EVENT);
 
-  This method, if successful, sets @c m_curr_row_end pointer to point at the
-  next row in the rows buffer. This is done when unpacking the row to be 
-  inserted.
+            /* No need to scan for rows, just apply it */
+            error= do_apply_row(rli);
+            break;
+        }
+      }
+    }
 
-  @note If a matching record is found, it is either updated using 
-  @c ha_update_row() or first deleted and then new record written.
-*/ 
+    {/**
+         The following failure injecion works in cooperation with tests 
+         setting @@global.debug= 'd,stop_slave_middle_group'.
+         The sql thread receives the killed status and will proceed 
+         to shutdown trying to finish incomplete events group.
+     */
+      DBUG_EXECUTE_IF("stop_slave_middle_group",
+                      if (thd->transaction.all.modified_non_trans_table)
+                        const_cast<Relay_log_info*>(rli)->abort_slave= 1;);
+    }
 
-int
-Rows_log_event::write_row(const Relay_log_info *const rli,
-                          const bool overwrite)
-{
-  DBUG_ENTER("write_row");
-  DBUG_ASSERT(m_table != NULL && thd != NULL);
+    if ((error= do_after_row_operations(rli, error)) &&
+        ignored_error_code(convert_handler_error(error, thd, table)))
+    {
 
-  TABLE *table= m_table;  // pointer to event's table
-  int error;
-  int UNINIT_VAR(keynum);
-  auto_afree_ptr<char> key(NULL);
+      if (global_system_variables.log_warnings)
+        slave_rows_error_report(WARNING_LEVEL, error, rli, thd, table,
+                                get_type_str(),
+                                const_cast<Relay_log_info*>(rli)->get_rpl_log_name(),
+                                (ulong) log_pos);
+      clear_all_errors(thd, const_cast<Relay_log_info*>(rli));
+      error= 0;
+    }
+  } // if (table)
 
-  /* fill table->record[0] with default values */
-  bool abort_on_warnings= (rli->info_thd->variables.sql_mode &
-                           (MODE_STRICT_TRANS_TABLES | MODE_STRICT_ALL_TABLES));
-  if ((error= prepare_record(table, &m_cols,
-                             table->file->ht->db_type != DB_TYPE_NDBCLUSTER,
-                             abort_on_warnings, m_curr_row == m_rows_buf)))
-    DBUG_RETURN(error);
   
-  /* unpack row into table->record[0] */
-  if ((error= unpack_current_row(rli, &m_cols, abort_on_warnings)))
-    DBUG_RETURN(error);
-
-  // Temporary fix to find out why it fails [/Matz]
-  memcpy(m_table->write_set->bitmap, m_cols.bitmap, (m_table->write_set->n_bits + 7) / 8);
-
-  if (m_curr_row == m_rows_buf)
+  if (error)
   {
-    /* this is the first row to be inserted, we estimate the rows with
-       the size of the first row and use that value to initialize
-       storage engine for bulk insertion */
-    DBUG_ASSERT(!(m_curr_row > m_curr_row_end));
-    ulong estimated_rows= 0;
-    if (m_curr_row < m_curr_row_end)
-      estimated_rows= (m_rows_end - m_curr_row) / (m_curr_row_end - m_curr_row);
-    else if (m_curr_row == m_curr_row_end)
-      estimated_rows= 1;
+    slave_rows_error_report(ERROR_LEVEL, error, rli, thd, table,
+                             get_type_str(),
+                             const_cast<Relay_log_info*>(rli)->get_rpl_log_name(),
+                             (ulong) log_pos);
+    /*
+      @todo We should probably not call
+      reset_current_stmt_binlog_format_row() from here.
 
-    m_table->file->ha_start_bulk_insert(estimated_rows);
+      Note: this applies to log_event_old.cc too.
+      /Sven
+    */
+    thd->reset_current_stmt_binlog_format_row();
+    const_cast<Relay_log_info*>(rli)->cleanup_context(thd, error);
+    thd->is_slave_error= 1;
+    DBUG_RETURN(error);
   }
-  
-  
-#ifndef DBUG_OFF
-  DBUG_DUMP("record[0]", table->record[0], table->s->reclength);
-  DBUG_PRINT_BITSET("debug", "write_set = %s", table->write_set);
-  DBUG_PRINT_BITSET("debug", "read_set = %s", table->read_set);
-#endif
 
-  /* 
-    Try to write record. If a corresponding record already exists in the table,
-    we try to change it using ha_update_row() if possible. Otherwise we delete
-    it and repeat the whole process again. 
+  if (get_flags(STMT_END_F))
+    if ((error= rows_event_stmt_cleanup(rli, thd)))
+      rli->report(ERROR_LEVEL, error,
+                  "Error in %s event: commit of row events failed, "
+                  "table `%s`.`%s`",
+                  get_type_str(), m_table->s->db.str,
+                  m_table->s->table_name.str);
 
-    TODO: Add safety measures against infinite looping. 
-   */
+  DBUG_RETURN(error);
+}
 
-  m_table->mark_columns_per_binlog_row_image();
+Log_event::enum_skip_reason
+Rows_log_event::do_shall_skip(Relay_log_info *rli)
+{
+  /*
+    If the slave skip counter is 1 and this event does not end a
+    statement, then we should not start executing on the next event.
+    Otherwise, we defer the decision to the normal skipping logic.
+  */
+  if (rli->slave_skip_counter == 1 && !get_flags(STMT_END_F))
+    return Log_event::EVENT_SKIP_IGNORE;
+  else
+    return Log_event::do_shall_skip(rli);
+}
 
-  while ((error= table->file->ha_write_row(table->record[0])))
+/**
+   The function is called at Rows_log_event statement commit time,
+   normally from Rows_log_event::do_update_pos() and possibly from
+   Query_log_event::do_apply_event() of the COMMIT.
+   The function commits the last statement for engines, binlog and
+   releases resources have been allocated for the statement.
+  
+   @retval  0         Ok.
+   @retval  non-zero  Error at the commit.
+ */
+
+static int rows_event_stmt_cleanup(Relay_log_info const *rli, THD * thd)
+{
+  int error;
   {
-    if (error == HA_ERR_LOCK_DEADLOCK ||
-        error == HA_ERR_LOCK_WAIT_TIMEOUT ||
-        (keynum= table->file->get_dup_key(error)) < 0 ||
-        !overwrite)
-    {
-      DBUG_PRINT("info",("get_dup_key returns %d)", keynum));
-      /*
-        Deadlock, waiting for lock or just an error from the handler
-        such as HA_ERR_FOUND_DUPP_KEY when overwrite is false.
-        Retrieval of the duplicate key number may fail
-        - either because the error was not "duplicate key" error
-        - or because the information which key is not available
-      */
-      table->file->print_error(error, MYF(0));
-      goto error;
-    }
     /*
-       We need to retrieve the old row into record[1] to be able to
-       either update or delete the offending record.  We either:
-
-       - use ha_rnd_pos() with a row-id (available as dupp_row) to the
-         offending row, if that is possible (MyISAM and Blackhole), or else
-
-       - use ha_index_read_idx_map() with the key that is duplicated, to
-         retrieve the offending row.
-     */
-    if (table->file->ha_table_flags() & HA_DUPLICATE_POS)
-    {
-      DBUG_PRINT("info",("Locating offending record using ha_rnd_pos()"));
-
-      if (table->file->inited && (error= table->file->ha_index_end()))
-        DBUG_RETURN(error);
-      if ((error= table->file->ha_rnd_init(FALSE)))
-        DBUG_RETURN(error);
-
-      error= table->file->ha_rnd_pos(table->record[1], table->file->dup_ref);
-
-      table->file->ha_rnd_end();
-      if (error)
-      {
-        DBUG_PRINT("info",("ha_rnd_pos() returns error %d",error));
-        if (error == HA_ERR_RECORD_DELETED)
-          error= HA_ERR_KEY_NOT_FOUND;
-        table->file->print_error(error, MYF(0));
-        goto error;
-      }
-    }
-    else
-    {
-      DBUG_PRINT("info",("Locating offending record using index_read_idx()"));
-
-      if (table->file->extra(HA_EXTRA_FLUSH_CACHE))
-      {
-        DBUG_PRINT("info",("Error when setting HA_EXTRA_FLUSH_CACHE"));
-        error= my_errno;
-        goto error;
-      }
+      This is the end of a statement or transaction, so close (and
+      unlock) the tables we opened when processing the
+      Table_map_log_event starting the statement.
 
-      if (key.get() == NULL)
-      {
-        key.assign(static_cast<char*>(my_alloca(table->s->max_unique_length)));
-        if (key.get() == NULL)
-        {
-          DBUG_PRINT("info",("Can't allocate key buffer"));
-          error= ENOMEM;
-          goto error;
-        }
-      }
+      OBSERVER.  This will clear *all* mappings, not only those that
+      are open for the table. There is not good handle for on-close
+      actions for tables.
 
-      key_copy((uchar*)key.get(), table->record[0], table->key_info + keynum,
-               0);
-      error= table->file->ha_index_read_idx_map(table->record[1], keynum,
-                                                (const uchar*)key.get(),
-                                                HA_WHOLE_KEY,
-                                                HA_READ_KEY_EXACT);
-      if (error)
-      {
-        DBUG_PRINT("info",("ha_index_read_idx_map() returns %s", HA_ERR(error)));
-        if (error == HA_ERR_RECORD_DELETED)
-          error= HA_ERR_KEY_NOT_FOUND;
-        table->file->print_error(error, MYF(0));
-        goto error;
-      }
-    }
+      NOTE. Even if we have no table ('table' == 0) we still need to be
+      here, so that we increase the group relay log position. If we didn't, we
+      could have a group relay log position which lags behind "forever"
+      (assume the last master's transaction is ignored by the slave because of
+      replicate-ignore rules).
+    */
+    error= thd->binlog_flush_pending_rows_event(TRUE);
 
     /*
-       Now, record[1] should contain the offending row.  That
-       will enable us to update it or, alternatively, delete it (so
-       that we can insert the new row afterwards).
-     */
+      If this event is not in a transaction, the call below will, if some
+      transactional storage engines are involved, commit the statement into
+      them and flush the pending event to binlog.
+      If this event is in a transaction, the call will do nothing, but a
+      Xid_log_event will come next which will, if some transactional engines
+      are involved, commit the transaction and flush the pending event to the
+      binlog.
+    */
+    error|= (error ? trans_rollback_stmt(thd) : trans_commit_stmt(thd));
 
     /*
-      If row is incomplete we will use the record found to fill 
-      missing columns.  
+      Now what if this is not a transactional engine? we still need to
+      flush the pending event to the binlog; we did it with
+      thd->binlog_flush_pending_rows_event(). Note that we imitate
+      what is done for real queries: a call to
+      ha_autocommit_or_rollback() (sometimes only if involves a
+      transactional engine), and a call to be sure to have the pending
+      event flushed.
     */
-    if (!get_flags(COMPLETE_ROWS_F))
-    {
-      restore_record(table,record[1]);
-      error= unpack_current_row(rli, &m_cols);
-    }
-
-#ifndef DBUG_OFF
-    DBUG_PRINT("debug",("preparing for update: before and after image"));
-    DBUG_DUMP("record[1] (before)", table->record[1], table->s->reclength);
-    DBUG_DUMP("record[0] (after)", table->record[0], table->s->reclength);
-#endif
 
     /*
-       REPLACE is defined as either INSERT or DELETE + INSERT.  If
-       possible, we can replace it with an UPDATE, but that will not
-       work on InnoDB if FOREIGN KEY checks are necessary.
+      @todo We should probably not call
+      reset_current_stmt_binlog_format_row() from here.
 
-       I (Matz) am not sure of the reason for the last_uniq_key()
-       check as, but I'm guessing that it's something along the
-       following lines.
+      Note: this applies to log_event_old.cc too
 
-       Suppose that we got the duplicate key to be a key that is not
-       the last unique key for the table and we perform an update:
-       then there might be another key for which the unique check will
-       fail, so we're better off just deleting the row and inserting
-       the correct row.
-     */
-    if (last_uniq_key(table, keynum) &&
-        !table->file->referenced_by_foreign_key())
-    {
-      DBUG_PRINT("info",("Updating row using ha_update_row()"));
-      error=table->file->ha_update_row(table->record[1],
-                                       table->record[0]);
-      switch (error) {
-                
-      case HA_ERR_RECORD_IS_THE_SAME:
-        DBUG_PRINT("info",("ignoring HA_ERR_RECORD_IS_THE_SAME error from"
-                           " ha_update_row()"));
-        error= 0;
-      
-      case 0:
-        break;
-        
-      default:    
-        DBUG_PRINT("info",("ha_update_row() returns error %d",error));
-        table->file->print_error(error, MYF(0));
-      }
-      
-      goto error;
-    }
-    else
-    {
-      DBUG_PRINT("info",("Deleting offending row and trying to write new one again"));
-      if ((error= table->file->ha_delete_row(table->record[1])))
-      {
-        DBUG_PRINT("info",("ha_delete_row() returns error %d",error));
-        table->file->print_error(error, MYF(0));
-        goto error;
-      }
-      /* Will retry ha_write_row() with the offending row removed. */
-    }
-  }
+      Btw, the previous comment about transactional engines does not
+      seem related to anything that happens here.
+      /Sven
+    */
+    thd->reset_current_stmt_binlog_format_row();
 
-error:
-  m_table->default_column_bitmaps();
-  DBUG_RETURN(error);
+    const_cast<Relay_log_info*>(rli)->cleanup_context(thd, 0);
+  }
+  return error;
 }
 
-#endif
+/**
+   The method either increments the relay log position or
+   commits the current statement and increments the master group 
+   possition if the event is STMT_END_F flagged and
+   the statement corresponds to the autocommit query (i.e replicated
+   without wrapping in BEGIN/COMMIT)
 
+   @retval 0         Success
+   @retval non-zero  Error in the statement commit
+ */
 int
-Write_rows_log_event::do_exec_row(const Relay_log_info *const rli)
+Rows_log_event::do_update_pos(Relay_log_info *rli)
 {
-  DBUG_ASSERT(m_table != NULL);
-  int error= write_row(rli, slave_exec_mode == SLAVE_EXEC_MODE_IDEMPOTENT);
+  DBUG_ENTER("Rows_log_event::do_update_pos");
+  int error= 0;
 
-  if (error && !thd->is_error())
-  {
-    DBUG_ASSERT(0);
-    my_error(ER_UNKNOWN_ERROR, MYF(0));
+  DBUG_PRINT("info", ("flags: %s",
+                      get_flags(STMT_END_F) ? "STMT_END_F " : ""));
+
+  if (get_flags(STMT_END_F))
+  {
+    /*
+      Indicate that a statement is finished.
+      Step the group log position if we are not in a transaction,
+      otherwise increase the event log position.
+    */
+    rli->stmt_done(log_pos);
+    /*
+      Clear any errors in thd->net.last_err*. It is not known if this is
+      needed or not. It is believed that any errors that may exist in
+      thd->net.last_err* are allowed. Examples of errors are "key not
+      found", which is produced in the test case rpl_row_conflicts.test
+    */
+    thd->clear_error();
+  }
+  else
+  {
+    rli->inc_event_relay_log_pos();
   }
 
-  return error;
+  DBUG_RETURN(error);
 }
 
 #endif /* !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION) */
 
-#ifdef MYSQL_CLIENT
-void Write_rows_log_event::print(FILE *file, PRINT_EVENT_INFO* print_event_info)
+#ifndef MYSQL_CLIENT
+bool Rows_log_event::write_data_header(IO_CACHE *file)
 {
-  Rows_log_event::print_helper(file, print_event_info, "Write_rows");
+  uchar buf[ROWS_HEADER_LEN];	// No need to init the buffer
+  DBUG_ASSERT(m_table_id != ~0UL);
+  DBUG_EXECUTE_IF("old_row_based_repl_4_byte_map_id_master",
+                  {
+                    int4store(buf + 0, m_table_id);
+                    int2store(buf + 4, m_flags);
+                    return (my_b_safe_write(file, buf, 6));
+                  });
+  int6store(buf + RW_MAPID_OFFSET, (ulonglong)m_table_id);
+  int2store(buf + RW_FLAGS_OFFSET, m_flags);
+  return (my_b_safe_write(file, buf, ROWS_HEADER_LEN));
 }
-#endif
-
-/**************************************************************************
-	Delete_rows_log_event member functions
-**************************************************************************/
-
-#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
-/*
-  Compares table->record[0] and table->record[1]
 
-  Returns TRUE if different.
-*/
-static bool record_compare(TABLE *table, MY_BITMAP *cols)
+bool Rows_log_event::write_data_body(IO_CACHE*file)
 {
   /*
-    Need to set the X bit and the filler bits in both records since
-    there are engines that do not set it correctly.
+     Note that this should be the number of *bits*, not the number of
+     bytes.
+  */
+  uchar sbuf[sizeof(m_width) + 1];
+  my_ptrdiff_t const data_size= m_rows_cur - m_rows_buf;
+  bool res= false;
+  uchar *const sbuf_end= net_store_length(sbuf, (size_t) m_width);
+  DBUG_ASSERT(static_cast<size_t>(sbuf_end - sbuf) <= sizeof(sbuf));
 
-    In addition, since MyISAM checks that one hasn't tampered with the
-    record, it is necessary to restore the old bytes into the record
-    after doing the comparison.
+  DBUG_DUMP("m_width", sbuf, (size_t) (sbuf_end - sbuf));
+  res= res || my_b_safe_write(file, sbuf, (size_t) (sbuf_end - sbuf));
 
-    TODO[record format ndb]: Remove it once NDB returns correct
-    records. Check that the other engines also return correct records.
+  DBUG_DUMP("m_cols", (uchar*) m_cols.bitmap, no_bytes_in_map(&m_cols));
+  res= res || my_b_safe_write(file, (uchar*) m_cols.bitmap,
+                              no_bytes_in_map(&m_cols));
+  /*
+    TODO[refactor write]: Remove the "down cast" here (and elsewhere).
    */
-
-  DBUG_DUMP("record[0]", table->record[0], table->s->reclength);
-  DBUG_DUMP("record[1]", table->record[1], table->s->reclength);
-
-  bool result= FALSE;
-  uchar saved_x[2]= {0, 0}, saved_filler[2]= {0, 0};
-
-  if (table->s->null_bytes > 0)
+  if (get_type_code() == UPDATE_ROWS_EVENT)
   {
-    for (int i = 0 ; i < 2 ; ++i)
-    {
-      /* 
-        If we have an X bit then we need to take care of it.
-      */
-      if (!(table->s->db_options_in_use & HA_OPTION_PACK_RECORD))
-      {
-        saved_x[i]= table->record[i][0];
-        table->record[i][0]|= 1U;
-      }
-
-      /*
-         If (last_null_bit_pos == 0 && null_bytes > 1), then:
-
-         X bit (if any) + N nullable fields + M Field_bit fields = 8 bits 
-
-         Ie, the entire byte is used.
-      */
-      if (table->s->last_null_bit_pos > 0)
-      {
-        saved_filler[i]= table->record[i][table->s->null_bytes - 1];
-        table->record[i][table->s->null_bytes - 1]|=
-          256U - (1U << table->s->last_null_bit_pos);
-      }
-    }
+    DBUG_DUMP("m_cols_ai", (uchar*) m_cols_ai.bitmap,
+              no_bytes_in_map(&m_cols_ai));
+    res= res || my_b_safe_write(file, (uchar*) m_cols_ai.bitmap,
+                                no_bytes_in_map(&m_cols_ai));
   }
+  DBUG_DUMP("rows", m_rows_buf, data_size);
+  res= res || my_b_safe_write(file, m_rows_buf, (size_t) data_size);
 
-  if (table->s->blob_fields + table->s->varchar_fields == 0 &&
-      bitmap_is_set_all(cols))
-  {
-    result= cmp_record(table,record[1]);
-    goto record_compare_exit;
-  }
+  return res;
 
-  /* Compare null bits */
-  if (bitmap_is_set_all(cols) &&
-      memcmp(table->null_flags,
-	     table->null_flags+table->s->rec_buff_length,
-	     table->s->null_bytes))
-  {
-    result= TRUE;				// Diff in NULL value
-    goto record_compare_exit;
-  }
+}
+#endif
 
-  /* Compare updated fields */
-  for (Field **ptr=table->field ; 
-       *ptr && ((*ptr)->field_index < cols->n_bits);
-       ptr++)
+#if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT)
+void Rows_log_event::pack_info(Protocol *protocol)
+{
+  char buf[256];
+  char const *const flagstr=
+    get_flags(STMT_END_F) ? " flags: STMT_END_F" : "";
+  size_t bytes= my_snprintf(buf, sizeof(buf),
+                               "table_id: %lu%s", m_table_id, flagstr);
+  protocol->store(buf, bytes, &my_charset_bin);
+}
+#endif
+
+#ifdef MYSQL_CLIENT
+void Rows_log_event::print_helper(FILE *file,
+                                  PRINT_EVENT_INFO *print_event_info,
+                                  char const *const name)
+{
+  IO_CACHE *const head= &print_event_info->head_cache;
+  IO_CACHE *const body= &print_event_info->body_cache;
+  if (!print_event_info->short_form)
   {
-    if (bitmap_is_set(cols, (*ptr)->field_index))
-    {
-      if ((*ptr)->cmp_binary_offset(table->s->rec_buff_length))
-      {
-        result= TRUE;
-        goto record_compare_exit;
-      }
-    }
+    bool const last_stmt_event= get_flags(STMT_END_F);
+    print_header(head, print_event_info, !last_stmt_event);
+    my_b_printf(head, "\t%s: table id %lu%s\n",
+                name, m_table_id,
+                last_stmt_event ? " flags: STMT_END_F" : "");
+    print_base64(body, print_event_info, !last_stmt_event);
   }
 
-record_compare_exit:
-  /*
-    Restore the saved bytes.
-
-    TODO[record format ndb]: Remove this code once NDB returns the
-    correct record format.
-  */
-  if (table->s->null_bytes > 0)
+  if (get_flags(STMT_END_F))
   {
-    for (int i = 0 ; i < 2 ; ++i)
-    {
-      if (!(table->s->db_options_in_use & HA_OPTION_PACK_RECORD))
-        table->record[i][0]= saved_x[i];
-
-      if (table->s->last_null_bit_pos)
-        table->record[i][table->s->null_bytes - 1]= saved_filler[i];
-    }
+    copy_event_cache_to_file_and_reinit(head, file);
+    copy_event_cache_to_file_and_reinit(body, file);
   }
-
-  return result;
 }
+#endif
 
+/**************************************************************************
+	Table_map_log_event member functions and support functions
+**************************************************************************/
 
 /**
-  Checks if any of the columns in the given table is
-  signaled in the bitmap.
-
-  For each column in the given table checks if it is
-  signaled in the bitmap. This is most useful when deciding
-  whether a before image (BI) can be used or not for 
-  searching a row. If no column is signaled, then the 
-  image cannot be used for searching a record (regardless 
-  of using position(), index scan or table scan). Here is 
-  an example:
-
-  MASTER> SET @@binlog_row_image='MINIMAL';
-  MASTER> CREATE TABLE t1 (a int, b int, c int, primary key(c));
-  SLAVE> CREATE TABLE t1 (a int, b int);
-  MASTER> INSERT INTO t1 VALUES (1,2,3);
-  MASTER> UPDATE t1 SET a=2 WHERE b=2;
-
-  For the update statement only the PK (column c) is 
-  logged in the before image (BI). As such, given that 
-  the slave has no column c, it will not be able to 
-  find the row, because BI has no values for the columns
-  the slave knows about (column a and b).
+  @page How replication of field metadata works.
+  
+  When a table map is created, the master first calls 
+  Table_map_log_event::save_field_metadata() which calculates how many 
+  values will be in the field metadata. Only those fields that require the 
+  extra data are added. The method also loops through all of the fields in 
+  the table calling the method Field::save_field_metadata() which returns the
+  values for the field that will be saved in the metadata and replicated to
+  the slave. Once all fields have been processed, the table map is written to
+  the binlog adding the size of the field metadata and the field metadata to
+  the end of the body of the table map.
 
-  @param table   the table reference on the slave.
-  @param cols the bitmap signaling columns available in 
-                 the BI.
+  When a table map is read on the slave, the field metadata is read from the 
+  table map and passed to the table_def class constructor which saves the 
+  field metadata from the table map into an array based on the type of the 
+  field. Field metadata values not present (those fields that do not use extra 
+  data) in the table map are initialized as zero (0). The array size is the 
+  same as the columns for the table on the slave.
 
-  @return TRUE if BI contains usable colums for searching, 
-          FALSE otherwise.
+  Additionally, values saved for field metadata on the master are saved as a 
+  string of bytes (uchar) in the binlog. A field may require 1 or more bytes
+  to store the information. In cases where values require multiple bytes 
+  (e.g. values > 255), the endian-safe methods are used to properly encode 
+  the values on the master and decode them on the slave. When the field
+  metadata values are captured on the slave, they are stored in an array of
+  type uint16. This allows the least number of casts to prevent casting bugs
+  when the field metadata is used in comparisons of field attributes. When
+  the field metadata is used for calculating addresses in pointer math, the
+  type used is uint32. 
 */
-static
-my_bool is_any_column_signaled_for_table(TABLE *table, MY_BITMAP *cols)
-{
-
-  int nfields_set= 0;
-  for (Field **ptr=table->field ; 
-       *ptr && ((*ptr)->field_index < cols->n_bits);
-       ptr++)
-  {
-    if (bitmap_is_set(cols, (*ptr)->field_index))
-      nfields_set++;
-  }
-
-  return (nfields_set != 0);
-}
 
+#if !defined(MYSQL_CLIENT)
 /**
-  Checks if the fields in the given key are signaled in
-  the bitmap.
-
-  Validates whether the before image is usable for the
-  given key. It can be the case that the before image
-  does not contain values for the key (eg, master was
-  using 'minimal' option for image logging and slave has
-  different index structure on the table). Here is an
-  example:
-
-  MASTER> SET @@binlog_row_image='MINIMAL';
-  MASTER> CREATE TABLE t1 (a int, b int, c int, primary key(c));
-  SLAVE> CREATE TABLE t1 (a int, b int, c int, key(a,c));
-  MASTER> INSERT INTO t1 VALUES (1,2,3);
-  MASTER> UPDATE t1 SET a=2 WHERE b=2;
-
-  When finding the row on the slave, one cannot use the
-  index (a,c) to search for the row, because there is only
-  data in the before image for column c. This function
-  checks the fields needed for a given key and searches
-  the bitmap to see if all the fields required are 
-  signaled.
+  Save the field metadata based on the real_type of the field.
+  The metadata saved depends on the type of the field. Some fields
+  store a single byte for pack_length() while others store two bytes
+  for field_length (max length).
   
-  @param keyinfo  reference to key.
-  @param cols     the bitmap signaling which columns 
-                  have available data.
+  @retval  0  Ok.
 
-  @return TRUE if all fields are signaled in the bitmap 
-          for the given key, FALSE otherwise.
+  @todo
+  We may want to consider changing the encoding of the information.
+  Currently, the code attempts to minimize the number of bytes written to 
+  the tablemap. There are at least two other alternatives; 1) using 
+  net_store_length() to store the data allowing it to choose the number of
+  bytes that are appropriate thereby making the code much easier to 
+  maintain (only 1 place to change the encoding), or 2) use a fixed number
+  of bytes for each field. The problem with option 1 is that net_store_length()
+  will use one byte if the value < 251, but 3 bytes if it is > 250. Thus,
+  for fields like CHAR which can be no larger than 255 characters, the method
+  will use 3 bytes when the value is > 250. Further, every value that is
+  encoded using 2 parts (e.g., pack_length, field_length) will be numerically
+  > 250 therefore will use 3 bytes for eah value. The problem with option 2
+  is less wasteful for space but does waste 1 byte for every field that does
+  not encode 2 parts. 
 */
-static
-my_bool are_all_columns_signaled_for_key(KEY *keyinfo, MY_BITMAP *cols)
+int Table_map_log_event::save_field_metadata()
 {
-  for (uint i=0 ; i < keyinfo->key_parts ;i++)
+  DBUG_ENTER("Table_map_log_event::save_field_metadata");
+  int index= 0;
+  for (unsigned int i= 0 ; i < m_table->s->fields ; i++)
   {
-    uint fieldnr= keyinfo->key_part[i].fieldnr - 1;
-    if (fieldnr >= cols->n_bits || 
-        !bitmap_is_set(cols, fieldnr))
-      return FALSE;
+    DBUG_PRINT("debug", ("field_type: %d", m_coltype[i]));
+    index+= m_table->s->field[i]->save_field_metadata(&m_field_metadata[index]);
   }
- 
-  return TRUE;
+  DBUG_RETURN(index);
 }
+#endif /* !defined(MYSQL_CLIENT) */
 
-/**
-  Searches the table for a given key that can be used
-  according to the existing values, ie, columns set
-  in the bitmap.
+/*
+  Constructor used to build an event for writing to the binary log.
+  Mats says tbl->s lives longer than this event so it's ok to copy pointers
+  (tbl->s->db etc) and not pointer content.
+ */
+#if !defined(MYSQL_CLIENT)
+Table_map_log_event::Table_map_log_event(THD *thd, TABLE *tbl, ulong tid,
+                                         bool is_transactional)
+  : Log_event(thd, 0, is_transactional),
+    m_table(tbl),
+    m_dbnam(tbl->s->db.str),
+    m_dblen(m_dbnam ? tbl->s->db.length : 0),
+    m_tblnam(tbl->s->table_name.str),
+    m_tbllen(tbl->s->table_name.length),
+    m_colcnt(tbl->s->fields),
+    m_memory(NULL),
+    m_table_id(tid),
+    m_flags(TM_BIT_LEN_EXACT_F),
+    m_data_size(0),
+    m_field_metadata(0),
+    m_field_metadata_size(0),
+    m_null_bits(0),
+    m_meta_memory(NULL)
+{
+  uchar cbuf[sizeof(m_colcnt) + 1];
+  uchar *cbuf_end;
+  DBUG_ASSERT(m_table_id != ~0UL);
+  /*
+    In TABLE_SHARE, "db" and "table_name" are 0-terminated (see this comment in
+    table.cc / alloc_table_share():
+      Use the fact the key is db/0/table_name/0
+    As we rely on this let's assert it.
+  */
+  DBUG_ASSERT((tbl->s->db.str == 0) ||
+              (tbl->s->db.str[tbl->s->db.length] == 0));
+  DBUG_ASSERT(tbl->s->table_name.str[tbl->s->table_name.length] == 0);
 
-  The caller can specify which type of key to find by
-  setting the following flags in the key_type parameter:
 
-    - PRI_KEY_FLAG
-      Returns the primary key.
+  m_data_size=  TABLE_MAP_HEADER_LEN;
+  DBUG_EXECUTE_IF("old_row_based_repl_4_byte_map_id_master", m_data_size= 6;);
+  m_data_size+= m_dblen + 2;	// Include length and terminating \0
+  m_data_size+= m_tbllen + 2;	// Include length and terminating \0
+  cbuf_end= net_store_length(cbuf, (size_t) m_colcnt);
+  DBUG_ASSERT(static_cast<size_t>(cbuf_end - cbuf) <= sizeof(cbuf));
+  m_data_size+= (cbuf_end - cbuf) + m_colcnt;	// COLCNT and column types
 
-    - UNIQUE_KEY_FLAG
-      Returns a unique key (flagged with HA_NOSAME)
+  /* If malloc fails, caught in is_valid() */
+  if ((m_memory= (uchar*) my_malloc(m_colcnt, MYF(MY_WME))))
+  {
+    m_coltype= reinterpret_cast<uchar*>(m_memory);
+    for (unsigned int i= 0 ; i < m_table->s->fields ; ++i)
+      m_coltype[i]= m_table->field[i]->type();
+  }
 
-    - MULTIPLE_KEY_FLAG
-      Returns a key that is not unique (flagged with HA_NOSAME 
-      and without HA_NULL_PART_KEY) nor PK.
+  /*
+    Calculate a bitmap for the results of maybe_null() for all columns.
+    The bitmap is used to determine when there is a column from the master
+    that is not on the slave and is null and thus not in the row data during
+    replication.
+  */
+  uint num_null_bytes= (m_table->s->fields + 7) / 8;
+  m_data_size+= num_null_bytes;
+  m_meta_memory= (uchar *)my_multi_malloc(MYF(MY_WME),
+                                 &m_null_bits, num_null_bytes,
+                                 &m_field_metadata, (m_colcnt * 2),
+                                 NULL);
 
-  The above flags can be used together, in which case, the
-  search is conducted in the above listed order. Eg, the 
-  following flag:
+  bzero(m_field_metadata, (m_colcnt * 2));
 
-    (PRI_KEY_FLAG | UNIQUE_KEY_FLAG | MULTIPLE_KEY_FLAG)
+  /*
+    Create an array for the field metadata and store it.
+  */
+  m_field_metadata_size= save_field_metadata();
+  DBUG_ASSERT(m_field_metadata_size <= (m_colcnt * 2));
 
-  means that a primary key is returned if it is suitable. If
-  not then the unique keys are searched. If no unique key is
-  suitable, then the keys are searched. Finally, if no key
-  is suitable, MAX_KEY is returned.
+  /*
+    Now set the size of the data to the size of the field metadata array
+    plus one or three bytes (see pack.c:net_store_length) for number of 
+    elements in the field metadata array.
+  */
+  if (m_field_metadata_size < 251)
+    m_data_size+= m_field_metadata_size + 1; 
+  else
+    m_data_size+= m_field_metadata_size + 3; 
 
-  @param table    reference to the table.
-  @param bi_cols  a bitmap that filters out columns that should
-                  not be considered while searching the key. 
-                  Columns that should be considered are set.
-  @param key_type the type of key to search for.
+  bzero(m_null_bits, num_null_bytes);
+  for (unsigned int i= 0 ; i < m_table->s->fields ; ++i)
+    if (m_table->field[i]->maybe_null())
+      m_null_bits[(i / 8)]+= 1 << (i % 8);
 
-  @return MAX_KEY if no key, according to the key_type specified
-          is suitable. Returns the key otherwise.
+}
+#endif /* !defined(MYSQL_CLIENT) */
 
-*/
-static
-uint
-search_key_in_table(TABLE *table, MY_BITMAP *bi_cols, uint key_type)
+/*
+  Constructor used by slave to read the event from the binary log.
+ */
+#if defined(HAVE_REPLICATION)
+Table_map_log_event::Table_map_log_event(const char *buf, uint event_len,
+                                         const Format_description_log_event
+                                         *description_event)
+
+  : Log_event(buf, description_event),
+#ifndef MYSQL_CLIENT
+    m_table(NULL),
+#endif
+    m_dbnam(NULL), m_dblen(0), m_tblnam(NULL), m_tbllen(0),
+    m_colcnt(0), m_coltype(0),
+    m_memory(NULL), m_table_id(ULONG_MAX), m_flags(0),
+    m_data_size(0), m_field_metadata(0), m_field_metadata_size(0),
+    m_null_bits(0), m_meta_memory(NULL)
 {
-  KEY *keyinfo;
-  uint res= MAX_KEY;
-  uint key;
+  unsigned int bytes_read= 0;
+  DBUG_ENTER("Table_map_log_event::Table_map_log_event(const char*,uint,...)");
 
-  if (key_type & PRI_KEY_FLAG && (table->s->primary_key < MAX_KEY))
+  uint8 common_header_len= description_event->common_header_len;
+  uint8 post_header_len= description_event->post_header_len[TABLE_MAP_EVENT-1];
+  DBUG_PRINT("info",("event_len: %u  common_header_len: %d  post_header_len: %d",
+                     event_len, common_header_len, post_header_len));
+
+  /*
+    Don't print debug messages when running valgrind since they can
+    trigger false warnings.
+   */
+#ifndef HAVE_purify
+  DBUG_DUMP("event buffer", (uchar*) buf, event_len);
+#endif
+
+  /* Read the post-header */
+  const char *post_start= buf + common_header_len;
+
+  post_start+= TM_MAPID_OFFSET;
+  if (post_header_len == 6)
   {
-    keyinfo= table->s->key_info + (uint) table->s->primary_key;
-    if (are_all_columns_signaled_for_key(keyinfo, bi_cols)) 
-      return table->s->primary_key;
+    /* Master is of an intermediate source tree before 5.1.4. Id is 4 bytes */
+    m_table_id= uint4korr(post_start);
+    post_start+= 4;
   }
-
-  if (key_type & UNIQUE_KEY_FLAG && table->s->uniques)
+  else
   {
-    for (key=0,keyinfo= table->key_info ; 
-         (key < table->s->keys) && (res == MAX_KEY);
-         key++,keyinfo++)
-    {
-      /*
-        - Unique keys cannot be disabled, thence we skip the check.
-        - Skip unique keys with nullable parts
-        - Skip primary keys
-      */
-      if (!((keyinfo->flags & (HA_NOSAME | HA_NULL_PART_KEY)) != HA_NOSAME) ||
-          (key == table->s->primary_key))
-        continue;
-      res= are_all_columns_signaled_for_key(keyinfo, bi_cols) ? 
-           key : MAX_KEY;
-
-      if (res < MAX_KEY)
-        return res;
-    }
+    DBUG_ASSERT(post_header_len == TABLE_MAP_HEADER_LEN);
+    m_table_id= (ulong) uint6korr(post_start);
+    post_start+= TM_FLAGS_OFFSET;
   }
 
-  if (key_type & MULTIPLE_KEY_FLAG && table->s->keys)
-  {
-    for (key=0,keyinfo= table->key_info ; 
-         (key < table->s->keys) && (res == MAX_KEY);
-         key++,keyinfo++)
-    {
-      /*
-        - Skip innactive keys
-        - Skip unique keys without nullable parts
-        - Skip primary keys
-      */
-      if (!(table->s->keys_in_use.is_set(key)) ||
-          ((keyinfo->flags & (HA_NOSAME | HA_NULL_PART_KEY)) == HA_NOSAME) ||
-          (key == table->s->primary_key))
-        continue;
+  DBUG_ASSERT(m_table_id != ~0UL);
 
-      res= are_all_columns_signaled_for_key(keyinfo, bi_cols) ? 
-           key : MAX_KEY;
+  m_flags= uint2korr(post_start);
 
-      if (res < MAX_KEY)
-        return res;
-    }
+  /* Read the variable part of the event */
+  const char *const vpart= buf + common_header_len + post_header_len;
+
+  /* Extract the length of the various parts from the buffer */
+  uchar const *const ptr_dblen= (uchar const*)vpart + 0;
+  m_dblen= *(uchar*) ptr_dblen;
+
+  /* Length of database name + counter + terminating null */
+  uchar const *const ptr_tbllen= ptr_dblen + m_dblen + 2;
+  m_tbllen= *(uchar*) ptr_tbllen;
+
+  /* Length of table name + counter + terminating null */
+  uchar const *const ptr_colcnt= ptr_tbllen + m_tbllen + 2;
+  uchar *ptr_after_colcnt= (uchar*) ptr_colcnt;
+  m_colcnt= net_field_length(&ptr_after_colcnt);
+
+  DBUG_PRINT("info",("m_dblen: %lu  off: %ld  m_tbllen: %lu  off: %ld  m_colcnt: %lu  off: %ld",
+                     (ulong) m_dblen, (long) (ptr_dblen-(const uchar*)vpart), 
+                     (ulong) m_tbllen, (long) (ptr_tbllen-(const uchar*)vpart),
+                     m_colcnt, (long) (ptr_colcnt-(const uchar*)vpart)));
+
+  /* Allocate mem for all fields in one go. If fails, caught in is_valid() */
+  m_memory= (uchar*) my_multi_malloc(MYF(MY_WME),
+                                     &m_dbnam, (uint) m_dblen + 1,
+                                     &m_tblnam, (uint) m_tbllen + 1,
+                                     &m_coltype, (uint) m_colcnt,
+                                     NullS);
+
+  if (m_memory)
+  {
+    /* Copy the different parts into their memory */
+    strncpy(const_cast<char*>(m_dbnam), (const char*)ptr_dblen  + 1, m_dblen + 1);
+    strncpy(const_cast<char*>(m_tblnam), (const char*)ptr_tbllen + 1, m_tbllen + 1);
+    memcpy(m_coltype, ptr_after_colcnt, m_colcnt);
+
+    ptr_after_colcnt= ptr_after_colcnt + m_colcnt;
+    bytes_read= (uint) (ptr_after_colcnt - (uchar *)buf);
+    DBUG_PRINT("info", ("Bytes read: %d.\n", bytes_read));
+    if (bytes_read < event_len)
+    {
+      m_field_metadata_size= net_field_length(&ptr_after_colcnt);
+      DBUG_ASSERT(m_field_metadata_size <= (m_colcnt * 2));
+      uint num_null_bytes= (m_colcnt + 7) / 8;
+      m_meta_memory= (uchar *)my_multi_malloc(MYF(MY_WME),
+                                     &m_null_bits, num_null_bytes,
+                                     &m_field_metadata, m_field_metadata_size,
+                                     NULL);
+      memcpy(m_field_metadata, ptr_after_colcnt, m_field_metadata_size);
+      ptr_after_colcnt= (uchar*)ptr_after_colcnt + m_field_metadata_size;
+      memcpy(m_null_bits, ptr_after_colcnt, num_null_bytes);
+    }
+  }
+
+  DBUG_VOID_RETURN;
+}
+#endif
+
+Table_map_log_event::~Table_map_log_event()
+{
+  my_free(m_meta_memory);
+  my_free(m_memory);
+}
+
+/*
+  Return value is an error code, one of:
+
+      -1     Failure to open table   [from open_tables()]
+       0     Success
+       1     No room for more tables [from set_table()]
+       2     Out of memory           [from set_table()]
+       3     Wrong table definition
+       4     Daisy-chaining RBR with SBR not possible
+ */
+
+#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
+int Table_map_log_event::do_apply_event(Relay_log_info const *rli)
+{
+  RPL_TABLE_LIST *table_list;
+  char *db_mem, *tname_mem;
+  size_t dummy_len;
+  void *memory;
+  DBUG_ENTER("Table_map_log_event::do_apply_event(Relay_log_info*)");
+  DBUG_ASSERT(rli->info_thd == thd);
+
+  /* Step the query id to mark what columns that are actually used. */
+  thd->set_query_id(next_query_id());
+
+  if (!(memory= my_multi_malloc(MYF(MY_WME),
+                                &table_list, (uint) sizeof(RPL_TABLE_LIST),
+                                &db_mem, (uint) NAME_LEN + 1,
+                                &tname_mem, (uint) NAME_LEN + 1,
+                                NullS)))
+    DBUG_RETURN(HA_ERR_OUT_OF_MEM);
+
+  strmov(db_mem, rpl_filter->get_rewrite_db(m_dbnam, &dummy_len));
+  strmov(tname_mem, m_tblnam);
+
+  table_list->init_one_table(db_mem, strlen(db_mem),
+                             tname_mem, strlen(tname_mem),
+                             tname_mem, TL_WRITE);
+
+  table_list->table_id= m_table_id;
+  table_list->updating= 1;
+
+  int error= 0;
+
+  if (rli->info_thd->slave_thread /* filtering is for slave only */ &&
+      (!rpl_filter->db_ok(table_list->db) ||
+       (rpl_filter->is_on() && !rpl_filter->tables_ok("", table_list))))
+  {
+    my_free(memory);
+  }
+  else
+  {
+    DBUG_ASSERT(thd->lex->query_tables != table_list);
+
+    /*
+      Use placement new to construct the table_def instance in the
+      memory allocated for it inside table_list.
+
+      The memory allocated by the table_def structure (i.e., not the
+      memory allocated *for* the table_def structure) is released
+      inside Relay_log_info::clear_tables_to_lock() by calling the
+      table_def destructor explicitly.
+    */
+    new (&table_list->m_tabledef)
+      table_def(m_coltype, m_colcnt,
+                m_field_metadata, m_field_metadata_size,
+                m_null_bits, m_flags);
+    table_list->m_tabledef_valid= TRUE;
+
+    /*
+      We record in the slave's information that the table should be
+      locked by linking the table into the list of tables to lock.
+    */
+    table_list->next_global= table_list->next_local= rli->tables_to_lock;
+    const_cast<Relay_log_info*>(rli)->tables_to_lock= table_list;
+    const_cast<Relay_log_info*>(rli)->tables_to_lock_count++;
+    /* 'memory' is freed in clear_tables_to_lock */
+  }
+
+  DBUG_RETURN(error);
+}
+
+Log_event::enum_skip_reason
+Table_map_log_event::do_shall_skip(Relay_log_info *rli)
+{
+  /*
+    If the slave skip counter is 1, then we should not start executing
+    on the next event.
+  */
+  return continue_group(rli);
+}
+
+int Table_map_log_event::do_update_pos(Relay_log_info *rli)
+{
+  rli->inc_event_relay_log_pos();
+  return 0;
+}
+
+#endif /* !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION) */
+
+#ifndef MYSQL_CLIENT
+bool Table_map_log_event::write_data_header(IO_CACHE *file)
+{
+  DBUG_ASSERT(m_table_id != ~0UL);
+  uchar buf[TABLE_MAP_HEADER_LEN];
+  DBUG_EXECUTE_IF("old_row_based_repl_4_byte_map_id_master",
+                  {
+                    int4store(buf + 0, m_table_id);
+                    int2store(buf + 4, m_flags);
+                    return (my_b_safe_write(file, buf, 6));
+                  });
+  int6store(buf + TM_MAPID_OFFSET, (ulonglong)m_table_id);
+  int2store(buf + TM_FLAGS_OFFSET, m_flags);
+  return (my_b_safe_write(file, buf, TABLE_MAP_HEADER_LEN));
+}
+
+bool Table_map_log_event::write_data_body(IO_CACHE *file)
+{
+  DBUG_ASSERT(m_dbnam != NULL);
+  DBUG_ASSERT(m_tblnam != NULL);
+  /* We use only one byte per length for storage in event: */
+  DBUG_ASSERT(m_dblen < 128);
+  DBUG_ASSERT(m_tbllen < 128);
+
+  uchar const dbuf[]= { (uchar) m_dblen };
+  uchar const tbuf[]= { (uchar) m_tbllen };
+
+  uchar cbuf[sizeof(m_colcnt) + 1];
+  uchar *const cbuf_end= net_store_length(cbuf, (size_t) m_colcnt);
+  DBUG_ASSERT(static_cast<size_t>(cbuf_end - cbuf) <= sizeof(cbuf));
+
+  /*
+    Store the size of the field metadata.
+  */
+  uchar mbuf[sizeof(m_field_metadata_size)];
+  uchar *const mbuf_end= net_store_length(mbuf, m_field_metadata_size);
+
+  return (my_b_safe_write(file, dbuf,      sizeof(dbuf)) ||
+          my_b_safe_write(file, (const uchar*)m_dbnam,   m_dblen+1) ||
+          my_b_safe_write(file, tbuf,      sizeof(tbuf)) ||
+          my_b_safe_write(file, (const uchar*)m_tblnam,  m_tbllen+1) ||
+          my_b_safe_write(file, cbuf, (size_t) (cbuf_end - cbuf)) ||
+          my_b_safe_write(file, m_coltype, m_colcnt) ||
+          my_b_safe_write(file, mbuf, (size_t) (mbuf_end - mbuf)) ||
+          my_b_safe_write(file, m_field_metadata, m_field_metadata_size),
+          my_b_safe_write(file, m_null_bits, (m_colcnt + 7) / 8));
+ }
+#endif
+
+#if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT)
+
+/*
+  Print some useful information for the SHOW BINARY LOG information
+  field.
+ */
+
+#if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT)
+void Table_map_log_event::pack_info(Protocol *protocol)
+{
+    char buf[256];
+    size_t bytes= my_snprintf(buf, sizeof(buf),
+                                 "table_id: %lu (%s.%s)",
+                              m_table_id, m_dbnam, m_tblnam);
+    protocol->store(buf, bytes, &my_charset_bin);
+}
+#endif
+
+
+#endif
+
+
+#ifdef MYSQL_CLIENT
+void Table_map_log_event::print(FILE *file, PRINT_EVENT_INFO *print_event_info)
+{
+  if (!print_event_info->short_form)
+  {
+    print_header(&print_event_info->head_cache, print_event_info, TRUE);
+    my_b_printf(&print_event_info->head_cache,
+                "\tTable_map: `%s`.`%s` mapped to number %lu\n",
+                m_dbnam, m_tblnam, m_table_id);
+    print_base64(&print_event_info->body_cache, print_event_info, TRUE);
+  }
+}
+#endif
+
+/**************************************************************************
+	Write_rows_log_event member functions
+**************************************************************************/
+
+/*
+  Constructor used to build an event for writing to the binary log.
+ */
+#if !defined(MYSQL_CLIENT)
+Write_rows_log_event::Write_rows_log_event(THD *thd_arg, TABLE *tbl_arg,
+                                           ulong tid_arg,
+                                           bool is_transactional)
+  : Rows_log_event(thd_arg, tbl_arg, tid_arg, tbl_arg->write_set, is_transactional)
+{
+}
+#endif
+
+/*
+  Constructor used by slave to read the event from the binary log.
+ */
+#ifdef HAVE_REPLICATION
+Write_rows_log_event::Write_rows_log_event(const char *buf, uint event_len,
+                                           const Format_description_log_event
+                                           *description_event)
+: Rows_log_event(buf, event_len, WRITE_ROWS_EVENT, description_event)
+{
+}
+#endif
+
+#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
+int 
+Write_rows_log_event::do_before_row_operations(const Slave_reporting_capability *const)
+{
+  int error= 0;
+
+  /**
+     todo: to introduce a property for the event (handler?) which forces
+     applying the event in the replace (idempotent) fashion.
+  */
+  if ((slave_exec_mode == SLAVE_EXEC_MODE_IDEMPOTENT) ||
+      (m_table->s->db_type()->db_type == DB_TYPE_NDBCLUSTER))
+  {
+    /*
+      We are using REPLACE semantics and not INSERT IGNORE semantics
+      when writing rows, that is: new rows replace old rows.  We need to
+      inform the storage engine that it should use this behaviour.
+    */
+    
+    /* Tell the storage engine that we are using REPLACE semantics. */
+    thd->lex->duplicates= DUP_REPLACE;
+    
+    /*
+      Pretend we're executing a REPLACE command: this is needed for
+      InnoDB and NDB Cluster since they are not (properly) checking the
+      lex->duplicates flag.
+    */
+    thd->lex->sql_command= SQLCOM_REPLACE;
+    /* 
+       Do not raise the error flag in case of hitting to an unique attribute
+    */
+    m_table->file->extra(HA_EXTRA_IGNORE_DUP_KEY);
+    /* 
+       NDB specific: update from ndb master wrapped as Write_rows
+       so that the event should be applied to replace slave's row
+    */
+    m_table->file->extra(HA_EXTRA_WRITE_CAN_REPLACE);
+    /* 
+       NDB specific: if update from ndb master wrapped as Write_rows
+       does not find the row it's assumed idempotent binlog applying
+       is taking place; don't raise the error.
+    */
+    m_table->file->extra(HA_EXTRA_IGNORE_NO_KEY);
+    /*
+      TODO: the cluster team (Tomas?) says that it's better if the engine knows
+      how many rows are going to be inserted, then it can allocate needed memory
+      from the start.
+    */
+  }
+
+  /*
+    We need TIMESTAMP_NO_AUTO_SET otherwise ha_write_row() will not use fill
+    any TIMESTAMP column with data from the row but instead will use
+    the event's current time.
+    As we replicate from TIMESTAMP to TIMESTAMP and slave has no extra
+    columns, we know that all TIMESTAMP columns on slave will receive explicit
+    data from the row, so TIMESTAMP_NO_AUTO_SET is ok.
+    When we allow a table without TIMESTAMP to be replicated to a table having
+    more columns including a TIMESTAMP column, or when we allow a TIMESTAMP
+    column to be replicated into a BIGINT column and the slave's table has a
+    TIMESTAMP column, then the slave's TIMESTAMP column will take its value
+    from set_time() which we called earlier (consistent with SBR). And then in
+    some cases we won't want TIMESTAMP_NO_AUTO_SET (will require some code to
+    analyze if explicit data is provided for slave's TIMESTAMP columns).
+  */
+  m_table->timestamp_field_type= TIMESTAMP_NO_AUTO_SET;
+  
+  /* Honor next number column if present */
+  m_table->next_number_field= m_table->found_next_number_field;
+  /*
+   * Fixed Bug#45999, In RBR, Store engine of Slave auto-generates new
+   * sequence numbers for auto_increment fields if the values of them are 0.
+   * If generateing a sequence number is decided by the values of
+   * table->auto_increment_field_not_null and SQL_MODE(if includes
+   * MODE_NO_AUTO_VALUE_ON_ZERO) in update_auto_increment function.
+   * SQL_MODE of slave sql thread is always consistency with master's.
+   * In RBR, auto_increment fields never are NULL.
+   */
+  m_table->auto_increment_field_not_null= TRUE;
+  return error;
+}
+
+int 
+Write_rows_log_event::do_after_row_operations(const Slave_reporting_capability *const,
+                                              int error)
+{
+  int local_error= 0;
+  m_table->next_number_field=0;
+  m_table->auto_increment_field_not_null= FALSE;
+  if ((slave_exec_mode == SLAVE_EXEC_MODE_IDEMPOTENT) ||
+      m_table->s->db_type()->db_type == DB_TYPE_NDBCLUSTER)
+  {
+    m_table->file->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
+    m_table->file->extra(HA_EXTRA_WRITE_CANNOT_REPLACE);
+    /*
+      resetting the extra with 
+      table->file->extra(HA_EXTRA_NO_IGNORE_NO_KEY); 
+      fires bug#27077
+      explanation: file->reset() performs this duty
+      ultimately. Still todo: fix
+    */
   }
-
-  return res;
+  if ((local_error= m_table->file->ha_end_bulk_insert()))
+  {
+    m_table->file->print_error(local_error, MYF(0));
+  }
+  return error? error : local_error;
 }
 
+#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
+
+/*
+  Check if there are more UNIQUE keys after the given key.
+*/
+static int
+last_uniq_key(TABLE *table, uint keyno)
+{
+  while (++keyno < table->s->keys)
+    if (table->key_info[keyno].flags & HA_NOSAME)
+      return 0;
+  return 1;
+}
 
 /**
-  Locate the current row in event's table.
+   Check if an error is a duplicate key error.
 
-  The current row is pointed by @c m_curr_row. Member @c m_width tells how many 
-  columns are there in the row (this can be differnet from the number of columns 
-  in the table). It is assumed that event's table is already open and pointed 
-  by @c m_table.
-
-  If a corresponding record is found in the table it is stored in 
-  @c m_table->record[0]. Note that when record is located based on a primary 
-  key, it is possible that the record found differs from the row being located.
-
-  If no key is specified or table does not have keys, a table scan is used to 
-  find the row. In that case the row should be complete and contain values for
-  all columns. However, it can still be shorter than the table, i.e. the table 
-  can contain extra columns not present in the row. It is also possible that 
-  the table has fewer columns than the row being located. 
+   This function is used to check if an error code is one of the
+   duplicate key error, i.e., and error code for which it is sensible
+   to do a <code>get_dup_key()</code> to retrieve the duplicate key.
 
-  @returns Error code on failure, 0 on success. 
-  
-  @post In case of success @c m_table->record[0] contains the record found. 
-  Also, the internal "cursor" of the table is positioned at the record found.
+   @param errcode The error code to check.
 
-  @note If the engine allows random access of the records, a combination of
-  @c position() and @c rnd_pos() will be used. 
+   @return <code>true</code> if the error code is such that
+   <code>get_dup_key()</code> will return true, <code>false</code>
+   otherwise.
  */
-
-
-int Rows_log_event::find_row(const Relay_log_info *rli)
+bool
+is_duplicate_key_error(int errcode)
 {
-  DBUG_ENTER("Rows_log_event::find_row");
-
-  DBUG_ASSERT(m_table && m_table->in_use != NULL);
-
-  TABLE *table= m_table;
-  int error= 0;
-  KEY *keyinfo;
-  uint key;
+  switch (errcode)
+  {
+  case HA_ERR_FOUND_DUPP_KEY:
+  case HA_ERR_FOUND_DUPP_UNIQUE:
+    return true;
+  }
+  return false;
+}
 
-  /*
-    rpl_row_tabledefs.test specifies that
-    if the extra field on the slave does not have a default value
-    and this is okay with Delete or Update events.
-    Todo: fix wl3228 hld that requires defauls for all types of events
-  */
-  
-  prepare_record(table, &m_cols, FALSE);
-  error= unpack_current_row(rli, &m_cols);
+/**
+  Write the current row into event's table.
 
-  // Temporary fix to find out why it fails [/Matz]
-  memcpy(m_table->read_set->bitmap, m_cols.bitmap, (m_table->read_set->n_bits + 7) / 8);
+  The row is located in the row buffer, pointed by @c m_curr_row member.
+  Number of columns of the row is stored in @c m_width member (it can be 
+  different from the number of columns in the table to which we insert). 
+  Bitmap @c m_cols indicates which columns are present in the row. It is assumed 
+  that event's table is already open and pointed by @c m_table.
 
-  if (!is_any_column_signaled_for_table(table, &m_cols))
-  {
-    error= HA_ERR_END_OF_FILE;
-    goto err;
-  }
+  If the same record already exists in the table it can be either overwritten 
+  or an error is reported depending on the value of @c overwrite flag 
+  (error reporting not yet implemented). Note that the matching record can be
+  different from the row we insert if we use primary keys to identify records in
+  the table.
 
-#ifndef DBUG_OFF
-  DBUG_PRINT("info",("looking for the following record"));
-  DBUG_DUMP("record[0]", table->record[0], table->s->reclength);
-#endif
+  The row to be inserted can contain values only for selected columns. The 
+  missing columns are filled with default values using @c prepare_record() 
+  function. If a matching record is found in the table and @c overwritte is
+  true, the missing columns are taken from it.
 
-  if ((key= search_key_in_table(table, &m_cols, PRI_KEY_FLAG)) >= MAX_KEY)
-    /* we dont have a PK, or PK is not usable with BI values */
-    goto INDEX_SCAN;
+  @param  rli   Relay log info (needed for row unpacking).
+  @param  overwrite  
+                Shall we overwrite if the row already exists or signal 
+                error (currently ignored).
 
-  if ((table->file->ha_table_flags() & HA_PRIMARY_KEY_REQUIRED_FOR_POSITION))
-  {
-    /*
-      Use a more efficient method to fetch the record given by
-      table->record[0] if the engine allows it.  We first compute a
-      row reference using the position() member function (it will be
-      stored in table->file->ref) and the use rnd_pos() to position
-      the "cursor" (i.e., record[0] in this case) at the correct row.
+  @returns Error code on failure, 0 on success.
 
-      TODO: Add a check that the correct record has been fetched by
-      comparing with the original record. Take into account that the
-      record on the master and slave can be of different
-      length. Something along these lines should work:
+  This method, if successful, sets @c m_curr_row_end pointer to point at the
+  next row in the rows buffer. This is done when unpacking the row to be 
+  inserted.
 
-      ADD>>>  store_record(table,record[1]);
-              int error= table->file->rnd_pos(table->record[0], table->file->ref);
-      ADD>>>  DBUG_ASSERT(memcmp(table->record[1], table->record[0],
-                                 table->s->reclength) == 0);
+  @note If a matching record is found, it is either updated using 
+  @c ha_update_row() or first deleted and then new record written.
+*/ 
 
-    */
-    DBUG_PRINT("info",("locating record using primary key (position)"));
-    int error;
-    if (table->file->inited && (error= table->file->ha_index_end()))
-      DBUG_RETURN(error);
-    if ((error= table->file->ha_rnd_init(FALSE)))
-      DBUG_RETURN(error);
+int
+Rows_log_event::write_row(const Relay_log_info *const rli,
+                          const bool overwrite)
+{
+  DBUG_ENTER("write_row");
+  DBUG_ASSERT(m_table != NULL && thd != NULL);
 
-    error= table->file->rnd_pos_by_record(table->record[0]);
+  TABLE *table= m_table;  // pointer to event's table
+  int error;
+  int UNINIT_VAR(keynum);
+  auto_afree_ptr<char> key(NULL);
 
-    table->file->ha_rnd_end();
-    if (error)
-    {
-      DBUG_PRINT("info",("rnd_pos returns error %d",error));
-      if (error == HA_ERR_RECORD_DELETED)
-        error= HA_ERR_KEY_NOT_FOUND;
-      table->file->print_error(error, MYF(0));
-    }
+  /* fill table->record[0] with default values */
+  bool abort_on_warnings= (rli->info_thd->variables.sql_mode &
+                           (MODE_STRICT_TRANS_TABLES | MODE_STRICT_ALL_TABLES));
+  if ((error= prepare_record(table, &m_cols,
+                             table->file->ht->db_type != DB_TYPE_NDBCLUSTER,
+                             abort_on_warnings, m_curr_row == m_rows_buf)))
     DBUG_RETURN(error);
-  }
-
-  // We can't use position() - try other methods.
   
-INDEX_SCAN:
-
-  /*
-    Save copy of the record in table->record[1]. It might be needed 
-    later if linear search is used to find exact match.
-   */ 
-  store_record(table,record[1]);    
+  /* unpack row into table->record[0] */
+  if ((error= unpack_current_row(rli, &m_cols, abort_on_warnings)))
+    DBUG_RETURN(error);
 
-  if ((key= search_key_in_table(table, &m_cols, 
-                                (PRI_KEY_FLAG | UNIQUE_KEY_FLAG | MULTIPLE_KEY_FLAG))) 
-       >= MAX_KEY)
-    /* we dont have a key, or no key is suitable for the BI values */
-    goto TABLE_SCAN; 
+  // Temporary fix to find out why it fails [/Matz]
+  memcpy(m_table->write_set->bitmap, m_cols.bitmap, (m_table->write_set->n_bits + 7) / 8);
 
+  if (m_curr_row == m_rows_buf)
   {
-    keyinfo= table->key_info + key;
-
-
-    DBUG_PRINT("info",("locating record using primary key (index_read)"));
-
-    /* The key'th key is active and usable: search the table using the index */
-    if (!table->file->inited && (error= table->file->ha_index_init(key, FALSE)))
-    {
-      DBUG_PRINT("info",("ha_index_init returns error %d",error));
-      table->file->print_error(error, MYF(0));
-      goto err;
-    }
+    /* this is the first row to be inserted, we estimate the rows with
+       the size of the first row and use that value to initialize
+       storage engine for bulk insertion */
+    DBUG_ASSERT(!(m_curr_row > m_curr_row_end));
+    ulong estimated_rows= 0;
+    if (m_curr_row < m_curr_row_end)
+      estimated_rows= (m_rows_end - m_curr_row) / (m_curr_row_end - m_curr_row);
+    else if (m_curr_row == m_curr_row_end)
+      estimated_rows= 1;
 
-    /* Fill key data for the row */
+    m_table->file->ha_start_bulk_insert(estimated_rows);
+  }
+  
+  
+#ifndef DBUG_OFF
+  DBUG_DUMP("record[0]", table->record[0], table->s->reclength);
+  DBUG_PRINT_BITSET("debug", "write_set = %s", table->write_set);
+  DBUG_PRINT_BITSET("debug", "read_set = %s", table->read_set);
+#endif
 
-    DBUG_ASSERT(m_key);
-    key_copy(m_key, table->record[0], keyinfo, 0);
+  /* 
+    Try to write record. If a corresponding record already exists in the table,
+    we try to change it using ha_update_row() if possible. Otherwise we delete
+    it and repeat the whole process again. 
 
-    /*
-      Don't print debug messages when running valgrind since they can
-      trigger false warnings.
-     */
-#ifndef HAVE_purify
-    DBUG_DUMP("key data", m_key, keyinfo->key_length);
-#endif
+    TODO: Add safety measures against infinite looping. 
+   */
 
-    /*
-      We need to set the null bytes to ensure that the filler bit are
-      all set when returning.  There are storage engines that just set
-      the necessary bits on the bytes and don't set the filler bits
-      correctly.
-    */
-    if (table->s->null_bytes > 0)
-      table->record[0][table->s->null_bytes - 1]|=
-        256U - (1U << table->s->last_null_bit_pos);
+  m_table->mark_columns_per_binlog_row_image();
 
-    if ((error= table->file->ha_index_read_map(table->record[0], m_key,
-                                               HA_WHOLE_KEY,
-                                               HA_READ_KEY_EXACT)))
+  while ((error= table->file->ha_write_row(table->record[0])))
+  {
+    if (error == HA_ERR_LOCK_DEADLOCK ||
+        error == HA_ERR_LOCK_WAIT_TIMEOUT ||
+        (keynum= table->file->get_dup_key(error)) < 0 ||
+        !overwrite)
     {
-      DBUG_PRINT("info",("no record matching the key found in the table"));
-      if (error == HA_ERR_RECORD_DELETED)
-        error= HA_ERR_KEY_NOT_FOUND;
+      DBUG_PRINT("info",("get_dup_key returns %d)", keynum));
+      /*
+        Deadlock, waiting for lock or just an error from the handler
+        such as HA_ERR_FOUND_DUPP_KEY when overwrite is false.
+        Retrieval of the duplicate key number may fail
+        - either because the error was not "duplicate key" error
+        - or because the information which key is not available
+      */
       table->file->print_error(error, MYF(0));
-      table->file->ha_index_end();
-      goto err;
+      goto error;
     }
+    /*
+       We need to retrieve the old row into record[1] to be able to
+       either update or delete the offending record.  We either:
 
-  /*
-    Don't print debug messages when running valgrind since they can
-    trigger false warnings.
-   */
-#ifndef HAVE_purify
-    DBUG_PRINT("info",("found first matching record")); 
-    DBUG_DUMP("record[0]", table->record[0], table->s->reclength);
-#endif
-    /*
-      Below is a minor "optimization".  If the key (i.e., key number
-      0) has the HA_NOSAME flag set, we know that we have found the
-      correct record (since there can be no duplicates); otherwise, we
-      have to compare the record with the one found to see if it is
-      the correct one.
-
-      CAVEAT! This behaviour is essential for the replication of,
-      e.g., the mysql.proc table since the correct record *shall* be
-      found using the primary key *only*.  There shall be no
-      comparison of non-PK columns to decide if the correct record is
-      found.  I can see no scenario where it would be incorrect to
-      chose the row to change only using a PK or an UNNI.
-    */
-    if (keyinfo->flags & HA_NOSAME || key == table->s->primary_key)
+       - use ha_rnd_pos() with a row-id (available as dupp_row) to the
+         offending row, if that is possible (MyISAM and Blackhole), or else
+
+       - use ha_index_read_idx_map() with the key that is duplicated, to
+         retrieve the offending row.
+     */
+    if (table->file->ha_table_flags() & HA_DUPLICATE_POS)
     {
-      /* Unique does not have non nullable part */
-      if (!(table->key_info->flags & (HA_NULL_PART_KEY)))
-      {
-        table->file->ha_index_end();
-        goto ok;
-      }
-      else
-      {
-        KEY *keyinfo= table->key_info;
-        /*
-          Unique has nullable part. We need to check if there is any field in the
-          BI image that is null and part of UNNI.
-        */
-        bool null_found= FALSE;
-        for (uint i=0; i < keyinfo->key_parts && !null_found; i++)
-        {
-          uint fieldnr= keyinfo->key_part[i].fieldnr - 1;
-          Field **f= table->field+fieldnr;
-          null_found= (*f)->is_null();
-        }
+      DBUG_PRINT("info",("Locating offending record using ha_rnd_pos()"));
 
-        if (!null_found)
-        {
-          table->file->ha_index_end();
-          goto ok;
-        }
+      if (table->file->inited && (error= table->file->ha_index_end()))
+        DBUG_RETURN(error);
+      if ((error= table->file->ha_rnd_init(FALSE)))
+        DBUG_RETURN(error);
 
-        /* else fall through to index scan */
+      error= table->file->ha_rnd_pos(table->record[1], table->file->dup_ref);
+
+      table->file->ha_rnd_end();
+      if (error)
+      {
+        DBUG_PRINT("info",("ha_rnd_pos() returns error %d",error));
+        if (error == HA_ERR_RECORD_DELETED)
+          error= HA_ERR_KEY_NOT_FOUND;
+        table->file->print_error(error, MYF(0));
+        goto error;
       }
     }
-
-    /*
-      In case key is not unique, we still have to iterate over records found
-      and find the one which is identical to the row given. A copy of the 
-      record we are looking for is stored in record[1].
-     */ 
-    DBUG_PRINT("info",("non-unique index, scanning it to find matching record")); 
-
-    while (record_compare(table, &m_cols))
+    else
     {
-      /*
-        We need to set the null bytes to ensure that the filler bit
-        are all set when returning.  There are storage engines that
-        just set the necessary bits on the bytes and don't set the
-        filler bits correctly.
+      DBUG_PRINT("info",("Locating offending record using index_read_idx()"));
 
-        TODO[record format ndb]: Remove this code once NDB returns the
-        correct record format.
-      */
-      if (table->s->null_bytes > 0)
+      if (table->file->extra(HA_EXTRA_FLUSH_CACHE))
       {
-        table->record[0][table->s->null_bytes - 1]|=
-          256U - (1U << table->s->last_null_bit_pos);
+        DBUG_PRINT("info",("Error when setting HA_EXTRA_FLUSH_CACHE"));
+        error= my_errno;
+        goto error;
       }
 
-      while ((error= table->file->ha_index_next(table->record[0])))
+      if (key.get() == NULL)
       {
-        /* We just skip records that has already been deleted */
+        key.assign(static_cast<char*>(my_alloca(table->s->max_unique_length)));
+        if (key.get() == NULL)
+        {
+          DBUG_PRINT("info",("Can't allocate key buffer"));
+          error= ENOMEM;
+          goto error;
+        }
+      }
+
+      key_copy((uchar*)key.get(), table->record[0], table->key_info + keynum,
+               0);
+      error= table->file->ha_index_read_idx_map(table->record[1], keynum,
+                                                (const uchar*)key.get(),
+                                                HA_WHOLE_KEY,
+                                                HA_READ_KEY_EXACT);
+      if (error)
+      {
+        DBUG_PRINT("info",("ha_index_read_idx_map() returns %s", HA_ERR(error)));
         if (error == HA_ERR_RECORD_DELETED)
-          continue;
-        DBUG_PRINT("info",("no record matching the given row found"));
+          error= HA_ERR_KEY_NOT_FOUND;
         table->file->print_error(error, MYF(0));
-        table->file->ha_index_end();
-        goto err;
+        goto error;
       }
     }
 
     /*
-      Have to restart the scan to be able to fetch the next row.
-    */
-    table->file->ha_index_end();
-    goto ok;
-  }
+       Now, record[1] should contain the offending row.  That
+       will enable us to update it or, alternatively, delete it (so
+       that we can insert the new row afterwards).
+     */
 
-TABLE_SCAN:
+    /*
+      If row is incomplete we will use the record found to fill 
+      missing columns.  
+    */
+    if (!get_flags(COMPLETE_ROWS_F))
+    {
+      restore_record(table,record[1]);
+      error= unpack_current_row(rli, &m_cols);
+    }
 
-  /* All that we can do now is rely on a table scan */
-  {
-    DBUG_PRINT("info",("locating record using table scan (ha_rnd_next)"));
+#ifndef DBUG_OFF
+    DBUG_PRINT("debug",("preparing for update: before and after image"));
+    DBUG_DUMP("record[1] (before)", table->record[1], table->s->reclength);
+    DBUG_DUMP("record[0] (after)", table->record[0], table->s->reclength);
+#endif
 
-    int restart_count= 0; // Number of times scanning has restarted from top
+    /*
+       REPLACE is defined as either INSERT or DELETE + INSERT.  If
+       possible, we can replace it with an UPDATE, but that will not
+       work on InnoDB if FOREIGN KEY checks are necessary.
 
-    /* We don't have a key: search the table using ha_rnd_next() */
-    if ((error= table->file->ha_rnd_init(1)))
-    {
-      DBUG_PRINT("info",("error initializing table scan"
-                         " (ha_rnd_init returns %d)",error));
-      table->file->print_error(error, MYF(0));
-      goto err;
-    }
+       I (Matz) am not sure of the reason for the last_uniq_key()
+       check as, but I'm guessing that it's something along the
+       following lines.
 
-    /* Continue until we find the right record or have made a full loop */
-    do
+       Suppose that we got the duplicate key to be a key that is not
+       the last unique key for the table and we perform an update:
+       then there might be another key for which the unique check will
+       fail, so we're better off just deleting the row and inserting
+       the correct row.
+     */
+    if (last_uniq_key(table, keynum) &&
+        !table->file->referenced_by_foreign_key())
     {
-  restart_ha_rnd_next:
-      error= table->file->ha_rnd_next(table->record[0]);
-
-      DBUG_PRINT("info", ("error: %s", HA_ERR(error)));
+      DBUG_PRINT("info",("Updating row using ha_update_row()"));
+      error=table->file->ha_update_row(table->record[1],
+                                       table->record[0]);
       switch (error) {
-
+                
+      case HA_ERR_RECORD_IS_THE_SAME:
+        DBUG_PRINT("info",("ignoring HA_ERR_RECORD_IS_THE_SAME error from"
+                           " ha_update_row()"));
+        error= 0;
+      
       case 0:
         break;
 
-      /*
-        If the record was deleted, we pick the next one without doing
-        any comparisons.
-      */
-      case HA_ERR_RECORD_DELETED:
-        goto restart_ha_rnd_next;
-
-      case HA_ERR_END_OF_FILE:
-        if (++restart_count < 2)
-          table->file->ha_rnd_init(1);
-        break;
-
       default:
-        DBUG_PRINT("info", ("Failed to get next record"
-                            " (ha_rnd_next returns %d)",error));
+        DBUG_PRINT("info",("ha_update_row() returns error %d",error));
         table->file->print_error(error, MYF(0));
-        table->file->ha_rnd_end();
-        goto err;
       }
-    }
-    while (restart_count < 2 && record_compare(table, &m_cols));
-    
-    /* 
-      Note: above record_compare will take into accout all record fields 
-      which might be incorrect in case a partial row was given in the event
-     */
 
-    /*
-      Have to restart the scan to be able to fetch the next row.
-    */
-    if (restart_count == 2)
-      DBUG_PRINT("info", ("Record not found"));
+      goto error;
+    }
     else
-      DBUG_DUMP("record found", table->record[0], table->s->reclength);
-    table->file->ha_rnd_end();
-
-    DBUG_ASSERT(error == HA_ERR_END_OF_FILE || error == 0);
-    goto err;
+    {
+      DBUG_PRINT("info",("Deleting offending row and trying to write new one again"));
+      if ((error= table->file->ha_delete_row(table->record[1])))
+      {
+        DBUG_PRINT("info",("ha_delete_row() returns error %d",error));
+        table->file->print_error(error, MYF(0));
+        goto error;
+      }
+      /* Will retry ha_write_row() with the offending row removed. */
+    }
   }
-ok:
-  table->default_column_bitmaps();
-  DBUG_RETURN(0);
 
-err:
-  table->default_column_bitmaps();
+error:
+  m_table->default_column_bitmaps();
   DBUG_RETURN(error);
 }
 
 #endif
 
+int
+Write_rows_log_event::do_exec_row(const Relay_log_info *const rli)
+{
+  DBUG_ASSERT(m_table != NULL);
+  int error= write_row(rli, slave_exec_mode == SLAVE_EXEC_MODE_IDEMPOTENT);
+
+  if (error && !thd->is_error())
+  {
+    DBUG_ASSERT(0);
+    my_error(ER_UNKNOWN_ERROR, MYF(0));
+  }
+
+  return error;
+}
+
+#endif /* !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION) */
+
+#ifdef MYSQL_CLIENT
+void Write_rows_log_event::print(FILE *file, PRINT_EVENT_INFO* print_event_info)
+{
+  Rows_log_event::print_helper(file, print_event_info, "Write_rows");
+}
+#endif
+
+/**************************************************************************
+	Delete_rows_log_event member functions
+**************************************************************************/
+
+#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
+
+#endif
+
 /*
   Constructor used to build an event for writing to the binary log.
  */
@@ -9753,6 +10006,17 @@ Delete_rows_log_event::do_before_row_ope
       return HA_ERR_OUT_OF_MEM;
   }
 
+  /* we will be using a hash to lookup rows, initialize it */
+  if (decide_row_lookup_method(m_table, &m_cols, get_type_code()) == ROW_LOOKUP_HASH_SCAN)
+    my_hash_init(&m_hash,
+        &my_charset_bin,            /* the charater set information */
+        16 /* FIXME */,             /* growth size */
+        0,                          /* key offset */
+        0,                          /* key length */
+        rows_log_event_get_key,     /* get function pointer */
+        (my_hash_free_key) rows_log_event_free_entry,  /* freefunction pointer */
+        MYF(0));                    /* flags */
+
   return 0;
 }
 
@@ -9765,6 +10029,11 @@ Delete_rows_log_event::do_after_row_oper
   my_free(m_key);
   m_key= NULL;
 
+  /* we don't need the hash anymore, free it */
+  if ((decide_row_lookup_method(m_table, &m_cols, get_type_code()) == ROW_LOOKUP_HASH_SCAN) &&
+      my_hash_inited(&m_hash))
+    my_hash_free(&m_hash);
+
   return error;
 }
 
@@ -9773,16 +10042,11 @@ int Delete_rows_log_event::do_exec_row(c
   int error;
   DBUG_ASSERT(m_table != NULL);
 
-  if (!(error= find_row(rli))) 
-  { 
+  /* m_table->record[0] contains the BI */
+  m_table->mark_columns_per_binlog_row_image();
+  error= m_table->file->ha_delete_row(m_table->record[1]);
+  m_table->default_column_bitmaps();
 
-    m_table->mark_columns_per_binlog_row_image();
-    /*
-      Delete the record found, located in record[0]
-    */
-    error= m_table->file->ha_delete_row(m_table->record[0]);
-    m_table->default_column_bitmaps();
-  }
   return error;
 }
 
@@ -9868,6 +10132,16 @@ Update_rows_log_event::do_before_row_ope
 
   m_table->timestamp_field_type= TIMESTAMP_NO_AUTO_SET;
 
+  if (decide_row_lookup_method(m_table, &m_cols, get_type_code()) == ROW_LOOKUP_HASH_SCAN)
+    my_hash_init(&m_hash,
+        &my_charset_bin,            /* the charater set information */
+        16 /* FIXME */,             /* growth size */
+        0,                          /* key offset */
+        0,                          /* key length */
+        rows_log_event_get_key,     /* get function pointer */
+        (my_hash_free_key) rows_log_event_free_entry,  /* freefunction pointer */
+        MYF(0));                    /* flags */
+
   return 0;
 }
 
@@ -9880,6 +10154,11 @@ Update_rows_log_event::do_after_row_oper
   my_free(m_key); // Free for multi_malloc
   m_key= NULL;
 
+  /* we don't need the hash anymore, free it */
+  if ((decide_row_lookup_method(m_table, &m_cols, get_type_code()) == ROW_LOOKUP_HASH_SCAN) &&
+      my_hash_inited(&m_hash))
+    my_hash_free(&m_hash);
+
   return error;
 }
 
@@ -9889,53 +10168,6 @@ Update_rows_log_event::do_exec_row(const
   DBUG_ASSERT(m_table != NULL);
   int error= 0;
 
-  /**
-     Check if update contains only values in AI for columns that do 
-     not exist on the slave. If it does, we can just unpack the rows 
-     and return (do nothing on the local table).
-
-     NOTE: We do the following optimization and check only if there 
-     are usable values on the AI and disregard the fact that there 
-     might be usable values in the BI. In practice this means that 
-     the slave will not go through find_row (since we have nothing
-     on the record to update, why go looking for it?).
-
-     If we wanted find_row to run anyway, we could move this
-     check after find_row, but then we would have to face the fact
-     that the slave might stop without finding the proper record 
-     (because it might have incomplete BI), even though there were
-     no values in AI.
-
-     On the other hand, if AI has usable values but BI has not,
-     then find_row will return an error (and the error is then
-     propagated as it was already).
-   */
-  if (!is_any_column_signaled_for_table(m_table, &m_cols_ai))
-  {
-    /* 
-      Read and discard images, because:
-      1. AI does not contain any useful values to replay;
-      2. BI is irrelevant if there is nothing useful in AI.
-    */
-    error = unpack_current_row(rli, &m_cols);
-    m_curr_row= m_curr_row_end;
-    error = error | unpack_current_row(rli, &m_cols_ai);
-
-    return error;
-  }
-
-  error= find_row(rli); 
-  if (error)
-  {
-    /*
-      We need to read the second image in the event of error to be
-      able to skip to the next pair of updates
-    */
-    m_curr_row= m_curr_row_end;
-    unpack_current_row(rli, &m_cols_ai);
-    return error;
-  }
-
   /*
     This is the situation after locating BI:
 

=== modified file 'sql/log_event.h'
--- a/sql/log_event.h	2010-10-08 14:35:24 +0000
+++ b/sql/log_event.h	2010-11-05 00:31:22 +0000
@@ -3528,6 +3528,13 @@ private:
 class Rows_log_event : public Log_event
 {
 public:
+  enum row_lookup_mode {
+       ROW_LOOKUP_NOT_NEEDED= 0,
+       ROW_LOOKUP_INDEX_SCAN= 1,
+       ROW_LOOKUP_TABLE_SCAN= 2,
+       ROW_LOOKUP_HASH_SCAN= 3,
+  };
+
   /**
      Enumeration of the errors that can be returned.
    */
@@ -3703,6 +3710,7 @@ protected:
   ulong       m_table_id;	/* Table ID */
   MY_BITMAP   m_cols;		/* Bitmap denoting columns available */
   ulong       m_width;          /* The width of the columns bitmap */
+  HASH        m_hash;
   /*
     Bitmap for columns available in the after image, if present. These
     fields are only available for Update_rows events. Observe that the
@@ -3810,6 +3818,13 @@ private:
       
   */
   virtual int do_exec_row(const Relay_log_info *const rli) = 0;
+
+  int hash_row(Relay_log_info const *rli);
+  int handle_idempotent_errors(Relay_log_info const *rli, int *err);
+  int do_apply_row(Relay_log_info const *rli);
+  int do_index_scan_and_update(Relay_log_info const *rli);
+  int do_hash_scan_and_update(Relay_log_info const *rli);
+  int do_table_scan_and_update(Relay_log_info const *rli);
 #endif /* defined(MYSQL_SERVER) && defined(HAVE_REPLICATION) */
 
   friend class Old_rows_log_event;


Attachment: [text/bzr-bundle] bzr/luis.soares@oracle.com-20101105003122-ed89mbefikon0om7.bundle
Thread
bzr commit into mysql-next-mr branch (luis.soares:3204) Luis Soares5 Nov