#At file:///home/lsoares/Workspace/bzr/work/features/wl5597/mysql-next-mr/ based on revid:alexander.nozdrin@stripped
3204 Luis Soares 2010-11-05
WL 5597: Work in progress.
modified:
sql/log_event.cc
sql/log_event.h
=== modified file 'sql/log_event.cc'
--- a/sql/log_event.cc 2010-10-17 23:27:40 +0000
+++ b/sql/log_event.cc 2010-11-05 00:31:22 +0000
@@ -7556,2165 +7556,2418 @@ int Rows_log_event::do_add_row_data(ucha
#endif
#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
-int Rows_log_event::do_apply_event(Relay_log_info const *rli)
-{
- DBUG_ENTER("Rows_log_event::do_apply_event(Relay_log_info*)");
- int error= 0;
- /*
- If m_table_id == ~0UL, then we have a dummy event that does not
- contain any data. In that case, we just remove all tables in the
- tables_to_lock list, close the thread tables, and return with
- success.
- */
- if (m_table_id == ~0UL)
- {
- /*
- This one is supposed to be set: just an extra check so that
- nothing strange has happened.
- */
- DBUG_ASSERT(get_flags(STMT_END_F));
- const_cast<Relay_log_info*>(rli)->slave_close_thread_tables(thd);
- thd->clear_error();
- DBUG_RETURN(0);
- }
- /*
- 'thd' has been set by exec_relay_log_event(), just before calling
- do_apply_event(). We still check here to prevent future coding
- errors.
- */
- DBUG_ASSERT(rli->info_thd == thd);
+/**
+ Checks if any of the columns in the given table is
+ signaled in the bitmap.
- /*
- If there is no locks taken, this is the first binrow event seen
- after the table map events. We should then lock all the tables
- used in the transaction and proceed with execution of the actual
- event.
- */
- if (!thd->lock)
- {
- /*
- Lock_tables() reads the contents of thd->lex, so they must be
- initialized.
+ For each column in the given table checks if it is
+ signaled in the bitmap. This is most useful when deciding
+ whether a before image (BI) can be used or not for
+ searching a row. If no column is signaled, then the
+ image cannot be used for searching a record (regardless
+ of using position(), index scan or table scan). Here is
+ an example:
- We also call the mysql_reset_thd_for_next_command(), since this
- is the logical start of the next "statement". Note that this
- call might reset the value of current_stmt_binlog_format, so
- we need to do any changes to that value after this function.
- */
- lex_start(thd);
- mysql_reset_thd_for_next_command(thd);
- /*
- The current statement is just about to begin and
- has not yet modified anything. Note, all.modified is reset
- by mysql_reset_thd_for_next_command.
- */
- thd->transaction.stmt.modified_non_trans_table= FALSE;
- /*
- This is a row injection, so we flag the "statement" as
- such. Note that this code is called both when the slave does row
- injections and when the BINLOG statement is used to do row
- injections.
- */
- thd->lex->set_stmt_row_injection();
+ MASTER> SET @@binlog_row_image='MINIMAL';
+ MASTER> CREATE TABLE t1 (a int, b int, c int, primary key(c));
+ SLAVE> CREATE TABLE t1 (a int, b int);
+ MASTER> INSERT INTO t1 VALUES (1,2,3);
+ MASTER> UPDATE t1 SET a=2 WHERE b=2;
- /*
- There are a few flags that are replicated with each row event.
- Make sure to set/clear them before executing the main body of
- the event.
- */
- if (get_flags(NO_FOREIGN_KEY_CHECKS_F))
- thd->variables.option_bits|= OPTION_NO_FOREIGN_KEY_CHECKS;
- else
- thd->variables.option_bits&= ~OPTION_NO_FOREIGN_KEY_CHECKS;
+ For the update statement only the PK (column c) is
+ logged in the before image (BI). As such, given that
+ the slave has no column c, it will not be able to
+ find the row, because BI has no values for the columns
+ the slave knows about (column a and b).
- if (get_flags(RELAXED_UNIQUE_CHECKS_F))
- thd->variables.option_bits|= OPTION_RELAXED_UNIQUE_CHECKS;
- else
- thd->variables.option_bits&= ~OPTION_RELAXED_UNIQUE_CHECKS;
- /* A small test to verify that objects have consistent types */
- DBUG_ASSERT(sizeof(thd->variables.option_bits) == sizeof(OPTION_RELAXED_UNIQUE_CHECKS));
+ @param table the table reference on the slave.
+ @param cols the bitmap signaling columns available in
+ the BI.
- if (open_and_lock_tables(thd, rli->tables_to_lock, FALSE, 0))
- {
- uint actual_error= thd->stmt_da->sql_errno();
- if (thd->is_slave_error || thd->is_fatal_error)
- {
- /*
- Error reporting borrowed from Query_log_event with many excessive
- simplifications.
- We should not honour --slave-skip-errors at this point as we are
- having severe errors which should not be skiped.
- */
- rli->report(ERROR_LEVEL, actual_error,
- "Error executing row event: '%s'",
- (actual_error ? thd->stmt_da->message() :
- "unexpected success or fatal error"));
- thd->is_slave_error= 1;
- }
- const_cast<Relay_log_info*>(rli)->slave_close_thread_tables(thd);
- DBUG_RETURN(actual_error);
- }
+ @return TRUE if BI contains usable colums for searching,
+ FALSE otherwise.
+*/
+static
+my_bool is_any_column_signaled_for_table(TABLE *table, MY_BITMAP *cols)
+{
- /*
- When the open and locking succeeded, we check all tables to
- ensure that they still have the correct type.
+ int nfields_set= 0;
+ for (Field **ptr=table->field ;
+ *ptr && ((*ptr)->field_index < cols->n_bits);
+ ptr++)
+ {
+ if (bitmap_is_set(cols, (*ptr)->field_index))
+ nfields_set++;
+ }
- We can use a down cast here since we know that every table added
- to the tables_to_lock is a RPL_TABLE_LIST.
- */
+ return (nfields_set != 0);
+}
- {
- DBUG_PRINT("debug", ("Checking compability of tables to lock - tables_to_lock: %p",
- rli->tables_to_lock));
- RPL_TABLE_LIST *ptr= rli->tables_to_lock;
- for ( ; ptr ; ptr= static_cast<RPL_TABLE_LIST*>(ptr->next_global))
- {
- TABLE *conv_table;
- if (!ptr->m_tabledef.compatible_with(thd, const_cast<Relay_log_info*>(rli),
- ptr->table, &conv_table))
- {
- DBUG_PRINT("debug", ("Table: %s.%s is not compatible with master",
- ptr->table->s->db.str,
- ptr->table->s->table_name.str));
- /*
- We should not honour --slave-skip-errors at this point as we are
- having severe errors which should not be skiped.
- */
- thd->is_slave_error= 1;
- const_cast<Relay_log_info*>(rli)->slave_close_thread_tables(thd);
- DBUG_RETURN(ERR_BAD_TABLE_DEF);
- }
- DBUG_PRINT("debug", ("Table: %s.%s is compatible with master"
- " - conv_table: %p",
- ptr->table->s->db.str,
- ptr->table->s->table_name.str, conv_table));
- ptr->m_conv_table= conv_table;
- }
- }
+/**
+ Checks if the fields in the given key are signaled in
+ the bitmap.
- /*
- ... and then we add all the tables to the table map and but keep
- them in the tables to lock list.
+ Validates whether the before image is usable for the
+ given key. It can be the case that the before image
+ does not contain values for the key (eg, master was
+ using 'minimal' option for image logging and slave has
+ different index structure on the table). Here is an
+ example:
- We also invalidate the query cache for all the tables, since
- they will now be changed.
+ MASTER> SET @@binlog_row_image='MINIMAL';
+ MASTER> CREATE TABLE t1 (a int, b int, c int, primary key(c));
+ SLAVE> CREATE TABLE t1 (a int, b int, c int, key(a,c));
+ MASTER> INSERT INTO t1 VALUES (1,2,3);
+ MASTER> UPDATE t1 SET a=2 WHERE b=2;
- TODO [/Matz]: Maybe the query cache should not be invalidated
- here? It might be that a table is not changed, even though it
- was locked for the statement. We do know that each
- Rows_log_event contain at least one row, so after processing one
- Rows_log_event, we can invalidate the query cache for the
- associated table.
- */
- for (TABLE_LIST *ptr= rli->tables_to_lock ; ptr ; ptr= ptr->next_global)
- {
- const_cast<Relay_log_info*>(rli)->m_table_map.set_table(ptr->table_id, ptr->table);
- }
-#ifdef HAVE_QUERY_CACHE
- query_cache.invalidate_locked_for_write(rli->tables_to_lock);
-#endif
+ When finding the row on the slave, one cannot use the
+ index (a,c) to search for the row, because there is only
+ data in the before image for column c. This function
+ checks the fields needed for a given key and searches
+ the bitmap to see if all the fields required are
+ signaled.
+
+ @param keyinfo reference to key.
+ @param cols the bitmap signaling which columns
+ have available data.
+
+ @return TRUE if all fields are signaled in the bitmap
+ for the given key, FALSE otherwise.
+*/
+static
+my_bool are_all_columns_signaled_for_key(KEY *keyinfo, MY_BITMAP *cols)
+{
+ for (uint i=0 ; i < keyinfo->key_parts ;i++)
+ {
+ uint fieldnr= keyinfo->key_part[i].fieldnr - 1;
+ if (fieldnr >= cols->n_bits ||
+ !bitmap_is_set(cols, fieldnr))
+ return FALSE;
}
- TABLE*
- table=
- m_table= const_cast<Relay_log_info*>(rli)->m_table_map.get_table(m_table_id);
+ return TRUE;
+}
- DBUG_PRINT("debug", ("m_table: 0x%lx, m_table_id: %lu", (ulong) m_table, m_table_id));
+/**
+ Searches the table for a given key that can be used
+ according to the existing values, ie, columns set
+ in the bitmap.
- if (table)
- {
- bool transactional_table= table->file->has_transactions();
- /*
- table == NULL means that this table should not be replicated
- (this was set up by Table_map_log_event::do_apply_event()
- which tested replicate-* rules).
- */
+ The caller can specify which type of key to find by
+ setting the following flags in the key_type parameter:
- /*
- It's not needed to set_time() but
- 1) it continues the property that "Time" in SHOW PROCESSLIST shows how
- much slave is behind
- 2) it will be needed when we allow replication from a table with no
- TIMESTAMP column to a table with one.
- So we call set_time(), like in SBR. Presently it changes nothing.
- */
- thd->set_time((time_t)when);
+ - PRI_KEY_FLAG
+ Returns the primary key.
- /*
- Now we are in a statement and will stay in a statement until we
- see a STMT_END_F.
+ - UNIQUE_KEY_FLAG
+ Returns a unique key (flagged with HA_NOSAME)
- We set this flag here, before actually applying any rows, in
- case the SQL thread is stopped and we need to detect that we're
- inside a statement and halting abruptly might cause problems
- when restarting.
- */
- const_cast<Relay_log_info*>(rli)->set_flag(Relay_log_info::IN_STMT);
+ - MULTIPLE_KEY_FLAG
+ Returns a key that is not unique (flagged with HA_NOSAME
+ and without HA_NULL_PART_KEY) nor PK.
- if ( m_width == table->s->fields && bitmap_is_set_all(&m_cols))
- set_flags(COMPLETE_ROWS_F);
+ The above flags can be used together, in which case, the
+ search is conducted in the above listed order. Eg, the
+ following flag:
- /*
- Set tables write and read sets.
-
- Read_set contains all slave columns (in case we are going to fetch
- a complete record from slave)
-
- Write_set equals the m_cols bitmap sent from master but it can be
- longer if slave has extra columns.
- */
+ (PRI_KEY_FLAG | UNIQUE_KEY_FLAG | MULTIPLE_KEY_FLAG)
- DBUG_PRINT_BITSET("debug", "Setting table's write_set from: %s", &m_cols);
-
- bitmap_set_all(table->read_set);
- if (get_type_code() == DELETE_ROWS_EVENT)
- bitmap_intersect(table->read_set,&m_cols);
+ means that a primary key is returned if it is suitable. If
+ not then the unique keys are searched. If no unique key is
+ suitable, then the keys are searched. Finally, if no key
+ is suitable, MAX_KEY is returned.
- bitmap_set_all(table->write_set);
- if (!get_flags(COMPLETE_ROWS_F))
- {
- if (get_type_code() == UPDATE_ROWS_EVENT)
- bitmap_intersect(table->write_set,&m_cols_ai);
- else /* WRITE ROWS EVENTS store the bitmap in m_cols instead of m_cols_ai */
- bitmap_intersect(table->write_set,&m_cols);
- }
+ @param table reference to the table.
+ @param bi_cols a bitmap that filters out columns that should
+ not be considered while searching the key.
+ Columns that should be considered are set.
+ @param key_type the type of key to search for.
- this->slave_exec_mode= slave_exec_mode_options; // fix the mode
+ @return MAX_KEY if no key, according to the key_type specified
+ is suitable. Returns the key otherwise.
- // Do event specific preparations
- error= do_before_row_operations(rli);
+*/
+static
+uint
+search_key_in_table(TABLE *table, MY_BITMAP *bi_cols, uint key_type)
+{
+ KEY *keyinfo;
+ uint res= MAX_KEY;
+ uint key;
- // row processing loop
+ if (key_type & PRI_KEY_FLAG && (table->s->primary_key < MAX_KEY))
+ {
+ keyinfo= table->s->key_info + (uint) table->s->primary_key;
+ if (are_all_columns_signaled_for_key(keyinfo, bi_cols))
+ return table->s->primary_key;
+ }
- while (error == 0)
+ if (key_type & UNIQUE_KEY_FLAG && table->s->uniques)
+ {
+ for (key=0,keyinfo= table->key_info ;
+ (key < table->s->keys) && (res == MAX_KEY);
+ key++,keyinfo++)
{
- /* in_use can have been set to NULL in close_tables_for_reopen */
- THD* old_thd= table->in_use;
- if (!table->in_use)
- table->in_use= thd;
-
- error= do_exec_row(rli);
-
- DBUG_PRINT("info", ("error: %s", HA_ERR(error)));
- DBUG_ASSERT(error != HA_ERR_RECORD_DELETED);
-
- table->in_use = old_thd;
-
- if (error)
- {
- int actual_error= convert_handler_error(error, thd, table);
- bool idempotent_error= (idempotent_error_code(error) &&
- (slave_exec_mode == SLAVE_EXEC_MODE_IDEMPOTENT));
- bool ignored_error= (idempotent_error == 0 ?
- ignored_error_code(actual_error) : 0);
-
- if (idempotent_error || ignored_error)
- {
- if (global_system_variables.log_warnings)
- slave_rows_error_report(WARNING_LEVEL, error, rli, thd, table,
- get_type_str(),
- const_cast<Relay_log_info*>(rli)->get_rpl_log_name(),
- (ulong) log_pos);
- clear_all_errors(thd, const_cast<Relay_log_info*>(rli));
- error= 0;
- if (idempotent_error == 0)
- break;
- }
- }
-
/*
- If m_curr_row_end was not set during event execution (e.g., because
- of errors) we can't proceed to the next row. If the error is transient
- (i.e., error==0 at this point) we must call unpack_current_row() to set
- m_curr_row_end.
- */
-
- DBUG_PRINT("info", ("curr_row: 0x%lu; curr_row_end: 0x%lu; rows_end: 0x%lu",
- (ulong) m_curr_row, (ulong) m_curr_row_end, (ulong) m_rows_end));
-
- if (!m_curr_row_end && !error)
- error= unpack_current_row(rli, &m_cols);
-
- // at this moment m_curr_row_end should be set
- DBUG_ASSERT(error || m_curr_row_end != NULL);
- DBUG_ASSERT(error || m_curr_row <= m_curr_row_end);
- DBUG_ASSERT(error || m_curr_row_end <= m_rows_end);
-
- m_curr_row= m_curr_row_end;
-
- if (error == 0 && !transactional_table)
- thd->transaction.all.modified_non_trans_table=
- thd->transaction.stmt.modified_non_trans_table= TRUE;
-
- if (m_curr_row == m_rows_end)
- break;
- } // row processing loop
+ - Unique keys cannot be disabled, thence we skip the check.
+ - Skip unique keys with nullable parts
+ - Skip primary keys
+ */
+ if (!((keyinfo->flags & (HA_NOSAME | HA_NULL_PART_KEY)) != HA_NOSAME) ||
+ (key == table->s->primary_key))
+ continue;
+ res= are_all_columns_signaled_for_key(keyinfo, bi_cols) ?
+ key : MAX_KEY;
- {/**
- The following failure injecion works in cooperation with tests
- setting @@global.debug= 'd,stop_slave_middle_group'.
- The sql thread receives the killed status and will proceed
- to shutdown trying to finish incomplete events group.
- */
- DBUG_EXECUTE_IF("stop_slave_middle_group",
- if (thd->transaction.all.modified_non_trans_table)
- const_cast<Relay_log_info*>(rli)->abort_slave= 1;);
+ if (res < MAX_KEY)
+ return res;
}
+ }
- if ((error= do_after_row_operations(rli, error)) &&
- ignored_error_code(convert_handler_error(error, thd, table)))
+ if (key_type & MULTIPLE_KEY_FLAG && table->s->keys)
+ {
+ for (key=0,keyinfo= table->key_info ;
+ (key < table->s->keys) && (res == MAX_KEY);
+ key++,keyinfo++)
{
+ /*
+ - Skip innactive keys
+ - Skip unique keys without nullable parts
+ - Skip primary keys
+ */
+ if (!(table->s->keys_in_use.is_set(key)) ||
+ ((keyinfo->flags & (HA_NOSAME | HA_NULL_PART_KEY)) == HA_NOSAME) ||
+ (key == table->s->primary_key))
+ continue;
- if (global_system_variables.log_warnings)
- slave_rows_error_report(WARNING_LEVEL, error, rli, thd, table,
- get_type_str(),
- const_cast<Relay_log_info*>(rli)->get_rpl_log_name(),
- (ulong) log_pos);
- clear_all_errors(thd, const_cast<Relay_log_info*>(rli));
- error= 0;
- }
- } // if (table)
-
-
- if (error)
- {
- slave_rows_error_report(ERROR_LEVEL, error, rli, thd, table,
- get_type_str(),
- const_cast<Relay_log_info*>(rli)->get_rpl_log_name(),
- (ulong) log_pos);
- /*
- @todo We should probably not call
- reset_current_stmt_binlog_format_row() from here.
+ res= are_all_columns_signaled_for_key(keyinfo, bi_cols) ?
+ key : MAX_KEY;
- Note: this applies to log_event_old.cc too.
- /Sven
- */
- thd->reset_current_stmt_binlog_format_row();
- const_cast<Relay_log_info*>(rli)->cleanup_context(thd, error);
- thd->is_slave_error= 1;
- DBUG_RETURN(error);
+ if (res < MAX_KEY)
+ return res;
+ }
}
- if (get_flags(STMT_END_F))
- if ((error= rows_event_stmt_cleanup(rli, thd)))
- rli->report(ERROR_LEVEL, error,
- "Error in %s event: commit of row events failed, "
- "table `%s`.`%s`",
- get_type_str(), m_table->s->db.str,
- m_table->s->table_name.str);
-
- DBUG_RETURN(error);
+ return res;
}
-Log_event::enum_skip_reason
-Rows_log_event::do_shall_skip(Relay_log_info *rli)
+static uint decide_row_lookup_method(TABLE* table, MY_BITMAP *cols, uint event_type)
{
- /*
- If the slave skip counter is 1 and this event does not end a
- statement, then we should not start executing on the next event.
- Otherwise, we defer the decision to the normal skipping logic.
- */
- if (rli->slave_skip_counter == 1 && !get_flags(STMT_END_F))
- return Log_event::EVENT_SKIP_IGNORE;
+ uint res= Rows_log_event::ROW_LOOKUP_NOT_NEEDED;
+ if (event_type == WRITE_ROWS_EVENT)
+ return res;
+
+ uint key_index= search_key_in_table(table, cols, (PRI_KEY_FLAG | UNIQUE_KEY_FLAG | MULTIPLE_KEY_FLAG));
+
+ /* No index */
+ if (key_index == MAX_KEY /* TODO: || key_index > number of keys in the table */)
+ // TODO: change so that it takes into account the slave_exec_mode flag
+ //res= slave_exec_mode & TABLE_SCAN ? TABLE_SCAN : HASH_SCAN;
+ res= Rows_log_event::ROW_LOOKUP_HASH_SCAN;
+
else
- return Log_event::do_shall_skip(rli);
+ // TODO: change so that it takes into account the slave_exec_mode flag
+ //res= slave_exec_mode & INDEX_SEARCH ? INDEX_SEARCH : HASH_SCAN;
+ res= Rows_log_event::ROW_LOOKUP_INDEX_SCAN;
+
+ return res;
}
-/**
- The function is called at Rows_log_event statement commit time,
- normally from Rows_log_event::do_update_pos() and possibly from
- Query_log_event::do_apply_event() of the COMMIT.
- The function commits the last statement for engines, binlog and
- releases resources have been allocated for the statement.
-
- @retval 0 Ok.
- @retval non-zero Error at the commit.
- */
+/*
+ Compares table->record[0] and table->record[1]
-static int rows_event_stmt_cleanup(Relay_log_info const *rli, THD * thd)
+ Returns TRUE if different.
+*/
+static bool record_compare(TABLE *table, MY_BITMAP *cols)
{
- int error;
- {
- /*
- This is the end of a statement or transaction, so close (and
- unlock) the tables we opened when processing the
- Table_map_log_event starting the statement.
+ /*
+ Need to set the X bit and the filler bits in both records since
+ there are engines that do not set it correctly.
- OBSERVER. This will clear *all* mappings, not only those that
- are open for the table. There is not good handle for on-close
- actions for tables.
+ In addition, since MyISAM checks that one hasn't tampered with the
+ record, it is necessary to restore the old bytes into the record
+ after doing the comparison.
- NOTE. Even if we have no table ('table' == 0) we still need to be
- here, so that we increase the group relay log position. If we didn't, we
- could have a group relay log position which lags behind "forever"
- (assume the last master's transaction is ignored by the slave because of
- replicate-ignore rules).
- */
- error= thd->binlog_flush_pending_rows_event(TRUE);
+ TODO[record format ndb]: Remove it once NDB returns correct
+ records. Check that the other engines also return correct records.
+ */
- /*
- If this event is not in a transaction, the call below will, if some
- transactional storage engines are involved, commit the statement into
- them and flush the pending event to binlog.
- If this event is in a transaction, the call will do nothing, but a
- Xid_log_event will come next which will, if some transactional engines
- are involved, commit the transaction and flush the pending event to the
- binlog.
- */
- error|= (error ? trans_rollback_stmt(thd) : trans_commit_stmt(thd));
+ DBUG_DUMP("record[0]", table->record[0], table->s->reclength);
+ DBUG_DUMP("record[1]", table->record[1], table->s->reclength);
- /*
- Now what if this is not a transactional engine? we still need to
- flush the pending event to the binlog; we did it with
- thd->binlog_flush_pending_rows_event(). Note that we imitate
- what is done for real queries: a call to
- ha_autocommit_or_rollback() (sometimes only if involves a
- transactional engine), and a call to be sure to have the pending
- event flushed.
- */
+ bool result= FALSE;
+ uchar saved_x[2]= {0, 0}, saved_filler[2]= {0, 0};
- /*
- @todo We should probably not call
- reset_current_stmt_binlog_format_row() from here.
+ if (table->s->null_bytes > 0)
+ {
+ for (int i = 0 ; i < 2 ; ++i)
+ {
+ /*
+ If we have an X bit then we need to take care of it.
+ */
+ if (!(table->s->db_options_in_use & HA_OPTION_PACK_RECORD))
+ {
+ saved_x[i]= table->record[i][0];
+ table->record[i][0]|= 1U;
+ }
- Note: this applies to log_event_old.cc too
+ /*
+ If (last_null_bit_pos == 0 && null_bytes > 1), then:
- Btw, the previous comment about transactional engines does not
- seem related to anything that happens here.
- /Sven
- */
- thd->reset_current_stmt_binlog_format_row();
+ X bit (if any) + N nullable fields + M Field_bit fields = 8 bits
- const_cast<Relay_log_info*>(rli)->cleanup_context(thd, 0);
+ Ie, the entire byte is used.
+ */
+ if (table->s->last_null_bit_pos > 0)
+ {
+ saved_filler[i]= table->record[i][table->s->null_bytes - 1];
+ table->record[i][table->s->null_bytes - 1]|=
+ 256U - (1U << table->s->last_null_bit_pos);
+ }
+ }
}
- return error;
-}
-
-/**
- The method either increments the relay log position or
- commits the current statement and increments the master group
- possition if the event is STMT_END_F flagged and
- the statement corresponds to the autocommit query (i.e replicated
- without wrapping in BEGIN/COMMIT)
- @retval 0 Success
- @retval non-zero Error in the statement commit
- */
-int
-Rows_log_event::do_update_pos(Relay_log_info *rli)
-{
- DBUG_ENTER("Rows_log_event::do_update_pos");
- int error= 0;
+ if (table->s->blob_fields + table->s->varchar_fields == 0 &&
+ bitmap_is_set_all(cols))
+ {
+ result= cmp_record(table,record[1]);
+ goto record_compare_exit;
+ }
- DBUG_PRINT("info", ("flags: %s",
- get_flags(STMT_END_F) ? "STMT_END_F " : ""));
+ /* Compare null bits */
+ if (bitmap_is_set_all(cols) &&
+ memcmp(table->null_flags,
+ table->null_flags+table->s->rec_buff_length,
+ table->s->null_bytes))
+ {
+ result= TRUE; // Diff in NULL value
+ goto record_compare_exit;
+ }
- if (get_flags(STMT_END_F))
+ /* Compare updated fields */
+ for (Field **ptr=table->field ;
+ *ptr && ((*ptr)->field_index < cols->n_bits);
+ ptr++)
{
- /*
- Indicate that a statement is finished.
- Step the group log position if we are not in a transaction,
- otherwise increase the event log position.
- */
- rli->stmt_done(log_pos);
- /*
- Clear any errors in thd->net.last_err*. It is not known if this is
- needed or not. It is believed that any errors that may exist in
- thd->net.last_err* are allowed. Examples of errors are "key not
- found", which is produced in the test case rpl_row_conflicts.test
- */
- thd->clear_error();
+ if (bitmap_is_set(cols, (*ptr)->field_index))
+ {
+ if ((*ptr)->cmp_binary_offset(table->s->rec_buff_length))
+ {
+ result= TRUE;
+ goto record_compare_exit;
+ }
+ }
}
- else
+
+record_compare_exit:
+ /*
+ Restore the saved bytes.
+
+ TODO[record format ndb]: Remove this code once NDB returns the
+ correct record format.
+ */
+ if (table->s->null_bytes > 0)
{
- rli->inc_event_relay_log_pos();
+ for (int i = 0 ; i < 2 ; ++i)
+ {
+ if (!(table->s->db_options_in_use & HA_OPTION_PACK_RECORD))
+ table->record[i][0]= saved_x[i];
+
+ if (table->s->last_null_bit_pos)
+ table->record[i][table->s->null_bytes - 1]= saved_filler[i];
+ }
}
- DBUG_RETURN(error);
+ return result;
}
-#endif /* !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION) */
-#ifndef MYSQL_CLIENT
-bool Rows_log_event::write_data_header(IO_CACHE *file)
+struct row_entry
{
- uchar buf[ROWS_HEADER_LEN]; // No need to init the buffer
- DBUG_ASSERT(m_table_id != ~0UL);
- DBUG_EXECUTE_IF("old_row_based_repl_4_byte_map_id_master",
- {
- int4store(buf + 0, m_table_id);
- int2store(buf + 4, m_flags);
- return (my_b_safe_write(file, buf, 6));
- });
- int6store(buf + RW_MAPID_OFFSET, (ulonglong)m_table_id);
- int2store(buf + RW_FLAGS_OFFSET, m_flags);
- return (my_b_safe_write(file, buf, ROWS_HEADER_LEN));
-}
+ uchar *key;
+ uint length;
+ const uchar *m_curr_row;
+} typedef row_entry;
-bool Rows_log_event::write_data_body(IO_CACHE*file)
+extern "C" uchar *rows_log_event_get_key(const uchar *record, size_t *length,
+ my_bool not_used __attribute__((unused)))
{
- /*
- Note that this should be the number of *bits*, not the number of
- bytes.
- */
- uchar sbuf[sizeof(m_width) + 1];
- my_ptrdiff_t const data_size= m_rows_cur - m_rows_buf;
- bool res= false;
- uchar *const sbuf_end= net_store_length(sbuf, (size_t) m_width);
- DBUG_ASSERT(static_cast<size_t>(sbuf_end - sbuf) <= sizeof(sbuf));
-
- DBUG_DUMP("m_width", sbuf, (size_t) (sbuf_end - sbuf));
- res= res || my_b_safe_write(file, sbuf, (size_t) (sbuf_end - sbuf));
-
- DBUG_DUMP("m_cols", (uchar*) m_cols.bitmap, no_bytes_in_map(&m_cols));
- res= res || my_b_safe_write(file, (uchar*) m_cols.bitmap,
- no_bytes_in_map(&m_cols));
- /*
- TODO[refactor write]: Remove the "down cast" here (and elsewhere).
- */
- if (get_type_code() == UPDATE_ROWS_EVENT)
- {
- DBUG_DUMP("m_cols_ai", (uchar*) m_cols_ai.bitmap,
- no_bytes_in_map(&m_cols_ai));
- res= res || my_b_safe_write(file, (uchar*) m_cols_ai.bitmap,
- no_bytes_in_map(&m_cols_ai));
- }
- DBUG_DUMP("rows", m_rows_buf, data_size);
- res= res || my_b_safe_write(file, m_rows_buf, (size_t) data_size);
+ DBUG_ENTER("get_key");
- return res;
+ row_entry *entry=(row_entry *) record;
+ *length= entry->length;
+ DBUG_RETURN((uchar*) entry->key);
}
-#endif
-#if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT)
-void Rows_log_event::pack_info(Protocol *protocol)
+static void rows_log_event_free_entry(row_entry *entry)
{
- char buf[256];
- char const *const flagstr=
- get_flags(STMT_END_F) ? " flags: STMT_END_F" : "";
- size_t bytes= my_snprintf(buf, sizeof(buf),
- "table_id: %lu%s", m_table_id, flagstr);
- protocol->store(buf, bytes, &my_charset_bin);
+ DBUG_ENTER("free_entry");
+ my_free(entry->key);
+ my_free(entry);
+ DBUG_VOID_RETURN;
}
-#endif
-#ifdef MYSQL_CLIENT
-void Rows_log_event::print_helper(FILE *file,
- PRINT_EVENT_INFO *print_event_info,
- char const *const name)
+int Rows_log_event::hash_row(Relay_log_info const *rli)
{
- IO_CACHE *const head= &print_event_info->head_cache;
- IO_CACHE *const body= &print_event_info->body_cache;
- if (!print_event_info->short_form)
+ int error= 0;
+
+ if ((error= unpack_current_row(rli, &m_cols)))
+ goto err;
+ else
{
- bool const last_stmt_event= get_flags(STMT_END_F);
- print_header(head, print_event_info, !last_stmt_event);
- my_b_printf(head, "\t%s: table id %lu%s\n",
- name, m_table_id,
- last_stmt_event ? " flags: STMT_END_F" : "");
- print_base64(body, print_event_info, !last_stmt_event);
+ // TODO: remove blobs
+
+ row_entry *entry= (row_entry*)malloc(sizeof(row_entry));
+ entry->key= (uchar*) malloc(m_table->s->reclength);
+ memcpy(entry->key, m_table->record[0], m_table->s->reclength);
+ entry->length= m_table->s->reclength;
+ entry->m_curr_row=m_curr_row;
+ my_hash_insert(&m_hash, (uchar*)entry);
+
+
+ if (get_type_code() == UPDATE_ROWS_EVENT)
+ {
+ /*
+ This is the situation after locating BI:
+
+ ===|=== before image ====|=== after image ===|===
+ ^ ^
+ m_curr_row m_curr_row_end
+
+ We need to skip the AI as well, before moving on to the
+ next row.
+ */
+ m_curr_row=m_curr_row_end;
+ error= unpack_current_row(rli, &m_cols_ai);
+ }
+
+ m_curr_row=m_curr_row_end;
}
- if (get_flags(STMT_END_F))
+err:
+ return error;
+}
+
+int Rows_log_event::handle_idempotent_errors(Relay_log_info const *rli, int *err)
+{
+ int error= *err;
+ if (error)
{
- copy_event_cache_to_file_and_reinit(head, file);
- copy_event_cache_to_file_and_reinit(body, file);
+ int actual_error= convert_handler_error(error, thd, m_table);
+ bool idempotent_error= (idempotent_error_code(error) &&
+ (slave_exec_mode == SLAVE_EXEC_MODE_IDEMPOTENT));
+ bool ignored_error= (idempotent_error == 0 ?
+ ignored_error_code(actual_error) : 0);
+
+ if (idempotent_error || ignored_error)
+ {
+ if (global_system_variables.log_warnings)
+ slave_rows_error_report(WARNING_LEVEL, error, rli, thd, m_table,
+ get_type_str(),
+ const_cast<Relay_log_info*>(rli)->get_rpl_log_name(),
+ (ulong) log_pos);
+ clear_all_errors(thd, const_cast<Relay_log_info*>(rli));
+ *err= 0;
+ if (idempotent_error == 0)
+ return true;
+ }
}
+
+ return false;
}
-#endif
-/**************************************************************************
- Table_map_log_event member functions and support functions
-**************************************************************************/
+int Rows_log_event::do_apply_row(Relay_log_info const *rli)
+{
+ int error= 0;
-/**
- @page How replication of field metadata works.
-
- When a table map is created, the master first calls
- Table_map_log_event::save_field_metadata() which calculates how many
- values will be in the field metadata. Only those fields that require the
- extra data are added. The method also loops through all of the fields in
- the table calling the method Field::save_field_metadata() which returns the
- values for the field that will be saved in the metadata and replicated to
- the slave. Once all fields have been processed, the table map is written to
- the binlog adding the size of the field metadata and the field metadata to
- the end of the body of the table map.
+ /* in_use can have been set to NULL in close_tables_for_reopen */
+ THD* old_thd= m_table->in_use;
+ if (!m_table->in_use)
+ m_table->in_use= thd;
- When a table map is read on the slave, the field metadata is read from the
- table map and passed to the table_def class constructor which saves the
- field metadata from the table map into an array based on the type of the
- field. Field metadata values not present (those fields that do not use extra
- data) in the table map are initialized as zero (0). The array size is the
- same as the columns for the table on the slave.
+ error= do_exec_row(rli);
- Additionally, values saved for field metadata on the master are saved as a
- string of bytes (uchar) in the binlog. A field may require 1 or more bytes
- to store the information. In cases where values require multiple bytes
- (e.g. values > 255), the endian-safe methods are used to properly encode
- the values on the master and decode them on the slave. When the field
- metadata values are captured on the slave, they are stored in an array of
- type uint16. This allows the least number of casts to prevent casting bugs
- when the field metadata is used in comparisons of field attributes. When
- the field metadata is used for calculating addresses in pointer math, the
- type used is uint32.
-*/
+ DBUG_PRINT("info", ("error: %s", HA_ERR(error)));
+ DBUG_ASSERT(error != HA_ERR_RECORD_DELETED);
-#if !defined(MYSQL_CLIENT)
-/**
- Save the field metadata based on the real_type of the field.
- The metadata saved depends on the type of the field. Some fields
- store a single byte for pack_length() while others store two bytes
- for field_length (max length).
-
- @retval 0 Ok.
+ m_table->in_use = old_thd;
- @todo
- We may want to consider changing the encoding of the information.
- Currently, the code attempts to minimize the number of bytes written to
- the tablemap. There are at least two other alternatives; 1) using
- net_store_length() to store the data allowing it to choose the number of
- bytes that are appropriate thereby making the code much easier to
- maintain (only 1 place to change the encoding), or 2) use a fixed number
- of bytes for each field. The problem with option 1 is that net_store_length()
- will use one byte if the value < 251, but 3 bytes if it is > 250. Thus,
- for fields like CHAR which can be no larger than 255 characters, the method
- will use 3 bytes when the value is > 250. Further, every value that is
- encoded using 2 parts (e.g., pack_length, field_length) will be numerically
- > 250 therefore will use 3 bytes for eah value. The problem with option 2
- is less wasteful for space but does waste 1 byte for every field that does
- not encode 2 parts.
-*/
-int Table_map_log_event::save_field_metadata()
-{
- DBUG_ENTER("Table_map_log_event::save_field_metadata");
- int index= 0;
- for (unsigned int i= 0 ; i < m_table->s->fields ; i++)
- {
- DBUG_PRINT("debug", ("field_type: %d", m_coltype[i]));
- index+= m_table->s->field[i]->save_field_metadata(&m_field_metadata[index]);
- }
- DBUG_RETURN(index);
+ if (handle_idempotent_errors(rli, &error))
+ return error;
+
+ /*
+ If m_curr_row_end was not set during event execution (e.g., because
+ of errors) we can't proceed to the next row. If the error is transient
+ (i.e., error==0 at this point) we must call unpack_current_row() to set
+ m_curr_row_end.
+ */
+
+ DBUG_PRINT("info", ("curr_row: 0x%lu; curr_row_end: 0x%lu; rows_end: 0x%lu",
+ (ulong) m_curr_row, (ulong) m_curr_row_end, (ulong) m_rows_end));
+
+ if (!m_curr_row_end && !error)
+ error= unpack_current_row(rli, &m_cols);
+
+ // at this moment m_curr_row_end should be set
+ DBUG_ASSERT(error || m_curr_row_end != NULL);
+ DBUG_ASSERT(error || m_curr_row <= m_curr_row_end);
+ DBUG_ASSERT(error || m_curr_row_end <= m_rows_end);
+
+ m_curr_row= m_curr_row_end;
+
+ if (error == 0 && !m_table->file->has_transactions())
+ thd->transaction.all.modified_non_trans_table=
+ thd->transaction.stmt.modified_non_trans_table= TRUE;
+
+ return error;
}
-#endif /* !defined(MYSQL_CLIENT) */
-/*
- Constructor used to build an event for writing to the binary log.
- Mats says tbl->s lives longer than this event so it's ok to copy pointers
- (tbl->s->db etc) and not pointer content.
- */
-#if !defined(MYSQL_CLIENT)
-Table_map_log_event::Table_map_log_event(THD *thd, TABLE *tbl, ulong tid,
- bool is_transactional)
- : Log_event(thd, 0, is_transactional),
- m_table(tbl),
- m_dbnam(tbl->s->db.str),
- m_dblen(m_dbnam ? tbl->s->db.length : 0),
- m_tblnam(tbl->s->table_name.str),
- m_tbllen(tbl->s->table_name.length),
- m_colcnt(tbl->s->fields),
- m_memory(NULL),
- m_table_id(tid),
- m_flags(TM_BIT_LEN_EXACT_F),
- m_data_size(0),
- m_field_metadata(0),
- m_field_metadata_size(0),
- m_null_bits(0),
- m_meta_memory(NULL)
+
+int Rows_log_event::do_index_scan_and_update(Relay_log_info const *rli)
{
- uchar cbuf[sizeof(m_colcnt) + 1];
- uchar *cbuf_end;
- DBUG_ASSERT(m_table_id != ~0UL);
+ DBUG_ENTER("Rows_log_event::do_index_scan_and_update");
+ DBUG_ASSERT(m_table && m_table->in_use != NULL);
+
+ TABLE *table= m_table;
+ int error= 0;
+ KEY *keyinfo;
+ uint key;
+
/*
- In TABLE_SHARE, "db" and "table_name" are 0-terminated (see this comment in
- table.cc / alloc_table_share():
- Use the fact the key is db/0/table_name/0
- As we rely on this let's assert it.
+ rpl_row_tabledefs.test specifies that
+ if the extra field on the slave does not have a default value
+ and this is okay with Delete or Update events.
+ Todo: fix wl3228 hld that requires defauls for all types of events
*/
- DBUG_ASSERT((tbl->s->db.str == 0) ||
- (tbl->s->db.str[tbl->s->db.length] == 0));
- DBUG_ASSERT(tbl->s->table_name.str[tbl->s->table_name.length] == 0);
+ prepare_record(table, &m_cols, FALSE);
+ error= unpack_current_row(rli, &m_cols);
- m_data_size= TABLE_MAP_HEADER_LEN;
- DBUG_EXECUTE_IF("old_row_based_repl_4_byte_map_id_master", m_data_size= 6;);
- m_data_size+= m_dblen + 2; // Include length and terminating \0
- m_data_size+= m_tbllen + 2; // Include length and terminating \0
- cbuf_end= net_store_length(cbuf, (size_t) m_colcnt);
- DBUG_ASSERT(static_cast<size_t>(cbuf_end - cbuf) <= sizeof(cbuf));
- m_data_size+= (cbuf_end - cbuf) + m_colcnt; // COLCNT and column types
+ // Temporary fix to find out why it fails [/Matz]
+ memcpy(m_table->read_set->bitmap, m_cols.bitmap, (m_table->read_set->n_bits + 7) / 8);
- /* If malloc fails, caught in is_valid() */
- if ((m_memory= (uchar*) my_malloc(m_colcnt, MYF(MY_WME))))
+ if (!is_any_column_signaled_for_table(table, &m_cols))
{
- m_coltype= reinterpret_cast<uchar*>(m_memory);
- for (unsigned int i= 0 ; i < m_table->s->fields ; ++i)
- m_coltype[i]= m_table->field[i]->type();
+ error= HA_ERR_END_OF_FILE;
+ goto err;
}
- /*
- Calculate a bitmap for the results of maybe_null() for all columns.
- The bitmap is used to determine when there is a column from the master
- that is not on the slave and is null and thus not in the row data during
- replication.
- */
- uint num_null_bytes= (m_table->s->fields + 7) / 8;
- m_data_size+= num_null_bytes;
- m_meta_memory= (uchar *)my_multi_malloc(MYF(MY_WME),
- &m_null_bits, num_null_bytes,
- &m_field_metadata, (m_colcnt * 2),
- NULL);
+#ifndef DBUG_OFF
+ DBUG_PRINT("info",("looking for the following record"));
+ DBUG_DUMP("record[0]", table->record[0], table->s->reclength);
+#endif
- bzero(m_field_metadata, (m_colcnt * 2));
+ if ((key= search_key_in_table(table, &m_cols, PRI_KEY_FLAG)) >= MAX_KEY)
+ /* we dont have a PK, or PK is not usable with BI values */
+ goto INDEX_SCAN;
- /*
- Create an array for the field metadata and store it.
- */
- m_field_metadata_size= save_field_metadata();
- DBUG_ASSERT(m_field_metadata_size <= (m_colcnt * 2));
+ if ((table->file->ha_table_flags() & HA_PRIMARY_KEY_REQUIRED_FOR_POSITION))
+ {
+ /*
+ Use a more efficient method to fetch the record given by
+ table->record[0] if the engine allows it. We first compute a
+ row reference using the position() member function (it will be
+ stored in table->file->ref) and the use rnd_pos() to position
+ the "cursor" (i.e., record[0] in this case) at the correct row.
- /*
- Now set the size of the data to the size of the field metadata array
- plus one or three bytes (see pack.c:net_store_length) for number of
- elements in the field metadata array.
- */
- if (m_field_metadata_size < 251)
- m_data_size+= m_field_metadata_size + 1;
- else
- m_data_size+= m_field_metadata_size + 3;
+ TODO: Add a check that the correct record has been fetched by
+ comparing with the original record. Take into account that the
+ record on the master and slave can be of different
+ length. Something along these lines should work:
- bzero(m_null_bits, num_null_bytes);
- for (unsigned int i= 0 ; i < m_table->s->fields ; ++i)
- if (m_table->field[i]->maybe_null())
- m_null_bits[(i / 8)]+= 1 << (i % 8);
+ ADD>>> store_record(table,record[1]);
+ int error= table->file->rnd_pos(table->record[0], table->file->ref);
+ ADD>>> DBUG_ASSERT(memcmp(table->record[1], table->record[0],
+ table->s->reclength) == 0);
-}
-#endif /* !defined(MYSQL_CLIENT) */
+ */
+ DBUG_PRINT("info",("locating record using primary key (position)"));
+ int error;
+ if (table->file->inited && (error= table->file->ha_index_end()))
+ DBUG_RETURN(error);
+ if ((error= table->file->ha_rnd_init(FALSE)))
+ DBUG_RETURN(error);
-/*
- Constructor used by slave to read the event from the binary log.
- */
-#if defined(HAVE_REPLICATION)
-Table_map_log_event::Table_map_log_event(const char *buf, uint event_len,
- const Format_description_log_event
- *description_event)
+ error= table->file->rnd_pos_by_record(table->record[0]);
- : Log_event(buf, description_event),
-#ifndef MYSQL_CLIENT
- m_table(NULL),
-#endif
- m_dbnam(NULL), m_dblen(0), m_tblnam(NULL), m_tbllen(0),
- m_colcnt(0), m_coltype(0),
- m_memory(NULL), m_table_id(ULONG_MAX), m_flags(0),
- m_data_size(0), m_field_metadata(0), m_field_metadata_size(0),
- m_null_bits(0), m_meta_memory(NULL)
-{
- unsigned int bytes_read= 0;
- DBUG_ENTER("Table_map_log_event::Table_map_log_event(const char*,uint,...)");
+ table->file->ha_rnd_end();
+ if (error)
+ {
+ DBUG_PRINT("info",("rnd_pos returns error %d",error));
+ if (error == HA_ERR_RECORD_DELETED)
+ error= HA_ERR_KEY_NOT_FOUND;
+ table->file->print_error(error, MYF(0));
+ }
+ DBUG_RETURN(error);
+ }
- uint8 common_header_len= description_event->common_header_len;
- uint8 post_header_len= description_event->post_header_len[TABLE_MAP_EVENT-1];
- DBUG_PRINT("info",("event_len: %u common_header_len: %d post_header_len: %d",
- event_len, common_header_len, post_header_len));
+ // We can't use position() - try other methods.
+
+INDEX_SCAN:
/*
- Don't print debug messages when running valgrind since they can
- trigger false warnings.
+ Save copy of the record in table->record[1]. It might be needed
+ later if linear search is used to find exact match.
*/
-#ifndef HAVE_purify
- DBUG_DUMP("event buffer", (uchar*) buf, event_len);
-#endif
+ store_record(table,record[1]);
- /* Read the post-header */
- const char *post_start= buf + common_header_len;
-
- post_start+= TM_MAPID_OFFSET;
- if (post_header_len == 6)
- {
- /* Master is of an intermediate source tree before 5.1.4. Id is 4 bytes */
- m_table_id= uint4korr(post_start);
- post_start+= 4;
- }
- else
+ if ((key= search_key_in_table(table, &m_cols,
+ (PRI_KEY_FLAG | UNIQUE_KEY_FLAG | MULTIPLE_KEY_FLAG)))
+ >= MAX_KEY)
+ /* we dont have a key, or no key is suitable for the BI values */
{
- DBUG_ASSERT(post_header_len == TABLE_MAP_HEADER_LEN);
- m_table_id= (ulong) uint6korr(post_start);
- post_start+= TM_FLAGS_OFFSET;
+ error= HA_ERR_KEY_NOT_FOUND;
+ goto err;
}
- DBUG_ASSERT(m_table_id != ~0UL);
-
- m_flags= uint2korr(post_start);
+ {
+ keyinfo= table->key_info + key;
- /* Read the variable part of the event */
- const char *const vpart= buf + common_header_len + post_header_len;
- /* Extract the length of the various parts from the buffer */
- uchar const *const ptr_dblen= (uchar const*)vpart + 0;
- m_dblen= *(uchar*) ptr_dblen;
+ DBUG_PRINT("info",("locating record using primary key (index_read)"));
- /* Length of database name + counter + terminating null */
- uchar const *const ptr_tbllen= ptr_dblen + m_dblen + 2;
- m_tbllen= *(uchar*) ptr_tbllen;
+ /* The key'th key is active and usable: search the table using the index */
+ if (!table->file->inited && (error= table->file->ha_index_init(key, FALSE)))
+ {
+ DBUG_PRINT("info",("ha_index_init returns error %d",error));
+ table->file->print_error(error, MYF(0));
+ goto err;
+ }
- /* Length of table name + counter + terminating null */
- uchar const *const ptr_colcnt= ptr_tbllen + m_tbllen + 2;
- uchar *ptr_after_colcnt= (uchar*) ptr_colcnt;
- m_colcnt= net_field_length(&ptr_after_colcnt);
+ /* Fill key data for the row */
- DBUG_PRINT("info",("m_dblen: %lu off: %ld m_tbllen: %lu off: %ld m_colcnt: %lu off: %ld",
- (ulong) m_dblen, (long) (ptr_dblen-(const uchar*)vpart),
- (ulong) m_tbllen, (long) (ptr_tbllen-(const uchar*)vpart),
- m_colcnt, (long) (ptr_colcnt-(const uchar*)vpart)));
+ DBUG_ASSERT(m_key);
+ key_copy(m_key, table->record[0], keyinfo, 0);
- /* Allocate mem for all fields in one go. If fails, caught in is_valid() */
- m_memory= (uchar*) my_multi_malloc(MYF(MY_WME),
- &m_dbnam, (uint) m_dblen + 1,
- &m_tblnam, (uint) m_tbllen + 1,
- &m_coltype, (uint) m_colcnt,
- NullS);
+ /*
+ Don't print debug messages when running valgrind since they can
+ trigger false warnings.
+ */
+#ifndef HAVE_purify
+ DBUG_DUMP("key data", m_key, keyinfo->key_length);
+#endif
- if (m_memory)
- {
- /* Copy the different parts into their memory */
- strncpy(const_cast<char*>(m_dbnam), (const char*)ptr_dblen + 1, m_dblen + 1);
- strncpy(const_cast<char*>(m_tblnam), (const char*)ptr_tbllen + 1, m_tbllen + 1);
- memcpy(m_coltype, ptr_after_colcnt, m_colcnt);
+ /*
+ We need to set the null bytes to ensure that the filler bit are
+ all set when returning. There are storage engines that just set
+ the necessary bits on the bytes and don't set the filler bits
+ correctly.
+ */
+ if (table->s->null_bytes > 0)
+ table->record[0][table->s->null_bytes - 1]|=
+ 256U - (1U << table->s->last_null_bit_pos);
- ptr_after_colcnt= ptr_after_colcnt + m_colcnt;
- bytes_read= (uint) (ptr_after_colcnt - (uchar *)buf);
- DBUG_PRINT("info", ("Bytes read: %d.\n", bytes_read));
- if (bytes_read < event_len)
+ if ((error= table->file->ha_index_read_map(table->record[0], m_key,
+ HA_WHOLE_KEY,
+ HA_READ_KEY_EXACT)))
{
- m_field_metadata_size= net_field_length(&ptr_after_colcnt);
- DBUG_ASSERT(m_field_metadata_size <= (m_colcnt * 2));
- uint num_null_bytes= (m_colcnt + 7) / 8;
- m_meta_memory= (uchar *)my_multi_malloc(MYF(MY_WME),
- &m_null_bits, num_null_bytes,
- &m_field_metadata, m_field_metadata_size,
- NULL);
- memcpy(m_field_metadata, ptr_after_colcnt, m_field_metadata_size);
- ptr_after_colcnt= (uchar*)ptr_after_colcnt + m_field_metadata_size;
- memcpy(m_null_bits, ptr_after_colcnt, num_null_bytes);
+ DBUG_PRINT("info",("no record matching the key found in the table"));
+ if (error == HA_ERR_RECORD_DELETED)
+ error= HA_ERR_KEY_NOT_FOUND;
+ table->file->print_error(error, MYF(0));
+ table->file->ha_index_end();
+ goto err;
}
- }
- DBUG_VOID_RETURN;
-}
+ /*
+ Don't print debug messages when running valgrind since they can
+ trigger false warnings.
+ */
+#ifndef HAVE_purify
+ DBUG_PRINT("info",("found first matching record"));
+ DBUG_DUMP("record[0]", table->record[0], table->s->reclength);
#endif
+ /*
+ Below is a minor "optimization". If the key (i.e., key number
+ 0) has the HA_NOSAME flag set, we know that we have found the
+ correct record (since there can be no duplicates); otherwise, we
+ have to compare the record with the one found to see if it is
+ the correct one.
-Table_map_log_event::~Table_map_log_event()
-{
- my_free(m_meta_memory);
- my_free(m_memory);
-}
+ CAVEAT! This behaviour is essential for the replication of,
+ e.g., the mysql.proc table since the correct record *shall* be
+ found using the primary key *only*. There shall be no
+ comparison of non-PK columns to decide if the correct record is
+ found. I can see no scenario where it would be incorrect to
+ chose the row to change only using a PK or an UNNI.
+ */
+ if (keyinfo->flags & HA_NOSAME || key == table->s->primary_key)
+ {
+ /* Unique does not have non nullable part */
+ if (!(table->key_info->flags & (HA_NULL_PART_KEY)))
+ {
+ table->file->ha_index_end();
+ goto record_found;
+ }
+ else
+ {
+ KEY *keyinfo= table->key_info;
+ /*
+ Unique has nullable part. We need to check if there is any field in the
+ BI image that is null and part of UNNI.
+ */
+ bool null_found= FALSE;
+ for (uint i=0; i < keyinfo->key_parts && !null_found; i++)
+ {
+ uint fieldnr= keyinfo->key_part[i].fieldnr - 1;
+ Field **f= table->field+fieldnr;
+ null_found= (*f)->is_null();
+ }
-/*
- Return value is an error code, one of:
+ if (!null_found)
+ {
+ table->file->ha_index_end();
+ goto record_found;
+ }
- -1 Failure to open table [from open_tables()]
- 0 Success
- 1 No room for more tables [from set_table()]
- 2 Out of memory [from set_table()]
- 3 Wrong table definition
- 4 Daisy-chaining RBR with SBR not possible
- */
+ /* else fall through to index scan */
+ }
+ }
-#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
-int Table_map_log_event::do_apply_event(Relay_log_info const *rli)
-{
- RPL_TABLE_LIST *table_list;
- char *db_mem, *tname_mem;
- size_t dummy_len;
- void *memory;
- DBUG_ENTER("Table_map_log_event::do_apply_event(Relay_log_info*)");
- DBUG_ASSERT(rli->info_thd == thd);
+ /*
+ In case key is not unique, we still have to iterate over records found
+ and find the one which is identical to the row given. A copy of the
+ record we are looking for is stored in record[1].
+ */
+ DBUG_PRINT("info",("non-unique index, scanning it to find matching record"));
- /* Step the query id to mark what columns that are actually used. */
- thd->set_query_id(next_query_id());
+ while (record_compare(table, &m_cols))
+ {
+ /*
+ We need to set the null bytes to ensure that the filler bit
+ are all set when returning. There are storage engines that
+ just set the necessary bits on the bytes and don't set the
+ filler bits correctly.
- if (!(memory= my_multi_malloc(MYF(MY_WME),
- &table_list, (uint) sizeof(RPL_TABLE_LIST),
- &db_mem, (uint) NAME_LEN + 1,
- &tname_mem, (uint) NAME_LEN + 1,
- NullS)))
- DBUG_RETURN(HA_ERR_OUT_OF_MEM);
+ TODO[record format ndb]: Remove this code once NDB returns the
+ correct record format.
+ */
+ if (table->s->null_bytes > 0)
+ {
+ table->record[0][table->s->null_bytes - 1]|=
+ 256U - (1U << table->s->last_null_bit_pos);
+ }
- strmov(db_mem, rpl_filter->get_rewrite_db(m_dbnam, &dummy_len));
- strmov(tname_mem, m_tblnam);
+ while ((error= table->file->ha_index_next(table->record[0])))
+ {
+ /* We just skip records that has already been deleted */
+ if (error == HA_ERR_RECORD_DELETED)
+ continue;
+ DBUG_PRINT("info",("no record matching the given row found"));
+ table->file->print_error(error, MYF(0));
+ table->file->ha_index_end();
+ goto err;
+ }
+ }
- table_list->init_one_table(db_mem, strlen(db_mem),
- tname_mem, strlen(tname_mem),
- tname_mem, TL_WRITE);
+ /*
+ Have to restart the scan to be able to fetch the next row.
+ */
+ table->file->ha_index_end();
+ }
- table_list->table_id= m_table_id;
- table_list->updating= 1;
+record_found:
+ error= do_apply_row(rli);
+
+err:
+ table->default_column_bitmaps();
+ DBUG_RETURN(error);
+
+}
+int Rows_log_event::do_hash_scan_and_update(Relay_log_info const *rli)
+{
int error= 0;
- if (rli->info_thd->slave_thread /* filtering is for slave only */ &&
- (!rpl_filter->db_ok(table_list->db) ||
- (rpl_filter->is_on() && !rpl_filter->tables_ok("", table_list))))
+ DBUG_ENTER("Rows_log_event::do_hash_scan_and_update");
+
+ if ((error= hash_row(rli)))
+ goto err;
+
+ /**
+ Last row hashed.
+
+ Now do the table scan and update according to the hash table
+ matches.
+ */
+ if (m_curr_row == m_rows_end)
{
- my_free(memory);
+
+ TABLE* table= m_table;
+ MY_BITMAP* read_set= &m_cols;
+
+ if ((error= table->file->ha_rnd_init(1)))
+ {
+ DBUG_PRINT("info",("error initializing table scan"
+ " (ha_rnd_init returns %d)",error));
+ table->file->print_error(error, MYF(0));
+ goto err;
+ }
+
+ /* Continue until we find the right record or reached the end of the table */
+ do
+ {
+ error= table->file->ha_rnd_next(table->record[0]);
+
+ // TODO: remove blobs from record got from the engine
+
+ DBUG_PRINT("info", ("error: %s", HA_ERR(error)));
+ switch (error) {
+ case 0:
+ {
+ bool found_in_hash= false;
+ HASH_SEARCH_STATE state;
+ /* save a copy from the record got from the engine. */
+ store_record(table, record[1]);
+
+ /**
+ This is only needed because records are hashed without blobs, so
+ we may have false positives.
+ */
+ row_entry *entry= (row_entry *) my_hash_first(&m_hash, table->record[0], table->s->reclength, &state);
+ while (entry)
+ {
+ /**
+ unpack again the full record to table->record[0]. Now, both
+ table->record[0] and table->record[1] have the same contents.
+ */
+ m_curr_row= entry->m_curr_row;
+ if ((error= unpack_current_row(rli, &m_cols)))
+ goto close_table_and_err;
+
+ /*
+ compare the row_entry with row taking into account
+ blobs, if there is any. If there is a match do the
+ operation and remove the entry from the hash table.
+ */
+ if (!record_compare(table, read_set))
+ {
+ found_in_hash= true;
+
+ my_hash_delete(&m_hash, (uchar *)entry);
+ break;
+ }
+
+ // find next
+ entry= (row_entry *)my_hash_next(&m_hash, table->record[0], table->s->reclength, &state);
+ }
+
+ if (found_in_hash)
+ if ((error= do_apply_row(rli)))
+ goto err;
+ }
+ break;
+
+ /*
+ If the record was deleted, we pick the next one without doing
+ any comparisons.
+ */
+ case HA_ERR_RECORD_DELETED:
+ break;
+
+ case HA_ERR_END_OF_FILE: // to make it clear
+ default:
+ DBUG_PRINT("info", ("Failed to get next record"
+ " (ha_rnd_next returns %d)",error));
+ goto close_table_and_err;
+ }
+ }
+
+ while ((m_hash.records > 0) && (!error || (error == HA_ERR_RECORD_DELETED)));
}
- else
+
+err:
+ DBUG_RETURN(error);
+
+close_table_and_err:
+ m_table->file->print_error(error, MYF(0));
+ m_table->file->ha_rnd_end();
+ DBUG_RETURN (error);
+
+}
+
+int Rows_log_event::do_table_scan_and_update(Relay_log_info const *rli)
+{
+ int error= 0;
+ DBUG_ENTER("Rows_log_event::do_table_scan_and_update");
+ DBUG_ASSERT(m_curr_row != m_rows_end);
+ DBUG_PRINT("info",("locating record using table scan (ha_rnd_next)"));
+
+ int restart_count= 0; // Number of times scanning has restarted from top
+
+ /* We don't have a key: search the table using ha_rnd_next() */
+ if ((error= m_table->file->ha_rnd_init(1)))
+ {
+ DBUG_PRINT("info",("error initializing table scan"
+ " (ha_rnd_init returns %d)",error));
+ m_table->file->print_error(error, MYF(0));
+ goto err;
+ }
+
+ /* Continue until we find the right record or have made a full loop */
+ do
{
- DBUG_ASSERT(thd->lex->query_tables != table_list);
+ restart_ha_rnd_next:
+ error= m_table->file->ha_rnd_next(m_table->record[0]);
- /*
- Use placement new to construct the table_def instance in the
- memory allocated for it inside table_list.
+ DBUG_PRINT("info", ("error: %s", HA_ERR(error)));
+ switch (error) {
- The memory allocated by the table_def structure (i.e., not the
- memory allocated *for* the table_def structure) is released
- inside Relay_log_info::clear_tables_to_lock() by calling the
- table_def destructor explicitly.
- */
- new (&table_list->m_tabledef)
- table_def(m_coltype, m_colcnt,
- m_field_metadata, m_field_metadata_size,
- m_null_bits, m_flags);
- table_list->m_tabledef_valid= TRUE;
+ case 0:
+ break;
/*
- We record in the slave's information that the table should be
- locked by linking the table into the list of tables to lock.
+ If the record was deleted, we pick the next one without doing
+ any comparisons.
*/
- table_list->next_global= table_list->next_local= rli->tables_to_lock;
- const_cast<Relay_log_info*>(rli)->tables_to_lock= table_list;
- const_cast<Relay_log_info*>(rli)->tables_to_lock_count++;
- /* 'memory' is freed in clear_tables_to_lock */
+ case HA_ERR_RECORD_DELETED:
+ goto restart_ha_rnd_next;
+
+ case HA_ERR_END_OF_FILE:
+ if (++restart_count < 2)
+ m_table->file->ha_rnd_init(1);
+ break;
+
+ default:
+ DBUG_PRINT("info", ("Failed to get next record"
+ " (ha_rnd_next returns %d)",error));
+ m_table->file->print_error(error, MYF(0));
+ m_table->file->ha_rnd_end();
+ goto err;
+ }
}
+ while (restart_count < 2 && record_compare(m_table, &m_cols));
- DBUG_RETURN(error);
-}
+ /*
+ Note: above record_compare will take into accout all record fields
+ which might be incorrect in case a partial row was given in the event
+ */
-Log_event::enum_skip_reason
-Table_map_log_event::do_shall_skip(Relay_log_info *rli)
-{
/*
- If the slave skip counter is 1, then we should not start executing
- on the next event.
+ Have to restart the scan to be able to fetch the next row.
*/
- return continue_group(rli);
-}
+ if (restart_count == 2)
+ DBUG_PRINT("info", ("Record not found"));
+ else
+ DBUG_DUMP("record found", m_table->record[0], m_table->s->reclength);
+ m_table->file->ha_rnd_end();
-int Table_map_log_event::do_update_pos(Relay_log_info *rli)
-{
- rli->inc_event_relay_log_pos();
- return 0;
-}
+ DBUG_ASSERT(error == HA_ERR_END_OF_FILE || error == 0);
-#endif /* !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION) */
+ /* If the row was found, apply it */
+ if (!error)
+ {
+ DBUG_ASSERT(restart_count < 2);
+ error= do_apply_row(rli);
+ }
+
+err:
+ m_table->default_column_bitmaps();
+ DBUG_RETURN(error);
-#ifndef MYSQL_CLIENT
-bool Table_map_log_event::write_data_header(IO_CACHE *file)
-{
- DBUG_ASSERT(m_table_id != ~0UL);
- uchar buf[TABLE_MAP_HEADER_LEN];
- DBUG_EXECUTE_IF("old_row_based_repl_4_byte_map_id_master",
- {
- int4store(buf + 0, m_table_id);
- int2store(buf + 4, m_flags);
- return (my_b_safe_write(file, buf, 6));
- });
- int6store(buf + TM_MAPID_OFFSET, (ulonglong)m_table_id);
- int2store(buf + TM_FLAGS_OFFSET, m_flags);
- return (my_b_safe_write(file, buf, TABLE_MAP_HEADER_LEN));
}
-bool Table_map_log_event::write_data_body(IO_CACHE *file)
+int Rows_log_event::do_apply_event(Relay_log_info const *rli)
{
- DBUG_ASSERT(m_dbnam != NULL);
- DBUG_ASSERT(m_tblnam != NULL);
- /* We use only one byte per length for storage in event: */
- DBUG_ASSERT(m_dblen < 128);
- DBUG_ASSERT(m_tbllen < 128);
+ DBUG_ENTER("Rows_log_event::do_apply_event(Relay_log_info*)");
+ int error= 0;
+ /*
+ If m_table_id == ~0UL, then we have a dummy event that does not
+ contain any data. In that case, we just remove all tables in the
+ tables_to_lock list, close the thread tables, and return with
+ success.
+ */
+ if (m_table_id == ~0UL)
+ {
+ /*
+ This one is supposed to be set: just an extra check so that
+ nothing strange has happened.
+ */
+ DBUG_ASSERT(get_flags(STMT_END_F));
- uchar const dbuf[]= { (uchar) m_dblen };
- uchar const tbuf[]= { (uchar) m_tbllen };
+ const_cast<Relay_log_info*>(rli)->slave_close_thread_tables(thd);
+ thd->clear_error();
+ DBUG_RETURN(0);
+ }
- uchar cbuf[sizeof(m_colcnt) + 1];
- uchar *const cbuf_end= net_store_length(cbuf, (size_t) m_colcnt);
- DBUG_ASSERT(static_cast<size_t>(cbuf_end - cbuf) <= sizeof(cbuf));
+ /*
+ 'thd' has been set by exec_relay_log_event(), just before calling
+ do_apply_event(). We still check here to prevent future coding
+ errors.
+ */
+ DBUG_ASSERT(rli->info_thd == thd);
/*
- Store the size of the field metadata.
+ If there is no locks taken, this is the first binrow event seen
+ after the table map events. We should then lock all the tables
+ used in the transaction and proceed with execution of the actual
+ event.
*/
- uchar mbuf[sizeof(m_field_metadata_size)];
- uchar *const mbuf_end= net_store_length(mbuf, m_field_metadata_size);
+ if (!thd->lock)
+ {
+ /*
+ Lock_tables() reads the contents of thd->lex, so they must be
+ initialized.
- return (my_b_safe_write(file, dbuf, sizeof(dbuf)) ||
- my_b_safe_write(file, (const uchar*)m_dbnam, m_dblen+1) ||
- my_b_safe_write(file, tbuf, sizeof(tbuf)) ||
- my_b_safe_write(file, (const uchar*)m_tblnam, m_tbllen+1) ||
- my_b_safe_write(file, cbuf, (size_t) (cbuf_end - cbuf)) ||
- my_b_safe_write(file, m_coltype, m_colcnt) ||
- my_b_safe_write(file, mbuf, (size_t) (mbuf_end - mbuf)) ||
- my_b_safe_write(file, m_field_metadata, m_field_metadata_size),
- my_b_safe_write(file, m_null_bits, (m_colcnt + 7) / 8));
- }
-#endif
+ We also call the mysql_reset_thd_for_next_command(), since this
+ is the logical start of the next "statement". Note that this
+ call might reset the value of current_stmt_binlog_format, so
+ we need to do any changes to that value after this function.
+ */
+ lex_start(thd);
+ mysql_reset_thd_for_next_command(thd);
+ /*
+ The current statement is just about to begin and
+ has not yet modified anything. Note, all.modified is reset
+ by mysql_reset_thd_for_next_command.
+ */
+ thd->transaction.stmt.modified_non_trans_table= FALSE;
+ /*
+ This is a row injection, so we flag the "statement" as
+ such. Note that this code is called both when the slave does row
+ injections and when the BINLOG statement is used to do row
+ injections.
+ */
+ thd->lex->set_stmt_row_injection();
-#if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT)
+ /*
+ There are a few flags that are replicated with each row event.
+ Make sure to set/clear them before executing the main body of
+ the event.
+ */
+ if (get_flags(NO_FOREIGN_KEY_CHECKS_F))
+ thd->variables.option_bits|= OPTION_NO_FOREIGN_KEY_CHECKS;
+ else
+ thd->variables.option_bits&= ~OPTION_NO_FOREIGN_KEY_CHECKS;
-/*
- Print some useful information for the SHOW BINARY LOG information
- field.
- */
+ if (get_flags(RELAXED_UNIQUE_CHECKS_F))
+ thd->variables.option_bits|= OPTION_RELAXED_UNIQUE_CHECKS;
+ else
+ thd->variables.option_bits&= ~OPTION_RELAXED_UNIQUE_CHECKS;
+ /* A small test to verify that objects have consistent types */
+ DBUG_ASSERT(sizeof(thd->variables.option_bits) == sizeof(OPTION_RELAXED_UNIQUE_CHECKS));
-#if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT)
-void Table_map_log_event::pack_info(Protocol *protocol)
-{
- char buf[256];
- size_t bytes= my_snprintf(buf, sizeof(buf),
- "table_id: %lu (%s.%s)",
- m_table_id, m_dbnam, m_tblnam);
- protocol->store(buf, bytes, &my_charset_bin);
-}
-#endif
+ if (open_and_lock_tables(thd, rli->tables_to_lock, FALSE, 0))
+ {
+ uint actual_error= thd->stmt_da->sql_errno();
+ if (thd->is_slave_error || thd->is_fatal_error)
+ {
+ /*
+ Error reporting borrowed from Query_log_event with many excessive
+ simplifications.
+ We should not honour --slave-skip-errors at this point as we are
+ having severe errors which should not be skiped.
+ */
+ rli->report(ERROR_LEVEL, actual_error,
+ "Error executing row event: '%s'",
+ (actual_error ? thd->stmt_da->message() :
+ "unexpected success or fatal error"));
+ thd->is_slave_error= 1;
+ }
+ const_cast<Relay_log_info*>(rli)->slave_close_thread_tables(thd);
+ DBUG_RETURN(actual_error);
+ }
+ /*
+ When the open and locking succeeded, we check all tables to
+ ensure that they still have the correct type.
-#endif
+ We can use a down cast here since we know that every table added
+ to the tables_to_lock is a RPL_TABLE_LIST.
+ */
+ {
+ DBUG_PRINT("debug", ("Checking compability of tables to lock - tables_to_lock: %p",
+ rli->tables_to_lock));
+ RPL_TABLE_LIST *ptr= rli->tables_to_lock;
+ for ( ; ptr ; ptr= static_cast<RPL_TABLE_LIST*>(ptr->next_global))
+ {
+ TABLE *conv_table;
+ if (!ptr->m_tabledef.compatible_with(thd, const_cast<Relay_log_info*>(rli),
+ ptr->table, &conv_table))
+ {
+ DBUG_PRINT("debug", ("Table: %s.%s is not compatible with master",
+ ptr->table->s->db.str,
+ ptr->table->s->table_name.str));
+ /*
+ We should not honour --slave-skip-errors at this point as we are
+ having severe errors which should not be skiped.
+ */
+ thd->is_slave_error= 1;
+ const_cast<Relay_log_info*>(rli)->slave_close_thread_tables(thd);
+ DBUG_RETURN(ERR_BAD_TABLE_DEF);
+ }
+ DBUG_PRINT("debug", ("Table: %s.%s is compatible with master"
+ " - conv_table: %p",
+ ptr->table->s->db.str,
+ ptr->table->s->table_name.str, conv_table));
+ ptr->m_conv_table= conv_table;
+ }
+ }
-#ifdef MYSQL_CLIENT
-void Table_map_log_event::print(FILE *file, PRINT_EVENT_INFO *print_event_info)
-{
- if (!print_event_info->short_form)
- {
- print_header(&print_event_info->head_cache, print_event_info, TRUE);
- my_b_printf(&print_event_info->head_cache,
- "\tTable_map: `%s`.`%s` mapped to number %lu\n",
- m_dbnam, m_tblnam, m_table_id);
- print_base64(&print_event_info->body_cache, print_event_info, TRUE);
- }
-}
-#endif
+ /*
+ ... and then we add all the tables to the table map and but keep
+ them in the tables to lock list.
-/**************************************************************************
- Write_rows_log_event member functions
-**************************************************************************/
+ We also invalidate the query cache for all the tables, since
+ they will now be changed.
-/*
- Constructor used to build an event for writing to the binary log.
- */
-#if !defined(MYSQL_CLIENT)
-Write_rows_log_event::Write_rows_log_event(THD *thd_arg, TABLE *tbl_arg,
- ulong tid_arg,
- bool is_transactional)
- : Rows_log_event(thd_arg, tbl_arg, tid_arg, tbl_arg->write_set, is_transactional)
-{
-}
+ TODO [/Matz]: Maybe the query cache should not be invalidated
+ here? It might be that a table is not changed, even though it
+ was locked for the statement. We do know that each
+ Rows_log_event contain at least one row, so after processing one
+ Rows_log_event, we can invalidate the query cache for the
+ associated table.
+ */
+ for (TABLE_LIST *ptr= rli->tables_to_lock ; ptr ; ptr= ptr->next_global)
+ {
+ const_cast<Relay_log_info*>(rli)->m_table_map.set_table(ptr->table_id, ptr->table);
+ }
+#ifdef HAVE_QUERY_CACHE
+ query_cache.invalidate_locked_for_write(rli->tables_to_lock);
#endif
+ }
-/*
- Constructor used by slave to read the event from the binary log.
- */
-#ifdef HAVE_REPLICATION
-Write_rows_log_event::Write_rows_log_event(const char *buf, uint event_len,
- const Format_description_log_event
- *description_event)
-: Rows_log_event(buf, event_len, WRITE_ROWS_EVENT, description_event)
-{
-}
-#endif
+ TABLE*
+ table=
+ m_table= const_cast<Relay_log_info*>(rli)->m_table_map.get_table(m_table_id);
-#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
-int
-Write_rows_log_event::do_before_row_operations(const Slave_reporting_capability *const)
-{
- int error= 0;
+ DBUG_PRINT("debug", ("m_table: 0x%lx, m_table_id: %lu", (ulong) m_table, m_table_id));
- /**
- todo: to introduce a property for the event (handler?) which forces
- applying the event in the replace (idempotent) fashion.
- */
- if ((slave_exec_mode == SLAVE_EXEC_MODE_IDEMPOTENT) ||
- (m_table->s->db_type()->db_type == DB_TYPE_NDBCLUSTER))
+ if (table)
{
/*
- We are using REPLACE semantics and not INSERT IGNORE semantics
- when writing rows, that is: new rows replace old rows. We need to
- inform the storage engine that it should use this behaviour.
+ table == NULL means that this table should not be replicated
+ (this was set up by Table_map_log_event::do_apply_event()
+ which tested replicate-* rules).
*/
-
- /* Tell the storage engine that we are using REPLACE semantics. */
- thd->lex->duplicates= DUP_REPLACE;
-
+
/*
- Pretend we're executing a REPLACE command: this is needed for
- InnoDB and NDB Cluster since they are not (properly) checking the
- lex->duplicates flag.
+ It's not needed to set_time() but
+ 1) it continues the property that "Time" in SHOW PROCESSLIST shows how
+ much slave is behind
+ 2) it will be needed when we allow replication from a table with no
+ TIMESTAMP column to a table with one.
+ So we call set_time(), like in SBR. Presently it changes nothing.
*/
- thd->lex->sql_command= SQLCOM_REPLACE;
- /*
- Do not raise the error flag in case of hitting to an unique attribute
- */
- m_table->file->extra(HA_EXTRA_IGNORE_DUP_KEY);
- /*
- NDB specific: update from ndb master wrapped as Write_rows
- so that the event should be applied to replace slave's row
- */
- m_table->file->extra(HA_EXTRA_WRITE_CAN_REPLACE);
- /*
- NDB specific: if update from ndb master wrapped as Write_rows
- does not find the row it's assumed idempotent binlog applying
- is taking place; don't raise the error.
- */
- m_table->file->extra(HA_EXTRA_IGNORE_NO_KEY);
- /*
- TODO: the cluster team (Tomas?) says that it's better if the engine knows
- how many rows are going to be inserted, then it can allocate needed memory
- from the start.
- */
- }
-
- /*
- We need TIMESTAMP_NO_AUTO_SET otherwise ha_write_row() will not use fill
- any TIMESTAMP column with data from the row but instead will use
- the event's current time.
- As we replicate from TIMESTAMP to TIMESTAMP and slave has no extra
- columns, we know that all TIMESTAMP columns on slave will receive explicit
- data from the row, so TIMESTAMP_NO_AUTO_SET is ok.
- When we allow a table without TIMESTAMP to be replicated to a table having
- more columns including a TIMESTAMP column, or when we allow a TIMESTAMP
- column to be replicated into a BIGINT column and the slave's table has a
- TIMESTAMP column, then the slave's TIMESTAMP column will take its value
- from set_time() which we called earlier (consistent with SBR). And then in
- some cases we won't want TIMESTAMP_NO_AUTO_SET (will require some code to
- analyze if explicit data is provided for slave's TIMESTAMP columns).
- */
- m_table->timestamp_field_type= TIMESTAMP_NO_AUTO_SET;
-
- /* Honor next number column if present */
- m_table->next_number_field= m_table->found_next_number_field;
- /*
- * Fixed Bug#45999, In RBR, Store engine of Slave auto-generates new
- * sequence numbers for auto_increment fields if the values of them are 0.
- * If generateing a sequence number is decided by the values of
- * table->auto_increment_field_not_null and SQL_MODE(if includes
- * MODE_NO_AUTO_VALUE_ON_ZERO) in update_auto_increment function.
- * SQL_MODE of slave sql thread is always consistency with master's.
- * In RBR, auto_increment fields never are NULL.
- */
- m_table->auto_increment_field_not_null= TRUE;
- return error;
-}
+ thd->set_time((time_t)when);
-int
-Write_rows_log_event::do_after_row_operations(const Slave_reporting_capability *const,
- int error)
-{
- int local_error= 0;
- m_table->next_number_field=0;
- m_table->auto_increment_field_not_null= FALSE;
- if ((slave_exec_mode == SLAVE_EXEC_MODE_IDEMPOTENT) ||
- m_table->s->db_type()->db_type == DB_TYPE_NDBCLUSTER)
- {
- m_table->file->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
- m_table->file->extra(HA_EXTRA_WRITE_CANNOT_REPLACE);
/*
- resetting the extra with
- table->file->extra(HA_EXTRA_NO_IGNORE_NO_KEY);
- fires bug#27077
- explanation: file->reset() performs this duty
- ultimately. Still todo: fix
- */
- }
- if ((local_error= m_table->file->ha_end_bulk_insert()))
- {
- m_table->file->print_error(local_error, MYF(0));
- }
- return error? error : local_error;
-}
+ Now we are in a statement and will stay in a statement until we
+ see a STMT_END_F.
-#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
+ We set this flag here, before actually applying any rows, in
+ case the SQL thread is stopped and we need to detect that we're
+ inside a statement and halting abruptly might cause problems
+ when restarting.
+ */
+ const_cast<Relay_log_info*>(rli)->set_flag(Relay_log_info::IN_STMT);
-/*
- Check if there are more UNIQUE keys after the given key.
-*/
-static int
-last_uniq_key(TABLE *table, uint keyno)
-{
- while (++keyno < table->s->keys)
- if (table->key_info[keyno].flags & HA_NOSAME)
- return 0;
- return 1;
-}
+ if ( m_width == table->s->fields && bitmap_is_set_all(&m_cols))
+ set_flags(COMPLETE_ROWS_F);
-/**
- Check if an error is a duplicate key error.
+ /*
+ Set tables write and read sets.
+
+ Read_set contains all slave columns (in case we are going to fetch
+ a complete record from slave)
+
+ Write_set equals the m_cols bitmap sent from master but it can be
+ longer if slave has extra columns.
+ */
- This function is used to check if an error code is one of the
- duplicate key error, i.e., and error code for which it is sensible
- to do a <code>get_dup_key()</code> to retrieve the duplicate key.
+ DBUG_PRINT_BITSET("debug", "Setting table's write_set from: %s", &m_cols);
+
+ bitmap_set_all(table->read_set);
+ if (get_type_code() == DELETE_ROWS_EVENT)
+ bitmap_intersect(table->read_set,&m_cols);
- @param errcode The error code to check.
+ bitmap_set_all(table->write_set);
+ if (!get_flags(COMPLETE_ROWS_F))
+ {
+ if (get_type_code() == UPDATE_ROWS_EVENT)
+ bitmap_intersect(table->write_set,&m_cols_ai);
+ else /* WRITE ROWS EVENTS store the bitmap in m_cols instead of m_cols_ai */
+ bitmap_intersect(table->write_set,&m_cols);
+ }
- @return <code>true</code> if the error code is such that
- <code>get_dup_key()</code> will return true, <code>false</code>
- otherwise.
- */
-bool
-is_duplicate_key_error(int errcode)
-{
- switch (errcode)
- {
- case HA_ERR_FOUND_DUPP_KEY:
- case HA_ERR_FOUND_DUPP_UNIQUE:
- return true;
- }
- return false;
-}
+ this->slave_exec_mode= slave_exec_mode_options; // fix the mode
+ // Do event specific preparations
-/**
- Write the current row into event's table.
+ error= do_before_row_operations(rli);
- The row is located in the row buffer, pointed by @c m_curr_row member.
- Number of columns of the row is stored in @c m_width member (it can be
- different from the number of columns in the table to which we insert).
- Bitmap @c m_cols indicates which columns are present in the row. It is assumed
- that event's table is already open and pointed by @c m_table.
+ /**
+ Check if update contains only values in AI for columns that do
+ not exist on the slave. If it does, we can just unpack the rows
+ and return (do nothing on the local table).
+
+ NOTE: We do the following optimization and check only if there
+ are usable values on the AI and disregard the fact that there
+ might be usable values in the BI. In practice this means that
+ the slave will not go through find_row (since we have nothing
+ on the record to update, why go looking for it?).
+
+ If we wanted find_row to run anyway, we could move this
+ check after find_row, but then we would have to face the fact
+ that the slave might stop without finding the proper record
+ (because it might have incomplete BI), even though there were
+ no values in AI.
+
+ On the other hand, if AI has usable values but BI has not,
+ then find_row will return an error (and the error is then
+ propagated as it was already).
+ */
+ if (get_type_code() != UPDATE_ROWS_EVENT ||
+ is_any_column_signaled_for_table(table, &m_cols_ai))
+ {
+ uint row_lookup_method= decide_row_lookup_method(table, &m_cols, get_type_code());
- If the same record already exists in the table it can be either overwritten
- or an error is reported depending on the value of @c overwrite flag
- (error reporting not yet implemented). Note that the matching record can be
- different from the row we insert if we use primary keys to identify records in
- the table.
+ // row processing loop
+ while (!error && (m_curr_row != m_rows_end))
+ {
+ switch (row_lookup_method)
+ {
+ case ROW_LOOKUP_HASH_SCAN:
+ /**
+ scan the table and for each entry in the table, if
+ it exists in the hash, execute the row.
+ */
+ error= do_hash_scan_and_update(rli);
+ break;
- The row to be inserted can contain values only for selected columns. The
- missing columns are filled with default values using @c prepare_record()
- function. If a matching record is found in the table and @c overwritte is
- true, the missing columns are taken from it.
+ case ROW_LOOKUP_INDEX_SCAN:
+ error= do_index_scan_and_update(rli);
+ break;
- @param rli Relay log info (needed for row unpacking).
- @param overwrite
- Shall we overwrite if the row already exists or signal
- error (currently ignored).
+ case ROW_LOOKUP_TABLE_SCAN:
+ error= do_table_scan_and_update(rli);
+ break;
- @returns Error code on failure, 0 on success.
+ case ROW_LOOKUP_NOT_NEEDED:
+ DBUG_ASSERT(get_type_code() == WRITE_ROWS_EVENT);
- This method, if successful, sets @c m_curr_row_end pointer to point at the
- next row in the rows buffer. This is done when unpacking the row to be
- inserted.
+ /* No need to scan for rows, just apply it */
+ error= do_apply_row(rli);
+ break;
+ }
+ }
+ }
- @note If a matching record is found, it is either updated using
- @c ha_update_row() or first deleted and then new record written.
-*/
+ {/**
+ The following failure injecion works in cooperation with tests
+ setting @@global.debug= 'd,stop_slave_middle_group'.
+ The sql thread receives the killed status and will proceed
+ to shutdown trying to finish incomplete events group.
+ */
+ DBUG_EXECUTE_IF("stop_slave_middle_group",
+ if (thd->transaction.all.modified_non_trans_table)
+ const_cast<Relay_log_info*>(rli)->abort_slave= 1;);
+ }
-int
-Rows_log_event::write_row(const Relay_log_info *const rli,
- const bool overwrite)
-{
- DBUG_ENTER("write_row");
- DBUG_ASSERT(m_table != NULL && thd != NULL);
+ if ((error= do_after_row_operations(rli, error)) &&
+ ignored_error_code(convert_handler_error(error, thd, table)))
+ {
- TABLE *table= m_table; // pointer to event's table
- int error;
- int UNINIT_VAR(keynum);
- auto_afree_ptr<char> key(NULL);
+ if (global_system_variables.log_warnings)
+ slave_rows_error_report(WARNING_LEVEL, error, rli, thd, table,
+ get_type_str(),
+ const_cast<Relay_log_info*>(rli)->get_rpl_log_name(),
+ (ulong) log_pos);
+ clear_all_errors(thd, const_cast<Relay_log_info*>(rli));
+ error= 0;
+ }
+ } // if (table)
- /* fill table->record[0] with default values */
- bool abort_on_warnings= (rli->info_thd->variables.sql_mode &
- (MODE_STRICT_TRANS_TABLES | MODE_STRICT_ALL_TABLES));
- if ((error= prepare_record(table, &m_cols,
- table->file->ht->db_type != DB_TYPE_NDBCLUSTER,
- abort_on_warnings, m_curr_row == m_rows_buf)))
- DBUG_RETURN(error);
- /* unpack row into table->record[0] */
- if ((error= unpack_current_row(rli, &m_cols, abort_on_warnings)))
- DBUG_RETURN(error);
-
- // Temporary fix to find out why it fails [/Matz]
- memcpy(m_table->write_set->bitmap, m_cols.bitmap, (m_table->write_set->n_bits + 7) / 8);
-
- if (m_curr_row == m_rows_buf)
+ if (error)
{
- /* this is the first row to be inserted, we estimate the rows with
- the size of the first row and use that value to initialize
- storage engine for bulk insertion */
- DBUG_ASSERT(!(m_curr_row > m_curr_row_end));
- ulong estimated_rows= 0;
- if (m_curr_row < m_curr_row_end)
- estimated_rows= (m_rows_end - m_curr_row) / (m_curr_row_end - m_curr_row);
- else if (m_curr_row == m_curr_row_end)
- estimated_rows= 1;
+ slave_rows_error_report(ERROR_LEVEL, error, rli, thd, table,
+ get_type_str(),
+ const_cast<Relay_log_info*>(rli)->get_rpl_log_name(),
+ (ulong) log_pos);
+ /*
+ @todo We should probably not call
+ reset_current_stmt_binlog_format_row() from here.
- m_table->file->ha_start_bulk_insert(estimated_rows);
+ Note: this applies to log_event_old.cc too.
+ /Sven
+ */
+ thd->reset_current_stmt_binlog_format_row();
+ const_cast<Relay_log_info*>(rli)->cleanup_context(thd, error);
+ thd->is_slave_error= 1;
+ DBUG_RETURN(error);
}
-
-
-#ifndef DBUG_OFF
- DBUG_DUMP("record[0]", table->record[0], table->s->reclength);
- DBUG_PRINT_BITSET("debug", "write_set = %s", table->write_set);
- DBUG_PRINT_BITSET("debug", "read_set = %s", table->read_set);
-#endif
- /*
- Try to write record. If a corresponding record already exists in the table,
- we try to change it using ha_update_row() if possible. Otherwise we delete
- it and repeat the whole process again.
+ if (get_flags(STMT_END_F))
+ if ((error= rows_event_stmt_cleanup(rli, thd)))
+ rli->report(ERROR_LEVEL, error,
+ "Error in %s event: commit of row events failed, "
+ "table `%s`.`%s`",
+ get_type_str(), m_table->s->db.str,
+ m_table->s->table_name.str);
- TODO: Add safety measures against infinite looping.
- */
+ DBUG_RETURN(error);
+}
- m_table->mark_columns_per_binlog_row_image();
+Log_event::enum_skip_reason
+Rows_log_event::do_shall_skip(Relay_log_info *rli)
+{
+ /*
+ If the slave skip counter is 1 and this event does not end a
+ statement, then we should not start executing on the next event.
+ Otherwise, we defer the decision to the normal skipping logic.
+ */
+ if (rli->slave_skip_counter == 1 && !get_flags(STMT_END_F))
+ return Log_event::EVENT_SKIP_IGNORE;
+ else
+ return Log_event::do_shall_skip(rli);
+}
- while ((error= table->file->ha_write_row(table->record[0])))
+/**
+ The function is called at Rows_log_event statement commit time,
+ normally from Rows_log_event::do_update_pos() and possibly from
+ Query_log_event::do_apply_event() of the COMMIT.
+ The function commits the last statement for engines, binlog and
+ releases resources have been allocated for the statement.
+
+ @retval 0 Ok.
+ @retval non-zero Error at the commit.
+ */
+
+static int rows_event_stmt_cleanup(Relay_log_info const *rli, THD * thd)
+{
+ int error;
{
- if (error == HA_ERR_LOCK_DEADLOCK ||
- error == HA_ERR_LOCK_WAIT_TIMEOUT ||
- (keynum= table->file->get_dup_key(error)) < 0 ||
- !overwrite)
- {
- DBUG_PRINT("info",("get_dup_key returns %d)", keynum));
- /*
- Deadlock, waiting for lock or just an error from the handler
- such as HA_ERR_FOUND_DUPP_KEY when overwrite is false.
- Retrieval of the duplicate key number may fail
- - either because the error was not "duplicate key" error
- - or because the information which key is not available
- */
- table->file->print_error(error, MYF(0));
- goto error;
- }
/*
- We need to retrieve the old row into record[1] to be able to
- either update or delete the offending record. We either:
-
- - use ha_rnd_pos() with a row-id (available as dupp_row) to the
- offending row, if that is possible (MyISAM and Blackhole), or else
-
- - use ha_index_read_idx_map() with the key that is duplicated, to
- retrieve the offending row.
- */
- if (table->file->ha_table_flags() & HA_DUPLICATE_POS)
- {
- DBUG_PRINT("info",("Locating offending record using ha_rnd_pos()"));
-
- if (table->file->inited && (error= table->file->ha_index_end()))
- DBUG_RETURN(error);
- if ((error= table->file->ha_rnd_init(FALSE)))
- DBUG_RETURN(error);
-
- error= table->file->ha_rnd_pos(table->record[1], table->file->dup_ref);
-
- table->file->ha_rnd_end();
- if (error)
- {
- DBUG_PRINT("info",("ha_rnd_pos() returns error %d",error));
- if (error == HA_ERR_RECORD_DELETED)
- error= HA_ERR_KEY_NOT_FOUND;
- table->file->print_error(error, MYF(0));
- goto error;
- }
- }
- else
- {
- DBUG_PRINT("info",("Locating offending record using index_read_idx()"));
-
- if (table->file->extra(HA_EXTRA_FLUSH_CACHE))
- {
- DBUG_PRINT("info",("Error when setting HA_EXTRA_FLUSH_CACHE"));
- error= my_errno;
- goto error;
- }
+ This is the end of a statement or transaction, so close (and
+ unlock) the tables we opened when processing the
+ Table_map_log_event starting the statement.
- if (key.get() == NULL)
- {
- key.assign(static_cast<char*>(my_alloca(table->s->max_unique_length)));
- if (key.get() == NULL)
- {
- DBUG_PRINT("info",("Can't allocate key buffer"));
- error= ENOMEM;
- goto error;
- }
- }
+ OBSERVER. This will clear *all* mappings, not only those that
+ are open for the table. There is not good handle for on-close
+ actions for tables.
- key_copy((uchar*)key.get(), table->record[0], table->key_info + keynum,
- 0);
- error= table->file->ha_index_read_idx_map(table->record[1], keynum,
- (const uchar*)key.get(),
- HA_WHOLE_KEY,
- HA_READ_KEY_EXACT);
- if (error)
- {
- DBUG_PRINT("info",("ha_index_read_idx_map() returns %s", HA_ERR(error)));
- if (error == HA_ERR_RECORD_DELETED)
- error= HA_ERR_KEY_NOT_FOUND;
- table->file->print_error(error, MYF(0));
- goto error;
- }
- }
+ NOTE. Even if we have no table ('table' == 0) we still need to be
+ here, so that we increase the group relay log position. If we didn't, we
+ could have a group relay log position which lags behind "forever"
+ (assume the last master's transaction is ignored by the slave because of
+ replicate-ignore rules).
+ */
+ error= thd->binlog_flush_pending_rows_event(TRUE);
/*
- Now, record[1] should contain the offending row. That
- will enable us to update it or, alternatively, delete it (so
- that we can insert the new row afterwards).
- */
+ If this event is not in a transaction, the call below will, if some
+ transactional storage engines are involved, commit the statement into
+ them and flush the pending event to binlog.
+ If this event is in a transaction, the call will do nothing, but a
+ Xid_log_event will come next which will, if some transactional engines
+ are involved, commit the transaction and flush the pending event to the
+ binlog.
+ */
+ error|= (error ? trans_rollback_stmt(thd) : trans_commit_stmt(thd));
/*
- If row is incomplete we will use the record found to fill
- missing columns.
+ Now what if this is not a transactional engine? we still need to
+ flush the pending event to the binlog; we did it with
+ thd->binlog_flush_pending_rows_event(). Note that we imitate
+ what is done for real queries: a call to
+ ha_autocommit_or_rollback() (sometimes only if involves a
+ transactional engine), and a call to be sure to have the pending
+ event flushed.
*/
- if (!get_flags(COMPLETE_ROWS_F))
- {
- restore_record(table,record[1]);
- error= unpack_current_row(rli, &m_cols);
- }
-
-#ifndef DBUG_OFF
- DBUG_PRINT("debug",("preparing for update: before and after image"));
- DBUG_DUMP("record[1] (before)", table->record[1], table->s->reclength);
- DBUG_DUMP("record[0] (after)", table->record[0], table->s->reclength);
-#endif
/*
- REPLACE is defined as either INSERT or DELETE + INSERT. If
- possible, we can replace it with an UPDATE, but that will not
- work on InnoDB if FOREIGN KEY checks are necessary.
+ @todo We should probably not call
+ reset_current_stmt_binlog_format_row() from here.
- I (Matz) am not sure of the reason for the last_uniq_key()
- check as, but I'm guessing that it's something along the
- following lines.
+ Note: this applies to log_event_old.cc too
- Suppose that we got the duplicate key to be a key that is not
- the last unique key for the table and we perform an update:
- then there might be another key for which the unique check will
- fail, so we're better off just deleting the row and inserting
- the correct row.
- */
- if (last_uniq_key(table, keynum) &&
- !table->file->referenced_by_foreign_key())
- {
- DBUG_PRINT("info",("Updating row using ha_update_row()"));
- error=table->file->ha_update_row(table->record[1],
- table->record[0]);
- switch (error) {
-
- case HA_ERR_RECORD_IS_THE_SAME:
- DBUG_PRINT("info",("ignoring HA_ERR_RECORD_IS_THE_SAME error from"
- " ha_update_row()"));
- error= 0;
-
- case 0:
- break;
-
- default:
- DBUG_PRINT("info",("ha_update_row() returns error %d",error));
- table->file->print_error(error, MYF(0));
- }
-
- goto error;
- }
- else
- {
- DBUG_PRINT("info",("Deleting offending row and trying to write new one again"));
- if ((error= table->file->ha_delete_row(table->record[1])))
- {
- DBUG_PRINT("info",("ha_delete_row() returns error %d",error));
- table->file->print_error(error, MYF(0));
- goto error;
- }
- /* Will retry ha_write_row() with the offending row removed. */
- }
- }
+ Btw, the previous comment about transactional engines does not
+ seem related to anything that happens here.
+ /Sven
+ */
+ thd->reset_current_stmt_binlog_format_row();
-error:
- m_table->default_column_bitmaps();
- DBUG_RETURN(error);
+ const_cast<Relay_log_info*>(rli)->cleanup_context(thd, 0);
+ }
+ return error;
}
-#endif
+/**
+ The method either increments the relay log position or
+ commits the current statement and increments the master group
+ possition if the event is STMT_END_F flagged and
+ the statement corresponds to the autocommit query (i.e replicated
+ without wrapping in BEGIN/COMMIT)
+ @retval 0 Success
+ @retval non-zero Error in the statement commit
+ */
int
-Write_rows_log_event::do_exec_row(const Relay_log_info *const rli)
+Rows_log_event::do_update_pos(Relay_log_info *rli)
{
- DBUG_ASSERT(m_table != NULL);
- int error= write_row(rli, slave_exec_mode == SLAVE_EXEC_MODE_IDEMPOTENT);
+ DBUG_ENTER("Rows_log_event::do_update_pos");
+ int error= 0;
- if (error && !thd->is_error())
- {
- DBUG_ASSERT(0);
- my_error(ER_UNKNOWN_ERROR, MYF(0));
+ DBUG_PRINT("info", ("flags: %s",
+ get_flags(STMT_END_F) ? "STMT_END_F " : ""));
+
+ if (get_flags(STMT_END_F))
+ {
+ /*
+ Indicate that a statement is finished.
+ Step the group log position if we are not in a transaction,
+ otherwise increase the event log position.
+ */
+ rli->stmt_done(log_pos);
+ /*
+ Clear any errors in thd->net.last_err*. It is not known if this is
+ needed or not. It is believed that any errors that may exist in
+ thd->net.last_err* are allowed. Examples of errors are "key not
+ found", which is produced in the test case rpl_row_conflicts.test
+ */
+ thd->clear_error();
+ }
+ else
+ {
+ rli->inc_event_relay_log_pos();
}
- return error;
+ DBUG_RETURN(error);
}
#endif /* !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION) */
-#ifdef MYSQL_CLIENT
-void Write_rows_log_event::print(FILE *file, PRINT_EVENT_INFO* print_event_info)
+#ifndef MYSQL_CLIENT
+bool Rows_log_event::write_data_header(IO_CACHE *file)
{
- Rows_log_event::print_helper(file, print_event_info, "Write_rows");
+ uchar buf[ROWS_HEADER_LEN]; // No need to init the buffer
+ DBUG_ASSERT(m_table_id != ~0UL);
+ DBUG_EXECUTE_IF("old_row_based_repl_4_byte_map_id_master",
+ {
+ int4store(buf + 0, m_table_id);
+ int2store(buf + 4, m_flags);
+ return (my_b_safe_write(file, buf, 6));
+ });
+ int6store(buf + RW_MAPID_OFFSET, (ulonglong)m_table_id);
+ int2store(buf + RW_FLAGS_OFFSET, m_flags);
+ return (my_b_safe_write(file, buf, ROWS_HEADER_LEN));
}
-#endif
-
-/**************************************************************************
- Delete_rows_log_event member functions
-**************************************************************************/
-
-#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
-/*
- Compares table->record[0] and table->record[1]
- Returns TRUE if different.
-*/
-static bool record_compare(TABLE *table, MY_BITMAP *cols)
+bool Rows_log_event::write_data_body(IO_CACHE*file)
{
/*
- Need to set the X bit and the filler bits in both records since
- there are engines that do not set it correctly.
+ Note that this should be the number of *bits*, not the number of
+ bytes.
+ */
+ uchar sbuf[sizeof(m_width) + 1];
+ my_ptrdiff_t const data_size= m_rows_cur - m_rows_buf;
+ bool res= false;
+ uchar *const sbuf_end= net_store_length(sbuf, (size_t) m_width);
+ DBUG_ASSERT(static_cast<size_t>(sbuf_end - sbuf) <= sizeof(sbuf));
- In addition, since MyISAM checks that one hasn't tampered with the
- record, it is necessary to restore the old bytes into the record
- after doing the comparison.
+ DBUG_DUMP("m_width", sbuf, (size_t) (sbuf_end - sbuf));
+ res= res || my_b_safe_write(file, sbuf, (size_t) (sbuf_end - sbuf));
- TODO[record format ndb]: Remove it once NDB returns correct
- records. Check that the other engines also return correct records.
+ DBUG_DUMP("m_cols", (uchar*) m_cols.bitmap, no_bytes_in_map(&m_cols));
+ res= res || my_b_safe_write(file, (uchar*) m_cols.bitmap,
+ no_bytes_in_map(&m_cols));
+ /*
+ TODO[refactor write]: Remove the "down cast" here (and elsewhere).
*/
-
- DBUG_DUMP("record[0]", table->record[0], table->s->reclength);
- DBUG_DUMP("record[1]", table->record[1], table->s->reclength);
-
- bool result= FALSE;
- uchar saved_x[2]= {0, 0}, saved_filler[2]= {0, 0};
-
- if (table->s->null_bytes > 0)
+ if (get_type_code() == UPDATE_ROWS_EVENT)
{
- for (int i = 0 ; i < 2 ; ++i)
- {
- /*
- If we have an X bit then we need to take care of it.
- */
- if (!(table->s->db_options_in_use & HA_OPTION_PACK_RECORD))
- {
- saved_x[i]= table->record[i][0];
- table->record[i][0]|= 1U;
- }
-
- /*
- If (last_null_bit_pos == 0 && null_bytes > 1), then:
-
- X bit (if any) + N nullable fields + M Field_bit fields = 8 bits
-
- Ie, the entire byte is used.
- */
- if (table->s->last_null_bit_pos > 0)
- {
- saved_filler[i]= table->record[i][table->s->null_bytes - 1];
- table->record[i][table->s->null_bytes - 1]|=
- 256U - (1U << table->s->last_null_bit_pos);
- }
- }
+ DBUG_DUMP("m_cols_ai", (uchar*) m_cols_ai.bitmap,
+ no_bytes_in_map(&m_cols_ai));
+ res= res || my_b_safe_write(file, (uchar*) m_cols_ai.bitmap,
+ no_bytes_in_map(&m_cols_ai));
}
+ DBUG_DUMP("rows", m_rows_buf, data_size);
+ res= res || my_b_safe_write(file, m_rows_buf, (size_t) data_size);
- if (table->s->blob_fields + table->s->varchar_fields == 0 &&
- bitmap_is_set_all(cols))
- {
- result= cmp_record(table,record[1]);
- goto record_compare_exit;
- }
+ return res;
- /* Compare null bits */
- if (bitmap_is_set_all(cols) &&
- memcmp(table->null_flags,
- table->null_flags+table->s->rec_buff_length,
- table->s->null_bytes))
- {
- result= TRUE; // Diff in NULL value
- goto record_compare_exit;
- }
+}
+#endif
- /* Compare updated fields */
- for (Field **ptr=table->field ;
- *ptr && ((*ptr)->field_index < cols->n_bits);
- ptr++)
+#if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT)
+void Rows_log_event::pack_info(Protocol *protocol)
+{
+ char buf[256];
+ char const *const flagstr=
+ get_flags(STMT_END_F) ? " flags: STMT_END_F" : "";
+ size_t bytes= my_snprintf(buf, sizeof(buf),
+ "table_id: %lu%s", m_table_id, flagstr);
+ protocol->store(buf, bytes, &my_charset_bin);
+}
+#endif
+
+#ifdef MYSQL_CLIENT
+void Rows_log_event::print_helper(FILE *file,
+ PRINT_EVENT_INFO *print_event_info,
+ char const *const name)
+{
+ IO_CACHE *const head= &print_event_info->head_cache;
+ IO_CACHE *const body= &print_event_info->body_cache;
+ if (!print_event_info->short_form)
{
- if (bitmap_is_set(cols, (*ptr)->field_index))
- {
- if ((*ptr)->cmp_binary_offset(table->s->rec_buff_length))
- {
- result= TRUE;
- goto record_compare_exit;
- }
- }
+ bool const last_stmt_event= get_flags(STMT_END_F);
+ print_header(head, print_event_info, !last_stmt_event);
+ my_b_printf(head, "\t%s: table id %lu%s\n",
+ name, m_table_id,
+ last_stmt_event ? " flags: STMT_END_F" : "");
+ print_base64(body, print_event_info, !last_stmt_event);
}
-record_compare_exit:
- /*
- Restore the saved bytes.
-
- TODO[record format ndb]: Remove this code once NDB returns the
- correct record format.
- */
- if (table->s->null_bytes > 0)
+ if (get_flags(STMT_END_F))
{
- for (int i = 0 ; i < 2 ; ++i)
- {
- if (!(table->s->db_options_in_use & HA_OPTION_PACK_RECORD))
- table->record[i][0]= saved_x[i];
-
- if (table->s->last_null_bit_pos)
- table->record[i][table->s->null_bytes - 1]= saved_filler[i];
- }
+ copy_event_cache_to_file_and_reinit(head, file);
+ copy_event_cache_to_file_and_reinit(body, file);
}
-
- return result;
}
+#endif
+/**************************************************************************
+ Table_map_log_event member functions and support functions
+**************************************************************************/
/**
- Checks if any of the columns in the given table is
- signaled in the bitmap.
-
- For each column in the given table checks if it is
- signaled in the bitmap. This is most useful when deciding
- whether a before image (BI) can be used or not for
- searching a row. If no column is signaled, then the
- image cannot be used for searching a record (regardless
- of using position(), index scan or table scan). Here is
- an example:
-
- MASTER> SET @@binlog_row_image='MINIMAL';
- MASTER> CREATE TABLE t1 (a int, b int, c int, primary key(c));
- SLAVE> CREATE TABLE t1 (a int, b int);
- MASTER> INSERT INTO t1 VALUES (1,2,3);
- MASTER> UPDATE t1 SET a=2 WHERE b=2;
-
- For the update statement only the PK (column c) is
- logged in the before image (BI). As such, given that
- the slave has no column c, it will not be able to
- find the row, because BI has no values for the columns
- the slave knows about (column a and b).
+ @page How replication of field metadata works.
+
+ When a table map is created, the master first calls
+ Table_map_log_event::save_field_metadata() which calculates how many
+ values will be in the field metadata. Only those fields that require the
+ extra data are added. The method also loops through all of the fields in
+ the table calling the method Field::save_field_metadata() which returns the
+ values for the field that will be saved in the metadata and replicated to
+ the slave. Once all fields have been processed, the table map is written to
+ the binlog adding the size of the field metadata and the field metadata to
+ the end of the body of the table map.
- @param table the table reference on the slave.
- @param cols the bitmap signaling columns available in
- the BI.
+ When a table map is read on the slave, the field metadata is read from the
+ table map and passed to the table_def class constructor which saves the
+ field metadata from the table map into an array based on the type of the
+ field. Field metadata values not present (those fields that do not use extra
+ data) in the table map are initialized as zero (0). The array size is the
+ same as the columns for the table on the slave.
- @return TRUE if BI contains usable colums for searching,
- FALSE otherwise.
+ Additionally, values saved for field metadata on the master are saved as a
+ string of bytes (uchar) in the binlog. A field may require 1 or more bytes
+ to store the information. In cases where values require multiple bytes
+ (e.g. values > 255), the endian-safe methods are used to properly encode
+ the values on the master and decode them on the slave. When the field
+ metadata values are captured on the slave, they are stored in an array of
+ type uint16. This allows the least number of casts to prevent casting bugs
+ when the field metadata is used in comparisons of field attributes. When
+ the field metadata is used for calculating addresses in pointer math, the
+ type used is uint32.
*/
-static
-my_bool is_any_column_signaled_for_table(TABLE *table, MY_BITMAP *cols)
-{
-
- int nfields_set= 0;
- for (Field **ptr=table->field ;
- *ptr && ((*ptr)->field_index < cols->n_bits);
- ptr++)
- {
- if (bitmap_is_set(cols, (*ptr)->field_index))
- nfields_set++;
- }
-
- return (nfields_set != 0);
-}
+#if !defined(MYSQL_CLIENT)
/**
- Checks if the fields in the given key are signaled in
- the bitmap.
-
- Validates whether the before image is usable for the
- given key. It can be the case that the before image
- does not contain values for the key (eg, master was
- using 'minimal' option for image logging and slave has
- different index structure on the table). Here is an
- example:
-
- MASTER> SET @@binlog_row_image='MINIMAL';
- MASTER> CREATE TABLE t1 (a int, b int, c int, primary key(c));
- SLAVE> CREATE TABLE t1 (a int, b int, c int, key(a,c));
- MASTER> INSERT INTO t1 VALUES (1,2,3);
- MASTER> UPDATE t1 SET a=2 WHERE b=2;
-
- When finding the row on the slave, one cannot use the
- index (a,c) to search for the row, because there is only
- data in the before image for column c. This function
- checks the fields needed for a given key and searches
- the bitmap to see if all the fields required are
- signaled.
+ Save the field metadata based on the real_type of the field.
+ The metadata saved depends on the type of the field. Some fields
+ store a single byte for pack_length() while others store two bytes
+ for field_length (max length).
- @param keyinfo reference to key.
- @param cols the bitmap signaling which columns
- have available data.
+ @retval 0 Ok.
- @return TRUE if all fields are signaled in the bitmap
- for the given key, FALSE otherwise.
+ @todo
+ We may want to consider changing the encoding of the information.
+ Currently, the code attempts to minimize the number of bytes written to
+ the tablemap. There are at least two other alternatives; 1) using
+ net_store_length() to store the data allowing it to choose the number of
+ bytes that are appropriate thereby making the code much easier to
+ maintain (only 1 place to change the encoding), or 2) use a fixed number
+ of bytes for each field. The problem with option 1 is that net_store_length()
+ will use one byte if the value < 251, but 3 bytes if it is > 250. Thus,
+ for fields like CHAR which can be no larger than 255 characters, the method
+ will use 3 bytes when the value is > 250. Further, every value that is
+ encoded using 2 parts (e.g., pack_length, field_length) will be numerically
+ > 250 therefore will use 3 bytes for eah value. The problem with option 2
+ is less wasteful for space but does waste 1 byte for every field that does
+ not encode 2 parts.
*/
-static
-my_bool are_all_columns_signaled_for_key(KEY *keyinfo, MY_BITMAP *cols)
+int Table_map_log_event::save_field_metadata()
{
- for (uint i=0 ; i < keyinfo->key_parts ;i++)
+ DBUG_ENTER("Table_map_log_event::save_field_metadata");
+ int index= 0;
+ for (unsigned int i= 0 ; i < m_table->s->fields ; i++)
{
- uint fieldnr= keyinfo->key_part[i].fieldnr - 1;
- if (fieldnr >= cols->n_bits ||
- !bitmap_is_set(cols, fieldnr))
- return FALSE;
+ DBUG_PRINT("debug", ("field_type: %d", m_coltype[i]));
+ index+= m_table->s->field[i]->save_field_metadata(&m_field_metadata[index]);
}
-
- return TRUE;
+ DBUG_RETURN(index);
}
+#endif /* !defined(MYSQL_CLIENT) */
-/**
- Searches the table for a given key that can be used
- according to the existing values, ie, columns set
- in the bitmap.
+/*
+ Constructor used to build an event for writing to the binary log.
+ Mats says tbl->s lives longer than this event so it's ok to copy pointers
+ (tbl->s->db etc) and not pointer content.
+ */
+#if !defined(MYSQL_CLIENT)
+Table_map_log_event::Table_map_log_event(THD *thd, TABLE *tbl, ulong tid,
+ bool is_transactional)
+ : Log_event(thd, 0, is_transactional),
+ m_table(tbl),
+ m_dbnam(tbl->s->db.str),
+ m_dblen(m_dbnam ? tbl->s->db.length : 0),
+ m_tblnam(tbl->s->table_name.str),
+ m_tbllen(tbl->s->table_name.length),
+ m_colcnt(tbl->s->fields),
+ m_memory(NULL),
+ m_table_id(tid),
+ m_flags(TM_BIT_LEN_EXACT_F),
+ m_data_size(0),
+ m_field_metadata(0),
+ m_field_metadata_size(0),
+ m_null_bits(0),
+ m_meta_memory(NULL)
+{
+ uchar cbuf[sizeof(m_colcnt) + 1];
+ uchar *cbuf_end;
+ DBUG_ASSERT(m_table_id != ~0UL);
+ /*
+ In TABLE_SHARE, "db" and "table_name" are 0-terminated (see this comment in
+ table.cc / alloc_table_share():
+ Use the fact the key is db/0/table_name/0
+ As we rely on this let's assert it.
+ */
+ DBUG_ASSERT((tbl->s->db.str == 0) ||
+ (tbl->s->db.str[tbl->s->db.length] == 0));
+ DBUG_ASSERT(tbl->s->table_name.str[tbl->s->table_name.length] == 0);
- The caller can specify which type of key to find by
- setting the following flags in the key_type parameter:
- - PRI_KEY_FLAG
- Returns the primary key.
+ m_data_size= TABLE_MAP_HEADER_LEN;
+ DBUG_EXECUTE_IF("old_row_based_repl_4_byte_map_id_master", m_data_size= 6;);
+ m_data_size+= m_dblen + 2; // Include length and terminating \0
+ m_data_size+= m_tbllen + 2; // Include length and terminating \0
+ cbuf_end= net_store_length(cbuf, (size_t) m_colcnt);
+ DBUG_ASSERT(static_cast<size_t>(cbuf_end - cbuf) <= sizeof(cbuf));
+ m_data_size+= (cbuf_end - cbuf) + m_colcnt; // COLCNT and column types
- - UNIQUE_KEY_FLAG
- Returns a unique key (flagged with HA_NOSAME)
+ /* If malloc fails, caught in is_valid() */
+ if ((m_memory= (uchar*) my_malloc(m_colcnt, MYF(MY_WME))))
+ {
+ m_coltype= reinterpret_cast<uchar*>(m_memory);
+ for (unsigned int i= 0 ; i < m_table->s->fields ; ++i)
+ m_coltype[i]= m_table->field[i]->type();
+ }
- - MULTIPLE_KEY_FLAG
- Returns a key that is not unique (flagged with HA_NOSAME
- and without HA_NULL_PART_KEY) nor PK.
+ /*
+ Calculate a bitmap for the results of maybe_null() for all columns.
+ The bitmap is used to determine when there is a column from the master
+ that is not on the slave and is null and thus not in the row data during
+ replication.
+ */
+ uint num_null_bytes= (m_table->s->fields + 7) / 8;
+ m_data_size+= num_null_bytes;
+ m_meta_memory= (uchar *)my_multi_malloc(MYF(MY_WME),
+ &m_null_bits, num_null_bytes,
+ &m_field_metadata, (m_colcnt * 2),
+ NULL);
- The above flags can be used together, in which case, the
- search is conducted in the above listed order. Eg, the
- following flag:
+ bzero(m_field_metadata, (m_colcnt * 2));
- (PRI_KEY_FLAG | UNIQUE_KEY_FLAG | MULTIPLE_KEY_FLAG)
+ /*
+ Create an array for the field metadata and store it.
+ */
+ m_field_metadata_size= save_field_metadata();
+ DBUG_ASSERT(m_field_metadata_size <= (m_colcnt * 2));
- means that a primary key is returned if it is suitable. If
- not then the unique keys are searched. If no unique key is
- suitable, then the keys are searched. Finally, if no key
- is suitable, MAX_KEY is returned.
+ /*
+ Now set the size of the data to the size of the field metadata array
+ plus one or three bytes (see pack.c:net_store_length) for number of
+ elements in the field metadata array.
+ */
+ if (m_field_metadata_size < 251)
+ m_data_size+= m_field_metadata_size + 1;
+ else
+ m_data_size+= m_field_metadata_size + 3;
- @param table reference to the table.
- @param bi_cols a bitmap that filters out columns that should
- not be considered while searching the key.
- Columns that should be considered are set.
- @param key_type the type of key to search for.
+ bzero(m_null_bits, num_null_bytes);
+ for (unsigned int i= 0 ; i < m_table->s->fields ; ++i)
+ if (m_table->field[i]->maybe_null())
+ m_null_bits[(i / 8)]+= 1 << (i % 8);
- @return MAX_KEY if no key, according to the key_type specified
- is suitable. Returns the key otherwise.
+}
+#endif /* !defined(MYSQL_CLIENT) */
-*/
-static
-uint
-search_key_in_table(TABLE *table, MY_BITMAP *bi_cols, uint key_type)
+/*
+ Constructor used by slave to read the event from the binary log.
+ */
+#if defined(HAVE_REPLICATION)
+Table_map_log_event::Table_map_log_event(const char *buf, uint event_len,
+ const Format_description_log_event
+ *description_event)
+
+ : Log_event(buf, description_event),
+#ifndef MYSQL_CLIENT
+ m_table(NULL),
+#endif
+ m_dbnam(NULL), m_dblen(0), m_tblnam(NULL), m_tbllen(0),
+ m_colcnt(0), m_coltype(0),
+ m_memory(NULL), m_table_id(ULONG_MAX), m_flags(0),
+ m_data_size(0), m_field_metadata(0), m_field_metadata_size(0),
+ m_null_bits(0), m_meta_memory(NULL)
{
- KEY *keyinfo;
- uint res= MAX_KEY;
- uint key;
+ unsigned int bytes_read= 0;
+ DBUG_ENTER("Table_map_log_event::Table_map_log_event(const char*,uint,...)");
- if (key_type & PRI_KEY_FLAG && (table->s->primary_key < MAX_KEY))
+ uint8 common_header_len= description_event->common_header_len;
+ uint8 post_header_len= description_event->post_header_len[TABLE_MAP_EVENT-1];
+ DBUG_PRINT("info",("event_len: %u common_header_len: %d post_header_len: %d",
+ event_len, common_header_len, post_header_len));
+
+ /*
+ Don't print debug messages when running valgrind since they can
+ trigger false warnings.
+ */
+#ifndef HAVE_purify
+ DBUG_DUMP("event buffer", (uchar*) buf, event_len);
+#endif
+
+ /* Read the post-header */
+ const char *post_start= buf + common_header_len;
+
+ post_start+= TM_MAPID_OFFSET;
+ if (post_header_len == 6)
{
- keyinfo= table->s->key_info + (uint) table->s->primary_key;
- if (are_all_columns_signaled_for_key(keyinfo, bi_cols))
- return table->s->primary_key;
+ /* Master is of an intermediate source tree before 5.1.4. Id is 4 bytes */
+ m_table_id= uint4korr(post_start);
+ post_start+= 4;
}
-
- if (key_type & UNIQUE_KEY_FLAG && table->s->uniques)
+ else
{
- for (key=0,keyinfo= table->key_info ;
- (key < table->s->keys) && (res == MAX_KEY);
- key++,keyinfo++)
- {
- /*
- - Unique keys cannot be disabled, thence we skip the check.
- - Skip unique keys with nullable parts
- - Skip primary keys
- */
- if (!((keyinfo->flags & (HA_NOSAME | HA_NULL_PART_KEY)) != HA_NOSAME) ||
- (key == table->s->primary_key))
- continue;
- res= are_all_columns_signaled_for_key(keyinfo, bi_cols) ?
- key : MAX_KEY;
-
- if (res < MAX_KEY)
- return res;
- }
+ DBUG_ASSERT(post_header_len == TABLE_MAP_HEADER_LEN);
+ m_table_id= (ulong) uint6korr(post_start);
+ post_start+= TM_FLAGS_OFFSET;
}
- if (key_type & MULTIPLE_KEY_FLAG && table->s->keys)
- {
- for (key=0,keyinfo= table->key_info ;
- (key < table->s->keys) && (res == MAX_KEY);
- key++,keyinfo++)
- {
- /*
- - Skip innactive keys
- - Skip unique keys without nullable parts
- - Skip primary keys
- */
- if (!(table->s->keys_in_use.is_set(key)) ||
- ((keyinfo->flags & (HA_NOSAME | HA_NULL_PART_KEY)) == HA_NOSAME) ||
- (key == table->s->primary_key))
- continue;
+ DBUG_ASSERT(m_table_id != ~0UL);
- res= are_all_columns_signaled_for_key(keyinfo, bi_cols) ?
- key : MAX_KEY;
+ m_flags= uint2korr(post_start);
- if (res < MAX_KEY)
- return res;
- }
+ /* Read the variable part of the event */
+ const char *const vpart= buf + common_header_len + post_header_len;
+
+ /* Extract the length of the various parts from the buffer */
+ uchar const *const ptr_dblen= (uchar const*)vpart + 0;
+ m_dblen= *(uchar*) ptr_dblen;
+
+ /* Length of database name + counter + terminating null */
+ uchar const *const ptr_tbllen= ptr_dblen + m_dblen + 2;
+ m_tbllen= *(uchar*) ptr_tbllen;
+
+ /* Length of table name + counter + terminating null */
+ uchar const *const ptr_colcnt= ptr_tbllen + m_tbllen + 2;
+ uchar *ptr_after_colcnt= (uchar*) ptr_colcnt;
+ m_colcnt= net_field_length(&ptr_after_colcnt);
+
+ DBUG_PRINT("info",("m_dblen: %lu off: %ld m_tbllen: %lu off: %ld m_colcnt: %lu off: %ld",
+ (ulong) m_dblen, (long) (ptr_dblen-(const uchar*)vpart),
+ (ulong) m_tbllen, (long) (ptr_tbllen-(const uchar*)vpart),
+ m_colcnt, (long) (ptr_colcnt-(const uchar*)vpart)));
+
+ /* Allocate mem for all fields in one go. If fails, caught in is_valid() */
+ m_memory= (uchar*) my_multi_malloc(MYF(MY_WME),
+ &m_dbnam, (uint) m_dblen + 1,
+ &m_tblnam, (uint) m_tbllen + 1,
+ &m_coltype, (uint) m_colcnt,
+ NullS);
+
+ if (m_memory)
+ {
+ /* Copy the different parts into their memory */
+ strncpy(const_cast<char*>(m_dbnam), (const char*)ptr_dblen + 1, m_dblen + 1);
+ strncpy(const_cast<char*>(m_tblnam), (const char*)ptr_tbllen + 1, m_tbllen + 1);
+ memcpy(m_coltype, ptr_after_colcnt, m_colcnt);
+
+ ptr_after_colcnt= ptr_after_colcnt + m_colcnt;
+ bytes_read= (uint) (ptr_after_colcnt - (uchar *)buf);
+ DBUG_PRINT("info", ("Bytes read: %d.\n", bytes_read));
+ if (bytes_read < event_len)
+ {
+ m_field_metadata_size= net_field_length(&ptr_after_colcnt);
+ DBUG_ASSERT(m_field_metadata_size <= (m_colcnt * 2));
+ uint num_null_bytes= (m_colcnt + 7) / 8;
+ m_meta_memory= (uchar *)my_multi_malloc(MYF(MY_WME),
+ &m_null_bits, num_null_bytes,
+ &m_field_metadata, m_field_metadata_size,
+ NULL);
+ memcpy(m_field_metadata, ptr_after_colcnt, m_field_metadata_size);
+ ptr_after_colcnt= (uchar*)ptr_after_colcnt + m_field_metadata_size;
+ memcpy(m_null_bits, ptr_after_colcnt, num_null_bytes);
+ }
+ }
+
+ DBUG_VOID_RETURN;
+}
+#endif
+
+Table_map_log_event::~Table_map_log_event()
+{
+ my_free(m_meta_memory);
+ my_free(m_memory);
+}
+
+/*
+ Return value is an error code, one of:
+
+ -1 Failure to open table [from open_tables()]
+ 0 Success
+ 1 No room for more tables [from set_table()]
+ 2 Out of memory [from set_table()]
+ 3 Wrong table definition
+ 4 Daisy-chaining RBR with SBR not possible
+ */
+
+#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
+int Table_map_log_event::do_apply_event(Relay_log_info const *rli)
+{
+ RPL_TABLE_LIST *table_list;
+ char *db_mem, *tname_mem;
+ size_t dummy_len;
+ void *memory;
+ DBUG_ENTER("Table_map_log_event::do_apply_event(Relay_log_info*)");
+ DBUG_ASSERT(rli->info_thd == thd);
+
+ /* Step the query id to mark what columns that are actually used. */
+ thd->set_query_id(next_query_id());
+
+ if (!(memory= my_multi_malloc(MYF(MY_WME),
+ &table_list, (uint) sizeof(RPL_TABLE_LIST),
+ &db_mem, (uint) NAME_LEN + 1,
+ &tname_mem, (uint) NAME_LEN + 1,
+ NullS)))
+ DBUG_RETURN(HA_ERR_OUT_OF_MEM);
+
+ strmov(db_mem, rpl_filter->get_rewrite_db(m_dbnam, &dummy_len));
+ strmov(tname_mem, m_tblnam);
+
+ table_list->init_one_table(db_mem, strlen(db_mem),
+ tname_mem, strlen(tname_mem),
+ tname_mem, TL_WRITE);
+
+ table_list->table_id= m_table_id;
+ table_list->updating= 1;
+
+ int error= 0;
+
+ if (rli->info_thd->slave_thread /* filtering is for slave only */ &&
+ (!rpl_filter->db_ok(table_list->db) ||
+ (rpl_filter->is_on() && !rpl_filter->tables_ok("", table_list))))
+ {
+ my_free(memory);
+ }
+ else
+ {
+ DBUG_ASSERT(thd->lex->query_tables != table_list);
+
+ /*
+ Use placement new to construct the table_def instance in the
+ memory allocated for it inside table_list.
+
+ The memory allocated by the table_def structure (i.e., not the
+ memory allocated *for* the table_def structure) is released
+ inside Relay_log_info::clear_tables_to_lock() by calling the
+ table_def destructor explicitly.
+ */
+ new (&table_list->m_tabledef)
+ table_def(m_coltype, m_colcnt,
+ m_field_metadata, m_field_metadata_size,
+ m_null_bits, m_flags);
+ table_list->m_tabledef_valid= TRUE;
+
+ /*
+ We record in the slave's information that the table should be
+ locked by linking the table into the list of tables to lock.
+ */
+ table_list->next_global= table_list->next_local= rli->tables_to_lock;
+ const_cast<Relay_log_info*>(rli)->tables_to_lock= table_list;
+ const_cast<Relay_log_info*>(rli)->tables_to_lock_count++;
+ /* 'memory' is freed in clear_tables_to_lock */
+ }
+
+ DBUG_RETURN(error);
+}
+
+Log_event::enum_skip_reason
+Table_map_log_event::do_shall_skip(Relay_log_info *rli)
+{
+ /*
+ If the slave skip counter is 1, then we should not start executing
+ on the next event.
+ */
+ return continue_group(rli);
+}
+
+int Table_map_log_event::do_update_pos(Relay_log_info *rli)
+{
+ rli->inc_event_relay_log_pos();
+ return 0;
+}
+
+#endif /* !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION) */
+
+#ifndef MYSQL_CLIENT
+bool Table_map_log_event::write_data_header(IO_CACHE *file)
+{
+ DBUG_ASSERT(m_table_id != ~0UL);
+ uchar buf[TABLE_MAP_HEADER_LEN];
+ DBUG_EXECUTE_IF("old_row_based_repl_4_byte_map_id_master",
+ {
+ int4store(buf + 0, m_table_id);
+ int2store(buf + 4, m_flags);
+ return (my_b_safe_write(file, buf, 6));
+ });
+ int6store(buf + TM_MAPID_OFFSET, (ulonglong)m_table_id);
+ int2store(buf + TM_FLAGS_OFFSET, m_flags);
+ return (my_b_safe_write(file, buf, TABLE_MAP_HEADER_LEN));
+}
+
+bool Table_map_log_event::write_data_body(IO_CACHE *file)
+{
+ DBUG_ASSERT(m_dbnam != NULL);
+ DBUG_ASSERT(m_tblnam != NULL);
+ /* We use only one byte per length for storage in event: */
+ DBUG_ASSERT(m_dblen < 128);
+ DBUG_ASSERT(m_tbllen < 128);
+
+ uchar const dbuf[]= { (uchar) m_dblen };
+ uchar const tbuf[]= { (uchar) m_tbllen };
+
+ uchar cbuf[sizeof(m_colcnt) + 1];
+ uchar *const cbuf_end= net_store_length(cbuf, (size_t) m_colcnt);
+ DBUG_ASSERT(static_cast<size_t>(cbuf_end - cbuf) <= sizeof(cbuf));
+
+ /*
+ Store the size of the field metadata.
+ */
+ uchar mbuf[sizeof(m_field_metadata_size)];
+ uchar *const mbuf_end= net_store_length(mbuf, m_field_metadata_size);
+
+ return (my_b_safe_write(file, dbuf, sizeof(dbuf)) ||
+ my_b_safe_write(file, (const uchar*)m_dbnam, m_dblen+1) ||
+ my_b_safe_write(file, tbuf, sizeof(tbuf)) ||
+ my_b_safe_write(file, (const uchar*)m_tblnam, m_tbllen+1) ||
+ my_b_safe_write(file, cbuf, (size_t) (cbuf_end - cbuf)) ||
+ my_b_safe_write(file, m_coltype, m_colcnt) ||
+ my_b_safe_write(file, mbuf, (size_t) (mbuf_end - mbuf)) ||
+ my_b_safe_write(file, m_field_metadata, m_field_metadata_size),
+ my_b_safe_write(file, m_null_bits, (m_colcnt + 7) / 8));
+ }
+#endif
+
+#if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT)
+
+/*
+ Print some useful information for the SHOW BINARY LOG information
+ field.
+ */
+
+#if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT)
+void Table_map_log_event::pack_info(Protocol *protocol)
+{
+ char buf[256];
+ size_t bytes= my_snprintf(buf, sizeof(buf),
+ "table_id: %lu (%s.%s)",
+ m_table_id, m_dbnam, m_tblnam);
+ protocol->store(buf, bytes, &my_charset_bin);
+}
+#endif
+
+
+#endif
+
+
+#ifdef MYSQL_CLIENT
+void Table_map_log_event::print(FILE *file, PRINT_EVENT_INFO *print_event_info)
+{
+ if (!print_event_info->short_form)
+ {
+ print_header(&print_event_info->head_cache, print_event_info, TRUE);
+ my_b_printf(&print_event_info->head_cache,
+ "\tTable_map: `%s`.`%s` mapped to number %lu\n",
+ m_dbnam, m_tblnam, m_table_id);
+ print_base64(&print_event_info->body_cache, print_event_info, TRUE);
+ }
+}
+#endif
+
+/**************************************************************************
+ Write_rows_log_event member functions
+**************************************************************************/
+
+/*
+ Constructor used to build an event for writing to the binary log.
+ */
+#if !defined(MYSQL_CLIENT)
+Write_rows_log_event::Write_rows_log_event(THD *thd_arg, TABLE *tbl_arg,
+ ulong tid_arg,
+ bool is_transactional)
+ : Rows_log_event(thd_arg, tbl_arg, tid_arg, tbl_arg->write_set, is_transactional)
+{
+}
+#endif
+
+/*
+ Constructor used by slave to read the event from the binary log.
+ */
+#ifdef HAVE_REPLICATION
+Write_rows_log_event::Write_rows_log_event(const char *buf, uint event_len,
+ const Format_description_log_event
+ *description_event)
+: Rows_log_event(buf, event_len, WRITE_ROWS_EVENT, description_event)
+{
+}
+#endif
+
+#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
+int
+Write_rows_log_event::do_before_row_operations(const Slave_reporting_capability *const)
+{
+ int error= 0;
+
+ /**
+ todo: to introduce a property for the event (handler?) which forces
+ applying the event in the replace (idempotent) fashion.
+ */
+ if ((slave_exec_mode == SLAVE_EXEC_MODE_IDEMPOTENT) ||
+ (m_table->s->db_type()->db_type == DB_TYPE_NDBCLUSTER))
+ {
+ /*
+ We are using REPLACE semantics and not INSERT IGNORE semantics
+ when writing rows, that is: new rows replace old rows. We need to
+ inform the storage engine that it should use this behaviour.
+ */
+
+ /* Tell the storage engine that we are using REPLACE semantics. */
+ thd->lex->duplicates= DUP_REPLACE;
+
+ /*
+ Pretend we're executing a REPLACE command: this is needed for
+ InnoDB and NDB Cluster since they are not (properly) checking the
+ lex->duplicates flag.
+ */
+ thd->lex->sql_command= SQLCOM_REPLACE;
+ /*
+ Do not raise the error flag in case of hitting to an unique attribute
+ */
+ m_table->file->extra(HA_EXTRA_IGNORE_DUP_KEY);
+ /*
+ NDB specific: update from ndb master wrapped as Write_rows
+ so that the event should be applied to replace slave's row
+ */
+ m_table->file->extra(HA_EXTRA_WRITE_CAN_REPLACE);
+ /*
+ NDB specific: if update from ndb master wrapped as Write_rows
+ does not find the row it's assumed idempotent binlog applying
+ is taking place; don't raise the error.
+ */
+ m_table->file->extra(HA_EXTRA_IGNORE_NO_KEY);
+ /*
+ TODO: the cluster team (Tomas?) says that it's better if the engine knows
+ how many rows are going to be inserted, then it can allocate needed memory
+ from the start.
+ */
+ }
+
+ /*
+ We need TIMESTAMP_NO_AUTO_SET otherwise ha_write_row() will not use fill
+ any TIMESTAMP column with data from the row but instead will use
+ the event's current time.
+ As we replicate from TIMESTAMP to TIMESTAMP and slave has no extra
+ columns, we know that all TIMESTAMP columns on slave will receive explicit
+ data from the row, so TIMESTAMP_NO_AUTO_SET is ok.
+ When we allow a table without TIMESTAMP to be replicated to a table having
+ more columns including a TIMESTAMP column, or when we allow a TIMESTAMP
+ column to be replicated into a BIGINT column and the slave's table has a
+ TIMESTAMP column, then the slave's TIMESTAMP column will take its value
+ from set_time() which we called earlier (consistent with SBR). And then in
+ some cases we won't want TIMESTAMP_NO_AUTO_SET (will require some code to
+ analyze if explicit data is provided for slave's TIMESTAMP columns).
+ */
+ m_table->timestamp_field_type= TIMESTAMP_NO_AUTO_SET;
+
+ /* Honor next number column if present */
+ m_table->next_number_field= m_table->found_next_number_field;
+ /*
+ * Fixed Bug#45999, In RBR, Store engine of Slave auto-generates new
+ * sequence numbers for auto_increment fields if the values of them are 0.
+ * If generateing a sequence number is decided by the values of
+ * table->auto_increment_field_not_null and SQL_MODE(if includes
+ * MODE_NO_AUTO_VALUE_ON_ZERO) in update_auto_increment function.
+ * SQL_MODE of slave sql thread is always consistency with master's.
+ * In RBR, auto_increment fields never are NULL.
+ */
+ m_table->auto_increment_field_not_null= TRUE;
+ return error;
+}
+
+int
+Write_rows_log_event::do_after_row_operations(const Slave_reporting_capability *const,
+ int error)
+{
+ int local_error= 0;
+ m_table->next_number_field=0;
+ m_table->auto_increment_field_not_null= FALSE;
+ if ((slave_exec_mode == SLAVE_EXEC_MODE_IDEMPOTENT) ||
+ m_table->s->db_type()->db_type == DB_TYPE_NDBCLUSTER)
+ {
+ m_table->file->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
+ m_table->file->extra(HA_EXTRA_WRITE_CANNOT_REPLACE);
+ /*
+ resetting the extra with
+ table->file->extra(HA_EXTRA_NO_IGNORE_NO_KEY);
+ fires bug#27077
+ explanation: file->reset() performs this duty
+ ultimately. Still todo: fix
+ */
}
-
- return res;
+ if ((local_error= m_table->file->ha_end_bulk_insert()))
+ {
+ m_table->file->print_error(local_error, MYF(0));
+ }
+ return error? error : local_error;
}
+#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
+
+/*
+ Check if there are more UNIQUE keys after the given key.
+*/
+static int
+last_uniq_key(TABLE *table, uint keyno)
+{
+ while (++keyno < table->s->keys)
+ if (table->key_info[keyno].flags & HA_NOSAME)
+ return 0;
+ return 1;
+}
/**
- Locate the current row in event's table.
+ Check if an error is a duplicate key error.
- The current row is pointed by @c m_curr_row. Member @c m_width tells how many
- columns are there in the row (this can be differnet from the number of columns
- in the table). It is assumed that event's table is already open and pointed
- by @c m_table.
-
- If a corresponding record is found in the table it is stored in
- @c m_table->record[0]. Note that when record is located based on a primary
- key, it is possible that the record found differs from the row being located.
-
- If no key is specified or table does not have keys, a table scan is used to
- find the row. In that case the row should be complete and contain values for
- all columns. However, it can still be shorter than the table, i.e. the table
- can contain extra columns not present in the row. It is also possible that
- the table has fewer columns than the row being located.
+ This function is used to check if an error code is one of the
+ duplicate key error, i.e., and error code for which it is sensible
+ to do a <code>get_dup_key()</code> to retrieve the duplicate key.
- @returns Error code on failure, 0 on success.
-
- @post In case of success @c m_table->record[0] contains the record found.
- Also, the internal "cursor" of the table is positioned at the record found.
+ @param errcode The error code to check.
- @note If the engine allows random access of the records, a combination of
- @c position() and @c rnd_pos() will be used.
+ @return <code>true</code> if the error code is such that
+ <code>get_dup_key()</code> will return true, <code>false</code>
+ otherwise.
*/
-
-
-int Rows_log_event::find_row(const Relay_log_info *rli)
+bool
+is_duplicate_key_error(int errcode)
{
- DBUG_ENTER("Rows_log_event::find_row");
-
- DBUG_ASSERT(m_table && m_table->in_use != NULL);
-
- TABLE *table= m_table;
- int error= 0;
- KEY *keyinfo;
- uint key;
+ switch (errcode)
+ {
+ case HA_ERR_FOUND_DUPP_KEY:
+ case HA_ERR_FOUND_DUPP_UNIQUE:
+ return true;
+ }
+ return false;
+}
- /*
- rpl_row_tabledefs.test specifies that
- if the extra field on the slave does not have a default value
- and this is okay with Delete or Update events.
- Todo: fix wl3228 hld that requires defauls for all types of events
- */
-
- prepare_record(table, &m_cols, FALSE);
- error= unpack_current_row(rli, &m_cols);
+/**
+ Write the current row into event's table.
- // Temporary fix to find out why it fails [/Matz]
- memcpy(m_table->read_set->bitmap, m_cols.bitmap, (m_table->read_set->n_bits + 7) / 8);
+ The row is located in the row buffer, pointed by @c m_curr_row member.
+ Number of columns of the row is stored in @c m_width member (it can be
+ different from the number of columns in the table to which we insert).
+ Bitmap @c m_cols indicates which columns are present in the row. It is assumed
+ that event's table is already open and pointed by @c m_table.
- if (!is_any_column_signaled_for_table(table, &m_cols))
- {
- error= HA_ERR_END_OF_FILE;
- goto err;
- }
+ If the same record already exists in the table it can be either overwritten
+ or an error is reported depending on the value of @c overwrite flag
+ (error reporting not yet implemented). Note that the matching record can be
+ different from the row we insert if we use primary keys to identify records in
+ the table.
-#ifndef DBUG_OFF
- DBUG_PRINT("info",("looking for the following record"));
- DBUG_DUMP("record[0]", table->record[0], table->s->reclength);
-#endif
+ The row to be inserted can contain values only for selected columns. The
+ missing columns are filled with default values using @c prepare_record()
+ function. If a matching record is found in the table and @c overwritte is
+ true, the missing columns are taken from it.
- if ((key= search_key_in_table(table, &m_cols, PRI_KEY_FLAG)) >= MAX_KEY)
- /* we dont have a PK, or PK is not usable with BI values */
- goto INDEX_SCAN;
+ @param rli Relay log info (needed for row unpacking).
+ @param overwrite
+ Shall we overwrite if the row already exists or signal
+ error (currently ignored).
- if ((table->file->ha_table_flags() & HA_PRIMARY_KEY_REQUIRED_FOR_POSITION))
- {
- /*
- Use a more efficient method to fetch the record given by
- table->record[0] if the engine allows it. We first compute a
- row reference using the position() member function (it will be
- stored in table->file->ref) and the use rnd_pos() to position
- the "cursor" (i.e., record[0] in this case) at the correct row.
+ @returns Error code on failure, 0 on success.
- TODO: Add a check that the correct record has been fetched by
- comparing with the original record. Take into account that the
- record on the master and slave can be of different
- length. Something along these lines should work:
+ This method, if successful, sets @c m_curr_row_end pointer to point at the
+ next row in the rows buffer. This is done when unpacking the row to be
+ inserted.
- ADD>>> store_record(table,record[1]);
- int error= table->file->rnd_pos(table->record[0], table->file->ref);
- ADD>>> DBUG_ASSERT(memcmp(table->record[1], table->record[0],
- table->s->reclength) == 0);
+ @note If a matching record is found, it is either updated using
+ @c ha_update_row() or first deleted and then new record written.
+*/
- */
- DBUG_PRINT("info",("locating record using primary key (position)"));
- int error;
- if (table->file->inited && (error= table->file->ha_index_end()))
- DBUG_RETURN(error);
- if ((error= table->file->ha_rnd_init(FALSE)))
- DBUG_RETURN(error);
+int
+Rows_log_event::write_row(const Relay_log_info *const rli,
+ const bool overwrite)
+{
+ DBUG_ENTER("write_row");
+ DBUG_ASSERT(m_table != NULL && thd != NULL);
- error= table->file->rnd_pos_by_record(table->record[0]);
+ TABLE *table= m_table; // pointer to event's table
+ int error;
+ int UNINIT_VAR(keynum);
+ auto_afree_ptr<char> key(NULL);
- table->file->ha_rnd_end();
- if (error)
- {
- DBUG_PRINT("info",("rnd_pos returns error %d",error));
- if (error == HA_ERR_RECORD_DELETED)
- error= HA_ERR_KEY_NOT_FOUND;
- table->file->print_error(error, MYF(0));
- }
+ /* fill table->record[0] with default values */
+ bool abort_on_warnings= (rli->info_thd->variables.sql_mode &
+ (MODE_STRICT_TRANS_TABLES | MODE_STRICT_ALL_TABLES));
+ if ((error= prepare_record(table, &m_cols,
+ table->file->ht->db_type != DB_TYPE_NDBCLUSTER,
+ abort_on_warnings, m_curr_row == m_rows_buf)))
DBUG_RETURN(error);
- }
-
- // We can't use position() - try other methods.
-INDEX_SCAN:
-
- /*
- Save copy of the record in table->record[1]. It might be needed
- later if linear search is used to find exact match.
- */
- store_record(table,record[1]);
+ /* unpack row into table->record[0] */
+ if ((error= unpack_current_row(rli, &m_cols, abort_on_warnings)))
+ DBUG_RETURN(error);
- if ((key= search_key_in_table(table, &m_cols,
- (PRI_KEY_FLAG | UNIQUE_KEY_FLAG | MULTIPLE_KEY_FLAG)))
- >= MAX_KEY)
- /* we dont have a key, or no key is suitable for the BI values */
- goto TABLE_SCAN;
+ // Temporary fix to find out why it fails [/Matz]
+ memcpy(m_table->write_set->bitmap, m_cols.bitmap, (m_table->write_set->n_bits + 7) / 8);
+ if (m_curr_row == m_rows_buf)
{
- keyinfo= table->key_info + key;
-
-
- DBUG_PRINT("info",("locating record using primary key (index_read)"));
-
- /* The key'th key is active and usable: search the table using the index */
- if (!table->file->inited && (error= table->file->ha_index_init(key, FALSE)))
- {
- DBUG_PRINT("info",("ha_index_init returns error %d",error));
- table->file->print_error(error, MYF(0));
- goto err;
- }
+ /* this is the first row to be inserted, we estimate the rows with
+ the size of the first row and use that value to initialize
+ storage engine for bulk insertion */
+ DBUG_ASSERT(!(m_curr_row > m_curr_row_end));
+ ulong estimated_rows= 0;
+ if (m_curr_row < m_curr_row_end)
+ estimated_rows= (m_rows_end - m_curr_row) / (m_curr_row_end - m_curr_row);
+ else if (m_curr_row == m_curr_row_end)
+ estimated_rows= 1;
- /* Fill key data for the row */
+ m_table->file->ha_start_bulk_insert(estimated_rows);
+ }
+
+
+#ifndef DBUG_OFF
+ DBUG_DUMP("record[0]", table->record[0], table->s->reclength);
+ DBUG_PRINT_BITSET("debug", "write_set = %s", table->write_set);
+ DBUG_PRINT_BITSET("debug", "read_set = %s", table->read_set);
+#endif
- DBUG_ASSERT(m_key);
- key_copy(m_key, table->record[0], keyinfo, 0);
+ /*
+ Try to write record. If a corresponding record already exists in the table,
+ we try to change it using ha_update_row() if possible. Otherwise we delete
+ it and repeat the whole process again.
- /*
- Don't print debug messages when running valgrind since they can
- trigger false warnings.
- */
-#ifndef HAVE_purify
- DBUG_DUMP("key data", m_key, keyinfo->key_length);
-#endif
+ TODO: Add safety measures against infinite looping.
+ */
- /*
- We need to set the null bytes to ensure that the filler bit are
- all set when returning. There are storage engines that just set
- the necessary bits on the bytes and don't set the filler bits
- correctly.
- */
- if (table->s->null_bytes > 0)
- table->record[0][table->s->null_bytes - 1]|=
- 256U - (1U << table->s->last_null_bit_pos);
+ m_table->mark_columns_per_binlog_row_image();
- if ((error= table->file->ha_index_read_map(table->record[0], m_key,
- HA_WHOLE_KEY,
- HA_READ_KEY_EXACT)))
+ while ((error= table->file->ha_write_row(table->record[0])))
+ {
+ if (error == HA_ERR_LOCK_DEADLOCK ||
+ error == HA_ERR_LOCK_WAIT_TIMEOUT ||
+ (keynum= table->file->get_dup_key(error)) < 0 ||
+ !overwrite)
{
- DBUG_PRINT("info",("no record matching the key found in the table"));
- if (error == HA_ERR_RECORD_DELETED)
- error= HA_ERR_KEY_NOT_FOUND;
+ DBUG_PRINT("info",("get_dup_key returns %d)", keynum));
+ /*
+ Deadlock, waiting for lock or just an error from the handler
+ such as HA_ERR_FOUND_DUPP_KEY when overwrite is false.
+ Retrieval of the duplicate key number may fail
+ - either because the error was not "duplicate key" error
+ - or because the information which key is not available
+ */
table->file->print_error(error, MYF(0));
- table->file->ha_index_end();
- goto err;
+ goto error;
}
+ /*
+ We need to retrieve the old row into record[1] to be able to
+ either update or delete the offending record. We either:
- /*
- Don't print debug messages when running valgrind since they can
- trigger false warnings.
- */
-#ifndef HAVE_purify
- DBUG_PRINT("info",("found first matching record"));
- DBUG_DUMP("record[0]", table->record[0], table->s->reclength);
-#endif
- /*
- Below is a minor "optimization". If the key (i.e., key number
- 0) has the HA_NOSAME flag set, we know that we have found the
- correct record (since there can be no duplicates); otherwise, we
- have to compare the record with the one found to see if it is
- the correct one.
-
- CAVEAT! This behaviour is essential for the replication of,
- e.g., the mysql.proc table since the correct record *shall* be
- found using the primary key *only*. There shall be no
- comparison of non-PK columns to decide if the correct record is
- found. I can see no scenario where it would be incorrect to
- chose the row to change only using a PK or an UNNI.
- */
- if (keyinfo->flags & HA_NOSAME || key == table->s->primary_key)
+ - use ha_rnd_pos() with a row-id (available as dupp_row) to the
+ offending row, if that is possible (MyISAM and Blackhole), or else
+
+ - use ha_index_read_idx_map() with the key that is duplicated, to
+ retrieve the offending row.
+ */
+ if (table->file->ha_table_flags() & HA_DUPLICATE_POS)
{
- /* Unique does not have non nullable part */
- if (!(table->key_info->flags & (HA_NULL_PART_KEY)))
- {
- table->file->ha_index_end();
- goto ok;
- }
- else
- {
- KEY *keyinfo= table->key_info;
- /*
- Unique has nullable part. We need to check if there is any field in the
- BI image that is null and part of UNNI.
- */
- bool null_found= FALSE;
- for (uint i=0; i < keyinfo->key_parts && !null_found; i++)
- {
- uint fieldnr= keyinfo->key_part[i].fieldnr - 1;
- Field **f= table->field+fieldnr;
- null_found= (*f)->is_null();
- }
+ DBUG_PRINT("info",("Locating offending record using ha_rnd_pos()"));
- if (!null_found)
- {
- table->file->ha_index_end();
- goto ok;
- }
+ if (table->file->inited && (error= table->file->ha_index_end()))
+ DBUG_RETURN(error);
+ if ((error= table->file->ha_rnd_init(FALSE)))
+ DBUG_RETURN(error);
- /* else fall through to index scan */
+ error= table->file->ha_rnd_pos(table->record[1], table->file->dup_ref);
+
+ table->file->ha_rnd_end();
+ if (error)
+ {
+ DBUG_PRINT("info",("ha_rnd_pos() returns error %d",error));
+ if (error == HA_ERR_RECORD_DELETED)
+ error= HA_ERR_KEY_NOT_FOUND;
+ table->file->print_error(error, MYF(0));
+ goto error;
}
}
-
- /*
- In case key is not unique, we still have to iterate over records found
- and find the one which is identical to the row given. A copy of the
- record we are looking for is stored in record[1].
- */
- DBUG_PRINT("info",("non-unique index, scanning it to find matching record"));
-
- while (record_compare(table, &m_cols))
+ else
{
- /*
- We need to set the null bytes to ensure that the filler bit
- are all set when returning. There are storage engines that
- just set the necessary bits on the bytes and don't set the
- filler bits correctly.
+ DBUG_PRINT("info",("Locating offending record using index_read_idx()"));
- TODO[record format ndb]: Remove this code once NDB returns the
- correct record format.
- */
- if (table->s->null_bytes > 0)
+ if (table->file->extra(HA_EXTRA_FLUSH_CACHE))
{
- table->record[0][table->s->null_bytes - 1]|=
- 256U - (1U << table->s->last_null_bit_pos);
+ DBUG_PRINT("info",("Error when setting HA_EXTRA_FLUSH_CACHE"));
+ error= my_errno;
+ goto error;
}
- while ((error= table->file->ha_index_next(table->record[0])))
+ if (key.get() == NULL)
{
- /* We just skip records that has already been deleted */
+ key.assign(static_cast<char*>(my_alloca(table->s->max_unique_length)));
+ if (key.get() == NULL)
+ {
+ DBUG_PRINT("info",("Can't allocate key buffer"));
+ error= ENOMEM;
+ goto error;
+ }
+ }
+
+ key_copy((uchar*)key.get(), table->record[0], table->key_info + keynum,
+ 0);
+ error= table->file->ha_index_read_idx_map(table->record[1], keynum,
+ (const uchar*)key.get(),
+ HA_WHOLE_KEY,
+ HA_READ_KEY_EXACT);
+ if (error)
+ {
+ DBUG_PRINT("info",("ha_index_read_idx_map() returns %s", HA_ERR(error)));
if (error == HA_ERR_RECORD_DELETED)
- continue;
- DBUG_PRINT("info",("no record matching the given row found"));
+ error= HA_ERR_KEY_NOT_FOUND;
table->file->print_error(error, MYF(0));
- table->file->ha_index_end();
- goto err;
+ goto error;
}
}
/*
- Have to restart the scan to be able to fetch the next row.
- */
- table->file->ha_index_end();
- goto ok;
- }
+ Now, record[1] should contain the offending row. That
+ will enable us to update it or, alternatively, delete it (so
+ that we can insert the new row afterwards).
+ */
-TABLE_SCAN:
+ /*
+ If row is incomplete we will use the record found to fill
+ missing columns.
+ */
+ if (!get_flags(COMPLETE_ROWS_F))
+ {
+ restore_record(table,record[1]);
+ error= unpack_current_row(rli, &m_cols);
+ }
- /* All that we can do now is rely on a table scan */
- {
- DBUG_PRINT("info",("locating record using table scan (ha_rnd_next)"));
+#ifndef DBUG_OFF
+ DBUG_PRINT("debug",("preparing for update: before and after image"));
+ DBUG_DUMP("record[1] (before)", table->record[1], table->s->reclength);
+ DBUG_DUMP("record[0] (after)", table->record[0], table->s->reclength);
+#endif
- int restart_count= 0; // Number of times scanning has restarted from top
+ /*
+ REPLACE is defined as either INSERT or DELETE + INSERT. If
+ possible, we can replace it with an UPDATE, but that will not
+ work on InnoDB if FOREIGN KEY checks are necessary.
- /* We don't have a key: search the table using ha_rnd_next() */
- if ((error= table->file->ha_rnd_init(1)))
- {
- DBUG_PRINT("info",("error initializing table scan"
- " (ha_rnd_init returns %d)",error));
- table->file->print_error(error, MYF(0));
- goto err;
- }
+ I (Matz) am not sure of the reason for the last_uniq_key()
+ check as, but I'm guessing that it's something along the
+ following lines.
- /* Continue until we find the right record or have made a full loop */
- do
+ Suppose that we got the duplicate key to be a key that is not
+ the last unique key for the table and we perform an update:
+ then there might be another key for which the unique check will
+ fail, so we're better off just deleting the row and inserting
+ the correct row.
+ */
+ if (last_uniq_key(table, keynum) &&
+ !table->file->referenced_by_foreign_key())
{
- restart_ha_rnd_next:
- error= table->file->ha_rnd_next(table->record[0]);
-
- DBUG_PRINT("info", ("error: %s", HA_ERR(error)));
+ DBUG_PRINT("info",("Updating row using ha_update_row()"));
+ error=table->file->ha_update_row(table->record[1],
+ table->record[0]);
switch (error) {
-
+
+ case HA_ERR_RECORD_IS_THE_SAME:
+ DBUG_PRINT("info",("ignoring HA_ERR_RECORD_IS_THE_SAME error from"
+ " ha_update_row()"));
+ error= 0;
+
case 0:
break;
- /*
- If the record was deleted, we pick the next one without doing
- any comparisons.
- */
- case HA_ERR_RECORD_DELETED:
- goto restart_ha_rnd_next;
-
- case HA_ERR_END_OF_FILE:
- if (++restart_count < 2)
- table->file->ha_rnd_init(1);
- break;
-
default:
- DBUG_PRINT("info", ("Failed to get next record"
- " (ha_rnd_next returns %d)",error));
+ DBUG_PRINT("info",("ha_update_row() returns error %d",error));
table->file->print_error(error, MYF(0));
- table->file->ha_rnd_end();
- goto err;
}
- }
- while (restart_count < 2 && record_compare(table, &m_cols));
-
- /*
- Note: above record_compare will take into accout all record fields
- which might be incorrect in case a partial row was given in the event
- */
- /*
- Have to restart the scan to be able to fetch the next row.
- */
- if (restart_count == 2)
- DBUG_PRINT("info", ("Record not found"));
+ goto error;
+ }
else
- DBUG_DUMP("record found", table->record[0], table->s->reclength);
- table->file->ha_rnd_end();
-
- DBUG_ASSERT(error == HA_ERR_END_OF_FILE || error == 0);
- goto err;
+ {
+ DBUG_PRINT("info",("Deleting offending row and trying to write new one again"));
+ if ((error= table->file->ha_delete_row(table->record[1])))
+ {
+ DBUG_PRINT("info",("ha_delete_row() returns error %d",error));
+ table->file->print_error(error, MYF(0));
+ goto error;
+ }
+ /* Will retry ha_write_row() with the offending row removed. */
+ }
}
-ok:
- table->default_column_bitmaps();
- DBUG_RETURN(0);
-err:
- table->default_column_bitmaps();
+error:
+ m_table->default_column_bitmaps();
DBUG_RETURN(error);
}
#endif
+int
+Write_rows_log_event::do_exec_row(const Relay_log_info *const rli)
+{
+ DBUG_ASSERT(m_table != NULL);
+ int error= write_row(rli, slave_exec_mode == SLAVE_EXEC_MODE_IDEMPOTENT);
+
+ if (error && !thd->is_error())
+ {
+ DBUG_ASSERT(0);
+ my_error(ER_UNKNOWN_ERROR, MYF(0));
+ }
+
+ return error;
+}
+
+#endif /* !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION) */
+
+#ifdef MYSQL_CLIENT
+void Write_rows_log_event::print(FILE *file, PRINT_EVENT_INFO* print_event_info)
+{
+ Rows_log_event::print_helper(file, print_event_info, "Write_rows");
+}
+#endif
+
+/**************************************************************************
+ Delete_rows_log_event member functions
+**************************************************************************/
+
+#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
+
+#endif
+
/*
Constructor used to build an event for writing to the binary log.
*/
@@ -9753,6 +10006,17 @@ Delete_rows_log_event::do_before_row_ope
return HA_ERR_OUT_OF_MEM;
}
+ /* we will be using a hash to lookup rows, initialize it */
+ if (decide_row_lookup_method(m_table, &m_cols, get_type_code()) == ROW_LOOKUP_HASH_SCAN)
+ my_hash_init(&m_hash,
+ &my_charset_bin, /* the charater set information */
+ 16 /* FIXME */, /* growth size */
+ 0, /* key offset */
+ 0, /* key length */
+ rows_log_event_get_key, /* get function pointer */
+ (my_hash_free_key) rows_log_event_free_entry, /* freefunction pointer */
+ MYF(0)); /* flags */
+
return 0;
}
@@ -9765,6 +10029,11 @@ Delete_rows_log_event::do_after_row_oper
my_free(m_key);
m_key= NULL;
+ /* we don't need the hash anymore, free it */
+ if ((decide_row_lookup_method(m_table, &m_cols, get_type_code()) == ROW_LOOKUP_HASH_SCAN) &&
+ my_hash_inited(&m_hash))
+ my_hash_free(&m_hash);
+
return error;
}
@@ -9773,16 +10042,11 @@ int Delete_rows_log_event::do_exec_row(c
int error;
DBUG_ASSERT(m_table != NULL);
- if (!(error= find_row(rli)))
- {
+ /* m_table->record[0] contains the BI */
+ m_table->mark_columns_per_binlog_row_image();
+ error= m_table->file->ha_delete_row(m_table->record[1]);
+ m_table->default_column_bitmaps();
- m_table->mark_columns_per_binlog_row_image();
- /*
- Delete the record found, located in record[0]
- */
- error= m_table->file->ha_delete_row(m_table->record[0]);
- m_table->default_column_bitmaps();
- }
return error;
}
@@ -9868,6 +10132,16 @@ Update_rows_log_event::do_before_row_ope
m_table->timestamp_field_type= TIMESTAMP_NO_AUTO_SET;
+ if (decide_row_lookup_method(m_table, &m_cols, get_type_code()) == ROW_LOOKUP_HASH_SCAN)
+ my_hash_init(&m_hash,
+ &my_charset_bin, /* the charater set information */
+ 16 /* FIXME */, /* growth size */
+ 0, /* key offset */
+ 0, /* key length */
+ rows_log_event_get_key, /* get function pointer */
+ (my_hash_free_key) rows_log_event_free_entry, /* freefunction pointer */
+ MYF(0)); /* flags */
+
return 0;
}
@@ -9880,6 +10154,11 @@ Update_rows_log_event::do_after_row_oper
my_free(m_key); // Free for multi_malloc
m_key= NULL;
+ /* we don't need the hash anymore, free it */
+ if ((decide_row_lookup_method(m_table, &m_cols, get_type_code()) == ROW_LOOKUP_HASH_SCAN) &&
+ my_hash_inited(&m_hash))
+ my_hash_free(&m_hash);
+
return error;
}
@@ -9889,53 +10168,6 @@ Update_rows_log_event::do_exec_row(const
DBUG_ASSERT(m_table != NULL);
int error= 0;
- /**
- Check if update contains only values in AI for columns that do
- not exist on the slave. If it does, we can just unpack the rows
- and return (do nothing on the local table).
-
- NOTE: We do the following optimization and check only if there
- are usable values on the AI and disregard the fact that there
- might be usable values in the BI. In practice this means that
- the slave will not go through find_row (since we have nothing
- on the record to update, why go looking for it?).
-
- If we wanted find_row to run anyway, we could move this
- check after find_row, but then we would have to face the fact
- that the slave might stop without finding the proper record
- (because it might have incomplete BI), even though there were
- no values in AI.
-
- On the other hand, if AI has usable values but BI has not,
- then find_row will return an error (and the error is then
- propagated as it was already).
- */
- if (!is_any_column_signaled_for_table(m_table, &m_cols_ai))
- {
- /*
- Read and discard images, because:
- 1. AI does not contain any useful values to replay;
- 2. BI is irrelevant if there is nothing useful in AI.
- */
- error = unpack_current_row(rli, &m_cols);
- m_curr_row= m_curr_row_end;
- error = error | unpack_current_row(rli, &m_cols_ai);
-
- return error;
- }
-
- error= find_row(rli);
- if (error)
- {
- /*
- We need to read the second image in the event of error to be
- able to skip to the next pair of updates
- */
- m_curr_row= m_curr_row_end;
- unpack_current_row(rli, &m_cols_ai);
- return error;
- }
-
/*
This is the situation after locating BI:
=== modified file 'sql/log_event.h'
--- a/sql/log_event.h 2010-10-08 14:35:24 +0000
+++ b/sql/log_event.h 2010-11-05 00:31:22 +0000
@@ -3528,6 +3528,13 @@ private:
class Rows_log_event : public Log_event
{
public:
+ enum row_lookup_mode {
+ ROW_LOOKUP_NOT_NEEDED= 0,
+ ROW_LOOKUP_INDEX_SCAN= 1,
+ ROW_LOOKUP_TABLE_SCAN= 2,
+ ROW_LOOKUP_HASH_SCAN= 3,
+ };
+
/**
Enumeration of the errors that can be returned.
*/
@@ -3703,6 +3710,7 @@ protected:
ulong m_table_id; /* Table ID */
MY_BITMAP m_cols; /* Bitmap denoting columns available */
ulong m_width; /* The width of the columns bitmap */
+ HASH m_hash;
/*
Bitmap for columns available in the after image, if present. These
fields are only available for Update_rows events. Observe that the
@@ -3810,6 +3818,13 @@ private:
*/
virtual int do_exec_row(const Relay_log_info *const rli) = 0;
+
+ int hash_row(Relay_log_info const *rli);
+ int handle_idempotent_errors(Relay_log_info const *rli, int *err);
+ int do_apply_row(Relay_log_info const *rli);
+ int do_index_scan_and_update(Relay_log_info const *rli);
+ int do_hash_scan_and_update(Relay_log_info const *rli);
+ int do_table_scan_and_update(Relay_log_info const *rli);
#endif /* defined(MYSQL_SERVER) && defined(HAVE_REPLICATION) */
friend class Old_rows_log_event;
Attachment: [text/bzr-bundle] bzr/luis.soares@oracle.com-20101105003122-ed89mbefikon0om7.bundle
| Thread |
|---|
| • bzr commit into mysql-next-mr branch (luis.soares:3204) | Luis Soares | 5 Nov |