List:Commits« Previous MessageNext Message »
From:marko.makela Date:May 29 2012 2:19pm
Subject:bzr push into mysql-trunk-wl6255 branch (marko.makela:3892 to 3893) WL#6255
View as plain text  
 3893 Marko Mäkelä	2012-05-29
      WL#6255 online table rebuild: Online log apply.
      
      The online table rebuild will be enabled in the interface in a
      subsequent commit.
      
      prepare_inplace_alter_table_dict(): Allocate the online rebuild log if
      needed.
      
      ha_innobase::inplace_alter_table(): Apply the bulk of the online
      rebuild log while not holding (m)any locks.
      
      innobase_online_rebuild_log_free(): New function, for freeing the
      online rebuild log. Called from rollback_inplace_alter_table()
      and ha_innobase::commit_inplace_alter_table().
      
      ha_innobase::commit_inplace_alter_table(): Apply and free the online
      rebuild log. Prevent old transactions from accessing the rebuilt
      table, because the history is missing. (This bug has been present
      since the InnoDB Plugin in MySQL 5.1.)
      
      row_log_table_apply_convert_mrec(): Auxiliary function for converting
      a log record on the clustered index record of the old table into a row
      tuple of the rebuilt table. (TODO: implement ADD COLUMN, DROP COLUMN
      and others.)
      
      row_log_table_apply(), row_log_table_apply_ops(), row_log_table_apply_op(),
      row_log_table_apply_insert_low(), row_log_table_apply_insert(),
      row_log_table_apply_delete_low(), row_log_table_apply_delete(),
      row_log_table_apply_update(): Apply the online rebuild log.
      
      row_merge_skip_rec(): Determine if a row should be skipped during
      online table rebuild. Rolled-back records will be skipped.
      
      row_merge_read_clustered_index(): Relax assertions. Check if an error
      has occurred during online table rebuild.
      
      row_merge_copy_blobs(): Non-functional changes.
      
      row_merge_insert_index_tuples(): Replace zip_size with old_table,
      so that row_merge_skip_rec() can be called.
      
      row_merge_drop_indexes(): Add debug assertions. This function should
      not be called during online table rebuild (the entire table should be
      dropped instead).
      
      row_merge_build_indexes(): Add the parameter altered_table, so that
      rows can be converted for dropping, adding or reordering columns (not
      implemented yet). Adjust assertions and conditions.

    modified:
      storage/innobase/handler/handler0alter.cc
      storage/innobase/include/row0log.h
      storage/innobase/include/row0merge.h
      storage/innobase/row/row0log.cc
      storage/innobase/row/row0merge.cc
 3892 Marko Mäkelä	2012-05-29
      WL#6255 DML logging. This does not yet enable online table rebuild.
      The functions for applying the log are part of a separate change.
      
      In online table rebuild, we will not copy any delete-marked rows or
      any history to the new table. Anything that is delete-marked will be
      purged immediately from the new table.
      
      Because the DML log will contain BLOB pointers of the original table,
      the purge thread must not free any BLOBs of a table that is being
      rebuilt.
      
      Because we cannot easily block the ROLLBACK of an INSERT, we will keep
      track of transactions that have been rolled back, and skip any rows
      written by transactions that have been rolled back. In this way, we
      will not dereference any BLOBs that would be freed as part of a
      rollback.
      
      row_log_table_insert(): Logging function for inserting a new row,
      whether or not a fresh insert or an update of a delete-marked row.
      
      row_log_table_delete(): Logging function for delete-marking a row.
      The row will be identified by PRIMARY KEY (in the new table
      definition) and the DB_TRX_ID before the delete-marking (purging).
      
      row_log_table_update(): Logging function for an update of a row that
      was not delete-marked. The new value of the row will be logged, not
      dereferencing any BLOB pointers. If the PRIMARY KEY is redefined, the
      update record will also contain the new PRIMARY KEY and the DB_TRX_ID,
      DB_ROLL_PTR value of the old row.
      
      row_log_table_get_pk(): Constructs the old PRIMARY KEY and
      DB_TRX_ID,DB_ROLL_PTR for row_log_table_update(). If the PRIMARY KEY
      is not being redefined, returns NULL. This conversion has to be done
      at logging time while holding the clustered index leaf page latch, in
      case the new PRIMARY KEY includes column prefixes of externally stored
      columns.
      
      row_log_table_open(), row_log_table_close_func(): Common code for
      table DML logging.
      
      row_ins_clust_index_entry_by_modify(), row_ins_clust_index_entry_low(),
      row_ins_index_entry_big_rec_func(): Add row_log_table_insert() logging.
      
      row_undo_ins_remove_clust_rec(): Add row_log_table_delete() logging.
      
      row_undo_mod_clust_low(): Add an output parameter rebuilt_old_pk for
      row_log_table_update() logging.
      
      row_undo_mod_clust(): Add insert, update, delete logging.
      
      row_upd_clust_rec_by_insert(): Note that no logging is necessary for
      BLOB ownership changes.
      
      row_upd_clust_rec(): Add the parameters offsets, offsets_heap.  Add
      row_log_table_update() logging.

    modified:
      storage/innobase/btr/btr0cur.cc
      storage/innobase/include/row0log.h
      storage/innobase/row/row0ins.cc
      storage/innobase/row/row0log.cc
      storage/innobase/row/row0uins.cc
      storage/innobase/row/row0umod.cc
      storage/innobase/row/row0upd.cc
=== modified file 'storage/innobase/handler/handler0alter.cc'
--- a/storage/innobase/handler/handler0alter.cc	revid:marko.makela@stripped29133330-hvs4rzbzl68symnl
+++ b/storage/innobase/handler/handler0alter.cc	revid:marko.makela@stripped1608-cdeb9k9943oamce2
@@ -1456,10 +1456,6 @@ prepare_inplace_alter_table_dict(
 	column is to be added, and the primary index definition
 	is just copied from old table and stored in indexdefs[0] */
 	DBUG_ASSERT(!add_fts_doc_id || new_clustered);
-
-	/* A primary key index cannot be created online. The table
-	should be locked in this case. It should also be locked when a
-	full-text index is being created. */
 	DBUG_ASSERT(!!new_clustered
 		    == ((ha_alter_info->handler_flags
 			 & INNOBASE_INPLACE_REBUILD)
@@ -1481,13 +1477,8 @@ prepare_inplace_alter_table_dict(
 		|| ha_alter_info->alter_info->requested_lock
 		== Alter_info::ALTER_TABLE_LOCK_SHARED;
 
-	/* For now, rebuilding the clustered index requires a lock. */
-	ut_ad(!new_clustered || locked
-	      || ha_alter_info->alter_info->requested_lock
-	      == Alter_info::ALTER_TABLE_LOCK_DEFAULT);
-
 	/* Acquire a lock on the table before creating any indexes. */
-	if (new_clustered || locked) {
+	if (locked) {
 		error = row_merge_lock_table(
 			user_trx, indexed_table, LOCK_S);
 
@@ -1707,12 +1698,20 @@ col_fail:
 
 		/* If only online ALTER TABLE operations have been
 		requested, allocate a modification log. If the table
-		will be locked anyway, the modification log is
-		unnecessary. */
-		if (!locked && !num_fts_index
-		    && !innobase_need_rebuild(ha_alter_info)
-		    && !user_table->ibd_file_missing
-		    && !user_table->tablespace_discarded) {
+		will be locked anyway, the modification
+		log is unnecessary. When rebuilding the table
+		(new_clustered), we will allocate the log for the
+		clustered index of the old table, later. */
+		if (new_clustered
+		    || locked
+		    || user_table->ibd_file_missing
+		    || user_table->tablespace_discarded) {
+			/* No need to allocate a modification log. */
+			ut_ad(!add_index[num_created]->online_log);
+		} else if (add_index[num_created]->type & DICT_FTS) {
+			/* Fulltext indexes are not covered
+			by a modification log. */
+		} else {
 			DBUG_EXECUTE_IF("innodb_OOM_prepare_inplace_alter",
 					error = DB_OUT_OF_MEMORY;
 					goto error_handling;);
@@ -1728,6 +1727,29 @@ col_fail:
 		}
 	}
 
+	ut_ad(new_clustered == (indexed_table != user_table));
+
+	if (new_clustered && !locked) {
+		/* Allocate a log for online table rebuild. */
+		dict_index_t* clust_index = dict_table_get_first_index(
+			user_table);
+
+		DBUG_EXECUTE_IF("innodb_OOM_prepare_inplace_alter_rebuild",
+				error = DB_OUT_OF_MEMORY;
+				goto error_handling;);
+		rw_lock_x_lock(&clust_index->lock);
+		bool ok = row_log_allocate(
+			clust_index, indexed_table,
+			!!(ha_alter_info->handler_flags
+			   & Alter_inplace_info::ADD_PK_INDEX));
+		rw_lock_x_unlock(&clust_index->lock);
+
+		if (!ok) {
+			error = DB_OUT_OF_MEMORY;
+			goto error_handling;
+		}
+	}
+
 	if (fts_index) {
 #ifdef UNIV_DEBUG
 		/* Ensure that the dictionary operation mode will
@@ -1817,8 +1839,7 @@ error_handling:
 			user_trx, add_index, add_key_nums, n_add_index,
 			drop_index, n_drop_index,
 			drop_foreign, n_drop_foreign,
-			!locked && !new_clustered && !num_fts_index,
-			heap, trx, indexed_table);
+			!locked, heap, trx, indexed_table);
 		DBUG_RETURN(false);
 	case DB_TABLESPACE_ALREADY_EXISTS:
 		my_error(ER_TABLE_EXISTS_ERROR, MYF(0), "(unknown)");
@@ -1841,13 +1862,30 @@ error_handling:
 		if (indexed_table != user_table) {
 			dict_table_close(indexed_table, TRUE, FALSE);
 			row_merge_drop_table(trx, indexed_table);
+
+			/* Free the log for online table rebuild, if
+			one was allocated. */
+
+			dict_index_t* clust_index = dict_table_get_first_index(
+				user_table);
+
+			rw_lock_x_lock(&clust_index->lock);
+
+			if (clust_index->online_log) {
+				ut_ad(!locked);
+				row_log_free(clust_index);
+				clust_index->online_status
+					= ONLINE_INDEX_COMPLETE;
+			}
+
+			rw_lock_x_unlock(&clust_index->lock);
 		}
 
 		trx_commit_for_mysql(trx);
 		/* n_ref_count must be 1, because purge cannot
 		be executing on this very table as we are
 		holding dict_operation_lock X-latch. */
-		DBUG_ASSERT(user_table->n_ref_count == 1);
+		DBUG_ASSERT(user_table->n_ref_count == 1 || !locked);
 
 		online_retry_drop_indexes_with_trx(user_table, trx);
 	} else {
@@ -2460,10 +2498,17 @@ ok_exit:
 		prebuilt->trx,
 		prebuilt->table, ctx->indexed_table,
 		ctx->online,
-		ctx->add, ctx->add_key_numbers, ctx->num_to_add, table);
+		ctx->add, ctx->add_key_numbers, ctx->num_to_add,
+		table, altered_table);
 #ifndef DBUG_OFF
 oom:
 #endif /* !DBUG_OFF */
+	if (error == DB_SUCCESS && ctx->online
+	    && ctx->indexed_table != prebuilt->table) {
+		DEBUG_SYNC_C("row_log_table_apply1_before");
+		error = row_log_table_apply(
+			ctx->thr, prebuilt->table, table, altered_table);
+	}
 
 	/* After an error, remove all those index definitions
 	from the dictionary which were defined. */
@@ -2524,6 +2569,31 @@ oom:
 	DBUG_RETURN(true);
 }
 
+/** Free the modification log for online table rebuild.
+@param table	table that was being rebuilt online */
+static
+void
+innobase_online_rebuild_log_free(
+/*=============================*/
+	dict_table_t*	table)
+{
+	dict_index_t* clust_index = dict_table_get_first_index(table);
+
+	rw_lock_x_lock(&clust_index->lock);
+
+	if (clust_index->online_log) {
+		ut_ad(dict_index_get_online_status(clust_index)
+		      == ONLINE_INDEX_CREATION);
+		clust_index->online_status = ONLINE_INDEX_COMPLETE;
+		row_log_free(clust_index);
+		clust_index->online_status = ONLINE_INDEX_COMPLETE;
+	}
+
+	DBUG_ASSERT(dict_index_get_online_status(clust_index)
+		    == ONLINE_INDEX_COMPLETE);
+	rw_lock_x_unlock(&clust_index->lock);
+}
+
 /** Roll back the changes made during prepare_inplace_alter_table()
 and inplace_alter_table() inside the storage engine. Note that the
 allowed level of concurrency during this operation will be the same as
@@ -2576,6 +2646,8 @@ rollback_inplace_alter_table(
 					prebuilt->table->flags);
 			fail = true;
 		}
+
+		innobase_online_rebuild_log_free(prebuilt->table);
 	} else {
 		DBUG_ASSERT(!(ha_alter_info->handler_flags
 			      & Alter_inplace_info::ADD_PK_INDEX));
@@ -2587,6 +2659,14 @@ rollback_inplace_alter_table(
 	trx_free_for_mysql(ctx->trx);
 
 func_exit:
+#ifndef DBUG_OFF
+	dict_index_t* clust_index = dict_table_get_first_index(
+		prebuilt->table);
+	DBUG_ASSERT(!clust_index->online_log);
+	DBUG_ASSERT(dict_index_get_online_status(clust_index)
+		    == ONLINE_INDEX_COMPLETE);
+#endif /* !DBUG_OFF */
+
 	if (ctx && prebuilt->table == ctx->indexed_table) {
 		/* Clear the to_be_dropped flag in the data dictionary. */
 		for (ulint i = 0; i < ctx->num_to_drop; i++) {
@@ -2872,6 +2952,8 @@ ha_innobase::commit_inplace_alter_table(
 	DBUG_ENTER("commit_inplace_alter_table");
 
 	if (!(ha_alter_info->handler_flags & ~INNOBASE_INPLACE_IGNORE)) {
+		DBUG_ASSERT(!ctx);
+
 		/* Nothing to do */
 		if (!commit) {
 			goto ret;
@@ -2916,7 +2998,61 @@ ha_innobase::commit_inplace_alter_table(
 
 		/* We copied the table. Any indexes that were
 		requested to be dropped were not created in the copy
-		of the table. */
+		of the table. Apply any last bit of the rebuild log
+		and then rename the tables. */
+
+		if (ctx->online) {
+			DEBUG_SYNC_C("row_log_table_apply2_before");
+			error = row_log_table_apply(
+				ctx->thr, prebuilt->table,
+				table, altered_table);
+
+			switch (error) {
+				KEY*	dup_key;
+			case DB_SUCCESS:
+				break;
+			case DB_DUPLICATE_KEY:
+				if (prebuilt->trx->error_key_num
+				    == ULINT_UNDEFINED) {
+					/* This should be the hidden index on
+					FTS_DOC_ID. */
+					dup_key = NULL;
+				} else {
+					DBUG_ASSERT(
+						prebuilt->trx->error_key_num
+						< ha_alter_info->key_count);
+					dup_key = &ha_alter_info
+						->key_info_buffer[
+							prebuilt->trx
+							->error_key_num];
+				}
+				print_keydup_error(dup_key);
+				break;
+			case DB_ONLINE_LOG_TOO_BIG:
+				my_error(ER_INNODB_ONLINE_LOG_TOO_BIG, MYF(0),
+					 ha_alter_info->key_info_buffer[0]
+					 .name);
+				break;
+			case DB_INDEX_CORRUPT:
+				my_error(ER_INDEX_CORRUPT, MYF(0),
+					 (prebuilt->trx->error_key_num
+					  == ULINT_UNDEFINED)
+					 ? FTS_DOC_ID_INDEX_NAME
+					 : ha_alter_info->key_info_buffer[
+						 prebuilt->trx->error_key_num]
+					 .name);
+				break;
+			default:
+				my_error_innodb(error,
+						table_share->table_name.str,
+						prebuilt->table->flags);
+			}
+
+			if (error != DB_SUCCESS) {
+				err = -1;
+				goto drop_new_clustered;
+			}
+		}
 
 		/* A new clustered index was defined for the table
 		and there was no error at this point. We can
@@ -2935,17 +3071,24 @@ ha_innobase::commit_inplace_alter_table(
 		holding dict_operation_lock X-latch. */
 		ut_a(prebuilt->table->n_ref_count == 1);
 
-		if (error == DB_SUCCESS) {
-			dict_table_t*	old_table = prebuilt->table;
+		switch (error) {
+			dict_table_t*	old_table;
+			trx_id_t	trx_id;
+		case DB_SUCCESS:
+			old_table = prebuilt->table;
+			trx_id = prebuilt->trx->id;
 			trx_commit_for_mysql(prebuilt->trx);
 			row_prebuilt_free(prebuilt, TRUE);
 			error = row_merge_drop_table(trx, old_table);
 			prebuilt = row_create_prebuilt(
 				ctx->indexed_table, table->s->reclength);
-		}
-
-		switch (error) {
-		case DB_SUCCESS:
+			/* Prevent old transactions from accessing the
+			rebuilt table, because the history is missing. */
+			for (dict_index_t* index = dict_table_get_first_index(
+				     ctx->indexed_table);
+			     index; index = dict_table_get_next_index(index)) {
+				index->trx_id = trx_id;
+			}
 			err = 0;
 			break;
 		case DB_TABLESPACE_ALREADY_EXISTS:
@@ -3067,6 +3210,10 @@ trx_commit:
 		trx_rollback_for_mysql(trx);
 	}
 
+	if (new_clustered) {
+		innobase_online_rebuild_log_free(prebuilt->table);
+	}
+
 	if (err == 0 && ctx) {
 		/* The changes were successfully performed. */
 		bool	add_fts	= false;
@@ -3247,8 +3394,17 @@ func_exit:
 	}
 
 ret:
+#ifndef DBUG_OFF
+	dict_index_t* clust_index = dict_table_get_first_index(
+		prebuilt->table);
+	DBUG_ASSERT(!clust_index->online_log);
+	DBUG_ASSERT(dict_index_get_online_status(clust_index)
+		    == ONLINE_INDEX_COMPLETE);
+#endif /* !DBUG_OFF */
+
 	if (err == 0) {
 		MONITOR_ATOMIC_DEC(MONITOR_PENDING_ALTER_TABLE);
 	}
+
 	DBUG_RETURN(err != 0);
 }

=== modified file 'storage/innobase/include/row0log.h'
--- a/storage/innobase/include/row0log.h	revid:marko.makela@strippedymnl
+++ b/storage/innobase/include/row0log.h	revid:marko.makela@stripped
@@ -167,6 +167,22 @@ row_log_table_is_rollback(
 	__attribute__((nonnull));
 
 /******************************************************//**
+Apply the row_log_table log to a table upon completing rebuild.
+@return DB_SUCCESS, or error code on failure */
+UNIV_INTERN
+dberr_t
+row_log_table_apply(
+/*================*/
+	que_thr_t*	thr,	/*!< in: query graph */
+	dict_table_t*	old_table,
+				/*!< in: old table */
+	struct TABLE*	table,	/*!< in/out: MySQL table
+				(for reporting duplicates) */
+	struct TABLE*	altered_table)
+				/*!< in: new MySQL table definition */
+	__attribute__((nonnull, warn_unused_result));
+
+/******************************************************//**
 Get the latest transaction ID that has invoked row_log_online_op()
 during online creation.
 @return latest transaction ID, or 0 if nothing was logged */

=== modified file 'storage/innobase/include/row0merge.h'
--- a/storage/innobase/include/row0merge.h	revid:marko.makela@stripped529133330-hvs4rzbzl68symnl
+++ b/storage/innobase/include/row0merge.h	revid:marko.makela@stripped-cdeb9k9943oamce2
@@ -290,9 +290,11 @@ row_merge_build_indexes(
 	dict_index_t**	indexes,	/*!< in: indexes to be created */
 	const ulint*	key_numbers,	/*!< in: MySQL key numbers */
 	ulint		n_indexes,	/*!< in: size of indexes[] */
-	struct TABLE*	table)		/*!< in/out: MySQL table, for
+	struct TABLE*	table,		/*!< in/out: MySQL table, for
 					reporting erroneous key value
 					if applicable */
+	struct TABLE*	altered_table)	/*!< in/out: new MySQL table
+					definition */
 	__attribute__((nonnull, warn_unused_result));
 /********************************************************************//**
 Write a buffer to a block. */
@@ -302,7 +304,8 @@ row_merge_buf_write(
 /*================*/
 	const row_merge_buf_t*	buf,	/*!< in: sorted buffer */
 	const merge_file_t*	of,	/*!< in: output file */
-	row_merge_block_t*	block);	/*!< out: buffer for writing to file */
+	row_merge_block_t*	block)	/*!< out: buffer for writing to file */
+	__attribute__((nonnull));
 /********************************************************************//**
 Sort a buffer. */
 UNIV_INTERN

=== modified file 'storage/innobase/row/row0log.cc'
--- a/storage/innobase/row/row0log.cc	revid:marko.makela@stripped120529133330-hvs4rzbzl68symnl
+++ b/storage/innobase/row/row0log.cc	revid:marko.makela@strippeddeb9k9943oamce2
@@ -890,6 +890,1212 @@ row_log_table_is_rollback(
 }
 
 /******************************************************//**
+Converts a log record to a table row.
+@return converted row */
+static __attribute__((nonnull, warn_unused_result))
+const dtuple_t*
+row_log_table_apply_convert_mrec(
+/*=============================*/
+	const mrec_t*		mrec,		/*!< in: merge record */
+	const ulint*		offsets,	/*!< in: offsets of mrec */
+	mem_heap_t*		heap,		/*!< in/out: memory heap */
+	dict_table_t*		new_table,	/*!< in/out: table
+						being rebuilt */
+	const struct TABLE*	altered_table,	/*!< in: new MySQL
+						table definition */
+	const row_merge_dup_t*	dup)		/*!< in: old index and table */
+{
+	dtuple_t*	row;
+
+	/* This is based on row_build(). */
+	row = dtuple_create(heap, dict_table_get_n_cols(dup->index->table));
+	dict_table_copy_types(row, dup->index->table);
+
+	for (ulint i = 0; i < rec_offs_n_fields(offsets); i++) {
+		const dict_field_t*	ind_field
+			= dict_index_get_nth_field(dup->index, i);
+		const dict_col_t*	col
+			= dict_field_get_col(ind_field);
+		ulint			col_no
+			= dict_col_get_no(col);
+		dfield_t*		dfield
+			= dtuple_get_nth_field(row, col_no);
+		ulint			len;
+		const void*		data;
+
+		if (rec_offs_nth_extern(offsets, i)) {
+			data = btr_rec_copy_externally_stored_field(
+				mrec, offsets,
+				dict_table_zip_size(dup->index->table),
+				i, &len, heap);
+			ut_a(data);
+			dfield_set_data(dfield, data, len);
+		} else if (ind_field->prefix_len == 0) {
+			data = rec_get_nth_field(mrec, offsets, i, &len);
+			dfield_set_data(dfield, data, len);
+		}
+	}
+
+	/* TODO: convert row to new_table->cols, in case columns are
+	added or dropped or reordered */
+
+	return(row);
+}
+
+/******************************************************//**
+Replays an insert operation on a table that was rebuilt.
+@return DB_SUCCESS or error code */
+static __attribute__((nonnull, warn_unused_result))
+dberr_t
+row_log_table_apply_insert_low(
+/*===========================*/
+	que_thr_t*		thr,		/*!< in: query graph */
+	const dtuple_t*		row,		/*!< in: table row
+						in the old table definition */
+	mem_heap_t*		offsets_heap,	/*!< in/out: memory heap
+						that can be emptied */
+	mem_heap_t*		heap,		/*!< in/out: memory heap */
+	dict_table_t*		new_table,	/*!< in/out: table
+						being rebuilt */
+	const struct TABLE*	altered_table,	/*!< in: new MySQL
+						table definition */
+	row_merge_dup_t*	dup)		/*!< in/out: for reporting
+						duplicate key errors */
+{
+	dberr_t		error;
+	dtuple_t*	entry;
+	dict_index_t*	index	= dict_table_get_first_index(new_table);
+
+	ut_ad(dtuple_validate(row));
+
+#ifdef ROW_LOG_APPLY_PRINT
+	if (row_log_apply_print) {
+		fprintf(stderr, "table apply insert " IB_ID_FMT,
+			dup->index->id);
+		dtuple_print(stderr, row);
+	}
+#endif /* ROW_LOG_APPLY_PRINT */
+
+	static const ulint	flags
+		= BTR_CREATE_FLAG
+		| BTR_NO_LOCKING_FLAG
+		| BTR_NO_UNDO_LOG_FLAG
+		| BTR_KEEP_SYS_FLAG;
+
+	entry = row_build_index_entry(row, NULL, index, heap);
+
+	error = row_ins_clust_index_entry_low(
+		flags, BTR_MODIFY_TREE, index, entry, 0, thr);
+
+	if (error == DB_DUPLICATE_KEY) {
+		/* TODO: report the duplicate key unless the record is
+		a full match of what we tried to insert */
+		return(error);
+	}
+
+	while (error == DB_SUCCESS) {
+		if (!(index = dict_table_get_next_index(index))) {
+			break;
+		}
+
+		entry = row_build_index_entry(row, NULL, index, heap);
+		error = row_ins_sec_index_entry_low(
+			flags, BTR_MODIFY_TREE,
+			index, offsets_heap, heap, entry, thr);
+	}
+
+	if (error == DB_DUPLICATE_KEY) {
+		/* TODO: report dup */
+	}
+
+	return(error);
+}
+
+/******************************************************//**
+Replays an insert operation on a table that was rebuilt.
+@return DB_SUCCESS or error code */
+static __attribute__((nonnull, warn_unused_result))
+dberr_t
+row_log_table_apply_insert(
+/*=======================*/
+	que_thr_t*		thr,		/*!< in: query graph */
+	const mrec_t*		mrec,		/*!< in: merge record */
+	const ulint*		offsets,	/*!< in: offsets of mrec */
+	mem_heap_t*		offsets_heap,	/*!< in/out: memory heap
+						that can be emptied */
+	mem_heap_t*		heap,		/*!< in/out: memory heap */
+	dict_table_t*		new_table,	/*!< in/out: table
+						being rebuilt */
+	const struct TABLE*	altered_table,	/*!< in: new MySQL
+						table definition */
+	row_merge_dup_t*	dup)		/*!< in/out: for reporting
+						duplicate key errors */
+{
+	const dtuple_t*	row = row_log_table_apply_convert_mrec(
+		mrec, offsets, heap, new_table, altered_table, dup);
+	return(row_log_table_apply_insert_low(thr, row, offsets_heap, heap,
+					      new_table, altered_table, dup));
+}
+
+/******************************************************//**
+Deletes a record from a table that is being rebuilt.
+@return DB_SUCCESS or error code */
+static __attribute__((nonnull, warn_unused_result))
+dberr_t
+row_log_table_apply_delete_low(
+/*===========================*/
+	btr_pcur_t*	pcur,	/*!< in/out: B-tree cursor, will be trashed */
+	const ulint*	offsets,/*!< in: offsets on pcur */
+	mem_heap_t*	heap,	/*!< in/out: memory heap */
+	mtr_t*		mtr)	/*!< in/out: mini-transaction,
+				will be committed */
+{
+	dberr_t		error;
+	row_ext_t*	ext;
+	dtuple_t*	row;
+	dict_index_t*	index	= btr_pcur_get_btr_cur(pcur)->index;
+
+	ut_ad(dict_index_is_clust(index));
+
+	if (dict_table_get_next_index(index)) {
+		/* Build a row template for purging secondary index entries. */
+		row = row_build(
+			ROW_COPY_DATA, index, btr_pcur_get_rec(pcur),
+			offsets, NULL, &ext, heap);
+	} else {
+		row = NULL;
+	}
+
+	btr_cur_pessimistic_delete(&error, FALSE, btr_pcur_get_btr_cur(pcur),
+				   BTR_CREATE_FLAG, RB_NONE, mtr);
+	mtr_commit(mtr);
+
+	if (error != DB_SUCCESS) {
+		return(error);
+	}
+
+	while ((index = dict_table_get_next_index(index)) != NULL) {
+		const dtuple_t*	entry = row_build_index_entry(
+			row, ext, index, heap);
+		mtr_start(mtr);
+		btr_pcur_open(index, entry, PAGE_CUR_LE,
+			      BTR_MODIFY_TREE, pcur, mtr);
+#ifdef UNIV_DEBUG
+		switch (btr_pcur_get_btr_cur(pcur)->flag) {
+		case BTR_CUR_DELETE_REF:
+		case BTR_CUR_DEL_MARK_IBUF:
+		case BTR_CUR_DELETE_IBUF:
+		case BTR_CUR_INSERT_TO_IBUF:
+			/* We did not request buffering. */
+			break;
+		case BTR_CUR_HASH:
+		case BTR_CUR_HASH_FAIL:
+		case BTR_CUR_BINARY:
+			goto flag_ok;
+		}
+		ut_ad(0);
+flag_ok:
+#endif /* UNIV_DEBUG */
+
+		if (page_rec_is_infimum(btr_pcur_get_rec(pcur))
+		    || btr_pcur_get_low_match(pcur) < index->n_uniq) {
+			/* All secondary index entries should be
+			found, because new_table is being modified by
+			this thread only, and all indexes should be
+			updated in sync. */
+			mtr_commit(mtr);
+			return(DB_INDEX_CORRUPT);
+		}
+
+		btr_cur_pessimistic_delete(&error, FALSE,
+					   btr_pcur_get_btr_cur(pcur),
+					   BTR_CREATE_FLAG, RB_NONE, mtr);
+		mtr_commit(mtr);
+	}
+
+	return(error);
+}
+
+/******************************************************//**
+Replays a delete operation on a table that was rebuilt.
+@return DB_SUCCESS or error code */
+static __attribute__((nonnull, warn_unused_result))
+dberr_t
+row_log_table_apply_delete(
+/*=======================*/
+	que_thr_t*		thr,		/*!< in: query graph */
+	ulint			trx_id_col,	/*!< in: position of
+						DB_TRX_ID in the
+						clustered index */
+	const mrec_t*		mrec,		/*!< in: merge record */
+	const ulint*		moffsets,	/*!< in: offsets of mrec */
+	mem_heap_t*		offsets_heap,	/*!< in/out: memory heap
+						that can be emptied */
+	mem_heap_t*		heap,		/*!< in/out: memory heap */
+	dict_table_t*		new_table)
+{
+	dict_index_t*	index = dict_table_get_first_index(new_table);
+	dtuple_t*	old_pk;
+	mtr_t		mtr;
+	btr_pcur_t	pcur;
+	ulint*		offsets;
+
+	ut_ad(rec_offs_n_fields(moffsets)
+	      == dict_index_get_n_unique(index) + 1);
+	ut_ad(!rec_offs_any_extern(moffsets));
+
+	/* Convert the row to a search tuple. */
+	old_pk = dtuple_create(heap, index->n_uniq + 1);
+	dict_table_copy_types(old_pk, new_table);
+	dtuple_set_n_fields_cmp(old_pk, index->n_uniq);
+
+	for (ulint i = 0; i <= index->n_uniq; i++) {
+		ulint		len;
+		const void*	field;
+		field = rec_get_nth_field(mrec, moffsets, i, &len);
+		ut_ad(len != UNIV_SQL_NULL);
+		dfield_set_data(dtuple_get_nth_field(old_pk, i),
+				field, len);
+	}
+
+	mtr_start(&mtr);
+	btr_pcur_open(index, old_pk, PAGE_CUR_LE,
+		      BTR_MODIFY_TREE, &pcur, &mtr);
+#ifdef UNIV_DEBUG
+	switch (btr_pcur_get_btr_cur(&pcur)->flag) {
+	case BTR_CUR_DELETE_REF:
+	case BTR_CUR_DEL_MARK_IBUF:
+	case BTR_CUR_DELETE_IBUF:
+	case BTR_CUR_INSERT_TO_IBUF:
+		/* We did not request buffering. */
+		break;
+	case BTR_CUR_HASH:
+	case BTR_CUR_HASH_FAIL:
+	case BTR_CUR_BINARY:
+		goto flag_ok;
+	}
+	ut_ad(0);
+flag_ok:
+#endif /* UNIV_DEBUG */
+
+	if (page_rec_is_infimum(btr_pcur_get_rec(&pcur))
+	    || btr_pcur_get_low_match(&pcur) < index->n_uniq) {
+all_done:
+		mtr_commit(&mtr);
+		/* The record was not found. All done. */
+		return(DB_SUCCESS);
+	}
+
+	offsets = rec_get_offsets(btr_pcur_get_rec(&pcur), index, NULL,
+				  ULINT_UNDEFINED, &offsets_heap);
+#if defined UNIV_DEBUG || defined UNIV_BLOB_LIGHT_DEBUG
+	ut_a(!rec_offs_any_null_extern(btr_pcur_get_rec(&pcur), offsets));
+#endif /* UNIV_DEBUG || UNIV_BLOB_LIGHT_DEBUG */
+
+	/* Only remove the record if DB_TRX_ID matches what was
+	buffered. */
+
+	{
+		ulint		len;
+		const void*	mrec_trx_id
+			= rec_get_nth_field(mrec, moffsets, trx_id_col, &len);
+		ut_ad(len == DATA_TRX_ID_LEN);
+		const void*	rec_trx_id
+			= rec_get_nth_field(btr_pcur_get_rec(&pcur), offsets,
+					    trx_id_col, &len);
+		ut_ad(len == DATA_TRX_ID_LEN);
+		if (memcmp(mrec_trx_id, rec_trx_id, DATA_TRX_ID_LEN)) {
+			goto all_done;
+		}
+	}
+
+	return(row_log_table_apply_delete_low(&pcur, offsets, heap, &mtr));
+}
+
+/******************************************************//**
+Replays an update operation on a table that was rebuilt.
+@return DB_SUCCESS or error code */
+static __attribute__((nonnull(1,3,4,5,6,7,8), warn_unused_result))
+dberr_t
+row_log_table_apply_update(
+/*=======================*/
+	que_thr_t*		thr,		/*!< in: query graph */
+	ulint			trx_id_col,	/*!< in: position of
+						DB_TRX_ID in the
+						clustered index */
+	const mrec_t*		mrec,		/*!< in: merge record */
+	const ulint*		offsets,	/*!< in: offsets of mrec */
+	mem_heap_t*		offsets_heap,	/*!< in/out: memory heap
+						that can be emptied */
+	mem_heap_t*		heap,		/*!< in/out: memory heap */
+	dict_table_t*		new_table,	/*!< in/out: table
+						being rebuilt */
+	const struct TABLE*	altered_table,	/*!< in: new MySQL
+						table definition */
+	row_merge_dup_t*	dup,		/*!< in/out: for reporting
+						duplicate key errors */
+	const dtuple_t*		old_pk)		/*!< in: PRIMARY KEY and
+						DB_TRX_ID,DB_ROLL_PTR
+						of the old value,
+						or NULL if same_pk */
+{
+	const dtuple_t*	row;
+	dict_index_t*	index = dict_table_get_first_index(new_table);
+	mtr_t		mtr;
+	btr_pcur_t	pcur;
+
+	ut_ad(dtuple_get_n_fields_cmp(old_pk)
+	      == dict_index_get_n_unique(index));
+	ut_ad(dtuple_get_n_fields(old_pk)
+	      == dict_index_get_n_unique(index) + 1);
+	ut_ad(!!old_pk == dup->index->online_log->same_pk);
+
+	row = row_log_table_apply_convert_mrec(
+		mrec, offsets, heap, new_table, altered_table, dup);
+
+	mtr_start(&mtr);
+	btr_pcur_open(index, old_pk, PAGE_CUR_LE,
+		      BTR_MODIFY_TREE, &pcur, &mtr);
+#ifdef UNIV_DEBUG
+	switch (btr_pcur_get_btr_cur(&pcur)->flag) {
+	case BTR_CUR_DELETE_REF:
+	case BTR_CUR_DEL_MARK_IBUF:
+	case BTR_CUR_DELETE_IBUF:
+	case BTR_CUR_INSERT_TO_IBUF:
+		ut_ad(0);/* We did not request buffering. */
+	case BTR_CUR_HASH:
+	case BTR_CUR_HASH_FAIL:
+	case BTR_CUR_BINARY:
+		break;
+	}
+#endif /* UNIV_DEBUG */
+
+	if (page_rec_is_infimum(btr_pcur_get_rec(&pcur))
+	    || btr_pcur_get_low_match(&pcur) < index->n_uniq) {
+		mtr_commit(&mtr);
+insert:
+		/* The row was not found. Insert it. */
+		return(row_log_table_apply_insert_low(
+			       thr, row, offsets_heap, heap,
+			       new_table, altered_table, dup));
+	}
+
+	/* Update the record. */
+	ulint*		cur_offsets	= rec_get_offsets(
+		btr_pcur_get_rec(&pcur),
+		index, NULL, ULINT_UNDEFINED, &offsets_heap);
+
+	dtuple_t*	entry	= row_build_index_entry(
+		row, NULL, index, heap);
+	const upd_t*	update	= row_upd_build_difference_binary(
+		index, entry, btr_pcur_get_rec(&pcur), cur_offsets,
+		false, NULL, heap);
+	dberr_t		error	= DB_SUCCESS;
+
+	if (!update->n_fields) {
+		/* Nothing to do. */
+		goto func_exit;
+	}
+
+	if (rec_offs_any_extern(cur_offsets)) {
+		/* If the record contains any externally stored
+		columns, perform the update by delete and insert,
+		because we will not write any undo log that would
+		allow purge to free any orphaned externally stored
+		columns. */
+delete_insert:
+		error = row_log_table_apply_delete_low(
+			&pcur, cur_offsets, heap, &mtr);
+		ut_ad(mtr.state == MTR_COMMITTED);
+
+		if (error != DB_SUCCESS) {
+			return(error);
+		}
+
+		goto insert;
+	}
+
+	if (upd_get_nth_field(update, 0)->field_no < trx_id_col) {
+		if (!dup->index->online_log->same_pk) {
+			ut_ad(0);
+			error = DB_CORRUPTION;
+			goto func_exit;
+		}
+
+		/* The PRIMARY KEY columns have changed.
+		Delete the record with the old PRIMARY KEY value,
+		provided that it carries the same
+		DB_TRX_ID,DB_ROLL_PTR. Then, insert the new row. */
+		ulint		len;
+		const byte*	cur_trx_roll	= rec_get_nth_field(
+			mrec, offsets, trx_id_col, &len);
+		ut_ad(len == DATA_TRX_ID_LEN);
+		const dfield_t*	new_trx_roll	= dtuple_get_nth_field(
+			old_pk, trx_id_col);
+		/* We assume that DB_TRX_ID,DB_ROLL_PTR are stored
+		in one contiguous block. */
+		ut_ad(rec_get_nth_field(mrec, offsets, trx_id_col + 1, &len)
+		      == cur_trx_roll + DATA_TRX_ID_LEN);
+		ut_ad(len == DATA_ROLL_PTR_LEN);
+		ut_ad(new_trx_roll->len == DATA_TRX_ID_LEN);
+		ut_ad(dtuple_get_nth_field(old_pk, trx_id_col + 1)
+		      -> len == DATA_ROLL_PTR_LEN);
+		ut_ad(static_cast<const byte*>(
+			      dtuple_get_nth_field(old_pk, trx_id_col + 1)
+			      ->data)
+		      == static_cast<const byte*>(new_trx_roll->data)
+		      + DATA_TRX_ID_LEN);
+
+		if (!memcmp(cur_trx_roll, new_trx_roll->data,
+			    DATA_TRX_ID_LEN + DATA_ROLL_PTR_LEN)) {
+			/* The old row exists. Remove it. */
+			goto delete_insert;
+		}
+
+		/* Unless we called row_log_table_apply_delete_low(),
+		this will likely cause a duplicate key error. */
+		goto insert;
+	}
+
+	dtuple_t*	old_row;
+	row_ext_t*	old_ext;
+
+	if (dict_table_get_next_index(index)) {
+		/* Construct the row corresponding to the old value of
+		the record. */
+		old_row = row_build(ROW_COPY_DATA, index,
+				    btr_pcur_get_rec(&pcur),
+				    cur_offsets, NULL, &old_ext, heap);
+		ut_ad(old_row);
+	} else {
+		old_row = NULL;
+		old_ext = NULL;
+	}
+
+	big_rec_t*	big_rec;
+
+	error = btr_cur_pessimistic_update(
+		BTR_CREATE_FLAG | BTR_NO_LOCKING_FLAG
+		| BTR_NO_UNDO_LOG_FLAG | BTR_KEEP_SYS_FLAG
+		| BTR_KEEP_POS_FLAG,
+		btr_pcur_get_btr_cur(&pcur),
+		&cur_offsets, &offsets_heap, heap, &big_rec,
+		update, 0, NULL, 0, &mtr);
+
+	if (big_rec) {
+		if (error == DB_SUCCESS) {
+			error = btr_store_big_rec_extern_fields(
+				index, btr_pcur_get_block(&pcur),
+				btr_pcur_get_rec(&pcur), cur_offsets,
+				big_rec, &mtr, BTR_STORE_UPDATE);
+		}
+
+		dtuple_big_rec_free(big_rec);
+	}
+
+	while ((index = dict_table_get_next_index(index)) != NULL) {
+		if (error != DB_SUCCESS) {
+			break;
+		}
+
+		if (!row_upd_changes_ord_field_binary(
+			    index, update, thr, row, NULL)) {
+			continue;
+		}
+
+		mtr_commit(&mtr);
+
+		entry = row_build_index_entry(old_row, old_ext, index, heap);
+		if (!entry) {
+			ut_ad(0);
+			return(DB_CORRUPTION);
+		}
+
+		mtr_start(&mtr);
+
+		if (ROW_FOUND != row_search_index_entry(
+			    index, entry, BTR_MODIFY_TREE, &pcur, &mtr)) {
+			ut_ad(0);
+			error = DB_CORRUPTION;
+			break;
+		}
+
+		btr_cur_pessimistic_delete(
+			&error, FALSE, btr_pcur_get_btr_cur(&pcur),
+			BTR_CREATE_FLAG | BTR_NO_LOCKING_FLAG
+			| BTR_NO_UNDO_LOG_FLAG | BTR_KEEP_SYS_FLAG,
+			RB_NONE, &mtr);
+
+		if (error != DB_SUCCESS) {
+			break;
+		}
+
+		mtr_commit(&mtr);
+
+		entry = row_build_index_entry(row, NULL, index, heap);
+		error = row_ins_sec_index_entry_low(
+			BTR_CREATE_FLAG | BTR_NO_LOCKING_FLAG
+			| BTR_NO_UNDO_LOG_FLAG | BTR_KEEP_SYS_FLAG,
+			BTR_MODIFY_TREE, index, offsets_heap, heap,
+			entry, thr);
+
+		mtr_start(&mtr);
+	}
+
+func_exit:
+	mtr_commit(&mtr);
+	return(error);
+}
+
+/******************************************************//**
+Applies an operation to a table that was rebuilt.
+@return NULL on failure (mrec corruption) or when out of data;
+pointer to next record on success */
+static __attribute__((nonnull, warn_unused_result))
+const mrec_t*
+row_log_table_apply_op(
+/*===================*/
+	que_thr_t*		thr,		/*!< in: query graph */
+	ulint			trx_id_col,	/*!< in: position of
+						DB_TRX_ID in index */
+	dict_table_t*		new_table,	/*!< in/out: table
+						being rebuilt */
+	const struct TABLE*	altered_table,	/*!< in: new MySQL
+						table definition */
+	row_merge_dup_t*	dup,		/*!< in/out: for reporting
+						duplicate key errors */
+	dberr_t*		error,		/*!< out: DB_SUCCESS
+						or error code */
+	mem_heap_t*		offsets_heap,	/*!< in/out: memory heap
+						that can be emptied */
+	mem_heap_t*		heap,		/*!< in/out: memory heap */
+	const mrec_t*		mrec,		/*!< in: merge record */
+	const mrec_t*		mrec_end,	/*!< in: end of buffer */
+	ulint*			offsets)	/*!< in/out: work area
+						for parsing mrec */
+{
+	dict_index_t*	new_index = dict_table_get_first_index(new_table);
+	ulint		extra_size;
+	const mrec_t*	next_mrec;
+	dtuple_t*	old_pk;
+
+	ut_ad(dict_index_is_clust(dup->index));
+	ut_ad(dup->index->table != new_table);
+
+	*error = DB_SUCCESS;
+
+	if (mrec + 3 >= mrec_end) {
+		return(NULL);
+	}
+
+	switch (*mrec++) {
+	default:
+		ut_ad(0);
+		*error = DB_CORRUPTION;
+		return(NULL);
+	case ROW_T_INSERT:
+		extra_size = *mrec++;
+
+		if (extra_size >= 0x80) {
+			/* Read another byte of extra_size. */
+
+			extra_size = (extra_size & 0x7f) << 8;
+			extra_size |= *mrec++;
+		}
+
+		mrec += extra_size;
+
+		if (mrec > mrec_end) {
+			return(NULL);
+		}
+
+		rec_init_offsets_comp_ordinary(
+			mrec, 0, dup->index, dup->index->n_nullable, offsets);
+
+		next_mrec = mrec + rec_offs_data_size(offsets);
+
+		if (next_mrec > mrec_end) {
+			return(NULL);
+		} else {
+			ulint		len;
+			const byte*	db_trx_id
+				= rec_get_nth_field(
+					mrec, offsets, trx_id_col, &len);
+			ut_ad(len == DATA_TRX_ID_LEN);
+			if (!row_log_table_is_rollback(
+				    dup->index,
+				    trx_read_trx_id(db_trx_id))) {
+				*error = row_log_table_apply_insert(
+					thr, mrec, offsets, offsets_heap,
+					heap,
+					new_table, altered_table, dup);
+			}
+		}
+		break;
+
+	case ROW_T_DELETE:
+		extra_size = *mrec++;
+		ut_ad(mrec < mrec_end);
+
+		/* We assume extra_size < 0x100 for the PRIMARY KEY prefix.
+		For fixed-length PRIMARY key columns, it is 0. */
+		mrec += extra_size;
+
+		if (mrec > mrec_end) {
+			return(NULL);
+		}
+
+		rec_offs_set_n_fields(offsets, new_index->n_uniq + 1);
+		rec_init_offsets_comp_ordinary(mrec, 0, new_index, 0, offsets);
+		next_mrec = mrec + rec_offs_data_size(offsets);
+
+		*error = row_log_table_apply_delete(
+			thr, trx_id_col, mrec, offsets, offsets_heap, heap,
+			new_table);
+		break;
+	case ROW_T_UPDATE:
+		/* Logically, the log entry consists of the
+		(PRIMARY KEY,DB_TRX_ID) of the old value (converted
+		to the new primary key definition) followed by
+		the new value in the old table definition. If the
+		definition of the columns belonging to PRIMARY KEY
+		is not changed, the log will only contain
+		DB_TRX_ID,new_row. */
+
+		old_pk = dtuple_create(heap, new_index->n_uniq + 1);
+
+		if (dup->index->online_log->same_pk) {
+			ut_ad(new_index->n_uniq == dup->index->n_uniq);
+
+			if (DATA_TRX_ID_LEN + 2 + mrec > mrec_end) {
+				return(NULL);
+			}
+
+			/* Set DB_TRX_ID */
+			dfield_set_data(
+				dtuple_get_nth_field(old_pk,
+						     new_index->n_uniq),
+				mrec, DATA_TRX_ID_LEN);
+
+			mrec += DATA_TRX_ID_LEN;
+
+			extra_size = *mrec++;
+
+			if (extra_size >= 0x80) {
+				/* Read another byte of extra_size. */
+
+				extra_size = (extra_size & 0x7f) << 8;
+				extra_size |= *mrec++;
+			}
+
+			mrec += extra_size;
+
+			if (mrec > mrec_end) {
+				return(NULL);
+			}
+
+			rec_offs_set_n_fields(offsets, dup->index->n_fields);
+			rec_init_offsets_comp_ordinary(
+				mrec, 0, dup->index, dup->index->n_nullable,
+				offsets);
+
+			next_mrec = mrec + rec_offs_data_size(offsets);
+
+			if (next_mrec > mrec_end) {
+				return(NULL);
+			}
+
+			/* Copy the PRIMARY KEY fields from mrec to old_pk. */
+			for (ulint i = 0; i < new_index->n_uniq; i++) {
+				const void*	field;
+				ulint		len;
+				dfield_t*	dfield;
+
+				ut_ad(!rec_offs_nth_extern(offsets, i));
+
+				field = rec_get_nth_field(
+					mrec, offsets, i, &len);
+				ut_ad(len != UNIV_SQL_NULL);
+
+				dfield = dtuple_get_nth_field(old_pk, i);
+				dfield_set_data(dfield, field, len);
+			}
+		} else {
+			/* We assume extra_size < 0x100
+			for the PRIMARY KEY prefix. */
+			mrec += *mrec + 1;
+
+			if (mrec > mrec_end) {
+				return(NULL);
+			}
+
+			rec_offs_set_n_fields(offsets, new_index->n_uniq + 1);
+			rec_init_offsets_comp_ordinary(
+				mrec, 0, new_index, 0, offsets);
+
+			next_mrec = mrec + rec_offs_data_size(offsets);
+			if (next_mrec + 2 > mrec_end) {
+				return(NULL);
+			}
+
+			/* Copy the PRIMARY KEY fields and DB_TRX_ID
+			from mrec to old_pk. */
+			for (ulint i = 0; i <= new_index->n_uniq; i++) {
+				const void*	field;
+				ulint		len;
+				dfield_t*	dfield;
+
+				ut_ad(!rec_offs_nth_extern(offsets, i));
+
+				field = rec_get_nth_field(
+					mrec, offsets, i, &len);
+				ut_ad(len != UNIV_SQL_NULL);
+
+				dfield = dtuple_get_nth_field(old_pk, i);
+				dfield_set_data(dfield, field, len);
+			}
+
+			/* Fetch the new value of the row as it was
+			in the old table definition. */
+			extra_size = *mrec++;
+
+			if (extra_size >= 0x80) {
+				/* Read another byte of extra_size. */
+
+				extra_size = (extra_size & 0x7f) << 8;
+				extra_size |= *mrec++;
+			}
+
+			mrec += extra_size;
+
+			if (mrec > mrec_end) {
+				return(NULL);
+			}
+
+			rec_offs_set_n_fields(offsets, dup->index->n_fields);
+			rec_init_offsets_comp_ordinary(
+				mrec, 0, dup->index, dup->index->n_nullable,
+				offsets);
+
+			next_mrec = mrec + rec_offs_data_size(offsets);
+
+			if (next_mrec > mrec_end) {
+				return(NULL);
+			}
+		}
+
+		ut_ad(next_mrec <= mrec_end);
+
+		{
+			ulint		len;
+			const byte*	db_trx_id
+				= rec_get_nth_field(
+					mrec, offsets, trx_id_col, &len);
+			ut_ad(len == DATA_TRX_ID_LEN);
+			if (!row_log_table_is_rollback(
+				    dup->index,
+				    trx_read_trx_id(db_trx_id))) {
+				*error = row_log_table_apply_update(
+					thr, trx_id_col,
+					mrec, offsets, offsets_heap, heap,
+					new_table, altered_table, dup, old_pk);
+			}
+		}
+
+		break;
+	}
+
+	mem_heap_empty(offsets_heap);
+	mem_heap_empty(heap);
+	return(next_mrec);
+}
+
+/******************************************************//**
+Applies operations to a table was rebuilt.
+@return DB_SUCCESS, or error code on failure */
+static __attribute__((nonnull))
+dberr_t
+row_log_table_apply_ops(
+/*====================*/
+	que_thr_t*	thr,	/*!< in: query graph */
+	struct TABLE*	altered_table,
+				/*!< in: new MySQL table definition */
+	row_merge_dup_t*dup)	/*!< in/out: for reporting duplicate key
+				errors */
+{
+	dberr_t		error;
+	const mrec_t*	mrec		= NULL;
+	const mrec_t*	next_mrec;
+	const mrec_t*	mrec_end	= NULL; /* silence bogus warning */
+	const mrec_t*	next_mrec_end;
+	mem_heap_t*	heap;
+	mem_heap_t*	offsets_heap;
+	ulint*		offsets;
+	bool		has_index_lock;
+	dict_index_t*	index		= const_cast<dict_index_t*>(
+		dup->index);
+	const ulint	i		= 1 + REC_OFFS_HEADER_SIZE
+		+ dict_index_get_n_fields(index);
+	const ulint	trx_id_col	= dict_col_get_clust_pos(
+		dict_table_get_sys_col(index->table, DATA_TRX_ID),
+		index);
+	trx_t*		trx		= thr_get_trx(thr);
+
+	ut_ad(dict_index_is_clust(index));
+	ut_ad(dict_index_is_online_ddl(index));
+	ut_ad(trx->mysql_thd);
+#ifdef UNIV_SYNC_DEBUG
+	ut_ad(rw_lock_own(dict_index_get_lock(index), RW_LOCK_EX));
+#endif /* UNIV_SYNC_DEBUG */
+	ut_ad(index->online_log);
+	ut_ad(index->online_log->table);
+	ut_ad(!dict_index_is_online_ddl(
+		      dict_table_get_first_index(index->online_log->table)));
+	ut_ad(trx_id_col > 0);
+	ut_ad(trx_id_col != ULINT_UNDEFINED);
+
+	UNIV_MEM_INVALID(&mrec_end, sizeof mrec_end);
+
+	offsets = static_cast<ulint*>(ut_malloc(i * sizeof *offsets));
+	offsets[0] = i;
+	offsets[1] = dict_index_get_n_fields(index);
+
+	heap = mem_heap_create(UNIV_PAGE_SIZE);
+	offsets_heap = mem_heap_create(UNIV_PAGE_SIZE);
+	has_index_lock = true;
+
+next_block:
+	ut_ad(has_index_lock);
+#ifdef UNIV_SYNC_DEBUG
+	ut_ad(rw_lock_own(dict_index_get_lock(index), RW_LOCK_EX));
+#endif /* UNIV_SYNC_DEBUG */
+	ut_ad(index->online_log->head.bytes == 0);
+
+	if (trx_is_interrupted(trx)) {
+		goto interrupted;
+	}
+
+	if (dict_index_is_corrupted(index)) {
+		error = DB_INDEX_CORRUPT;
+		goto func_exit;
+	}
+
+	ut_ad(dict_index_is_online_ddl(index));
+
+	error = index->online_log->error;
+
+	if (error != DB_SUCCESS) {
+		goto func_exit;
+	}
+
+	if (UNIV_UNLIKELY(index->online_log->head.blocks
+			  > index->online_log->tail.blocks)) {
+unexpected_eof:
+		fprintf(stderr, "InnoDB: unexpected end of temporary file"
+			" for table %s\n", index->table_name);
+corruption:
+		error = DB_CORRUPTION;
+		goto func_exit;
+	}
+
+	if (index->online_log->head.blocks
+	    == index->online_log->tail.blocks) {
+		if (index->online_log->head.blocks) {
+#ifdef HAVE_FTRUNCATE
+			/* Truncate the file in order to save space. */
+			ftruncate(index->online_log->fd, 0);
+#endif /* HAVE_FTRUNCATE */
+			index->online_log->head.blocks
+				= index->online_log->tail.blocks = 0;
+		}
+
+		next_mrec = index->online_log->tail.block;
+		next_mrec_end = next_mrec + index->online_log->tail.bytes;
+
+		if (next_mrec_end == next_mrec) {
+			/* End of log reached. */
+all_done:
+			ut_ad(has_index_lock);
+			ut_ad(index->online_log->head.blocks == 0);
+			ut_ad(index->online_log->tail.blocks == 0);
+			index->online_log->head.bytes = 0;
+			index->online_log->tail.bytes = 0;
+			error = DB_SUCCESS;
+			goto func_exit;
+		}
+	} else {
+		os_offset_t	ofs;
+		ibool		success;
+
+		ofs = (os_offset_t) index->online_log->head.blocks
+			* srv_sort_buf_size;
+
+		ut_ad(has_index_lock);
+		has_index_lock = false;
+		rw_lock_x_unlock(dict_index_get_lock(index));
+
+		log_free_check();
+
+		ut_ad(dict_index_is_online_ddl(index));
+
+		success = os_file_read_no_error_handling(
+			OS_FILE_FROM_FD(index->online_log->fd),
+			index->online_log->head.block, ofs,
+			srv_sort_buf_size);
+
+		if (!success) {
+			fprintf(stderr, "InnoDB: unable to read temporary file"
+				" for table %s\n", index->table_name);
+			goto corruption;
+		}
+
+#ifdef POSIX_FADV_DONTNEED
+		/* Each block is read exactly once.  Free up the file cache. */
+		posix_fadvise(index->online_log->fd,
+			      ofs, srv_sort_buf_size, POSIX_FADV_DONTNEED);
+#endif /* POSIX_FADV_DONTNEED */
+#ifdef FALLOC_FL_PUNCH_HOLE
+		/* Try to deallocate the space for the file on disk.
+		This should work on ext4 on Linux 2.6.39 and later,
+		and be ignored when the operation is unsupported. */
+		fallocate(index->online_log->fd,
+			  FALLOC_FL_PUNCH_HOLE | FALLOC_FL_KEEP_SIZE,
+			  ofs, srv_buf_size);
+#endif /* FALLOC_FL_PUNCH_HOLE */
+
+		next_mrec = index->online_log->head.block;
+		next_mrec_end = next_mrec + srv_sort_buf_size;
+	}
+
+	/* This read is not protected by index->online_log->mutex for
+	performance reasons. We will eventually notice any error that
+	was flagged by a DML thread. */
+	error = index->online_log->error;
+
+	if (error != DB_SUCCESS) {
+		goto func_exit;
+	}
+
+	if (mrec) {
+		/* A partial record was read from the previous block.
+		Copy the temporary buffer full, as we do not know the
+		length of the record. Parse subsequent records from
+		the bigger buffer index->online_log->head.block
+		or index->online_log->tail.block. */
+
+		ut_ad(mrec == index->online_log->head.buf);
+		ut_ad(mrec_end > mrec);
+		ut_ad(mrec_end < (&index->online_log->head.buf)[1]);
+
+		memcpy((mrec_t*) mrec_end, next_mrec,
+		       (&index->online_log->head.buf)[1] - mrec_end);
+		mrec = row_log_table_apply_op(
+			thr, trx_id_col,
+			index->online_log->table, altered_table,
+			dup, &error, offsets_heap, heap,
+			index->online_log->head.buf,
+			(&index->online_log->head.buf)[1], offsets);
+		if (error != DB_SUCCESS) {
+			goto func_exit;
+		} else if (UNIV_UNLIKELY(mrec == NULL)) {
+			/* The record was not reassembled properly. */
+			goto corruption;
+		}
+		/* The record was previously found out to be
+		truncated. Now that the parse buffer was extended,
+		it should proceed beyond the old end of the buffer. */
+		ut_a(mrec > mrec_end);
+
+		index->online_log->head.bytes = mrec - mrec_end;
+		next_mrec += index->online_log->head.bytes;
+	}
+
+	ut_ad(next_mrec <= next_mrec_end);
+	/* The following loop must not be parsing the temporary
+	buffer, but head.block or tail.block. */
+
+	/* mrec!=NULL means that the next record starts from the
+	middle of the block */
+	ut_ad((mrec == NULL) == (index->online_log->head.bytes == 0));
+
+#ifdef UNIV_DEBUG
+	if (next_mrec_end == index->online_log->head.block
+	    + srv_sort_buf_size) {
+		/* If tail.bytes == 0, next_mrec_end can also be at
+		the end of tail.block. */
+		if (index->online_log->tail.bytes == 0) {
+			ut_ad(next_mrec == next_mrec_end);
+			ut_ad(index->online_log->tail.blocks == 0);
+			ut_ad(index->online_log->head.blocks == 0);
+			ut_ad(index->online_log->head.bytes == 0);
+		} else {
+			ut_ad(next_mrec == index->online_log->head.block
+			      + index->online_log->head.bytes);
+			ut_ad(index->online_log->tail.blocks
+			      > index->online_log->head.blocks);
+		}
+	} else if (next_mrec_end == index->online_log->tail.block
+		   + index->online_log->tail.bytes) {
+		ut_ad(next_mrec == index->online_log->tail.block
+		      + index->online_log->head.bytes);
+		ut_ad(index->online_log->tail.blocks == 0);
+		ut_ad(index->online_log->head.blocks == 0);
+		ut_ad(index->online_log->head.bytes
+		      <= index->online_log->tail.bytes);
+	} else {
+		ut_error;
+	}
+#endif /* UNIV_DEBUG */
+
+	mrec_end = next_mrec_end;
+
+	while (!trx_is_interrupted(trx)) {
+		mrec = next_mrec;
+		ut_ad(mrec < mrec_end);
+
+		if (!has_index_lock) {
+			/* We are applying operations from a different
+			block than the one that is being written to.
+			We do not hold index->lock in order to
+			allow other threads to concurrently buffer
+			modifications. */
+			ut_ad(mrec >= index->online_log->head.block);
+			ut_ad(mrec_end == index->online_log->head.block
+			      + srv_sort_buf_size);
+			ut_ad(index->online_log->head.bytes
+			      < srv_sort_buf_size);
+
+			/* Take the opportunity to do a redo log
+			checkpoint if needed. */
+			log_free_check();
+		} else {
+			/* We are applying operations from the last block.
+			Do not allow other threads to buffer anything,
+			so that we can finally catch up and synchronize. */
+			ut_ad(index->online_log->head.blocks == 0);
+			ut_ad(index->online_log->tail.blocks == 0);
+			ut_ad(mrec_end == index->online_log->tail.block
+			      + index->online_log->tail.bytes);
+			ut_ad(mrec >= index->online_log->tail.block);
+		}
+
+		/* This read is not protected by index->online_log->mutex
+		for performance reasons. We will eventually notice any
+		error that was flagged by a DML thread. */
+		error = index->online_log->error;
+
+		if (error != DB_SUCCESS) {
+			goto func_exit;
+		}
+
+		next_mrec = row_log_table_apply_op(
+			thr, trx_id_col,
+			index->online_log->table, altered_table,
+			dup, &error, offsets_heap, heap,
+			mrec, mrec_end, offsets);
+
+		if (error != DB_SUCCESS) {
+			goto func_exit;
+		} else if (next_mrec == next_mrec_end) {
+			/* The record happened to end on a block boundary.
+			Do we have more blocks left? */
+			if (has_index_lock) {
+				/* The index will be locked while
+				applying the last block. */
+				goto all_done;
+			}
+
+			mrec = NULL;
+process_next_block:
+			rw_lock_x_lock(dict_index_get_lock(index));
+			has_index_lock = true;
+
+			index->online_log->head.bytes = 0;
+			index->online_log->head.blocks++;
+			goto next_block;
+		} else if (next_mrec != NULL) {
+			ut_ad(next_mrec < next_mrec_end);
+			index->online_log->head.bytes += next_mrec - mrec;
+		} else if (has_index_lock) {
+			/* When mrec is within tail.block, it should
+			be a complete record, because we are holding
+			index->lock and thus excluding the writer. */
+			ut_ad(index->online_log->tail.blocks == 0);
+			ut_ad(mrec_end == index->online_log->tail.block
+			      + index->online_log->tail.bytes);
+			ut_ad(0);
+			goto unexpected_eof;
+		} else {
+			memcpy(index->online_log->head.buf, mrec,
+			       mrec_end - mrec);
+			mrec_end += index->online_log->head.buf - mrec;
+			mrec = index->online_log->head.buf;
+			goto process_next_block;
+		}
+	}
+
+interrupted:
+	error = DB_INTERRUPTED;
+func_exit:
+	if (!has_index_lock) {
+		rw_lock_x_lock(dict_index_get_lock(index));
+	}
+
+	mem_heap_free(offsets_heap);
+	mem_heap_free(heap);
+	ut_free(offsets);
+	return(error);
+}
+
+/******************************************************//**
+Apply the row_log_table log to a table upon completing rebuild.
+@return DB_SUCCESS, or error code on failure */
+UNIV_INTERN
+dberr_t
+row_log_table_apply(
+/*================*/
+	que_thr_t*	thr,	/*!< in: query graph */
+	dict_table_t*	old_table,
+				/*!< in: old table */
+	struct TABLE*	table,	/*!< in/out: MySQL table
+				(for reporting duplicates) */
+	struct TABLE*	altered_table)
+				/*!< in: new MySQL table definition */
+{
+	dberr_t		error;
+	dict_index_t*	clust_index;
+	row_merge_dup_t dup;
+
+	dup.index = clust_index = dict_table_get_first_index(old_table);
+	dup.table = table;
+	dup.n_dup = 0;
+
+	thr_get_trx(thr)->error_key_num = 0;
+
+#ifdef UNIV_SYNC_DEBUG
+	ut_ad(!rw_lock_own(&dict_operation_lock, RW_LOCK_SHARED));
+#endif /* UNIV_SYNC_DEBUG */
+
+	rw_lock_x_lock(dict_index_get_lock(clust_index));
+
+	if (!clust_index->online_log) {
+		ut_ad(dict_index_get_online_status(clust_index)
+		      == ONLINE_INDEX_COMPLETE);
+		/* This function should not be called unless
+		rebuilding a table online. Build in some fault
+		tolerance. */
+		ut_ad(0);
+		error = DB_ERROR;
+	} else {
+		error = row_log_table_apply_ops(
+			thr, altered_table, &dup);
+	}
+
+	rw_lock_x_unlock(dict_index_get_lock(clust_index));
+	return(error);
+}
+
+/******************************************************//**
 Allocate the row log for an index and flag the index
 for online creation.
 @retval true if success, false if not */

=== modified file 'storage/innobase/row/row0merge.cc'
--- a/storage/innobase/row/row0merge.cc	revid:marko.makela@stripped0-hvs4rzbzl68symnl
+++ b/storage/innobase/row/row0merge.cc	revid:marko.makela@strippedoamce2
@@ -289,7 +289,6 @@ row_merge_buf_add(
 					column prefixes, or NULL */
 	doc_id_t*		doc_id)	/*!< in/out: Doc ID if we are
 					creating FTS index */
-
 {
 	ulint			i;
 	const dict_index_t*	index;
@@ -1196,6 +1195,37 @@ row_merge_write_eof(
 	return(&block[0]);
 }
 
+/*************************************************************//**
+Check if a row should be skipped during online table rebuild.
+@return true if the record should be skipped */
+static __attribute__((nonnull, warn_unused_result))
+bool
+row_merge_skip_rec(
+/*===============*/
+	const rec_t*		rec,	/*!< in: clustered index record
+					or merge record */
+	const dict_index_t*	index,	/*!< in: clustered index of rec */
+	const dict_index_t*	oindex,	/*!< in: clustered index
+					in the old table */
+	const ulint*		offsets)/*!< in: rec_get_offsets(rec) */
+{
+	ulint		trx_id_offset;
+	const byte*	trx_id;
+
+	ut_ad(dict_index_is_clust(index));
+	ut_ad(dict_index_is_clust(oindex));
+	ut_ad(dict_index_is_online_ddl(oindex));
+
+	trx_id_offset = index->trx_id_offset;
+	if (!trx_id_offset) {
+		trx_id_offset = row_get_trx_id_offset(index, offsets);
+	}
+
+	trx_id = rec + trx_id_offset;
+
+	return(row_log_table_is_rollback(oindex, trx_read_trx_id(trx_id)));
+}
+
 /********************************************************************//**
 Reads clustered index of the table and create temporary files
 containing the index entries for the indexes to be built.
@@ -1245,8 +1275,6 @@ row_merge_read_clustered_index(
 
 	trx->op_info = "reading clustered index";
 
-	ut_ad(!online || old_table == new_table);
-
 #ifdef FTS_INTERNAL_DIAG_PRINT
 	DEBUG_FTS_SORT_PRINT("FTS_SORT: Start Create Index\n");
 #endif
@@ -1357,6 +1385,14 @@ row_merge_read_clustered_index(
 				goto func_exit;
 			}
 
+			if (online && old_table != new_table) {
+				err = row_log_table_get_error(clust_index);
+				if (err != DB_SUCCESS) {
+					trx->error_key_num = 0;
+					goto func_exit;
+				}
+			}
+
 			if (rw_lock_get_waiters(
 				    dict_index_get_lock(clust_index))) {
 				ibool	on_user_rec;
@@ -1426,6 +1462,7 @@ row_merge_read_clustered_index(
 
 		if (rec_get_deleted_flag(rec, dict_table_is_comp(old_table))
 		    && (!online
+			|| old_table != new_table
 			|| !trx_rw_is_active(
 				row_get_rec_trx_id(rec, clust_index, offsets),
 				NULL))) {
@@ -1444,6 +1481,12 @@ row_merge_read_clustered_index(
 			continue;
 		}
 
+		if (online && new_table != old_table
+		    && row_merge_skip_rec(
+			    rec, clust_index, clust_index, offsets)) {
+			continue;
+		}
+
 		/* This is essentially a READ UNCOMMITTED to fetch the
 		most recent version of the record. */
 
@@ -1568,7 +1611,7 @@ write_buffers:
 				} else {
 					row_merge_buf_sort(buf, NULL);
 				}
-			} else if (online) {
+			} else if (online && new_table == old_table) {
 				/* Note the newest transaction that
 				modified this index when the scan was
 				completed. We prevent older readers
@@ -2088,7 +2131,7 @@ row_merge_sort(
 
 /*************************************************************//**
 Copy externally stored columns to the data tuple. */
-static
+static __attribute__((nonnull))
 void
 row_merge_copy_blobs(
 /*=================*/
@@ -2098,10 +2141,9 @@ row_merge_copy_blobs(
 	dtuple_t*	tuple,	/*!< in/out: data tuple */
 	mem_heap_t*	heap)	/*!< in/out: memory heap */
 {
-	ulint	i;
-	ulint	n_fields = dtuple_get_n_fields(tuple);
+	ut_ad(rec_offs_any_extern(offsets));
 
-	for (i = 0; i < n_fields; i++) {
+	for (ulint i = 0; i < dtuple_get_n_fields(tuple); i++) {
 		ulint		len;
 		const void*	data;
 		dfield_t*	field = dtuple_get_nth_field(tuple, i);
@@ -2143,8 +2185,7 @@ row_merge_insert_index_tuples(
 	bool			del_marks,
 					/*!< in: whether some tuples may
 					be delete-marked */
-	ulint			zip_size,/*!< in: compressed page size of
-					 the old table, or 0 if uncompressed */
+	const dict_table_t*	old_table,/*!< in: old table */
 	int			fd,	/*!< in: file descriptor */
 	row_merge_block_t*	block)	/*!< in/out: file buffer */
 {
@@ -2203,6 +2244,22 @@ row_merge_insert_index_tuples(
 				break;
 			}
 
+			if (dict_index_is_clust(index)
+			    && dict_table_is_online_rebuild(old_table)) {
+				const dict_index_t*	old_index
+					= dict_table_get_first_index(
+						old_table);
+				error = row_log_table_get_error(old_index);
+				if (error != DB_SUCCESS) {
+					break;
+				}
+
+				if (row_merge_skip_rec(mrec, index, old_index,
+						       offsets)) {
+					continue;
+				}
+			}
+
 			dtuple = row_rec_to_index_entry_low(
 				mrec, index, offsets, &n_ext, tuple_heap);
 
@@ -2215,9 +2272,11 @@ row_merge_insert_index_tuples(
 						     REC_INFO_DELETED_FLAG);
 			}
 
-			if (UNIV_UNLIKELY(n_ext)) {
-				row_merge_copy_blobs(mrec, offsets, zip_size,
-						     dtuple, tuple_heap);
+			if (n_ext) {
+				row_merge_copy_blobs(
+					mrec, offsets,
+					dict_table_zip_size(old_table),
+					dtuple, tuple_heap);
 			}
 
 			ut_ad(dtuple_validate(dtuple));
@@ -2543,6 +2602,8 @@ row_merge_drop_indexes(
 #endif /* UNIV_SYNC_DEBUG */
 
 	index = dict_table_get_first_index(table);
+	ut_ad(dict_index_is_clust(index));
+	ut_ad(dict_index_get_online_status(index) == ONLINE_INDEX_COMPLETE);
 
 	/* the caller should have an open handle to the table */
 	ut_ad(table->n_ref_count >= 1);
@@ -2564,6 +2625,8 @@ row_merge_drop_indexes(
 		the indexes. */
 
 		while ((index = dict_table_get_next_index(index)) != NULL) {
+			ut_ad(!dict_index_is_clust(index));
+
 			switch (dict_index_get_online_status(index)) {
 			case ONLINE_INDEX_ABORTED_DROPPED:
 				continue;
@@ -3139,9 +3202,11 @@ row_merge_build_indexes(
 	dict_index_t**	indexes,	/*!< in: indexes to be created */
 	const ulint*	key_numbers,	/*!< in: MySQL key numbers */
 	ulint		n_indexes,	/*!< in: size of indexes[] */
-	struct TABLE*	table)		/*!< in/out: MySQL table, for
+	struct TABLE*	table,		/*!< in/out: MySQL table, for
 					reporting erroneous key value
 					if applicable */
+	struct TABLE*	altered_table)	/*!< in/out: new MySQL table
+					definition */
 {
 	merge_file_t*		merge_files;
 	row_merge_block_t*	block;
@@ -3155,8 +3220,6 @@ row_merge_build_indexes(
 	fts_psort_t*		merge_info = NULL;
 	ib_int64_t		sig_count = 0;
 
-	ut_ad(!online || old_table == new_table);
-
 	/* Allocate memory for merge file data structure and initialize
 	fields */
 
@@ -3260,8 +3323,9 @@ wait_again:
 
 			if (error == DB_SUCCESS) {
 				error = row_merge_insert_index_tuples(
-					trx->id, sort_idx, online,
-					dict_table_zip_size(old_table),
+					trx->id, sort_idx,
+					online && old_table == new_table,
+					old_table,
 					merge_files[i].fd, block);
 			}
 		}
@@ -3271,7 +3335,13 @@ wait_again:
 
 		if (indexes[i]->type & DICT_FTS) {
 			row_fts_psort_info_destroy(psort_info, merge_info);
-		} else if (error == DB_SUCCESS && online) {
+		} else if (error != DB_SUCCESS || !online) {
+			/* Do not apply any online log. */
+		} else if (old_table != new_table) {
+			ut_ad(!sort_idx->online_log);
+			ut_ad(sort_idx->online_status
+			      == ONLINE_INDEX_COMPLETE);
+		} else {
 			DEBUG_SYNC_C("row_log_apply_before");
 			error = row_log_apply(trx, sort_idx, table);
 			DEBUG_SYNC_C("row_log_apply_after");
@@ -3316,11 +3386,13 @@ func_exit:
 
 	DICT_TF2_FLAG_UNSET(new_table, DICT_TF2_FTS_ADD_DOC_ID);
 
-	if (online && error != DB_SUCCESS) {
-		/* On error, flag all online index creation as aborted. */
+	if (online && old_table == new_table && error != DB_SUCCESS) {
+		/* On error, flag all online secondary index creation
+		as aborted. */
 		for (i = 0; i < n_indexes; i++) {
 			ut_ad(!(indexes[i]->type & DICT_FTS));
 			ut_ad(*indexes[i]->name == TEMP_INDEX_PREFIX);
+			ut_ad(!dict_index_is_clust(indexes[i]));
 
 			/* Completed indexes should be dropped as
 			well, and indexes whose creation was aborted

No bundle (reason: useless for push emails).
Thread
bzr push into mysql-trunk-wl6255 branch (marko.makela:3892 to 3893) WL#6255marko.makela29 May