#At file:///home/lsoares/Workspace/bzr/work/features/wl5597/mysql-next-mr/ based on revid:luis.soares@stripped
3206 Luis Soares 2010-11-19
WL#5597
Work-in-progress!
Improvements on this cset:
- fixes to idempotency handling.
- fixes to m_curr_row pointer in exec row main loop.
- refactory on main loop: we check what apply method to use only
once keep the pointer and use it inside the exec row main loop
- added some DBUG macros
- fixes to bugs inside do_index_scan_and_update
modified:
sql/log_event.cc
sql/log_event.h
=== modified file 'sql/log_event.cc'
--- a/sql/log_event.cc 2010-11-12 13:59:15 +0000
+++ b/sql/log_event.cc 2010-11-19 11:33:10 +0000
@@ -7592,6 +7592,7 @@ int Rows_log_event::do_add_row_data(ucha
static
my_bool is_any_column_signaled_for_table(TABLE *table, MY_BITMAP *cols)
{
+ DBUG_ENTER("is_any_column_signaled_for_table");
int nfields_set= 0;
for (Field **ptr=table->field ;
@@ -7602,7 +7603,7 @@ my_bool is_any_column_signaled_for_table
nfields_set++;
}
- return (nfields_set != 0);
+ DBUG_RETURN (nfields_set != 0);
}
/**
@@ -7639,15 +7640,17 @@ my_bool is_any_column_signaled_for_table
static
my_bool are_all_columns_signaled_for_key(KEY *keyinfo, MY_BITMAP *cols)
{
+ DBUG_ENTER("are_all_columns_signaled_for_key");
+
for (uint i=0 ; i < keyinfo->key_parts ;i++)
{
uint fieldnr= keyinfo->key_part[i].fieldnr - 1;
if (fieldnr >= cols->n_bits ||
!bitmap_is_set(cols, fieldnr))
- return FALSE;
+ DBUG_RETURN(FALSE);
}
- return TRUE;
+ DBUG_RETURN(TRUE);
}
/**
@@ -7693,6 +7696,8 @@ static
uint
search_key_in_table(TABLE *table, MY_BITMAP *bi_cols, uint key_type)
{
+ DBUG_ENTER("search_key_in_table");
+
KEY *keyinfo;
uint res= MAX_KEY;
uint key;
@@ -7701,7 +7706,7 @@ search_key_in_table(TABLE *table, MY_BIT
{
keyinfo= table->s->key_info + (uint) table->s->primary_key;
if (are_all_columns_signaled_for_key(keyinfo, bi_cols))
- return table->s->primary_key;
+ DBUG_RETURN(table->s->primary_key);
}
if (key_type & UNIQUE_KEY_FLAG && table->s->uniques)
@@ -7722,7 +7727,7 @@ search_key_in_table(TABLE *table, MY_BIT
key : MAX_KEY;
if (res < MAX_KEY)
- return res;
+ DBUG_RETURN(res);
}
}
@@ -7746,11 +7751,11 @@ search_key_in_table(TABLE *table, MY_BIT
key : MAX_KEY;
if (res < MAX_KEY)
- return res;
+ DBUG_RETURN(res);
}
}
- return res;
+ DBUG_RETURN(res);
}
static uint decide_row_lookup_method(TABLE* table, MY_BITMAP *cols, uint event_type)
@@ -7782,6 +7787,8 @@ static uint decide_row_lookup_method(TAB
*/
static bool record_compare(TABLE *table, MY_BITMAP *cols)
{
+ DBUG_ENTER("record_compare");
+
/*
Need to set the X bit and the filler bits in both records since
there are engines that do not set it correctly.
@@ -7880,7 +7887,7 @@ record_compare_exit:
}
}
- return result;
+ DBUG_RETURN(result);
}
/**
@@ -7904,6 +7911,7 @@ struct row_entry
place).
*/
const uchar *m_curr_row;
+
} typedef row_entry;
extern "C" uchar *rows_log_event_get_key(const uchar *record, size_t *length,
@@ -7926,6 +7934,9 @@ static void rows_log_event_free_entry(ro
int remove_blobs_from_record(TABLE* table)
{
+
+ DBUG_ENTER("remove_blobs_from_record");
+
int res= 0;
if (table->s->blob_fields)
@@ -7942,7 +7953,7 @@ int remove_blobs_from_record(TABLE* tabl
}
}
- return res;
+ DBUG_RETURN(res);
}
bool
@@ -7971,10 +7982,7 @@ int Rows_log_event::hash_row(Relay_log_i
/**
Unpack the row to be hashed.
*/
- if ((error= unpack_current_row(rli, &m_cols)))
- goto err;
-
- else
+ if (!(error= unpack_current_row(rli, &m_cols)))
{
/**
Save a copy of the original record unpacked.
@@ -8008,14 +8016,42 @@ int Rows_log_event::hash_row(Relay_log_i
m_curr_row=m_curr_row_end;
error= unpack_current_row(rli, &m_cols_ai);
}
-
- m_curr_row=m_curr_row_end;
}
-err:
return error;
}
+void Rows_log_event::do_post_row_operations(Relay_log_info const *rli, int error)
+{
+
+ /*
+ If m_curr_row_end was not set during event execution (e.g., because
+ of errors) we can't proceed to the next row. If the error is transient
+ (i.e., error==0 at this point) we must call unpack_current_row() to set
+ m_curr_row_end.
+ */
+
+ DBUG_PRINT("info", ("curr_row: 0x%lu; curr_row_end: 0x%lu; rows_end: 0x%lu",
+ (ulong) m_curr_row, (ulong) m_curr_row_end, (ulong) m_rows_end));
+
+ if (!m_curr_row_end && !error)
+ {
+ error= unpack_current_row(rli, &m_cols);
+ }
+
+ // at this moment m_curr_row_end should be set
+ DBUG_ASSERT(error || m_curr_row_end != NULL);
+ DBUG_ASSERT(error || m_curr_row <= m_curr_row_end);
+ DBUG_ASSERT(error || m_curr_row_end <= m_rows_end);
+
+ m_curr_row= m_curr_row_end;
+
+ if (error == 0 && !m_table->file->has_transactions())
+ thd->transaction.all.modified_non_trans_table=
+ thd->transaction.stmt.modified_non_trans_table= TRUE;
+
+}
+
int Rows_log_event::handle_idempotent_errors(Relay_log_info const *rli, int *err)
{
int error= *err;
@@ -8037,57 +8073,32 @@ int Rows_log_event::handle_idempotent_er
clear_all_errors(thd, const_cast<Relay_log_info*>(rli));
*err= 0;
if (idempotent_error == 0)
- return true;
+ return *err;
}
}
- return false;
+ return *err;
}
int Rows_log_event::do_apply_row(Relay_log_info const *rli)
{
- int error= 0;
-
- /* in_use can have been set to NULL in close_tables_for_reopen */
- THD* old_thd= m_table->in_use;
- if (!m_table->in_use)
- m_table->in_use= thd;
+ DBUG_ENTER("Rows_log_event::do_apply_row");
- error= do_exec_row(rli);
-
- DBUG_PRINT("info", ("error: %s", HA_ERR(error)));
- DBUG_ASSERT(error != HA_ERR_RECORD_DELETED);
-
- m_table->in_use = old_thd;
-
- if (handle_idempotent_errors(rli, &error))
- return error;
-
- /*
- If m_curr_row_end was not set during event execution (e.g., because
- of errors) we can't proceed to the next row. If the error is transient
- (i.e., error==0 at this point) we must call unpack_current_row() to set
- m_curr_row_end.
- */
-
- DBUG_PRINT("info", ("curr_row: 0x%lu; curr_row_end: 0x%lu; rows_end: 0x%lu",
- (ulong) m_curr_row, (ulong) m_curr_row_end, (ulong) m_rows_end));
-
- if (!m_curr_row_end && !error)
- error= unpack_current_row(rli, &m_cols);
-
- // at this moment m_curr_row_end should be set
- DBUG_ASSERT(error || m_curr_row_end != NULL);
- DBUG_ASSERT(error || m_curr_row <= m_curr_row_end);
- DBUG_ASSERT(error || m_curr_row_end <= m_rows_end);
-
- m_curr_row= m_curr_row_end;
-
- if (error == 0 && !m_table->file->has_transactions())
- thd->transaction.all.modified_non_trans_table=
- thd->transaction.stmt.modified_non_trans_table= TRUE;
+ int error= 0;
+
+ /* in_use can have been set to NULL in close_tables_for_reopen */
+ THD* old_thd= m_table->in_use;
+ if (!m_table->in_use)
+ m_table->in_use= thd;
+
+ error= do_exec_row(rli);
+
+ DBUG_PRINT("info", ("error: %s", HA_ERR(error)));
+ DBUG_ASSERT(error != HA_ERR_RECORD_DELETED);
+
+ m_table->in_use = old_thd;
- return error;
+ DBUG_RETURN(error);
}
@@ -8100,6 +8111,7 @@ int Rows_log_event::do_index_scan_and_up
int error= 0;
KEY *keyinfo;
uint key;
+ const uchar *saved_m_curr_row= m_curr_row;
/*
rpl_row_tabledefs.test specifies that
@@ -8109,7 +8121,10 @@ int Rows_log_event::do_index_scan_and_up
*/
prepare_record(table, &m_cols, FALSE);
- error= unpack_current_row(rli, &m_cols);
+ if ((error= unpack_current_row(rli, &m_cols)))
+ goto end;
+
+ saved_m_curr_row= m_curr_row;
// Temporary fix to find out why it fails [/Matz]
memcpy(m_table->read_set->bitmap, m_cols.bitmap, (m_table->read_set->n_bits + 7) / 8);
@@ -8117,7 +8132,7 @@ int Rows_log_event::do_index_scan_and_up
if (!is_any_column_signaled_for_table(table, &m_cols))
{
error= HA_ERR_END_OF_FILE;
- goto err;
+ goto end;
}
#ifndef DBUG_OFF
@@ -8149,12 +8164,13 @@ int Rows_log_event::do_index_scan_and_up
table->s->reclength) == 0);
*/
+
DBUG_PRINT("info",("locating record using primary key (position)"));
- int error;
if (table->file->inited && (error= table->file->ha_index_end()))
- DBUG_RETURN(error);
+ goto end;
+
if ((error= table->file->ha_rnd_init(FALSE)))
- DBUG_RETURN(error);
+ goto end;
error= table->file->rnd_pos_by_record(table->record[0]);
@@ -8166,7 +8182,8 @@ int Rows_log_event::do_index_scan_and_up
error= HA_ERR_KEY_NOT_FOUND;
table->file->print_error(error, MYF(0));
}
- DBUG_RETURN(error);
+
+ goto end;
}
// We can't use position() - try other methods.
@@ -8185,7 +8202,7 @@ INDEX_SCAN:
/* we dont have a key, or no key is suitable for the BI values */
{
error= HA_ERR_KEY_NOT_FOUND;
- goto err;
+ goto end;
}
{
@@ -8199,7 +8216,7 @@ INDEX_SCAN:
{
DBUG_PRINT("info",("ha_index_init returns error %d",error));
table->file->print_error(error, MYF(0));
- goto err;
+ goto end;
}
/* Fill key data for the row */
@@ -8233,8 +8250,7 @@ INDEX_SCAN:
if (error == HA_ERR_RECORD_DELETED)
error= HA_ERR_KEY_NOT_FOUND;
table->file->print_error(error, MYF(0));
- table->file->ha_index_end();
- goto err;
+ goto end;
}
/*
@@ -8262,11 +8278,8 @@ INDEX_SCAN:
if (keyinfo->flags & HA_NOSAME || key == table->s->primary_key)
{
/* Unique does not have non nullable part */
- if (!(table->key_info->flags & (HA_NULL_PART_KEY)))
- {
- table->file->ha_index_end();
- goto record_found;
- }
+ if (!(table->key_info->flags & (HA_NULL_PART_KEY)))
+ goto end; // record found
else
{
KEY *keyinfo= table->key_info;
@@ -8283,10 +8296,7 @@ INDEX_SCAN:
}
if (!null_found)
- {
- table->file->ha_index_end();
- goto record_found;
- }
+ goto end; // record found
/* else fall through to index scan */
}
@@ -8323,21 +8333,25 @@ INDEX_SCAN:
continue;
DBUG_PRINT("info",("no record matching the given row found"));
table->file->print_error(error, MYF(0));
- table->file->ha_index_end();
- goto err;
+ goto end;
}
}
+ }
- /*
- Have to restart the scan to be able to fetch the next row.
- */
+end:
+ if (!error)
+ error= do_apply_row(rli);
+
+ if (table->file->inited)
table->file->ha_index_end();
- }
-record_found:
- error= do_apply_row(rli);
+ if ((get_type_code() == UPDATE_ROWS_EVENT) &&
+ (saved_m_curr_row== m_curr_row)) // we need to unpack the AI
+ {
+ m_curr_row= m_curr_row_end;
+ unpack_current_row(rli, &m_cols);
+ }
-err:
table->default_column_bitmaps();
DBUG_RETURN(error);
@@ -8353,14 +8367,12 @@ int Rows_log_event::do_hash_scan_and_upd
goto err;
/**
- Last row hashed.
-
- Now do the table scan and update according to the hash table
- matches.
+ Last row hashed. We are handling the last (pair of) row(s). So
+ now we do the table scan and match against the entries in the hash
+ table.
*/
- if (m_curr_row == m_rows_end)
+ if (m_curr_row_end == m_rows_end)
{
-
TABLE* table= m_table;
MY_BITMAP* read_set= &m_cols;
@@ -8375,6 +8387,7 @@ int Rows_log_event::do_hash_scan_and_upd
/* Continue until we find the right record or reached the end of the table */
do
{
+ /* get the first record from the table */
error= table->file->ha_rnd_next(table->record[0]);
DBUG_PRINT("info", ("error: %s", HA_ERR(error)));
@@ -8415,22 +8428,26 @@ int Rows_log_event::do_hash_scan_and_upd
both table->record[0] and table->record[1] should have
approximately the same contents (except maybe for blobs
- thence we need to do full comparison below).
- */
- m_curr_row= entry->m_curr_row;
- if ((error= unpack_current_row(rli, &m_cols)))
- goto close_table_and_err;
-
- /*
- compare the row_entry with row taking into account
- blobs, if there is any. If there is a match do the
- operation and remove the entry from the hash table.
*/
- if (!record_compare(table, read_set))
- {
- found_in_hash= true;
-
- my_hash_delete(&m_hash, (uchar *)entry);
- break;
+ m_curr_row= entry->m_curr_row;
+ if ((error= unpack_current_row(rli, &m_cols)))
+ /* unpack errors are not elegible for idempotency */
+ goto close_table;
+
+ else
+ {
+ /*
+ compare the row_entry with row taking into account
+ blobs, if there is any. If there is a match do the
+ operation and remove the entry from the hash table.
+ */
+ if (!record_compare(table, read_set))
+ {
+ found_in_hash= true;
+
+ my_hash_delete(&m_hash, (uchar *)entry);
+ break;
+ }
}
/* Next row for the loop... */
@@ -8441,8 +8458,15 @@ int Rows_log_event::do_hash_scan_and_upd
}
if (found_in_hash)
+ {
if ((error= do_apply_row(rli)))
- goto err;
+ {
+ if (handle_idempotent_errors(rli, &error) || error)
+ goto close_table;
+
+ do_post_row_operations(rli, error);
+ }
+ }
}
break;
@@ -8453,25 +8477,29 @@ int Rows_log_event::do_hash_scan_and_upd
case HA_ERR_RECORD_DELETED:
break;
- case HA_ERR_END_OF_FILE: // to make it clear
+ case HA_ERR_END_OF_FILE:
default:
DBUG_PRINT("info", ("Failed to get next record"
" (ha_rnd_next returns %d)",error));
- goto close_table_and_err;
+ goto close_table;
}
}
- while ((m_hash.records > 0) && (!error || (error == HA_ERR_RECORD_DELETED)));
- }
+ while ((m_hash.records > 0) &&
+ (!error || (error == HA_ERR_RECORD_DELETED)));
-err:
- DBUG_RETURN(error);
+close_table:
+ m_table->file->print_error(error, MYF(0));
+ m_table->file->ha_rnd_end();
-close_table_and_err:
- m_table->file->print_error(error, MYF(0));
- m_table->file->ha_rnd_end();
- DBUG_RETURN (error);
+ /* set back the pointer to the end of the rows */
+// if (!error || (error == HA_ERR_RECORD_DELETED))
+// m_curr_row= m_curr_row_end= m_rows_end;
+ }
+err:
+ DBUG_ASSERT((m_hash.records == 0) ? (error == 0) : (m_hash.records > 0));
+ DBUG_RETURN(error);
}
int Rows_log_event::do_table_scan_and_update(Relay_log_info const *rli)
@@ -8809,35 +8837,42 @@ int Rows_log_event::do_apply_event(Relay
is_any_column_signaled_for_table(table, &m_cols_ai))
{
uint row_lookup_method= decide_row_lookup_method(table, &m_cols, get_type_code());
+
+ int (Rows_log_event::*do_apply_row_ptr)(Relay_log_info const *)= NULL;
+ switch (row_lookup_method)
+ {
+ case ROW_LOOKUP_HASH_SCAN:
+ do_apply_row_ptr= &Rows_log_event::do_hash_scan_and_update;
+ break;
+
+ case ROW_LOOKUP_INDEX_SCAN:
+ do_apply_row_ptr= &Rows_log_event::do_index_scan_and_update;
+ break;
+
+ case ROW_LOOKUP_TABLE_SCAN:
+ do_apply_row_ptr= &Rows_log_event::do_table_scan_and_update;
+ break;
+
+ case ROW_LOOKUP_NOT_NEEDED:
+ DBUG_ASSERT(get_type_code() == WRITE_ROWS_EVENT);
+
+ /* No need to scan for rows, just apply it */
+ do_apply_row_ptr= &Rows_log_event::do_apply_row;
+ break;
+ }
// row processing loop
while (!error && (m_curr_row != m_rows_end))
{
- switch (row_lookup_method)
- {
- case ROW_LOOKUP_HASH_SCAN:
- /**
- scan the table and for each entry in the table, if
- it exists in the hash, execute the row.
- */
- error= do_hash_scan_and_update(rli);
- break;
-
- case ROW_LOOKUP_INDEX_SCAN:
- error= do_index_scan_and_update(rli);
- break;
-
- case ROW_LOOKUP_TABLE_SCAN:
- error= do_table_scan_and_update(rli);
- break;
-
- case ROW_LOOKUP_NOT_NEEDED:
- DBUG_ASSERT(get_type_code() == WRITE_ROWS_EVENT);
-
- /* No need to scan for rows, just apply it */
- error= do_apply_row(rli);
- break;
- }
+
+ error= (this->*do_apply_row_ptr)(rli);
+
+ if (handle_idempotent_errors(rli, &error))
+ break;
+
+ /* this advances m_curr_row */
+ do_post_row_operations(rli, error);
+
}
}
=== modified file 'sql/log_event.h'
--- a/sql/log_event.h 2010-11-05 00:31:22 +0000
+++ b/sql/log_event.h 2010-11-19 11:33:10 +0000
@@ -3821,6 +3821,7 @@ private:
int hash_row(Relay_log_info const *rli);
int handle_idempotent_errors(Relay_log_info const *rli, int *err);
+ void do_post_row_operations(Relay_log_info const *rli, int err);
int do_apply_row(Relay_log_info const *rli);
int do_index_scan_and_update(Relay_log_info const *rli);
int do_hash_scan_and_update(Relay_log_info const *rli);
Attachment: [text/bzr-bundle] bzr/luis.soares@oracle.com-20101119113310-suzckexehkbhyev0.bundle
| Thread |
|---|
| • bzr commit into mysql-next-mr branch (luis.soares:3206) WL#5597 | Luis Soares | 19 Nov |