3893 Marko Mäkelä 2012-05-29
WL#6255 online table rebuild: Online log apply.
The online table rebuild will be enabled in the interface in a
subsequent commit.
prepare_inplace_alter_table_dict(): Allocate the online rebuild log if
needed.
ha_innobase::inplace_alter_table(): Apply the bulk of the online
rebuild log while not holding (m)any locks.
innobase_online_rebuild_log_free(): New function, for freeing the
online rebuild log. Called from rollback_inplace_alter_table()
and ha_innobase::commit_inplace_alter_table().
ha_innobase::commit_inplace_alter_table(): Apply and free the online
rebuild log. Prevent old transactions from accessing the rebuilt
table, because the history is missing. (This bug has been present
since the InnoDB Plugin in MySQL 5.1.)
row_log_table_apply_convert_mrec(): Auxiliary function for converting
a log record on the clustered index record of the old table into a row
tuple of the rebuilt table. (TODO: implement ADD COLUMN, DROP COLUMN
and others.)
row_log_table_apply(), row_log_table_apply_ops(), row_log_table_apply_op(),
row_log_table_apply_insert_low(), row_log_table_apply_insert(),
row_log_table_apply_delete_low(), row_log_table_apply_delete(),
row_log_table_apply_update(): Apply the online rebuild log.
row_merge_skip_rec(): Determine if a row should be skipped during
online table rebuild. Rolled-back records will be skipped.
row_merge_read_clustered_index(): Relax assertions. Check if an error
has occurred during online table rebuild.
row_merge_copy_blobs(): Non-functional changes.
row_merge_insert_index_tuples(): Replace zip_size with old_table,
so that row_merge_skip_rec() can be called.
row_merge_drop_indexes(): Add debug assertions. This function should
not be called during online table rebuild (the entire table should be
dropped instead).
row_merge_build_indexes(): Add the parameter altered_table, so that
rows can be converted for dropping, adding or reordering columns (not
implemented yet). Adjust assertions and conditions.
modified:
storage/innobase/handler/handler0alter.cc
storage/innobase/include/row0log.h
storage/innobase/include/row0merge.h
storage/innobase/row/row0log.cc
storage/innobase/row/row0merge.cc
3892 Marko Mäkelä 2012-05-29
WL#6255 DML logging. This does not yet enable online table rebuild.
The functions for applying the log are part of a separate change.
In online table rebuild, we will not copy any delete-marked rows or
any history to the new table. Anything that is delete-marked will be
purged immediately from the new table.
Because the DML log will contain BLOB pointers of the original table,
the purge thread must not free any BLOBs of a table that is being
rebuilt.
Because we cannot easily block the ROLLBACK of an INSERT, we will keep
track of transactions that have been rolled back, and skip any rows
written by transactions that have been rolled back. In this way, we
will not dereference any BLOBs that would be freed as part of a
rollback.
row_log_table_insert(): Logging function for inserting a new row,
whether or not a fresh insert or an update of a delete-marked row.
row_log_table_delete(): Logging function for delete-marking a row.
The row will be identified by PRIMARY KEY (in the new table
definition) and the DB_TRX_ID before the delete-marking (purging).
row_log_table_update(): Logging function for an update of a row that
was not delete-marked. The new value of the row will be logged, not
dereferencing any BLOB pointers. If the PRIMARY KEY is redefined, the
update record will also contain the new PRIMARY KEY and the DB_TRX_ID,
DB_ROLL_PTR value of the old row.
row_log_table_get_pk(): Constructs the old PRIMARY KEY and
DB_TRX_ID,DB_ROLL_PTR for row_log_table_update(). If the PRIMARY KEY
is not being redefined, returns NULL. This conversion has to be done
at logging time while holding the clustered index leaf page latch, in
case the new PRIMARY KEY includes column prefixes of externally stored
columns.
row_log_table_open(), row_log_table_close_func(): Common code for
table DML logging.
row_ins_clust_index_entry_by_modify(), row_ins_clust_index_entry_low(),
row_ins_index_entry_big_rec_func(): Add row_log_table_insert() logging.
row_undo_ins_remove_clust_rec(): Add row_log_table_delete() logging.
row_undo_mod_clust_low(): Add an output parameter rebuilt_old_pk for
row_log_table_update() logging.
row_undo_mod_clust(): Add insert, update, delete logging.
row_upd_clust_rec_by_insert(): Note that no logging is necessary for
BLOB ownership changes.
row_upd_clust_rec(): Add the parameters offsets, offsets_heap. Add
row_log_table_update() logging.
modified:
storage/innobase/btr/btr0cur.cc
storage/innobase/include/row0log.h
storage/innobase/row/row0ins.cc
storage/innobase/row/row0log.cc
storage/innobase/row/row0uins.cc
storage/innobase/row/row0umod.cc
storage/innobase/row/row0upd.cc
=== modified file 'storage/innobase/handler/handler0alter.cc'
--- a/storage/innobase/handler/handler0alter.cc revid:marko.makela@stripped29133330-hvs4rzbzl68symnl
+++ b/storage/innobase/handler/handler0alter.cc revid:marko.makela@stripped1608-cdeb9k9943oamce2
@@ -1456,10 +1456,6 @@ prepare_inplace_alter_table_dict(
column is to be added, and the primary index definition
is just copied from old table and stored in indexdefs[0] */
DBUG_ASSERT(!add_fts_doc_id || new_clustered);
-
- /* A primary key index cannot be created online. The table
- should be locked in this case. It should also be locked when a
- full-text index is being created. */
DBUG_ASSERT(!!new_clustered
== ((ha_alter_info->handler_flags
& INNOBASE_INPLACE_REBUILD)
@@ -1481,13 +1477,8 @@ prepare_inplace_alter_table_dict(
|| ha_alter_info->alter_info->requested_lock
== Alter_info::ALTER_TABLE_LOCK_SHARED;
- /* For now, rebuilding the clustered index requires a lock. */
- ut_ad(!new_clustered || locked
- || ha_alter_info->alter_info->requested_lock
- == Alter_info::ALTER_TABLE_LOCK_DEFAULT);
-
/* Acquire a lock on the table before creating any indexes. */
- if (new_clustered || locked) {
+ if (locked) {
error = row_merge_lock_table(
user_trx, indexed_table, LOCK_S);
@@ -1707,12 +1698,20 @@ col_fail:
/* If only online ALTER TABLE operations have been
requested, allocate a modification log. If the table
- will be locked anyway, the modification log is
- unnecessary. */
- if (!locked && !num_fts_index
- && !innobase_need_rebuild(ha_alter_info)
- && !user_table->ibd_file_missing
- && !user_table->tablespace_discarded) {
+ will be locked anyway, the modification
+ log is unnecessary. When rebuilding the table
+ (new_clustered), we will allocate the log for the
+ clustered index of the old table, later. */
+ if (new_clustered
+ || locked
+ || user_table->ibd_file_missing
+ || user_table->tablespace_discarded) {
+ /* No need to allocate a modification log. */
+ ut_ad(!add_index[num_created]->online_log);
+ } else if (add_index[num_created]->type & DICT_FTS) {
+ /* Fulltext indexes are not covered
+ by a modification log. */
+ } else {
DBUG_EXECUTE_IF("innodb_OOM_prepare_inplace_alter",
error = DB_OUT_OF_MEMORY;
goto error_handling;);
@@ -1728,6 +1727,29 @@ col_fail:
}
}
+ ut_ad(new_clustered == (indexed_table != user_table));
+
+ if (new_clustered && !locked) {
+ /* Allocate a log for online table rebuild. */
+ dict_index_t* clust_index = dict_table_get_first_index(
+ user_table);
+
+ DBUG_EXECUTE_IF("innodb_OOM_prepare_inplace_alter_rebuild",
+ error = DB_OUT_OF_MEMORY;
+ goto error_handling;);
+ rw_lock_x_lock(&clust_index->lock);
+ bool ok = row_log_allocate(
+ clust_index, indexed_table,
+ !!(ha_alter_info->handler_flags
+ & Alter_inplace_info::ADD_PK_INDEX));
+ rw_lock_x_unlock(&clust_index->lock);
+
+ if (!ok) {
+ error = DB_OUT_OF_MEMORY;
+ goto error_handling;
+ }
+ }
+
if (fts_index) {
#ifdef UNIV_DEBUG
/* Ensure that the dictionary operation mode will
@@ -1817,8 +1839,7 @@ error_handling:
user_trx, add_index, add_key_nums, n_add_index,
drop_index, n_drop_index,
drop_foreign, n_drop_foreign,
- !locked && !new_clustered && !num_fts_index,
- heap, trx, indexed_table);
+ !locked, heap, trx, indexed_table);
DBUG_RETURN(false);
case DB_TABLESPACE_ALREADY_EXISTS:
my_error(ER_TABLE_EXISTS_ERROR, MYF(0), "(unknown)");
@@ -1841,13 +1862,30 @@ error_handling:
if (indexed_table != user_table) {
dict_table_close(indexed_table, TRUE, FALSE);
row_merge_drop_table(trx, indexed_table);
+
+ /* Free the log for online table rebuild, if
+ one was allocated. */
+
+ dict_index_t* clust_index = dict_table_get_first_index(
+ user_table);
+
+ rw_lock_x_lock(&clust_index->lock);
+
+ if (clust_index->online_log) {
+ ut_ad(!locked);
+ row_log_free(clust_index);
+ clust_index->online_status
+ = ONLINE_INDEX_COMPLETE;
+ }
+
+ rw_lock_x_unlock(&clust_index->lock);
}
trx_commit_for_mysql(trx);
/* n_ref_count must be 1, because purge cannot
be executing on this very table as we are
holding dict_operation_lock X-latch. */
- DBUG_ASSERT(user_table->n_ref_count == 1);
+ DBUG_ASSERT(user_table->n_ref_count == 1 || !locked);
online_retry_drop_indexes_with_trx(user_table, trx);
} else {
@@ -2460,10 +2498,17 @@ ok_exit:
prebuilt->trx,
prebuilt->table, ctx->indexed_table,
ctx->online,
- ctx->add, ctx->add_key_numbers, ctx->num_to_add, table);
+ ctx->add, ctx->add_key_numbers, ctx->num_to_add,
+ table, altered_table);
#ifndef DBUG_OFF
oom:
#endif /* !DBUG_OFF */
+ if (error == DB_SUCCESS && ctx->online
+ && ctx->indexed_table != prebuilt->table) {
+ DEBUG_SYNC_C("row_log_table_apply1_before");
+ error = row_log_table_apply(
+ ctx->thr, prebuilt->table, table, altered_table);
+ }
/* After an error, remove all those index definitions
from the dictionary which were defined. */
@@ -2524,6 +2569,31 @@ oom:
DBUG_RETURN(true);
}
+/** Free the modification log for online table rebuild.
+@param table table that was being rebuilt online */
+static
+void
+innobase_online_rebuild_log_free(
+/*=============================*/
+ dict_table_t* table)
+{
+ dict_index_t* clust_index = dict_table_get_first_index(table);
+
+ rw_lock_x_lock(&clust_index->lock);
+
+ if (clust_index->online_log) {
+ ut_ad(dict_index_get_online_status(clust_index)
+ == ONLINE_INDEX_CREATION);
+ clust_index->online_status = ONLINE_INDEX_COMPLETE;
+ row_log_free(clust_index);
+ clust_index->online_status = ONLINE_INDEX_COMPLETE;
+ }
+
+ DBUG_ASSERT(dict_index_get_online_status(clust_index)
+ == ONLINE_INDEX_COMPLETE);
+ rw_lock_x_unlock(&clust_index->lock);
+}
+
/** Roll back the changes made during prepare_inplace_alter_table()
and inplace_alter_table() inside the storage engine. Note that the
allowed level of concurrency during this operation will be the same as
@@ -2576,6 +2646,8 @@ rollback_inplace_alter_table(
prebuilt->table->flags);
fail = true;
}
+
+ innobase_online_rebuild_log_free(prebuilt->table);
} else {
DBUG_ASSERT(!(ha_alter_info->handler_flags
& Alter_inplace_info::ADD_PK_INDEX));
@@ -2587,6 +2659,14 @@ rollback_inplace_alter_table(
trx_free_for_mysql(ctx->trx);
func_exit:
+#ifndef DBUG_OFF
+ dict_index_t* clust_index = dict_table_get_first_index(
+ prebuilt->table);
+ DBUG_ASSERT(!clust_index->online_log);
+ DBUG_ASSERT(dict_index_get_online_status(clust_index)
+ == ONLINE_INDEX_COMPLETE);
+#endif /* !DBUG_OFF */
+
if (ctx && prebuilt->table == ctx->indexed_table) {
/* Clear the to_be_dropped flag in the data dictionary. */
for (ulint i = 0; i < ctx->num_to_drop; i++) {
@@ -2872,6 +2952,8 @@ ha_innobase::commit_inplace_alter_table(
DBUG_ENTER("commit_inplace_alter_table");
if (!(ha_alter_info->handler_flags & ~INNOBASE_INPLACE_IGNORE)) {
+ DBUG_ASSERT(!ctx);
+
/* Nothing to do */
if (!commit) {
goto ret;
@@ -2916,7 +2998,61 @@ ha_innobase::commit_inplace_alter_table(
/* We copied the table. Any indexes that were
requested to be dropped were not created in the copy
- of the table. */
+ of the table. Apply any last bit of the rebuild log
+ and then rename the tables. */
+
+ if (ctx->online) {
+ DEBUG_SYNC_C("row_log_table_apply2_before");
+ error = row_log_table_apply(
+ ctx->thr, prebuilt->table,
+ table, altered_table);
+
+ switch (error) {
+ KEY* dup_key;
+ case DB_SUCCESS:
+ break;
+ case DB_DUPLICATE_KEY:
+ if (prebuilt->trx->error_key_num
+ == ULINT_UNDEFINED) {
+ /* This should be the hidden index on
+ FTS_DOC_ID. */
+ dup_key = NULL;
+ } else {
+ DBUG_ASSERT(
+ prebuilt->trx->error_key_num
+ < ha_alter_info->key_count);
+ dup_key = &ha_alter_info
+ ->key_info_buffer[
+ prebuilt->trx
+ ->error_key_num];
+ }
+ print_keydup_error(dup_key);
+ break;
+ case DB_ONLINE_LOG_TOO_BIG:
+ my_error(ER_INNODB_ONLINE_LOG_TOO_BIG, MYF(0),
+ ha_alter_info->key_info_buffer[0]
+ .name);
+ break;
+ case DB_INDEX_CORRUPT:
+ my_error(ER_INDEX_CORRUPT, MYF(0),
+ (prebuilt->trx->error_key_num
+ == ULINT_UNDEFINED)
+ ? FTS_DOC_ID_INDEX_NAME
+ : ha_alter_info->key_info_buffer[
+ prebuilt->trx->error_key_num]
+ .name);
+ break;
+ default:
+ my_error_innodb(error,
+ table_share->table_name.str,
+ prebuilt->table->flags);
+ }
+
+ if (error != DB_SUCCESS) {
+ err = -1;
+ goto drop_new_clustered;
+ }
+ }
/* A new clustered index was defined for the table
and there was no error at this point. We can
@@ -2935,17 +3071,24 @@ ha_innobase::commit_inplace_alter_table(
holding dict_operation_lock X-latch. */
ut_a(prebuilt->table->n_ref_count == 1);
- if (error == DB_SUCCESS) {
- dict_table_t* old_table = prebuilt->table;
+ switch (error) {
+ dict_table_t* old_table;
+ trx_id_t trx_id;
+ case DB_SUCCESS:
+ old_table = prebuilt->table;
+ trx_id = prebuilt->trx->id;
trx_commit_for_mysql(prebuilt->trx);
row_prebuilt_free(prebuilt, TRUE);
error = row_merge_drop_table(trx, old_table);
prebuilt = row_create_prebuilt(
ctx->indexed_table, table->s->reclength);
- }
-
- switch (error) {
- case DB_SUCCESS:
+ /* Prevent old transactions from accessing the
+ rebuilt table, because the history is missing. */
+ for (dict_index_t* index = dict_table_get_first_index(
+ ctx->indexed_table);
+ index; index = dict_table_get_next_index(index)) {
+ index->trx_id = trx_id;
+ }
err = 0;
break;
case DB_TABLESPACE_ALREADY_EXISTS:
@@ -3067,6 +3210,10 @@ trx_commit:
trx_rollback_for_mysql(trx);
}
+ if (new_clustered) {
+ innobase_online_rebuild_log_free(prebuilt->table);
+ }
+
if (err == 0 && ctx) {
/* The changes were successfully performed. */
bool add_fts = false;
@@ -3247,8 +3394,17 @@ func_exit:
}
ret:
+#ifndef DBUG_OFF
+ dict_index_t* clust_index = dict_table_get_first_index(
+ prebuilt->table);
+ DBUG_ASSERT(!clust_index->online_log);
+ DBUG_ASSERT(dict_index_get_online_status(clust_index)
+ == ONLINE_INDEX_COMPLETE);
+#endif /* !DBUG_OFF */
+
if (err == 0) {
MONITOR_ATOMIC_DEC(MONITOR_PENDING_ALTER_TABLE);
}
+
DBUG_RETURN(err != 0);
}
=== modified file 'storage/innobase/include/row0log.h'
--- a/storage/innobase/include/row0log.h revid:marko.makela@strippedymnl
+++ b/storage/innobase/include/row0log.h revid:marko.makela@stripped
@@ -167,6 +167,22 @@ row_log_table_is_rollback(
__attribute__((nonnull));
/******************************************************//**
+Apply the row_log_table log to a table upon completing rebuild.
+@return DB_SUCCESS, or error code on failure */
+UNIV_INTERN
+dberr_t
+row_log_table_apply(
+/*================*/
+ que_thr_t* thr, /*!< in: query graph */
+ dict_table_t* old_table,
+ /*!< in: old table */
+ struct TABLE* table, /*!< in/out: MySQL table
+ (for reporting duplicates) */
+ struct TABLE* altered_table)
+ /*!< in: new MySQL table definition */
+ __attribute__((nonnull, warn_unused_result));
+
+/******************************************************//**
Get the latest transaction ID that has invoked row_log_online_op()
during online creation.
@return latest transaction ID, or 0 if nothing was logged */
=== modified file 'storage/innobase/include/row0merge.h'
--- a/storage/innobase/include/row0merge.h revid:marko.makela@stripped529133330-hvs4rzbzl68symnl
+++ b/storage/innobase/include/row0merge.h revid:marko.makela@stripped-cdeb9k9943oamce2
@@ -290,9 +290,11 @@ row_merge_build_indexes(
dict_index_t** indexes, /*!< in: indexes to be created */
const ulint* key_numbers, /*!< in: MySQL key numbers */
ulint n_indexes, /*!< in: size of indexes[] */
- struct TABLE* table) /*!< in/out: MySQL table, for
+ struct TABLE* table, /*!< in/out: MySQL table, for
reporting erroneous key value
if applicable */
+ struct TABLE* altered_table) /*!< in/out: new MySQL table
+ definition */
__attribute__((nonnull, warn_unused_result));
/********************************************************************//**
Write a buffer to a block. */
@@ -302,7 +304,8 @@ row_merge_buf_write(
/*================*/
const row_merge_buf_t* buf, /*!< in: sorted buffer */
const merge_file_t* of, /*!< in: output file */
- row_merge_block_t* block); /*!< out: buffer for writing to file */
+ row_merge_block_t* block) /*!< out: buffer for writing to file */
+ __attribute__((nonnull));
/********************************************************************//**
Sort a buffer. */
UNIV_INTERN
=== modified file 'storage/innobase/row/row0log.cc'
--- a/storage/innobase/row/row0log.cc revid:marko.makela@stripped120529133330-hvs4rzbzl68symnl
+++ b/storage/innobase/row/row0log.cc revid:marko.makela@strippeddeb9k9943oamce2
@@ -890,6 +890,1212 @@ row_log_table_is_rollback(
}
/******************************************************//**
+Converts a log record to a table row.
+@return converted row */
+static __attribute__((nonnull, warn_unused_result))
+const dtuple_t*
+row_log_table_apply_convert_mrec(
+/*=============================*/
+ const mrec_t* mrec, /*!< in: merge record */
+ const ulint* offsets, /*!< in: offsets of mrec */
+ mem_heap_t* heap, /*!< in/out: memory heap */
+ dict_table_t* new_table, /*!< in/out: table
+ being rebuilt */
+ const struct TABLE* altered_table, /*!< in: new MySQL
+ table definition */
+ const row_merge_dup_t* dup) /*!< in: old index and table */
+{
+ dtuple_t* row;
+
+ /* This is based on row_build(). */
+ row = dtuple_create(heap, dict_table_get_n_cols(dup->index->table));
+ dict_table_copy_types(row, dup->index->table);
+
+ for (ulint i = 0; i < rec_offs_n_fields(offsets); i++) {
+ const dict_field_t* ind_field
+ = dict_index_get_nth_field(dup->index, i);
+ const dict_col_t* col
+ = dict_field_get_col(ind_field);
+ ulint col_no
+ = dict_col_get_no(col);
+ dfield_t* dfield
+ = dtuple_get_nth_field(row, col_no);
+ ulint len;
+ const void* data;
+
+ if (rec_offs_nth_extern(offsets, i)) {
+ data = btr_rec_copy_externally_stored_field(
+ mrec, offsets,
+ dict_table_zip_size(dup->index->table),
+ i, &len, heap);
+ ut_a(data);
+ dfield_set_data(dfield, data, len);
+ } else if (ind_field->prefix_len == 0) {
+ data = rec_get_nth_field(mrec, offsets, i, &len);
+ dfield_set_data(dfield, data, len);
+ }
+ }
+
+ /* TODO: convert row to new_table->cols, in case columns are
+ added or dropped or reordered */
+
+ return(row);
+}
+
+/******************************************************//**
+Replays an insert operation on a table that was rebuilt.
+@return DB_SUCCESS or error code */
+static __attribute__((nonnull, warn_unused_result))
+dberr_t
+row_log_table_apply_insert_low(
+/*===========================*/
+ que_thr_t* thr, /*!< in: query graph */
+ const dtuple_t* row, /*!< in: table row
+ in the old table definition */
+ mem_heap_t* offsets_heap, /*!< in/out: memory heap
+ that can be emptied */
+ mem_heap_t* heap, /*!< in/out: memory heap */
+ dict_table_t* new_table, /*!< in/out: table
+ being rebuilt */
+ const struct TABLE* altered_table, /*!< in: new MySQL
+ table definition */
+ row_merge_dup_t* dup) /*!< in/out: for reporting
+ duplicate key errors */
+{
+ dberr_t error;
+ dtuple_t* entry;
+ dict_index_t* index = dict_table_get_first_index(new_table);
+
+ ut_ad(dtuple_validate(row));
+
+#ifdef ROW_LOG_APPLY_PRINT
+ if (row_log_apply_print) {
+ fprintf(stderr, "table apply insert " IB_ID_FMT,
+ dup->index->id);
+ dtuple_print(stderr, row);
+ }
+#endif /* ROW_LOG_APPLY_PRINT */
+
+ static const ulint flags
+ = BTR_CREATE_FLAG
+ | BTR_NO_LOCKING_FLAG
+ | BTR_NO_UNDO_LOG_FLAG
+ | BTR_KEEP_SYS_FLAG;
+
+ entry = row_build_index_entry(row, NULL, index, heap);
+
+ error = row_ins_clust_index_entry_low(
+ flags, BTR_MODIFY_TREE, index, entry, 0, thr);
+
+ if (error == DB_DUPLICATE_KEY) {
+ /* TODO: report the duplicate key unless the record is
+ a full match of what we tried to insert */
+ return(error);
+ }
+
+ while (error == DB_SUCCESS) {
+ if (!(index = dict_table_get_next_index(index))) {
+ break;
+ }
+
+ entry = row_build_index_entry(row, NULL, index, heap);
+ error = row_ins_sec_index_entry_low(
+ flags, BTR_MODIFY_TREE,
+ index, offsets_heap, heap, entry, thr);
+ }
+
+ if (error == DB_DUPLICATE_KEY) {
+ /* TODO: report dup */
+ }
+
+ return(error);
+}
+
+/******************************************************//**
+Replays an insert operation on a table that was rebuilt.
+@return DB_SUCCESS or error code */
+static __attribute__((nonnull, warn_unused_result))
+dberr_t
+row_log_table_apply_insert(
+/*=======================*/
+ que_thr_t* thr, /*!< in: query graph */
+ const mrec_t* mrec, /*!< in: merge record */
+ const ulint* offsets, /*!< in: offsets of mrec */
+ mem_heap_t* offsets_heap, /*!< in/out: memory heap
+ that can be emptied */
+ mem_heap_t* heap, /*!< in/out: memory heap */
+ dict_table_t* new_table, /*!< in/out: table
+ being rebuilt */
+ const struct TABLE* altered_table, /*!< in: new MySQL
+ table definition */
+ row_merge_dup_t* dup) /*!< in/out: for reporting
+ duplicate key errors */
+{
+ const dtuple_t* row = row_log_table_apply_convert_mrec(
+ mrec, offsets, heap, new_table, altered_table, dup);
+ return(row_log_table_apply_insert_low(thr, row, offsets_heap, heap,
+ new_table, altered_table, dup));
+}
+
+/******************************************************//**
+Deletes a record from a table that is being rebuilt.
+@return DB_SUCCESS or error code */
+static __attribute__((nonnull, warn_unused_result))
+dberr_t
+row_log_table_apply_delete_low(
+/*===========================*/
+ btr_pcur_t* pcur, /*!< in/out: B-tree cursor, will be trashed */
+ const ulint* offsets,/*!< in: offsets on pcur */
+ mem_heap_t* heap, /*!< in/out: memory heap */
+ mtr_t* mtr) /*!< in/out: mini-transaction,
+ will be committed */
+{
+ dberr_t error;
+ row_ext_t* ext;
+ dtuple_t* row;
+ dict_index_t* index = btr_pcur_get_btr_cur(pcur)->index;
+
+ ut_ad(dict_index_is_clust(index));
+
+ if (dict_table_get_next_index(index)) {
+ /* Build a row template for purging secondary index entries. */
+ row = row_build(
+ ROW_COPY_DATA, index, btr_pcur_get_rec(pcur),
+ offsets, NULL, &ext, heap);
+ } else {
+ row = NULL;
+ }
+
+ btr_cur_pessimistic_delete(&error, FALSE, btr_pcur_get_btr_cur(pcur),
+ BTR_CREATE_FLAG, RB_NONE, mtr);
+ mtr_commit(mtr);
+
+ if (error != DB_SUCCESS) {
+ return(error);
+ }
+
+ while ((index = dict_table_get_next_index(index)) != NULL) {
+ const dtuple_t* entry = row_build_index_entry(
+ row, ext, index, heap);
+ mtr_start(mtr);
+ btr_pcur_open(index, entry, PAGE_CUR_LE,
+ BTR_MODIFY_TREE, pcur, mtr);
+#ifdef UNIV_DEBUG
+ switch (btr_pcur_get_btr_cur(pcur)->flag) {
+ case BTR_CUR_DELETE_REF:
+ case BTR_CUR_DEL_MARK_IBUF:
+ case BTR_CUR_DELETE_IBUF:
+ case BTR_CUR_INSERT_TO_IBUF:
+ /* We did not request buffering. */
+ break;
+ case BTR_CUR_HASH:
+ case BTR_CUR_HASH_FAIL:
+ case BTR_CUR_BINARY:
+ goto flag_ok;
+ }
+ ut_ad(0);
+flag_ok:
+#endif /* UNIV_DEBUG */
+
+ if (page_rec_is_infimum(btr_pcur_get_rec(pcur))
+ || btr_pcur_get_low_match(pcur) < index->n_uniq) {
+ /* All secondary index entries should be
+ found, because new_table is being modified by
+ this thread only, and all indexes should be
+ updated in sync. */
+ mtr_commit(mtr);
+ return(DB_INDEX_CORRUPT);
+ }
+
+ btr_cur_pessimistic_delete(&error, FALSE,
+ btr_pcur_get_btr_cur(pcur),
+ BTR_CREATE_FLAG, RB_NONE, mtr);
+ mtr_commit(mtr);
+ }
+
+ return(error);
+}
+
+/******************************************************//**
+Replays a delete operation on a table that was rebuilt.
+@return DB_SUCCESS or error code */
+static __attribute__((nonnull, warn_unused_result))
+dberr_t
+row_log_table_apply_delete(
+/*=======================*/
+ que_thr_t* thr, /*!< in: query graph */
+ ulint trx_id_col, /*!< in: position of
+ DB_TRX_ID in the
+ clustered index */
+ const mrec_t* mrec, /*!< in: merge record */
+ const ulint* moffsets, /*!< in: offsets of mrec */
+ mem_heap_t* offsets_heap, /*!< in/out: memory heap
+ that can be emptied */
+ mem_heap_t* heap, /*!< in/out: memory heap */
+ dict_table_t* new_table)
+{
+ dict_index_t* index = dict_table_get_first_index(new_table);
+ dtuple_t* old_pk;
+ mtr_t mtr;
+ btr_pcur_t pcur;
+ ulint* offsets;
+
+ ut_ad(rec_offs_n_fields(moffsets)
+ == dict_index_get_n_unique(index) + 1);
+ ut_ad(!rec_offs_any_extern(moffsets));
+
+ /* Convert the row to a search tuple. */
+ old_pk = dtuple_create(heap, index->n_uniq + 1);
+ dict_table_copy_types(old_pk, new_table);
+ dtuple_set_n_fields_cmp(old_pk, index->n_uniq);
+
+ for (ulint i = 0; i <= index->n_uniq; i++) {
+ ulint len;
+ const void* field;
+ field = rec_get_nth_field(mrec, moffsets, i, &len);
+ ut_ad(len != UNIV_SQL_NULL);
+ dfield_set_data(dtuple_get_nth_field(old_pk, i),
+ field, len);
+ }
+
+ mtr_start(&mtr);
+ btr_pcur_open(index, old_pk, PAGE_CUR_LE,
+ BTR_MODIFY_TREE, &pcur, &mtr);
+#ifdef UNIV_DEBUG
+ switch (btr_pcur_get_btr_cur(&pcur)->flag) {
+ case BTR_CUR_DELETE_REF:
+ case BTR_CUR_DEL_MARK_IBUF:
+ case BTR_CUR_DELETE_IBUF:
+ case BTR_CUR_INSERT_TO_IBUF:
+ /* We did not request buffering. */
+ break;
+ case BTR_CUR_HASH:
+ case BTR_CUR_HASH_FAIL:
+ case BTR_CUR_BINARY:
+ goto flag_ok;
+ }
+ ut_ad(0);
+flag_ok:
+#endif /* UNIV_DEBUG */
+
+ if (page_rec_is_infimum(btr_pcur_get_rec(&pcur))
+ || btr_pcur_get_low_match(&pcur) < index->n_uniq) {
+all_done:
+ mtr_commit(&mtr);
+ /* The record was not found. All done. */
+ return(DB_SUCCESS);
+ }
+
+ offsets = rec_get_offsets(btr_pcur_get_rec(&pcur), index, NULL,
+ ULINT_UNDEFINED, &offsets_heap);
+#if defined UNIV_DEBUG || defined UNIV_BLOB_LIGHT_DEBUG
+ ut_a(!rec_offs_any_null_extern(btr_pcur_get_rec(&pcur), offsets));
+#endif /* UNIV_DEBUG || UNIV_BLOB_LIGHT_DEBUG */
+
+ /* Only remove the record if DB_TRX_ID matches what was
+ buffered. */
+
+ {
+ ulint len;
+ const void* mrec_trx_id
+ = rec_get_nth_field(mrec, moffsets, trx_id_col, &len);
+ ut_ad(len == DATA_TRX_ID_LEN);
+ const void* rec_trx_id
+ = rec_get_nth_field(btr_pcur_get_rec(&pcur), offsets,
+ trx_id_col, &len);
+ ut_ad(len == DATA_TRX_ID_LEN);
+ if (memcmp(mrec_trx_id, rec_trx_id, DATA_TRX_ID_LEN)) {
+ goto all_done;
+ }
+ }
+
+ return(row_log_table_apply_delete_low(&pcur, offsets, heap, &mtr));
+}
+
+/******************************************************//**
+Replays an update operation on a table that was rebuilt.
+@return DB_SUCCESS or error code */
+static __attribute__((nonnull(1,3,4,5,6,7,8), warn_unused_result))
+dberr_t
+row_log_table_apply_update(
+/*=======================*/
+ que_thr_t* thr, /*!< in: query graph */
+ ulint trx_id_col, /*!< in: position of
+ DB_TRX_ID in the
+ clustered index */
+ const mrec_t* mrec, /*!< in: merge record */
+ const ulint* offsets, /*!< in: offsets of mrec */
+ mem_heap_t* offsets_heap, /*!< in/out: memory heap
+ that can be emptied */
+ mem_heap_t* heap, /*!< in/out: memory heap */
+ dict_table_t* new_table, /*!< in/out: table
+ being rebuilt */
+ const struct TABLE* altered_table, /*!< in: new MySQL
+ table definition */
+ row_merge_dup_t* dup, /*!< in/out: for reporting
+ duplicate key errors */
+ const dtuple_t* old_pk) /*!< in: PRIMARY KEY and
+ DB_TRX_ID,DB_ROLL_PTR
+ of the old value,
+ or NULL if same_pk */
+{
+ const dtuple_t* row;
+ dict_index_t* index = dict_table_get_first_index(new_table);
+ mtr_t mtr;
+ btr_pcur_t pcur;
+
+ ut_ad(dtuple_get_n_fields_cmp(old_pk)
+ == dict_index_get_n_unique(index));
+ ut_ad(dtuple_get_n_fields(old_pk)
+ == dict_index_get_n_unique(index) + 1);
+ ut_ad(!!old_pk == dup->index->online_log->same_pk);
+
+ row = row_log_table_apply_convert_mrec(
+ mrec, offsets, heap, new_table, altered_table, dup);
+
+ mtr_start(&mtr);
+ btr_pcur_open(index, old_pk, PAGE_CUR_LE,
+ BTR_MODIFY_TREE, &pcur, &mtr);
+#ifdef UNIV_DEBUG
+ switch (btr_pcur_get_btr_cur(&pcur)->flag) {
+ case BTR_CUR_DELETE_REF:
+ case BTR_CUR_DEL_MARK_IBUF:
+ case BTR_CUR_DELETE_IBUF:
+ case BTR_CUR_INSERT_TO_IBUF:
+ ut_ad(0);/* We did not request buffering. */
+ case BTR_CUR_HASH:
+ case BTR_CUR_HASH_FAIL:
+ case BTR_CUR_BINARY:
+ break;
+ }
+#endif /* UNIV_DEBUG */
+
+ if (page_rec_is_infimum(btr_pcur_get_rec(&pcur))
+ || btr_pcur_get_low_match(&pcur) < index->n_uniq) {
+ mtr_commit(&mtr);
+insert:
+ /* The row was not found. Insert it. */
+ return(row_log_table_apply_insert_low(
+ thr, row, offsets_heap, heap,
+ new_table, altered_table, dup));
+ }
+
+ /* Update the record. */
+ ulint* cur_offsets = rec_get_offsets(
+ btr_pcur_get_rec(&pcur),
+ index, NULL, ULINT_UNDEFINED, &offsets_heap);
+
+ dtuple_t* entry = row_build_index_entry(
+ row, NULL, index, heap);
+ const upd_t* update = row_upd_build_difference_binary(
+ index, entry, btr_pcur_get_rec(&pcur), cur_offsets,
+ false, NULL, heap);
+ dberr_t error = DB_SUCCESS;
+
+ if (!update->n_fields) {
+ /* Nothing to do. */
+ goto func_exit;
+ }
+
+ if (rec_offs_any_extern(cur_offsets)) {
+ /* If the record contains any externally stored
+ columns, perform the update by delete and insert,
+ because we will not write any undo log that would
+ allow purge to free any orphaned externally stored
+ columns. */
+delete_insert:
+ error = row_log_table_apply_delete_low(
+ &pcur, cur_offsets, heap, &mtr);
+ ut_ad(mtr.state == MTR_COMMITTED);
+
+ if (error != DB_SUCCESS) {
+ return(error);
+ }
+
+ goto insert;
+ }
+
+ if (upd_get_nth_field(update, 0)->field_no < trx_id_col) {
+ if (!dup->index->online_log->same_pk) {
+ ut_ad(0);
+ error = DB_CORRUPTION;
+ goto func_exit;
+ }
+
+ /* The PRIMARY KEY columns have changed.
+ Delete the record with the old PRIMARY KEY value,
+ provided that it carries the same
+ DB_TRX_ID,DB_ROLL_PTR. Then, insert the new row. */
+ ulint len;
+ const byte* cur_trx_roll = rec_get_nth_field(
+ mrec, offsets, trx_id_col, &len);
+ ut_ad(len == DATA_TRX_ID_LEN);
+ const dfield_t* new_trx_roll = dtuple_get_nth_field(
+ old_pk, trx_id_col);
+ /* We assume that DB_TRX_ID,DB_ROLL_PTR are stored
+ in one contiguous block. */
+ ut_ad(rec_get_nth_field(mrec, offsets, trx_id_col + 1, &len)
+ == cur_trx_roll + DATA_TRX_ID_LEN);
+ ut_ad(len == DATA_ROLL_PTR_LEN);
+ ut_ad(new_trx_roll->len == DATA_TRX_ID_LEN);
+ ut_ad(dtuple_get_nth_field(old_pk, trx_id_col + 1)
+ -> len == DATA_ROLL_PTR_LEN);
+ ut_ad(static_cast<const byte*>(
+ dtuple_get_nth_field(old_pk, trx_id_col + 1)
+ ->data)
+ == static_cast<const byte*>(new_trx_roll->data)
+ + DATA_TRX_ID_LEN);
+
+ if (!memcmp(cur_trx_roll, new_trx_roll->data,
+ DATA_TRX_ID_LEN + DATA_ROLL_PTR_LEN)) {
+ /* The old row exists. Remove it. */
+ goto delete_insert;
+ }
+
+ /* Unless we called row_log_table_apply_delete_low(),
+ this will likely cause a duplicate key error. */
+ goto insert;
+ }
+
+ dtuple_t* old_row;
+ row_ext_t* old_ext;
+
+ if (dict_table_get_next_index(index)) {
+ /* Construct the row corresponding to the old value of
+ the record. */
+ old_row = row_build(ROW_COPY_DATA, index,
+ btr_pcur_get_rec(&pcur),
+ cur_offsets, NULL, &old_ext, heap);
+ ut_ad(old_row);
+ } else {
+ old_row = NULL;
+ old_ext = NULL;
+ }
+
+ big_rec_t* big_rec;
+
+ error = btr_cur_pessimistic_update(
+ BTR_CREATE_FLAG | BTR_NO_LOCKING_FLAG
+ | BTR_NO_UNDO_LOG_FLAG | BTR_KEEP_SYS_FLAG
+ | BTR_KEEP_POS_FLAG,
+ btr_pcur_get_btr_cur(&pcur),
+ &cur_offsets, &offsets_heap, heap, &big_rec,
+ update, 0, NULL, 0, &mtr);
+
+ if (big_rec) {
+ if (error == DB_SUCCESS) {
+ error = btr_store_big_rec_extern_fields(
+ index, btr_pcur_get_block(&pcur),
+ btr_pcur_get_rec(&pcur), cur_offsets,
+ big_rec, &mtr, BTR_STORE_UPDATE);
+ }
+
+ dtuple_big_rec_free(big_rec);
+ }
+
+ while ((index = dict_table_get_next_index(index)) != NULL) {
+ if (error != DB_SUCCESS) {
+ break;
+ }
+
+ if (!row_upd_changes_ord_field_binary(
+ index, update, thr, row, NULL)) {
+ continue;
+ }
+
+ mtr_commit(&mtr);
+
+ entry = row_build_index_entry(old_row, old_ext, index, heap);
+ if (!entry) {
+ ut_ad(0);
+ return(DB_CORRUPTION);
+ }
+
+ mtr_start(&mtr);
+
+ if (ROW_FOUND != row_search_index_entry(
+ index, entry, BTR_MODIFY_TREE, &pcur, &mtr)) {
+ ut_ad(0);
+ error = DB_CORRUPTION;
+ break;
+ }
+
+ btr_cur_pessimistic_delete(
+ &error, FALSE, btr_pcur_get_btr_cur(&pcur),
+ BTR_CREATE_FLAG | BTR_NO_LOCKING_FLAG
+ | BTR_NO_UNDO_LOG_FLAG | BTR_KEEP_SYS_FLAG,
+ RB_NONE, &mtr);
+
+ if (error != DB_SUCCESS) {
+ break;
+ }
+
+ mtr_commit(&mtr);
+
+ entry = row_build_index_entry(row, NULL, index, heap);
+ error = row_ins_sec_index_entry_low(
+ BTR_CREATE_FLAG | BTR_NO_LOCKING_FLAG
+ | BTR_NO_UNDO_LOG_FLAG | BTR_KEEP_SYS_FLAG,
+ BTR_MODIFY_TREE, index, offsets_heap, heap,
+ entry, thr);
+
+ mtr_start(&mtr);
+ }
+
+func_exit:
+ mtr_commit(&mtr);
+ return(error);
+}
+
+/******************************************************//**
+Applies an operation to a table that was rebuilt.
+@return NULL on failure (mrec corruption) or when out of data;
+pointer to next record on success */
+static __attribute__((nonnull, warn_unused_result))
+const mrec_t*
+row_log_table_apply_op(
+/*===================*/
+ que_thr_t* thr, /*!< in: query graph */
+ ulint trx_id_col, /*!< in: position of
+ DB_TRX_ID in index */
+ dict_table_t* new_table, /*!< in/out: table
+ being rebuilt */
+ const struct TABLE* altered_table, /*!< in: new MySQL
+ table definition */
+ row_merge_dup_t* dup, /*!< in/out: for reporting
+ duplicate key errors */
+ dberr_t* error, /*!< out: DB_SUCCESS
+ or error code */
+ mem_heap_t* offsets_heap, /*!< in/out: memory heap
+ that can be emptied */
+ mem_heap_t* heap, /*!< in/out: memory heap */
+ const mrec_t* mrec, /*!< in: merge record */
+ const mrec_t* mrec_end, /*!< in: end of buffer */
+ ulint* offsets) /*!< in/out: work area
+ for parsing mrec */
+{
+ dict_index_t* new_index = dict_table_get_first_index(new_table);
+ ulint extra_size;
+ const mrec_t* next_mrec;
+ dtuple_t* old_pk;
+
+ ut_ad(dict_index_is_clust(dup->index));
+ ut_ad(dup->index->table != new_table);
+
+ *error = DB_SUCCESS;
+
+ if (mrec + 3 >= mrec_end) {
+ return(NULL);
+ }
+
+ switch (*mrec++) {
+ default:
+ ut_ad(0);
+ *error = DB_CORRUPTION;
+ return(NULL);
+ case ROW_T_INSERT:
+ extra_size = *mrec++;
+
+ if (extra_size >= 0x80) {
+ /* Read another byte of extra_size. */
+
+ extra_size = (extra_size & 0x7f) << 8;
+ extra_size |= *mrec++;
+ }
+
+ mrec += extra_size;
+
+ if (mrec > mrec_end) {
+ return(NULL);
+ }
+
+ rec_init_offsets_comp_ordinary(
+ mrec, 0, dup->index, dup->index->n_nullable, offsets);
+
+ next_mrec = mrec + rec_offs_data_size(offsets);
+
+ if (next_mrec > mrec_end) {
+ return(NULL);
+ } else {
+ ulint len;
+ const byte* db_trx_id
+ = rec_get_nth_field(
+ mrec, offsets, trx_id_col, &len);
+ ut_ad(len == DATA_TRX_ID_LEN);
+ if (!row_log_table_is_rollback(
+ dup->index,
+ trx_read_trx_id(db_trx_id))) {
+ *error = row_log_table_apply_insert(
+ thr, mrec, offsets, offsets_heap,
+ heap,
+ new_table, altered_table, dup);
+ }
+ }
+ break;
+
+ case ROW_T_DELETE:
+ extra_size = *mrec++;
+ ut_ad(mrec < mrec_end);
+
+ /* We assume extra_size < 0x100 for the PRIMARY KEY prefix.
+ For fixed-length PRIMARY key columns, it is 0. */
+ mrec += extra_size;
+
+ if (mrec > mrec_end) {
+ return(NULL);
+ }
+
+ rec_offs_set_n_fields(offsets, new_index->n_uniq + 1);
+ rec_init_offsets_comp_ordinary(mrec, 0, new_index, 0, offsets);
+ next_mrec = mrec + rec_offs_data_size(offsets);
+
+ *error = row_log_table_apply_delete(
+ thr, trx_id_col, mrec, offsets, offsets_heap, heap,
+ new_table);
+ break;
+ case ROW_T_UPDATE:
+ /* Logically, the log entry consists of the
+ (PRIMARY KEY,DB_TRX_ID) of the old value (converted
+ to the new primary key definition) followed by
+ the new value in the old table definition. If the
+ definition of the columns belonging to PRIMARY KEY
+ is not changed, the log will only contain
+ DB_TRX_ID,new_row. */
+
+ old_pk = dtuple_create(heap, new_index->n_uniq + 1);
+
+ if (dup->index->online_log->same_pk) {
+ ut_ad(new_index->n_uniq == dup->index->n_uniq);
+
+ if (DATA_TRX_ID_LEN + 2 + mrec > mrec_end) {
+ return(NULL);
+ }
+
+ /* Set DB_TRX_ID */
+ dfield_set_data(
+ dtuple_get_nth_field(old_pk,
+ new_index->n_uniq),
+ mrec, DATA_TRX_ID_LEN);
+
+ mrec += DATA_TRX_ID_LEN;
+
+ extra_size = *mrec++;
+
+ if (extra_size >= 0x80) {
+ /* Read another byte of extra_size. */
+
+ extra_size = (extra_size & 0x7f) << 8;
+ extra_size |= *mrec++;
+ }
+
+ mrec += extra_size;
+
+ if (mrec > mrec_end) {
+ return(NULL);
+ }
+
+ rec_offs_set_n_fields(offsets, dup->index->n_fields);
+ rec_init_offsets_comp_ordinary(
+ mrec, 0, dup->index, dup->index->n_nullable,
+ offsets);
+
+ next_mrec = mrec + rec_offs_data_size(offsets);
+
+ if (next_mrec > mrec_end) {
+ return(NULL);
+ }
+
+ /* Copy the PRIMARY KEY fields from mrec to old_pk. */
+ for (ulint i = 0; i < new_index->n_uniq; i++) {
+ const void* field;
+ ulint len;
+ dfield_t* dfield;
+
+ ut_ad(!rec_offs_nth_extern(offsets, i));
+
+ field = rec_get_nth_field(
+ mrec, offsets, i, &len);
+ ut_ad(len != UNIV_SQL_NULL);
+
+ dfield = dtuple_get_nth_field(old_pk, i);
+ dfield_set_data(dfield, field, len);
+ }
+ } else {
+ /* We assume extra_size < 0x100
+ for the PRIMARY KEY prefix. */
+ mrec += *mrec + 1;
+
+ if (mrec > mrec_end) {
+ return(NULL);
+ }
+
+ rec_offs_set_n_fields(offsets, new_index->n_uniq + 1);
+ rec_init_offsets_comp_ordinary(
+ mrec, 0, new_index, 0, offsets);
+
+ next_mrec = mrec + rec_offs_data_size(offsets);
+ if (next_mrec + 2 > mrec_end) {
+ return(NULL);
+ }
+
+ /* Copy the PRIMARY KEY fields and DB_TRX_ID
+ from mrec to old_pk. */
+ for (ulint i = 0; i <= new_index->n_uniq; i++) {
+ const void* field;
+ ulint len;
+ dfield_t* dfield;
+
+ ut_ad(!rec_offs_nth_extern(offsets, i));
+
+ field = rec_get_nth_field(
+ mrec, offsets, i, &len);
+ ut_ad(len != UNIV_SQL_NULL);
+
+ dfield = dtuple_get_nth_field(old_pk, i);
+ dfield_set_data(dfield, field, len);
+ }
+
+ /* Fetch the new value of the row as it was
+ in the old table definition. */
+ extra_size = *mrec++;
+
+ if (extra_size >= 0x80) {
+ /* Read another byte of extra_size. */
+
+ extra_size = (extra_size & 0x7f) << 8;
+ extra_size |= *mrec++;
+ }
+
+ mrec += extra_size;
+
+ if (mrec > mrec_end) {
+ return(NULL);
+ }
+
+ rec_offs_set_n_fields(offsets, dup->index->n_fields);
+ rec_init_offsets_comp_ordinary(
+ mrec, 0, dup->index, dup->index->n_nullable,
+ offsets);
+
+ next_mrec = mrec + rec_offs_data_size(offsets);
+
+ if (next_mrec > mrec_end) {
+ return(NULL);
+ }
+ }
+
+ ut_ad(next_mrec <= mrec_end);
+
+ {
+ ulint len;
+ const byte* db_trx_id
+ = rec_get_nth_field(
+ mrec, offsets, trx_id_col, &len);
+ ut_ad(len == DATA_TRX_ID_LEN);
+ if (!row_log_table_is_rollback(
+ dup->index,
+ trx_read_trx_id(db_trx_id))) {
+ *error = row_log_table_apply_update(
+ thr, trx_id_col,
+ mrec, offsets, offsets_heap, heap,
+ new_table, altered_table, dup, old_pk);
+ }
+ }
+
+ break;
+ }
+
+ mem_heap_empty(offsets_heap);
+ mem_heap_empty(heap);
+ return(next_mrec);
+}
+
+/******************************************************//**
+Applies operations to a table was rebuilt.
+@return DB_SUCCESS, or error code on failure */
+static __attribute__((nonnull))
+dberr_t
+row_log_table_apply_ops(
+/*====================*/
+ que_thr_t* thr, /*!< in: query graph */
+ struct TABLE* altered_table,
+ /*!< in: new MySQL table definition */
+ row_merge_dup_t*dup) /*!< in/out: for reporting duplicate key
+ errors */
+{
+ dberr_t error;
+ const mrec_t* mrec = NULL;
+ const mrec_t* next_mrec;
+ const mrec_t* mrec_end = NULL; /* silence bogus warning */
+ const mrec_t* next_mrec_end;
+ mem_heap_t* heap;
+ mem_heap_t* offsets_heap;
+ ulint* offsets;
+ bool has_index_lock;
+ dict_index_t* index = const_cast<dict_index_t*>(
+ dup->index);
+ const ulint i = 1 + REC_OFFS_HEADER_SIZE
+ + dict_index_get_n_fields(index);
+ const ulint trx_id_col = dict_col_get_clust_pos(
+ dict_table_get_sys_col(index->table, DATA_TRX_ID),
+ index);
+ trx_t* trx = thr_get_trx(thr);
+
+ ut_ad(dict_index_is_clust(index));
+ ut_ad(dict_index_is_online_ddl(index));
+ ut_ad(trx->mysql_thd);
+#ifdef UNIV_SYNC_DEBUG
+ ut_ad(rw_lock_own(dict_index_get_lock(index), RW_LOCK_EX));
+#endif /* UNIV_SYNC_DEBUG */
+ ut_ad(index->online_log);
+ ut_ad(index->online_log->table);
+ ut_ad(!dict_index_is_online_ddl(
+ dict_table_get_first_index(index->online_log->table)));
+ ut_ad(trx_id_col > 0);
+ ut_ad(trx_id_col != ULINT_UNDEFINED);
+
+ UNIV_MEM_INVALID(&mrec_end, sizeof mrec_end);
+
+ offsets = static_cast<ulint*>(ut_malloc(i * sizeof *offsets));
+ offsets[0] = i;
+ offsets[1] = dict_index_get_n_fields(index);
+
+ heap = mem_heap_create(UNIV_PAGE_SIZE);
+ offsets_heap = mem_heap_create(UNIV_PAGE_SIZE);
+ has_index_lock = true;
+
+next_block:
+ ut_ad(has_index_lock);
+#ifdef UNIV_SYNC_DEBUG
+ ut_ad(rw_lock_own(dict_index_get_lock(index), RW_LOCK_EX));
+#endif /* UNIV_SYNC_DEBUG */
+ ut_ad(index->online_log->head.bytes == 0);
+
+ if (trx_is_interrupted(trx)) {
+ goto interrupted;
+ }
+
+ if (dict_index_is_corrupted(index)) {
+ error = DB_INDEX_CORRUPT;
+ goto func_exit;
+ }
+
+ ut_ad(dict_index_is_online_ddl(index));
+
+ error = index->online_log->error;
+
+ if (error != DB_SUCCESS) {
+ goto func_exit;
+ }
+
+ if (UNIV_UNLIKELY(index->online_log->head.blocks
+ > index->online_log->tail.blocks)) {
+unexpected_eof:
+ fprintf(stderr, "InnoDB: unexpected end of temporary file"
+ " for table %s\n", index->table_name);
+corruption:
+ error = DB_CORRUPTION;
+ goto func_exit;
+ }
+
+ if (index->online_log->head.blocks
+ == index->online_log->tail.blocks) {
+ if (index->online_log->head.blocks) {
+#ifdef HAVE_FTRUNCATE
+ /* Truncate the file in order to save space. */
+ ftruncate(index->online_log->fd, 0);
+#endif /* HAVE_FTRUNCATE */
+ index->online_log->head.blocks
+ = index->online_log->tail.blocks = 0;
+ }
+
+ next_mrec = index->online_log->tail.block;
+ next_mrec_end = next_mrec + index->online_log->tail.bytes;
+
+ if (next_mrec_end == next_mrec) {
+ /* End of log reached. */
+all_done:
+ ut_ad(has_index_lock);
+ ut_ad(index->online_log->head.blocks == 0);
+ ut_ad(index->online_log->tail.blocks == 0);
+ index->online_log->head.bytes = 0;
+ index->online_log->tail.bytes = 0;
+ error = DB_SUCCESS;
+ goto func_exit;
+ }
+ } else {
+ os_offset_t ofs;
+ ibool success;
+
+ ofs = (os_offset_t) index->online_log->head.blocks
+ * srv_sort_buf_size;
+
+ ut_ad(has_index_lock);
+ has_index_lock = false;
+ rw_lock_x_unlock(dict_index_get_lock(index));
+
+ log_free_check();
+
+ ut_ad(dict_index_is_online_ddl(index));
+
+ success = os_file_read_no_error_handling(
+ OS_FILE_FROM_FD(index->online_log->fd),
+ index->online_log->head.block, ofs,
+ srv_sort_buf_size);
+
+ if (!success) {
+ fprintf(stderr, "InnoDB: unable to read temporary file"
+ " for table %s\n", index->table_name);
+ goto corruption;
+ }
+
+#ifdef POSIX_FADV_DONTNEED
+ /* Each block is read exactly once. Free up the file cache. */
+ posix_fadvise(index->online_log->fd,
+ ofs, srv_sort_buf_size, POSIX_FADV_DONTNEED);
+#endif /* POSIX_FADV_DONTNEED */
+#ifdef FALLOC_FL_PUNCH_HOLE
+ /* Try to deallocate the space for the file on disk.
+ This should work on ext4 on Linux 2.6.39 and later,
+ and be ignored when the operation is unsupported. */
+ fallocate(index->online_log->fd,
+ FALLOC_FL_PUNCH_HOLE | FALLOC_FL_KEEP_SIZE,
+ ofs, srv_buf_size);
+#endif /* FALLOC_FL_PUNCH_HOLE */
+
+ next_mrec = index->online_log->head.block;
+ next_mrec_end = next_mrec + srv_sort_buf_size;
+ }
+
+ /* This read is not protected by index->online_log->mutex for
+ performance reasons. We will eventually notice any error that
+ was flagged by a DML thread. */
+ error = index->online_log->error;
+
+ if (error != DB_SUCCESS) {
+ goto func_exit;
+ }
+
+ if (mrec) {
+ /* A partial record was read from the previous block.
+ Copy the temporary buffer full, as we do not know the
+ length of the record. Parse subsequent records from
+ the bigger buffer index->online_log->head.block
+ or index->online_log->tail.block. */
+
+ ut_ad(mrec == index->online_log->head.buf);
+ ut_ad(mrec_end > mrec);
+ ut_ad(mrec_end < (&index->online_log->head.buf)[1]);
+
+ memcpy((mrec_t*) mrec_end, next_mrec,
+ (&index->online_log->head.buf)[1] - mrec_end);
+ mrec = row_log_table_apply_op(
+ thr, trx_id_col,
+ index->online_log->table, altered_table,
+ dup, &error, offsets_heap, heap,
+ index->online_log->head.buf,
+ (&index->online_log->head.buf)[1], offsets);
+ if (error != DB_SUCCESS) {
+ goto func_exit;
+ } else if (UNIV_UNLIKELY(mrec == NULL)) {
+ /* The record was not reassembled properly. */
+ goto corruption;
+ }
+ /* The record was previously found out to be
+ truncated. Now that the parse buffer was extended,
+ it should proceed beyond the old end of the buffer. */
+ ut_a(mrec > mrec_end);
+
+ index->online_log->head.bytes = mrec - mrec_end;
+ next_mrec += index->online_log->head.bytes;
+ }
+
+ ut_ad(next_mrec <= next_mrec_end);
+ /* The following loop must not be parsing the temporary
+ buffer, but head.block or tail.block. */
+
+ /* mrec!=NULL means that the next record starts from the
+ middle of the block */
+ ut_ad((mrec == NULL) == (index->online_log->head.bytes == 0));
+
+#ifdef UNIV_DEBUG
+ if (next_mrec_end == index->online_log->head.block
+ + srv_sort_buf_size) {
+ /* If tail.bytes == 0, next_mrec_end can also be at
+ the end of tail.block. */
+ if (index->online_log->tail.bytes == 0) {
+ ut_ad(next_mrec == next_mrec_end);
+ ut_ad(index->online_log->tail.blocks == 0);
+ ut_ad(index->online_log->head.blocks == 0);
+ ut_ad(index->online_log->head.bytes == 0);
+ } else {
+ ut_ad(next_mrec == index->online_log->head.block
+ + index->online_log->head.bytes);
+ ut_ad(index->online_log->tail.blocks
+ > index->online_log->head.blocks);
+ }
+ } else if (next_mrec_end == index->online_log->tail.block
+ + index->online_log->tail.bytes) {
+ ut_ad(next_mrec == index->online_log->tail.block
+ + index->online_log->head.bytes);
+ ut_ad(index->online_log->tail.blocks == 0);
+ ut_ad(index->online_log->head.blocks == 0);
+ ut_ad(index->online_log->head.bytes
+ <= index->online_log->tail.bytes);
+ } else {
+ ut_error;
+ }
+#endif /* UNIV_DEBUG */
+
+ mrec_end = next_mrec_end;
+
+ while (!trx_is_interrupted(trx)) {
+ mrec = next_mrec;
+ ut_ad(mrec < mrec_end);
+
+ if (!has_index_lock) {
+ /* We are applying operations from a different
+ block than the one that is being written to.
+ We do not hold index->lock in order to
+ allow other threads to concurrently buffer
+ modifications. */
+ ut_ad(mrec >= index->online_log->head.block);
+ ut_ad(mrec_end == index->online_log->head.block
+ + srv_sort_buf_size);
+ ut_ad(index->online_log->head.bytes
+ < srv_sort_buf_size);
+
+ /* Take the opportunity to do a redo log
+ checkpoint if needed. */
+ log_free_check();
+ } else {
+ /* We are applying operations from the last block.
+ Do not allow other threads to buffer anything,
+ so that we can finally catch up and synchronize. */
+ ut_ad(index->online_log->head.blocks == 0);
+ ut_ad(index->online_log->tail.blocks == 0);
+ ut_ad(mrec_end == index->online_log->tail.block
+ + index->online_log->tail.bytes);
+ ut_ad(mrec >= index->online_log->tail.block);
+ }
+
+ /* This read is not protected by index->online_log->mutex
+ for performance reasons. We will eventually notice any
+ error that was flagged by a DML thread. */
+ error = index->online_log->error;
+
+ if (error != DB_SUCCESS) {
+ goto func_exit;
+ }
+
+ next_mrec = row_log_table_apply_op(
+ thr, trx_id_col,
+ index->online_log->table, altered_table,
+ dup, &error, offsets_heap, heap,
+ mrec, mrec_end, offsets);
+
+ if (error != DB_SUCCESS) {
+ goto func_exit;
+ } else if (next_mrec == next_mrec_end) {
+ /* The record happened to end on a block boundary.
+ Do we have more blocks left? */
+ if (has_index_lock) {
+ /* The index will be locked while
+ applying the last block. */
+ goto all_done;
+ }
+
+ mrec = NULL;
+process_next_block:
+ rw_lock_x_lock(dict_index_get_lock(index));
+ has_index_lock = true;
+
+ index->online_log->head.bytes = 0;
+ index->online_log->head.blocks++;
+ goto next_block;
+ } else if (next_mrec != NULL) {
+ ut_ad(next_mrec < next_mrec_end);
+ index->online_log->head.bytes += next_mrec - mrec;
+ } else if (has_index_lock) {
+ /* When mrec is within tail.block, it should
+ be a complete record, because we are holding
+ index->lock and thus excluding the writer. */
+ ut_ad(index->online_log->tail.blocks == 0);
+ ut_ad(mrec_end == index->online_log->tail.block
+ + index->online_log->tail.bytes);
+ ut_ad(0);
+ goto unexpected_eof;
+ } else {
+ memcpy(index->online_log->head.buf, mrec,
+ mrec_end - mrec);
+ mrec_end += index->online_log->head.buf - mrec;
+ mrec = index->online_log->head.buf;
+ goto process_next_block;
+ }
+ }
+
+interrupted:
+ error = DB_INTERRUPTED;
+func_exit:
+ if (!has_index_lock) {
+ rw_lock_x_lock(dict_index_get_lock(index));
+ }
+
+ mem_heap_free(offsets_heap);
+ mem_heap_free(heap);
+ ut_free(offsets);
+ return(error);
+}
+
+/******************************************************//**
+Apply the row_log_table log to a table upon completing rebuild.
+@return DB_SUCCESS, or error code on failure */
+UNIV_INTERN
+dberr_t
+row_log_table_apply(
+/*================*/
+ que_thr_t* thr, /*!< in: query graph */
+ dict_table_t* old_table,
+ /*!< in: old table */
+ struct TABLE* table, /*!< in/out: MySQL table
+ (for reporting duplicates) */
+ struct TABLE* altered_table)
+ /*!< in: new MySQL table definition */
+{
+ dberr_t error;
+ dict_index_t* clust_index;
+ row_merge_dup_t dup;
+
+ dup.index = clust_index = dict_table_get_first_index(old_table);
+ dup.table = table;
+ dup.n_dup = 0;
+
+ thr_get_trx(thr)->error_key_num = 0;
+
+#ifdef UNIV_SYNC_DEBUG
+ ut_ad(!rw_lock_own(&dict_operation_lock, RW_LOCK_SHARED));
+#endif /* UNIV_SYNC_DEBUG */
+
+ rw_lock_x_lock(dict_index_get_lock(clust_index));
+
+ if (!clust_index->online_log) {
+ ut_ad(dict_index_get_online_status(clust_index)
+ == ONLINE_INDEX_COMPLETE);
+ /* This function should not be called unless
+ rebuilding a table online. Build in some fault
+ tolerance. */
+ ut_ad(0);
+ error = DB_ERROR;
+ } else {
+ error = row_log_table_apply_ops(
+ thr, altered_table, &dup);
+ }
+
+ rw_lock_x_unlock(dict_index_get_lock(clust_index));
+ return(error);
+}
+
+/******************************************************//**
Allocate the row log for an index and flag the index
for online creation.
@retval true if success, false if not */
=== modified file 'storage/innobase/row/row0merge.cc'
--- a/storage/innobase/row/row0merge.cc revid:marko.makela@stripped0-hvs4rzbzl68symnl
+++ b/storage/innobase/row/row0merge.cc revid:marko.makela@strippedoamce2
@@ -289,7 +289,6 @@ row_merge_buf_add(
column prefixes, or NULL */
doc_id_t* doc_id) /*!< in/out: Doc ID if we are
creating FTS index */
-
{
ulint i;
const dict_index_t* index;
@@ -1196,6 +1195,37 @@ row_merge_write_eof(
return(&block[0]);
}
+/*************************************************************//**
+Check if a row should be skipped during online table rebuild.
+@return true if the record should be skipped */
+static __attribute__((nonnull, warn_unused_result))
+bool
+row_merge_skip_rec(
+/*===============*/
+ const rec_t* rec, /*!< in: clustered index record
+ or merge record */
+ const dict_index_t* index, /*!< in: clustered index of rec */
+ const dict_index_t* oindex, /*!< in: clustered index
+ in the old table */
+ const ulint* offsets)/*!< in: rec_get_offsets(rec) */
+{
+ ulint trx_id_offset;
+ const byte* trx_id;
+
+ ut_ad(dict_index_is_clust(index));
+ ut_ad(dict_index_is_clust(oindex));
+ ut_ad(dict_index_is_online_ddl(oindex));
+
+ trx_id_offset = index->trx_id_offset;
+ if (!trx_id_offset) {
+ trx_id_offset = row_get_trx_id_offset(index, offsets);
+ }
+
+ trx_id = rec + trx_id_offset;
+
+ return(row_log_table_is_rollback(oindex, trx_read_trx_id(trx_id)));
+}
+
/********************************************************************//**
Reads clustered index of the table and create temporary files
containing the index entries for the indexes to be built.
@@ -1245,8 +1275,6 @@ row_merge_read_clustered_index(
trx->op_info = "reading clustered index";
- ut_ad(!online || old_table == new_table);
-
#ifdef FTS_INTERNAL_DIAG_PRINT
DEBUG_FTS_SORT_PRINT("FTS_SORT: Start Create Index\n");
#endif
@@ -1357,6 +1385,14 @@ row_merge_read_clustered_index(
goto func_exit;
}
+ if (online && old_table != new_table) {
+ err = row_log_table_get_error(clust_index);
+ if (err != DB_SUCCESS) {
+ trx->error_key_num = 0;
+ goto func_exit;
+ }
+ }
+
if (rw_lock_get_waiters(
dict_index_get_lock(clust_index))) {
ibool on_user_rec;
@@ -1426,6 +1462,7 @@ row_merge_read_clustered_index(
if (rec_get_deleted_flag(rec, dict_table_is_comp(old_table))
&& (!online
+ || old_table != new_table
|| !trx_rw_is_active(
row_get_rec_trx_id(rec, clust_index, offsets),
NULL))) {
@@ -1444,6 +1481,12 @@ row_merge_read_clustered_index(
continue;
}
+ if (online && new_table != old_table
+ && row_merge_skip_rec(
+ rec, clust_index, clust_index, offsets)) {
+ continue;
+ }
+
/* This is essentially a READ UNCOMMITTED to fetch the
most recent version of the record. */
@@ -1568,7 +1611,7 @@ write_buffers:
} else {
row_merge_buf_sort(buf, NULL);
}
- } else if (online) {
+ } else if (online && new_table == old_table) {
/* Note the newest transaction that
modified this index when the scan was
completed. We prevent older readers
@@ -2088,7 +2131,7 @@ row_merge_sort(
/*************************************************************//**
Copy externally stored columns to the data tuple. */
-static
+static __attribute__((nonnull))
void
row_merge_copy_blobs(
/*=================*/
@@ -2098,10 +2141,9 @@ row_merge_copy_blobs(
dtuple_t* tuple, /*!< in/out: data tuple */
mem_heap_t* heap) /*!< in/out: memory heap */
{
- ulint i;
- ulint n_fields = dtuple_get_n_fields(tuple);
+ ut_ad(rec_offs_any_extern(offsets));
- for (i = 0; i < n_fields; i++) {
+ for (ulint i = 0; i < dtuple_get_n_fields(tuple); i++) {
ulint len;
const void* data;
dfield_t* field = dtuple_get_nth_field(tuple, i);
@@ -2143,8 +2185,7 @@ row_merge_insert_index_tuples(
bool del_marks,
/*!< in: whether some tuples may
be delete-marked */
- ulint zip_size,/*!< in: compressed page size of
- the old table, or 0 if uncompressed */
+ const dict_table_t* old_table,/*!< in: old table */
int fd, /*!< in: file descriptor */
row_merge_block_t* block) /*!< in/out: file buffer */
{
@@ -2203,6 +2244,22 @@ row_merge_insert_index_tuples(
break;
}
+ if (dict_index_is_clust(index)
+ && dict_table_is_online_rebuild(old_table)) {
+ const dict_index_t* old_index
+ = dict_table_get_first_index(
+ old_table);
+ error = row_log_table_get_error(old_index);
+ if (error != DB_SUCCESS) {
+ break;
+ }
+
+ if (row_merge_skip_rec(mrec, index, old_index,
+ offsets)) {
+ continue;
+ }
+ }
+
dtuple = row_rec_to_index_entry_low(
mrec, index, offsets, &n_ext, tuple_heap);
@@ -2215,9 +2272,11 @@ row_merge_insert_index_tuples(
REC_INFO_DELETED_FLAG);
}
- if (UNIV_UNLIKELY(n_ext)) {
- row_merge_copy_blobs(mrec, offsets, zip_size,
- dtuple, tuple_heap);
+ if (n_ext) {
+ row_merge_copy_blobs(
+ mrec, offsets,
+ dict_table_zip_size(old_table),
+ dtuple, tuple_heap);
}
ut_ad(dtuple_validate(dtuple));
@@ -2543,6 +2602,8 @@ row_merge_drop_indexes(
#endif /* UNIV_SYNC_DEBUG */
index = dict_table_get_first_index(table);
+ ut_ad(dict_index_is_clust(index));
+ ut_ad(dict_index_get_online_status(index) == ONLINE_INDEX_COMPLETE);
/* the caller should have an open handle to the table */
ut_ad(table->n_ref_count >= 1);
@@ -2564,6 +2625,8 @@ row_merge_drop_indexes(
the indexes. */
while ((index = dict_table_get_next_index(index)) != NULL) {
+ ut_ad(!dict_index_is_clust(index));
+
switch (dict_index_get_online_status(index)) {
case ONLINE_INDEX_ABORTED_DROPPED:
continue;
@@ -3139,9 +3202,11 @@ row_merge_build_indexes(
dict_index_t** indexes, /*!< in: indexes to be created */
const ulint* key_numbers, /*!< in: MySQL key numbers */
ulint n_indexes, /*!< in: size of indexes[] */
- struct TABLE* table) /*!< in/out: MySQL table, for
+ struct TABLE* table, /*!< in/out: MySQL table, for
reporting erroneous key value
if applicable */
+ struct TABLE* altered_table) /*!< in/out: new MySQL table
+ definition */
{
merge_file_t* merge_files;
row_merge_block_t* block;
@@ -3155,8 +3220,6 @@ row_merge_build_indexes(
fts_psort_t* merge_info = NULL;
ib_int64_t sig_count = 0;
- ut_ad(!online || old_table == new_table);
-
/* Allocate memory for merge file data structure and initialize
fields */
@@ -3260,8 +3323,9 @@ wait_again:
if (error == DB_SUCCESS) {
error = row_merge_insert_index_tuples(
- trx->id, sort_idx, online,
- dict_table_zip_size(old_table),
+ trx->id, sort_idx,
+ online && old_table == new_table,
+ old_table,
merge_files[i].fd, block);
}
}
@@ -3271,7 +3335,13 @@ wait_again:
if (indexes[i]->type & DICT_FTS) {
row_fts_psort_info_destroy(psort_info, merge_info);
- } else if (error == DB_SUCCESS && online) {
+ } else if (error != DB_SUCCESS || !online) {
+ /* Do not apply any online log. */
+ } else if (old_table != new_table) {
+ ut_ad(!sort_idx->online_log);
+ ut_ad(sort_idx->online_status
+ == ONLINE_INDEX_COMPLETE);
+ } else {
DEBUG_SYNC_C("row_log_apply_before");
error = row_log_apply(trx, sort_idx, table);
DEBUG_SYNC_C("row_log_apply_after");
@@ -3316,11 +3386,13 @@ func_exit:
DICT_TF2_FLAG_UNSET(new_table, DICT_TF2_FTS_ADD_DOC_ID);
- if (online && error != DB_SUCCESS) {
- /* On error, flag all online index creation as aborted. */
+ if (online && old_table == new_table && error != DB_SUCCESS) {
+ /* On error, flag all online secondary index creation
+ as aborted. */
for (i = 0; i < n_indexes; i++) {
ut_ad(!(indexes[i]->type & DICT_FTS));
ut_ad(*indexes[i]->name == TEMP_INDEX_PREFIX);
+ ut_ad(!dict_index_is_clust(indexes[i]));
/* Completed indexes should be dropped as
well, and indexes whose creation was aborted
No bundle (reason: useless for push emails).| Thread |
|---|
| • bzr push into mysql-trunk-wl6255 branch (marko.makela:3892 to 3893) WL#6255 | marko.makela | 29 May |