List:Commits« Previous MessageNext Message »
From:marko.makela Date:May 29 2012 1:35pm
Subject:bzr push into mysql-trunk-wl6255 branch (marko.makela:3890 to 3892) WL#6255
View as plain text  
 3892 Marko Mäkelä	2012-05-29
      WL#6255 DML logging. This does not yet enable online table rebuild.
      The functions for applying the log are part of a separate change.
      
      In online table rebuild, we will not copy any delete-marked rows or
      any history to the new table. Anything that is delete-marked will be
      purged immediately from the new table.
      
      Because the DML log will contain BLOB pointers of the original table,
      the purge thread must not free any BLOBs of a table that is being
      rebuilt.
      
      Because we cannot easily block the ROLLBACK of an INSERT, we will keep
      track of transactions that have been rolled back, and skip any rows
      written by transactions that have been rolled back. In this way, we
      will not dereference any BLOBs that would be freed as part of a
      rollback.
      
      row_log_table_insert(): Logging function for inserting a new row,
      whether or not a fresh insert or an update of a delete-marked row.
      
      row_log_table_delete(): Logging function for delete-marking a row.
      The row will be identified by PRIMARY KEY (in the new table
      definition) and the DB_TRX_ID before the delete-marking (purging).
      
      row_log_table_update(): Logging function for an update of a row that
      was not delete-marked. The new value of the row will be logged, not
      dereferencing any BLOB pointers. If the PRIMARY KEY is redefined, the
      update record will also contain the new PRIMARY KEY and the DB_TRX_ID,
      DB_ROLL_PTR value of the old row.
      
      row_log_table_get_pk(): Constructs the old PRIMARY KEY and
      DB_TRX_ID,DB_ROLL_PTR for row_log_table_update(). If the PRIMARY KEY
      is not being redefined, returns NULL. This conversion has to be done
      at logging time while holding the clustered index leaf page latch, in
      case the new PRIMARY KEY includes column prefixes of externally stored
      columns.
      
      row_log_table_open(), row_log_table_close_func(): Common code for
      table DML logging.
      
      row_ins_clust_index_entry_by_modify(), row_ins_clust_index_entry_low(),
      row_ins_index_entry_big_rec_func(): Add row_log_table_insert() logging.
      
      row_undo_ins_remove_clust_rec(): Add row_log_table_delete() logging.
      
      row_undo_mod_clust_low(): Add an output parameter rebuilt_old_pk for
      row_log_table_update() logging.
      
      row_undo_mod_clust(): Add insert, update, delete logging.
      
      row_upd_clust_rec_by_insert(): Note that no logging is necessary for
      BLOB ownership changes.
      
      row_upd_clust_rec(): Add the parameters offsets, offsets_heap.  Add
      row_log_table_update() logging.

    modified:
      storage/innobase/btr/btr0cur.cc
      storage/innobase/include/row0log.h
      storage/innobase/row/row0ins.cc
      storage/innobase/row/row0log.cc
      storage/innobase/row/row0uins.cc
      storage/innobase/row/row0umod.cc
      storage/innobase/row/row0upd.cc
 3891 Marko Mäkelä	2012-05-29
      WL#6255 and WL#5526 performance tweak:
      
      btr_cur_pessimistic_delete(): Skip locking when flags are set.

    modified:
      storage/innobase/btr/btr0cur.cc
 3890 Marko Mäkelä	2012-05-29
      WL#6255 refactoring:
      
      row_ins_clust_index_entry_low(): Move the log_free_check() call
      to the callers, because during online table rebuild, the clustered index
      tree of the new copy of the table could be X-latched while calling
      this function.

    modified:
      storage/innobase/row/row0ins.cc
=== modified file 'storage/innobase/btr/btr0cur.cc'
--- a/storage/innobase/btr/btr0cur.cc	revid:marko.makela@stripped
+++ b/storage/innobase/btr/btr0cur.cc	revid:marko.makela@strippedom-20120529133330-hvs4rzbzl68symnl
@@ -57,6 +57,7 @@ Created 10/16/1994 Heikki Tuuri
 #include "buf0lru.h"
 #include "btr0btr.h"
 #include "btr0sea.h"
+#include "row0log.h"
 #include "row0purge.h"
 #include "row0upd.h"
 #include "trx0rec.h"
@@ -2761,6 +2762,13 @@ btr_cur_del_mark_set_clust_rec(
 
 	trx = thr_get_trx(thr);
 
+	if (dict_index_is_online_ddl(index)) {
+		row_log_table_delete(
+			rec, index, offsets,
+			trx_read_trx_id(row_get_trx_id_offset(index, offsets)
+					+ rec));
+	}
+
 	row_upd_rec_sys_fields(rec, page_zip, index, offsets, trx, roll_ptr);
 
 	btr_cur_del_mark_set_clust_rec_log(rec, index, trx->id,
@@ -3144,13 +3152,15 @@ btr_cur_pessimistic_delete(
 
 		btr_discard_page(cursor, mtr);
 
-		*err = DB_SUCCESS;
 		ret = TRUE;
 
 		goto return_after_reservations;
 	}
 
-	lock_update_delete(block, rec);
+	if (UNIV_LIKELY(flags == 0)) {
+		lock_update_delete(block, rec);
+	}
+
 	level = btr_page_get_level(page, mtr);
 
 	if (level > 0
@@ -3197,9 +3207,9 @@ btr_cur_pessimistic_delete(
 
 	ut_ad(btr_check_node_ptr(index, block, mtr));
 
+return_after_reservations:
 	*err = DB_SUCCESS;
 
-return_after_reservations:
 	mem_heap_free(heap);
 
 	if (ret == FALSE) {

=== modified file 'storage/innobase/include/row0log.h'
--- a/storage/innobase/include/row0log.h	revid:marko.makela@oracle.com-20120529123953-pwllj1orynm7a6q9
+++ b/storage/innobase/include/row0log.h	revid:marko.makela@stripped120529133330-hvs4rzbzl68symnl
@@ -81,6 +81,71 @@ row_log_table_get_error(
 	__attribute__((nonnull, warn_unused_result));
 
 /******************************************************//**
+Logs a delete operation to a table that is being rebuilt.
+This will be merged in row_log_table_apply_delete(). */
+UNIV_INTERN
+void
+row_log_table_delete(
+/*=================*/
+	const rec_t*	rec,	/*!< in: clustered index leaf page record,
+				page X-latched */
+	dict_index_t*	index,	/*!< in/out: clustered index, S-latched
+				or X-latched */
+	const ulint*	offsets,/*!< in: rec_get_offsets(rec,index) */
+	trx_id_t	trx_id)	/*!< in: DB_TRX_ID of the record before
+				it was deleted */
+	UNIV_COLD __attribute__((nonnull));
+
+/******************************************************//**
+Logs an update operation to a table that is being rebuilt.
+This will be merged in row_log_table_apply_update(). */
+UNIV_INTERN
+void
+row_log_table_update(
+/*=================*/
+	const rec_t*	rec,	/*!< in: clustered index leaf page record,
+				page X-latched */
+	dict_index_t*	index,	/*!< in/out: clustered index, S-latched
+				or X-latched */
+	const ulint*	offsets,/*!< in: rec_get_offsets(rec,index) */
+	const dtuple_t*	old_pk)	/*!< in: row_log_table_get_pk()
+				before the update */
+	UNIV_COLD __attribute__((nonnull(1,2,3)));
+
+/******************************************************//**
+Constructs the old PRIMARY KEY and DB_TRX_ID,DB_ROLL_PTR
+of a table that is being rebuilt.
+@return tuple of PRIMARY KEY,DB_TRX_ID,DB_ROLL_PTR in the rebuilt table,
+or NULL if not being rebuilt online or the PRIMARY KEY definition
+does not change */
+UNIV_INTERN
+const dtuple_t*
+row_log_table_get_pk(
+/*=================*/
+	const rec_t*	rec,	/*!< in: clustered index leaf page record,
+				page X-latched */
+	dict_index_t*	index,	/*!< in/out: clustered index, S-latched
+				or X-latched */
+	const ulint*	offsets,/*!< in: rec_get_offsets(rec,index),
+				or NULL */
+	mem_heap_t**	heap)	/*!< in/out: memory heap where allocated */
+	UNIV_COLD __attribute__((nonnull(1,2,4), warn_unused_result));
+
+/******************************************************//**
+Logs an insert to a table that is being rebuilt.
+This will be merged in row_log_table_apply_insert(). */
+UNIV_INTERN
+void
+row_log_table_insert(
+/*=================*/
+	const rec_t*	rec,	/*!< in: clustered index leaf page record,
+				page X-latched */
+	dict_index_t*	index,	/*!< in/out: clustered index, S-latched
+				or X-latched */
+	const ulint*	offsets)/*!< in: rec_get_offsets(rec,index) */
+	UNIV_COLD __attribute__((nonnull));
+
+/******************************************************//**
 Notes that a transaction is being rolled back. */
 UNIV_INTERN
 void

=== modified file 'storage/innobase/row/row0ins.cc'
--- a/storage/innobase/row/row0ins.cc	revid:marko.makela@strippedm-20120529123953-pwllj1orynm7a6q9
+++ b/storage/innobase/row/row0ins.cc	revid:marko.makela@stripped30-hvs4rzbzl68symnl
@@ -42,6 +42,7 @@ Created 4/20/1996 Heikki Tuuri
 #include "row0upd.h"
 #include "row0sel.h"
 #include "row0row.h"
+#include "row0log.h"
 #include "rem0cmp.h"
 #include "lock0lock.h"
 #include "log0log.h"
@@ -345,7 +346,9 @@ row_ins_clust_index_entry_by_modify(
 	update = row_upd_build_difference_binary(
 		cursor->index, entry, rec, NULL, true,
 		thr_get_trx(thr), heap);
-	if (mode == BTR_MODIFY_LEAF) {
+	if (mode != BTR_MODIFY_TREE) {
+		ut_ad((mode & ~BTR_ALREADY_S_LATCHED) == BTR_MODIFY_LEAF);
+
 		/* Try optimistic updating of the record, keeping changes
 		within the page */
 
@@ -361,7 +364,6 @@ row_ins_clust_index_entry_by_modify(
 			break;
 		}
 	} else {
-		ut_a(mode == BTR_MODIFY_TREE);
 		if (buf_LRU_buf_pool_running_out()) {
 
 			return(DB_LOCK_TABLE_FULL);
@@ -2169,6 +2171,11 @@ row_ins_clust_index_entry_low(
 
 	mtr_start(&mtr);
 
+	if (mode == BTR_MODIFY_LEAF && dict_index_is_online_ddl(index)) {
+		mode = BTR_MODIFY_LEAF | BTR_ALREADY_S_LATCHED;
+		mtr_s_lock(dict_index_get_lock(index), &mtr);
+	}
+
 	cursor.thr = thr;
 
 	/* Note that we use PAGE_CUR_LE as the search mode, because then
@@ -2276,13 +2283,18 @@ err_exit:
 			dtuple_big_rec_free(big_rec);
 		}
 
+		if (dict_index_is_online_ddl(index)) {
+			row_log_table_insert(rec, index, offsets);
+		}
+
 		mtr_commit(&mtr);
 		mem_heap_free(entry_heap);
 	} else {
 		rec_t*	insert_rec;
 
 		if (mode != BTR_MODIFY_TREE) {
-			ut_ad(mode == BTR_MODIFY_LEAF);
+			ut_ad((mode & ~BTR_ALREADY_S_LATCHED)
+			      == BTR_MODIFY_LEAF);
 			err = btr_cur_optimistic_insert(
 				flags, &cursor, &offsets, &offsets_heap,
 				entry, &insert_rec, &big_rec,
@@ -2299,9 +2311,14 @@ err_exit:
 				n_ext, thr, &mtr);
 		}
 
-		mtr_commit(&mtr);
-
 		if (UNIV_LIKELY_NULL(big_rec)) {
+			mtr_commit(&mtr);
+
+			/* Online table rebuild could read (and
+			ignore) the incomplete record at this point.
+			If online rebuild is in progress, the
+			row_ins_index_entry_big_rec() will write log. */
+
 			DBUG_EXECUTE_IF(
 				"row_ins_extern_checkpoint",
 				log_make_checkpoint_at(
@@ -2311,6 +2328,14 @@ err_exit:
 				thr_get_trx(thr)->mysql_thd,
 				__FILE__, __LINE__);
 			dtuple_convert_back_big_rec(index, entry, big_rec);
+		} else {
+			if (err == DB_SUCCESS
+			    && dict_index_is_online_ddl(index)) {
+				row_log_table_insert(
+					insert_rec, index, offsets);
+			}
+
+			mtr_commit(&mtr);
 		}
 	}
 
@@ -2504,6 +2529,11 @@ row_ins_index_entry_big_rec_func(
 		rec, offsets, big_rec, &mtr, BTR_STORE_INSERT);
 	DEBUG_SYNC_C_IF_THD(thd, "after_row_ins_extern");
 
+	if (error == DB_SUCCESS
+	    && dict_index_is_online_ddl(index)) {
+		row_log_table_insert(rec, index, offsets);
+	}
+
 	mtr_commit(&mtr);
 
 	return(error);

=== modified file 'storage/innobase/row/row0log.cc'
--- a/storage/innobase/row/row0log.cc	revid:marko.makela@stripped
+++ b/storage/innobase/row/row0log.cc	revid:marko.makela@stripped0120529133330-hvs4rzbzl68symnl
@@ -281,6 +281,552 @@ row_log_table_get_error(
 }
 
 /******************************************************//**
+Starts logging an operation to a table that is being rebuilt.
+@return pointer to log, or NULL if no logging is necessary */
+static __attribute__((nonnull, warn_unused_result))
+byte*
+row_log_table_open(
+/*===============*/
+	row_log_t*	log,	/*!< in/out: online rebuild log */
+	ulint		size,	/*!< in: size of log record */
+	ulint*		avail)	/*!< out: available size for log record */
+{
+	mutex_enter(&log->mutex);
+
+	UNIV_MEM_INVALID(log->tail.buf, sizeof log->tail.buf);
+
+	if (log->error != DB_SUCCESS) {
+		mutex_exit(&log->mutex);
+		return(NULL);
+	}
+
+	ut_ad(log->tail.bytes < srv_sort_buf_size);
+	*avail = srv_sort_buf_size - log->tail.bytes;
+
+	if (size > *avail) {
+		return(log->tail.buf);
+	} else {
+		return(log->tail.block + log->tail.bytes);
+	}
+}
+
+/******************************************************//**
+Stops logging an operation to a table that is being rebuilt. */
+static __attribute__((nonnull))
+void
+row_log_table_close_func(
+/*=====================*/
+	row_log_t*	log,	/*!< in/out: online rebuild log */
+#ifdef UNIV_DEBUG
+	const byte*	b,	/*!< in: end of log record */
+#endif /* UNIV_DEBUG */
+	ulint		size,	/*!< in: size of log record */
+	ulint		avail)	/*!< in: available size for log record */
+{
+	ut_ad(mutex_own(&log->mutex));
+
+	if (size >= avail) {
+		const os_offset_t	byte_offset
+			= (os_offset_t) log->tail.blocks
+			* srv_sort_buf_size;
+		ibool			ret;
+
+		if (byte_offset + srv_sort_buf_size >= srv_online_max_size) {
+			goto write_failed;
+		}
+
+		if (size == avail) {
+			ut_ad(b == &log->tail.block[srv_sort_buf_size]);
+		} else {
+			ut_ad(b == log->tail.buf + size);
+			memcpy(log->tail.block + log->tail.bytes,
+			       log->tail.buf, avail);
+		}
+		UNIV_MEM_ASSERT_RW(log->tail.block, srv_sort_buf_size);
+		ret = os_file_write(
+			"(modification log)",
+			OS_FILE_FROM_FD(log->fd),
+			log->tail.block, byte_offset, srv_sort_buf_size);
+		log->tail.blocks++;
+		if (!ret) {
+write_failed:
+			log->error = DB_ONLINE_LOG_TOO_BIG;
+		}
+		UNIV_MEM_INVALID(log->tail.block, srv_sort_buf_size);
+		memcpy(log->tail.block, log->tail.buf + avail, size - avail);
+		log->tail.bytes = size - avail;
+	} else {
+		log->tail.bytes += size;
+		ut_ad(b == log->tail.block + log->tail.bytes);
+	}
+
+	UNIV_MEM_INVALID(log->tail.buf, sizeof log->tail.buf);
+	mutex_exit(&log->mutex);
+}
+
+#ifdef UNIV_DEBUG
+# define row_log_table_close(log, b, size, avail)	\
+	row_log_table_close_func(log, b, size, avail)
+#else /* UNIV_DEBUG */
+# define row_log_table_close(log, b, size, avail)	\
+	row_log_table_close_func(log, size, avail)
+#endif /* UNIV_DEBUG */
+
+/******************************************************//**
+Logs a delete operation to a table that is being rebuilt.
+This will be merged in row_log_table_apply_delete(). */
+UNIV_INTERN
+void
+row_log_table_delete(
+/*=================*/
+	const rec_t*	rec,	/*!< in: clustered index leaf page record,
+				page X-latched */
+	dict_index_t*	index,	/*!< in/out: clustered index, S-latched
+				or X-latched */
+	const ulint*	offsets,/*!< in: rec_get_offsets(rec,index) */
+	trx_id_t	trx_id)	/*!< in: DB_TRX_ID of the record before
+				it was deleted */
+{
+	ulint		old_pk_extra_size;
+	ulint		old_pk_size;
+	ulint		mrec_size;
+	ulint		avail_size;
+	mem_heap_t*	heap		= NULL;
+	const dtuple_t*	old_pk;
+
+	ut_ad(dict_index_is_clust(index));
+	ut_ad(rec_offs_validate(rec, index, offsets));
+	ut_ad(rec_offs_n_fields(offsets) == dict_index_get_n_fields(index));
+	ut_ad(rec_offs_size(offsets) <= sizeof index->online_log->tail.buf);
+#ifdef UNIV_SYNC_DEBUG
+	ut_ad(rw_lock_own(&index->lock, RW_LOCK_SHARED)
+	      || rw_lock_own(&index->lock, RW_LOCK_EX));
+#endif /* UNIV_SYNC_DEBUG */
+
+	if (dict_index_is_corrupted(index)
+	    || !dict_index_is_online_ddl(index)
+	    || index->online_log->error != DB_SUCCESS) {
+		return;
+	}
+
+	/* Create the tuple PRIMARY KEY, DB_TRX_ID in the new_table. */
+	if (index->online_log->same_pk) {
+		byte*		db_trx_id;
+		dtuple_t*	tuple;
+
+		/* The PRIMARY KEY and DB_TRX_ID are in the first
+		fields of the record. */
+		heap = mem_heap_create(DATA_TRX_ID_LEN
+				       + DTUPLE_EST_ALLOC(index->n_uniq + 1));
+		old_pk = tuple = dtuple_create(heap, index->n_uniq + 1);
+		dtuple_set_n_fields_cmp(tuple, index->n_uniq);
+		dict_index_copy_types(tuple, index, index->n_uniq + 1);
+
+		for (ulint i = 0; i < index->n_uniq; i++) {
+			ulint		len;
+			const void*	field	= rec_get_nth_field(
+				rec, offsets, i, &len);
+			dfield_t*	dfield	= dtuple_get_nth_field(
+				tuple, i);
+			ut_ad(len != UNIV_SQL_NULL);
+			ut_ad(!rec_offs_nth_extern(offsets, i));
+			dfield_set_data(dfield, field, len);
+		}
+
+		db_trx_id = static_cast<byte*>(
+			mem_heap_alloc(heap, DATA_TRX_ID_LEN));
+		trx_write_trx_id(db_trx_id, trx_id);
+
+		dfield_set_data(dtuple_get_nth_field(tuple, index->n_uniq),
+				db_trx_id, DATA_TRX_ID_LEN);
+	} else {
+		/* The PRIMARY KEY has changed. Translate the tuple. */
+		dict_table_t*	new_table
+			= index->online_log->table;
+		dict_index_t*	new_index
+			= dict_table_get_first_index(new_table);
+		dfield_t*	dfield;
+
+		old_pk = row_log_table_get_pk(rec, index, offsets, &heap);
+		/* Remove DB_ROLL_PTR. */
+		ut_ad(dtuple_get_n_fields_cmp(old_pk)
+		      == dict_index_get_n_unique(new_index));
+		ut_ad(dtuple_get_n_fields(old_pk)
+		      == dict_index_get_n_unique(new_index) + 2);
+		const_cast<ulint&>(old_pk->n_fields)--;
+
+		/* Overwrite DB_TRX_ID with the old trx_id. */
+		dfield = dtuple_get_nth_field(old_pk, new_index->n_uniq);
+		ut_ad(dfield_get_type(dfield)->mtype == DATA_SYS);
+		ut_ad(dfield_get_type(dfield)->prtype
+		      == (DATA_NOT_NULL | DATA_TRX_ID));
+		ut_ad(dfield_get_len(dfield) == DATA_TRX_ID_LEN);
+		trx_write_trx_id(static_cast<byte*>(dfield->data), trx_id);
+	}
+
+	ut_ad(dtuple_get_n_fields(old_pk) > 1);
+	ut_ad(DATA_TRX_ID_LEN == dtuple_get_nth_field(
+		      old_pk, old_pk->n_fields - 1)->len);
+	old_pk_size = rec_get_converted_size_comp_prefix(
+		index, old_pk->fields, old_pk->n_fields,
+		0, &old_pk_extra_size) - REC_N_NEW_EXTRA_BYTES;
+	ut_ad(old_pk_extra_size >= REC_N_NEW_EXTRA_BYTES);
+	old_pk_extra_size -= REC_N_NEW_EXTRA_BYTES;
+	ut_ad(old_pk_extra_size < 0x100);
+
+	mrec_size = 2 + old_pk_size;
+
+	if (byte* b = row_log_table_open(index->online_log,
+					 mrec_size, &avail_size)) {
+		*b++ = ROW_T_DELETE;
+		*b++ = old_pk_extra_size;
+
+		rec_convert_dtuple_to_rec_comp(
+			b + old_pk_extra_size, 0, index,
+			REC_STATUS_ORDINARY,
+			old_pk->fields, old_pk->n_fields, 0);
+
+		b += old_pk_size;
+
+		row_log_table_close(
+			index->online_log, b, mrec_size, avail_size);
+	}
+
+	mem_heap_free(heap);
+}
+
+/******************************************************//**
+Logs an update to a table that is being rebuilt.
+This will be merged in row_log_table_apply_update(). */
+UNIV_INTERN
+void
+row_log_table_update(
+/*=================*/
+	const rec_t*	rec,	/*!< in: clustered index leaf page record,
+				page X-latched */
+	dict_index_t*	index,	/*!< in/out: clustered index, S-latched
+				or X-latched */
+	const ulint*	offsets,/*!< in: rec_get_offsets(rec,index) */
+	const dtuple_t*	old_pk)	/*!< in: row_log_table_get_pk()
+				before the update */
+{
+	ulint	omit_size;
+	ulint	old_pk_size;
+	ulint	old_pk_extra_size;
+	ulint	extra_size;
+	ulint	mrec_size;
+	ulint	avail_size;
+
+	ut_ad(dict_index_is_clust(index));
+	ut_ad(rec_offs_validate(rec, index, offsets));
+	ut_ad(rec_offs_n_fields(offsets) == dict_index_get_n_fields(index));
+	ut_ad(rec_offs_size(offsets) <= sizeof index->online_log->tail.buf);
+#ifdef UNIV_SYNC_DEBUG
+	ut_ad(rw_lock_own(&index->lock, RW_LOCK_SHARED)
+	      || rw_lock_own(&index->lock, RW_LOCK_EX));
+#endif /* UNIV_SYNC_DEBUG */
+	ut_ad(!old_pk || DATA_TRX_ID_LEN == dtuple_get_nth_field(
+		      old_pk, old_pk->n_fields - 1)->len);
+
+	if (dict_index_is_corrupted(index)
+	    || !dict_index_is_online_ddl(index)
+	    || index->online_log->error != DB_SUCCESS) {
+		return;
+	}
+
+	if (rec_offs_comp(offsets)) {
+		ut_ad(rec_get_status(rec) == REC_STATUS_ORDINARY);
+
+		omit_size = REC_N_NEW_EXTRA_BYTES;
+	} else {
+		omit_size = REC_N_OLD_EXTRA_BYTES;
+	}
+
+	extra_size = rec_offs_extra_size(offsets) - omit_size;
+
+	mrec_size = rec_offs_size(offsets) - omit_size
+		+ (ROW_LOG_HEADER_SIZE + 1) + (extra_size >= 0x80);
+
+	if (index->online_log->same_pk) {
+		ut_ad(!old_pk);
+		old_pk_extra_size = old_pk_size = 0;
+	} else {
+		ut_ad(old_pk);
+		ut_ad(dtuple_get_n_fields(old_pk) > 1);
+		old_pk_size = rec_get_converted_size_comp_prefix(
+			index, old_pk->fields, old_pk->n_fields,
+			0, &old_pk_extra_size) - REC_N_NEW_EXTRA_BYTES;
+		ut_ad(old_pk_extra_size >= REC_N_NEW_EXTRA_BYTES);
+		old_pk_extra_size -= REC_N_NEW_EXTRA_BYTES;
+		ut_ad(old_pk_extra_size < 0x100);
+		mrec_size += 1/*old_pk_extra_size*/ + old_pk_size;
+	}
+
+	if (byte* b = row_log_table_open(index->online_log,
+					 mrec_size, &avail_size)) {
+		*b++ = ROW_T_UPDATE;
+
+		if (old_pk_extra_size) {
+			*b++ = old_pk_extra_size;
+
+			rec_convert_dtuple_to_rec_comp(
+				b + old_pk_extra_size, 0, index,
+				REC_STATUS_ORDINARY,
+				old_pk->fields, old_pk->n_fields, 0);
+			b += old_pk_size;
+		}
+
+		if (extra_size < 0x80) {
+			*b++ = (byte) extra_size;
+		} else {
+			ut_ad(extra_size < 0x8000);
+			*b++ = (byte) (0x80 | (extra_size >> 8));
+			*b++ = (byte) extra_size;
+		}
+
+		memcpy(b, rec - rec_offs_extra_size(offsets), extra_size);
+		b += extra_size;
+		memcpy(b, rec, rec_offs_data_size(offsets));
+		b += rec_offs_data_size(offsets);
+
+		row_log_table_close(
+			index->online_log, b, mrec_size, avail_size);
+	}
+}
+
+/******************************************************//**
+Constructs the old PRIMARY KEY and DB_TRX_ID,DB_ROLL_PTR
+of a table that is being rebuilt.
+@return tuple of PRIMARY KEY,DB_TRX_ID,DB_ROLL_PTR in the rebuilt table,
+or NULL if not being rebuilt online or the PRIMARY KEY definition
+does not change */
+UNIV_INTERN
+const dtuple_t*
+row_log_table_get_pk(
+/*=================*/
+	const rec_t*	rec,	/*!< in: clustered index leaf page record,
+				page X-latched */
+	dict_index_t*	index,	/*!< in/out: clustered index, S-latched
+				or X-latched */
+	const ulint*	offsets,/*!< in: rec_get_offsets(rec,index) */
+	mem_heap_t**	heap)	/*!< in/out: memory heap where allocated */
+{
+	ut_ad(dict_index_is_clust(index));
+	ut_ad(dict_index_is_online_ddl(index));
+	ut_ad(!offsets || rec_offs_validate(rec, index, offsets));
+#ifdef UNIV_SYNC_DEBUG
+	ut_ad(rw_lock_own(&index->lock, RW_LOCK_SHARED)
+	      || rw_lock_own(&index->lock, RW_LOCK_EX));
+#endif /* UNIV_SYNC_DEBUG */
+
+	ut_ad(index->online_log);
+	ut_ad(index->online_log->table);
+
+	if (index->online_log->same_pk) {
+		/* The PRIMARY KEY columns are unchanged. */
+		return(NULL);
+	}
+
+	dtuple_t*	tuple;
+
+	mutex_enter(&index->online_log->mutex);
+
+	/* index->online_log->error is protected by the above mutex. */
+	if (index->online_log->error == DB_SUCCESS) {
+		dict_table_t*	new_table
+			= index->online_log->table;
+		dict_index_t*	new_index
+			= dict_table_get_first_index(new_table);
+		const ulint	new_n_uniq
+			= dict_index_get_n_unique(new_index);
+
+		if (!*heap) {
+			ulint	size = 0;
+
+			if (!offsets) {
+				size += (1 + REC_OFFS_HEADER_SIZE
+					 + index->n_fields)
+					* sizeof *offsets;
+			}
+
+			for (ulint i = 0; i < new_n_uniq; i++) {
+				size += dict_col_get_min_size(
+					dict_index_get_nth_col(new_index, i));
+			}
+
+			*heap = mem_heap_create(
+				DTUPLE_EST_ALLOC(new_n_uniq + 2) + size);
+		}
+
+		if (!offsets) {
+			offsets = rec_get_offsets(rec, index, NULL,
+						  ULINT_UNDEFINED, heap);
+		}
+
+		tuple = dtuple_create(*heap, new_n_uniq + 2);
+		dtuple_set_n_fields_cmp(tuple, new_n_uniq);
+		dict_index_copy_types(tuple, new_index, new_n_uniq + 2);
+
+		for (ulint new_i = 0; new_i < new_n_uniq; new_i++) {
+			dict_field_t*		ifield;
+			dfield_t*		dfield;
+			const dict_col_t*	new_col;
+			const dict_col_t*	col;
+			ulint			i;
+			ulint			len;
+			const byte*		field;
+
+			dfield = dtuple_get_nth_field(tuple, new_i);
+			ifield = dict_index_get_nth_field(new_index, new_i);
+			new_col = dict_field_get_col(ifield);
+
+			/* TODO: support ADD COLUMN, DROP COLUMN,
+			reordering columns */
+			col = dict_table_get_nth_col(
+				index->table, new_col->ind);
+
+			i = dict_col_get_clust_pos(col, index);
+
+			if (i == ULINT_UNDEFINED) {
+				ut_ad(0);
+				new_index->online_log->error = DB_CORRUPTION;
+				tuple = NULL;
+				goto func_exit;
+			}
+
+			field = rec_get_nth_field(rec, offsets, i, &len);
+
+			if (len == UNIV_SQL_NULL) {
+				new_index->online_log->error
+					= DB_PRIMARY_KEY_IS_NULL;
+				tuple = NULL;
+				goto func_exit;
+			}
+
+			if (rec_offs_nth_extern(offsets, i)) {
+				ulint		field_len = ifield->prefix_len;
+				byte*		blob_field;
+				const ulint	max_len =
+					DICT_MAX_FIELD_LEN_BY_FORMAT(
+						new_table);
+
+				if (!field_len) {
+					field_len = ifield->fixed_len;
+					if (!field_len) {
+						field_len = max_len + 1;
+					}
+				}
+
+				blob_field = static_cast<byte*>(
+					mem_heap_alloc(*heap, field_len));
+
+				len = btr_copy_externally_stored_field_prefix(
+					blob_field, field_len,
+					dict_table_zip_size(index->table),
+					field, len);
+				if (len == max_len + 1) {
+					new_index->online_log->error
+						= DB_TOO_BIG_INDEX_COL;
+					tuple = NULL;
+					goto func_exit;
+				}
+
+				dfield_set_data(dfield, blob_field, len);
+			} else {
+				if (ifield->prefix_len
+				    && ifield->prefix_len < len) {
+					len = ifield->prefix_len;
+				}
+
+				dfield_set_data(
+					dfield,
+					mem_heap_dup(*heap, field, len), len);
+			}
+		}
+
+		const byte* trx_roll = rec
+			+ row_get_trx_id_offset(index, offsets);
+
+		dfield_set_data(dtuple_get_nth_field(tuple, new_n_uniq),
+				trx_roll, DATA_TRX_ID_LEN);
+		dfield_set_data(dtuple_get_nth_field(tuple, new_n_uniq + 1),
+				trx_roll + DATA_TRX_ID_LEN, DATA_ROLL_PTR_LEN);
+	}
+
+func_exit:
+	mutex_exit(&index->online_log->mutex);
+	return(tuple);
+}
+
+/******************************************************//**
+Logs an insert to a table that is being rebuilt.
+This will be merged in row_log_table_apply_insert(). */
+UNIV_INTERN
+void
+row_log_table_insert(
+/*=================*/
+	const rec_t*	rec,	/*!< in: clustered index leaf page record,
+				page X-latched */
+	dict_index_t*	index,	/*!< in/out: clustered index, S-latched
+				or X-latched */
+	const ulint*	offsets)/*!< in: rec_get_offsets(rec,index) */
+{
+	ulint	omit_size;
+	ulint	extra_size;
+	ulint	mrec_size;
+	ulint	avail_size;
+
+	ut_ad(dict_index_is_clust(index));
+	ut_ad(rec_offs_validate(rec, index, offsets));
+	ut_ad(rec_offs_n_fields(offsets) == dict_index_get_n_fields(index));
+	ut_ad(rec_offs_size(offsets) <= sizeof index->online_log->tail.buf);
+#ifdef UNIV_SYNC_DEBUG
+	ut_ad(rw_lock_own(&index->lock, RW_LOCK_SHARED)
+	      || rw_lock_own(&index->lock, RW_LOCK_EX));
+#endif /* UNIV_SYNC_DEBUG */
+
+	if (dict_index_is_corrupted(index)
+	    || !dict_index_is_online_ddl(index)
+	    || index->online_log->error != DB_SUCCESS) {
+		return;
+	}
+
+	if (rec_offs_comp(offsets)) {
+		ut_ad(rec_get_status(rec) == REC_STATUS_ORDINARY);
+
+		omit_size = REC_N_NEW_EXTRA_BYTES;
+	} else {
+		omit_size = REC_N_OLD_EXTRA_BYTES;
+	}
+
+	extra_size = rec_offs_extra_size(offsets) - omit_size;
+
+	mrec_size = rec_offs_size(offsets) - omit_size
+		+ ROW_LOG_HEADER_SIZE + (extra_size >= 0x80);
+
+	if (byte* b = row_log_table_open(index->online_log,
+					 mrec_size, &avail_size)) {
+		*b++ = ROW_T_INSERT;
+
+		if (extra_size < 0x80) {
+			*b++ = (byte) extra_size;
+		} else {
+			ut_ad(extra_size < 0x8000);
+			*b++ = (byte) (0x80 | (extra_size >> 8));
+			*b++ = (byte) extra_size;
+		}
+
+		memcpy(b, rec - rec_offs_extra_size(offsets), extra_size);
+		b += extra_size;
+		memcpy(b, rec, rec_offs_data_size(offsets));
+		b += rec_offs_data_size(offsets);
+
+		row_log_table_close(
+			index->online_log, b, mrec_size, avail_size);
+	}
+}
+
+/******************************************************//**
 Notes that a transaction is being rolled back. */
 UNIV_INTERN
 void
@@ -969,10 +1515,9 @@ all_done:
 		ofs = (os_offset_t) index->online_log->head.blocks
 			* srv_sort_buf_size;
 
-		if (has_index_lock) {
-			has_index_lock = false;
-			rw_lock_x_unlock(dict_index_get_lock(index));
-		}
+		ut_ad(has_index_lock);
+		has_index_lock = false;
+		rw_lock_x_unlock(dict_index_get_lock(index));
 
 		log_free_check();
 
@@ -1083,7 +1628,7 @@ all_done:
 		if (!has_index_lock) {
 			/* We are applying operations from a different
 			block than the one that is being written to.
-			Release and reacquire index->lock in order to
+			We do not hold index->lock in order to
 			allow other threads to concurrently buffer
 			modifications. */
 			ut_ad(mrec >= index->online_log->head.block);

=== modified file 'storage/innobase/row/row0uins.cc'
--- a/storage/innobase/row/row0uins.cc	revid:marko.makela@strippedlj1orynm7a6q9
+++ b/storage/innobase/row/row0uins.cc	revid:marko.makela@stripped
@@ -70,20 +70,55 @@ row_undo_ins_remove_clust_rec(
 	btr_cur_t*	btr_cur;
 	ibool		success;
 	dberr_t		err;
-	ulint		n_tries		= 0;
+	ulint		n_tries	= 0;
 	mtr_t		mtr;
+	dict_index_t*	index	= node->pcur.btr_cur.index;
+	bool		online;
+
+	ut_ad(dict_index_is_clust(index));
 
 	mtr_start(&mtr);
 
-	success = btr_pcur_restore_position(BTR_MODIFY_LEAF, &(node->pcur),
-					    &mtr);
+	/* This is similar to row_undo_mod_clust(). Even though we
+	call row_log_table_rollback() elsewhere, the DDL thread may
+	already have copied this row to the sort buffers or to the new
+	table. We must log the removal, so that the row will be
+	correctly purged. However, we can log the removal out of sync
+	with the B-tree modification. */
+
+	online = dict_index_is_online_ddl(index);
+	if (online) {
+		ut_ad(node->trx->dict_operation_lock_mode
+		      != RW_X_LATCH);
+		ut_ad(node->table->id != DICT_INDEXES_ID);
+		mtr_s_lock(dict_index_get_lock(index), &mtr);
+	}
+
+	success = btr_pcur_restore_position(
+		online
+		? BTR_MODIFY_LEAF | BTR_ALREADY_S_LATCHED
+		: BTR_MODIFY_LEAF, &node->pcur, &mtr);
 	ut_a(success);
 
-	ut_ad(rec_get_trx_id(btr_pcur_get_rec(&node->pcur),
-			     node->pcur.btr_cur.index)
+	btr_cur = btr_pcur_get_btr_cur(&node->pcur);
+
+	ut_ad(rec_get_trx_id(btr_cur_get_rec(btr_cur), btr_cur->index)
 	      == node->trx->id);
 
+	if (online && dict_index_is_online_ddl(index)) {
+		const rec_t*	rec	= btr_cur_get_rec(btr_cur);
+		mem_heap_t*	heap	= NULL;
+		const ulint*	offsets	= rec_get_offsets(
+			rec, index, NULL, ULINT_UNDEFINED, &heap);
+		row_log_table_delete(
+			rec, index, offsets,
+			trx_read_trx_id(row_get_trx_id_offset(index, offsets)
+					+ rec));
+		mem_heap_free(heap);
+	}
+
 	if (node->table->id == DICT_INDEXES_ID) {
+		ut_ad(!online);
 		ut_ad(node->trx->dict_operation_lock_mode == RW_X_LATCH);
 
 		/* Drop the index tree associated with the row in
@@ -95,13 +130,11 @@ row_undo_ins_remove_clust_rec(
 
 		mtr_start(&mtr);
 
-		success = btr_pcur_restore_position(BTR_MODIFY_LEAF,
-						    &(node->pcur), &mtr);
+		success = btr_pcur_restore_position(
+			BTR_MODIFY_LEAF, &node->pcur, &mtr);
 		ut_a(success);
 	}
 
-	btr_cur = btr_pcur_get_btr_cur(&(node->pcur));
-
 	if (btr_cur_optimistic_delete(btr_cur, 0, &mtr)) {
 		err = DB_SUCCESS;
 		goto func_exit;

=== modified file 'storage/innobase/row/row0umod.cc'
--- a/storage/innobase/row/row0umod.cc	revid:marko.makela@strippedlj1orynm7a6q9
+++ b/storage/innobase/row/row0umod.cc	revid:marko.makela@stripped
@@ -111,6 +111,10 @@ row_undo_mod_clust_low(
 	mem_heap_t**	offsets_heap,
 				/*!< in/out: memory heap that can be emptied */
 	mem_heap_t*	heap,	/*!< in/out: memory heap */
+	const dtuple_t**rebuilt_old_pk,
+				/*!< out: row_log_table_get_pk()
+				before the update, or NULL if
+				the table is not being rebuilt online */
 	que_thr_t*	thr,	/*!< in: query thread */
 	mtr_t*		mtr,	/*!< in: mtr; must be committed before
 				latching any further pages */
@@ -132,12 +136,22 @@ row_undo_mod_clust_low(
 	btr_pcur_restore_position(mode, pcur, mtr);
 
 	ut_ad(success);
-
 	ut_ad(rec_get_trx_id(btr_cur_get_rec(btr_cur),
 			     btr_cur_get_index(btr_cur))
 	      == thr_get_trx(thr)->id);
 
-	if (mode == BTR_MODIFY_LEAF) {
+	if (mode != BTR_MODIFY_LEAF
+	    && dict_index_is_online_ddl(btr_cur_get_index(btr_cur))) {
+		*rebuilt_old_pk = row_log_table_get_pk(
+			btr_cur_get_rec(btr_cur),
+			btr_cur_get_index(btr_cur), NULL, &heap);
+	} else {
+		*rebuilt_old_pk = NULL;
+	}
+
+	if (mode != BTR_MODIFY_TREE) {
+		ut_ad((mode & ~BTR_ALREADY_S_LATCHED) == BTR_MODIFY_LEAF);
+
 		err = btr_cur_optimistic_update(
 			BTR_NO_LOCKING_FLAG | BTR_NO_UNDO_LOG_FLAG
 			| BTR_KEEP_SYS_FLAG,
@@ -147,8 +161,6 @@ row_undo_mod_clust_low(
 	} else {
 		big_rec_t*	dummy_big_rec;
 
-		ut_ad(mode == BTR_MODIFY_TREE);
-
 		err = btr_cur_pessimistic_update(
 			BTR_NO_LOCKING_FLAG
 			| BTR_NO_UNDO_LOG_FLAG
@@ -238,11 +250,18 @@ row_undo_mod_clust(
 	btr_pcur_t*	pcur;
 	mtr_t		mtr;
 	dberr_t		err;
+	dict_index_t*	index;
+	bool		online;
 	ibool		success;
 	ibool		more_vers;
 	undo_no_t	new_undo_no;
 
-	ut_ad(node && thr);
+	ut_ad(thr_get_trx(thr) == node->trx);
+	ut_ad(node->trx->dict_operation_lock_mode);
+#ifdef UNIV_SYNC_DEBUG
+	ut_ad(rw_lock_own(&dict_operation_lock, RW_LOCK_SHARED)
+	      || rw_lock_own(&dict_operation_lock, RW_LOCK_EX));
+#endif /* UNIV_SYNC_DEBUG */
 
 	log_free_check();
 
@@ -251,19 +270,31 @@ row_undo_mod_clust(
 
 	more_vers = row_undo_mod_undo_also_prev_vers(node, &new_undo_no);
 
-	pcur = &(node->pcur);
+	pcur = &node->pcur;
+
+	index = btr_cur_get_index(btr_pcur_get_btr_cur(pcur));
 
 	mtr_start(&mtr);
 
+	online = dict_index_is_online_ddl(index);
+	if (online) {
+		ut_ad(node->trx->dict_operation_lock_mode != RW_X_LATCH);
+		mtr_s_lock(dict_index_get_lock(index), &mtr);
+	}
+
 	mem_heap_t*	heap		= mem_heap_create(1024);
 	mem_heap_t*	offsets_heap	= NULL;
 	ulint*		offsets		= NULL;
+	const dtuple_t*	rebuilt_old_pk;
 
 	/* Try optimistic processing of the record, keeping changes within
 	the index page */
 
 	err = row_undo_mod_clust_low(node, &offsets, &offsets_heap,
-				     heap, thr, &mtr, BTR_MODIFY_LEAF);
+				     heap, &rebuilt_old_pk,
+				     thr, &mtr, online
+				     ? BTR_MODIFY_LEAF | BTR_ALREADY_S_LATCHED
+				     : BTR_MODIFY_LEAF);
 
 	if (err != DB_SUCCESS) {
 		btr_pcur_commit_specify_mtr(pcur, &mtr);
@@ -274,17 +305,50 @@ row_undo_mod_clust(
 		mtr_start(&mtr);
 
 		err = row_undo_mod_clust_low(
-			node, &offsets, &offsets_heap, heap, thr, &mtr,
-			BTR_MODIFY_TREE);
+			node, &offsets, &offsets_heap, heap, &rebuilt_old_pk,
+			thr, &mtr, BTR_MODIFY_TREE);
 		ut_ad(err == DB_SUCCESS || err == DB_OUT_OF_FILE_SPACE);
 	}
 
+	/* Online rebuild cannot be initiated while we are holding
+	dict_operation_lock and index->lock. (It can be aborted.) */
+	ut_ad(online || !dict_index_is_online_ddl(index));
+
+	if (err == DB_SUCCESS && online) {
+#ifdef UNIV_SYNC_DEBUG
+		ut_ad(rw_lock_own(&index->lock, RW_LOCK_SHARED)
+		      || rw_lock_own(&index->lock, RW_LOCK_EX));
+#endif /* UNIV_SYNC_DEBUG */
+		switch (node->rec_type) {
+		case TRX_UNDO_UPD_DEL_REC:
+			row_log_table_insert(
+				btr_pcur_get_rec(pcur), index, offsets);
+			break;
+		case TRX_UNDO_UPD_EXIST_REC:
+			row_log_table_update(
+				btr_pcur_get_rec(pcur), index, offsets,
+				rebuilt_old_pk);
+			break;
+		case TRX_UNDO_DEL_MARK_REC:
+			row_log_table_delete(
+				btr_pcur_get_rec(pcur), index, offsets,
+				node->trx->id);
+			break;
+		default:
+			ut_ad(0);
+			break;
+		}
+	}
+
 	btr_pcur_commit_specify_mtr(pcur, &mtr);
 
 	if (err == DB_SUCCESS && node->rec_type == TRX_UNDO_UPD_DEL_REC) {
 
 		mtr_start(&mtr);
 
+		/* It is not necessary to call row_log_table,
+		because the record is delete-marked and would thus
+		be omitted from the rebuilt copy of the table. */
 		err = row_undo_mod_remove_clust_low(
 			node, thr, &mtr, BTR_MODIFY_LEAF);
 		if (err != DB_SUCCESS) {

=== modified file 'storage/innobase/row/row0upd.cc'
--- a/storage/innobase/row/row0upd.cc	revid:marko.makela@stripped-20120529123953-pwllj1orynm7a6q9
+++ b/storage/innobase/row/row0upd.cc	revid:marko.makela@stripped0-hvs4rzbzl68symnl
@@ -2048,6 +2048,11 @@ err_exit:
 			btr_cur_get_page_zip(btr_cur),
 			rec, index, offsets, node->update, mtr);
 
+		/* It is not necessary to call row_log_table for
+		this, because during online table rebuild, purge will
+		not free any BLOBs in the table, whether or not they
+		are owned by the clustered index record. */
+
 		mtr_commit(mtr);
 	}
 
@@ -2067,16 +2072,18 @@ row_upd_clust_rec(
 /*==============*/
 	upd_node_t*	node,	/*!< in: row update node */
 	dict_index_t*	index,	/*!< in: clustered index */
+	ulint*		offsets,/*!< in: rec_get_offsets() on node->pcur */
+	mem_heap_t**	offsets_heap,
+				/*!< in/out: memory heap, can be emptied */
 	que_thr_t*	thr,	/*!< in: query thread */
 	mtr_t*		mtr)	/*!< in: mtr; gets committed here */
 {
-	mem_heap_t*	offsets_heap;
-	mem_heap_t*	heap;
-	big_rec_t*	big_rec	= NULL;
+	mem_heap_t*	heap		= NULL;
+	big_rec_t*	big_rec		= NULL;
 	btr_pcur_t*	pcur;
 	btr_cur_t*	btr_cur;
 	dberr_t		err;
-	ulint*		offsets	= NULL;
+	const dtuple_t*	rebuilt_old_pk	= NULL;
 
 	ut_ad(node);
 	ut_ad(dict_index_is_clust(index));
@@ -2085,21 +2092,20 @@ row_upd_clust_rec(
 	btr_cur = btr_pcur_get_btr_cur(pcur);
 
 	ut_ad(btr_cur_get_index(btr_cur) == index);
-	ut_ad(!rec_get_deleted_flag(btr_pcur_get_rec(pcur),
+	ut_ad(!rec_get_deleted_flag(btr_cur_get_rec(btr_cur),
 				    dict_table_is_comp(index->table)));
+	ut_ad(rec_offs_validate(btr_cur_get_rec(btr_cur), index, offsets));
 
-	offsets_heap = NULL;
+	if (dict_index_is_online_ddl(index)) {
+		rebuilt_old_pk = row_log_table_get_pk(
+			btr_cur_get_rec(btr_cur), index, offsets, &heap);
+	}
 
 	/* Try optimistic updating of the record, keeping changes within
 	the page; we do not check locks because we assume the x-lock on the
 	record to update */
 
 	if (node->cmpl_info & UPD_NODE_NO_SIZE_CHANGE) {
-		/* TODO: reuse offsets from caller */
-		offsets = rec_get_offsets(
-			btr_cur_get_rec(btr_cur),
-			index, offsets, ULINT_UNDEFINED, &offsets_heap);
-
 		err = btr_cur_update_in_place(
 			BTR_NO_LOCKING_FLAG, btr_cur,
 			offsets, node->update,
@@ -2107,10 +2113,16 @@ row_upd_clust_rec(
 	} else {
 		err = btr_cur_optimistic_update(
 			BTR_NO_LOCKING_FLAG, btr_cur,
-			&offsets, &offsets_heap, node->update,
+			&offsets, offsets_heap, node->update,
 			node->cmpl_info, thr, thr_get_trx(thr)->id, mtr);
 	}
 
+	if (err == DB_SUCCESS && rebuilt_old_pk
+	    && dict_index_is_online_ddl(index)) {
+		row_log_table_update(btr_cur_get_rec(btr_cur),
+				     index, offsets, rebuilt_old_pk);
+	}
+
 	mtr_commit(mtr);
 
 	if (UNIV_LIKELY(err == DB_SUCCESS)) {
@@ -2139,11 +2151,13 @@ row_upd_clust_rec(
 	ut_ad(!rec_get_deleted_flag(btr_pcur_get_rec(pcur),
 				    dict_table_is_comp(index->table)));
 
-	heap = mem_heap_create(1024);
+	if (!heap) {
+		heap = mem_heap_create(1024);
+	}
 
 	err = btr_cur_pessimistic_update(
 		BTR_NO_LOCKING_FLAG | BTR_KEEP_POS_FLAG, btr_cur,
-		&offsets, &offsets_heap, heap, &big_rec,
+		&offsets, offsets_heap, heap, &big_rec,
 		node->update, node->cmpl_info,
 		thr, thr_get_trx(thr)->id, mtr);
 	if (big_rec) {
@@ -2191,11 +2205,16 @@ row_upd_clust_rec(
 		ut_a(err == DB_SUCCESS);
 	}
 
+	if (err == DB_SUCCESS && rebuilt_old_pk
+	    && dict_index_is_online_ddl(index)) {
+		row_log_table_update(btr_cur_get_rec(btr_cur),
+				     index, offsets, rebuilt_old_pk);
+	}
+
 	mtr_commit(mtr);
-	mem_heap_free(heap);
 func_exit:
-	if (offsets_heap) {
-		mem_heap_free(offsets_heap);
+	if (heap) {
+		mem_heap_free(heap);
 	}
 
 	if (big_rec) {
@@ -2273,7 +2292,7 @@ row_upd_clust_step(
 	dberr_t		err;
 	mtr_t		mtr;
 	rec_t*		rec;
-	mem_heap_t*	heap		= NULL;
+	mem_heap_t*	heap	= NULL;
 	ulint		offsets_[REC_OFFS_NORMAL_SIZE];
 	ulint*		offsets;
 	ibool		referenced;
@@ -2299,7 +2318,17 @@ row_upd_clust_step(
 
 	ut_a(pcur->rel_pos == BTR_PCUR_ON);
 
-	success = btr_pcur_restore_position(BTR_MODIFY_LEAF, pcur, &mtr);
+	ulint	mode;
+
+	if (dict_index_is_online_ddl(index)) {
+		ut_ad(node->table->id != DICT_INDEXES_ID);
+		mode = BTR_MODIFY_LEAF | BTR_ALREADY_S_LATCHED;
+		mtr_s_lock(dict_index_get_lock(index), &mtr);
+	} else {
+		mode = BTR_MODIFY_LEAF;
+	}
+
+	success = btr_pcur_restore_position(mode, pcur, &mtr);
 
 	if (!success) {
 		err = DB_RECORD_NOT_FOUND;
@@ -2315,6 +2344,8 @@ row_upd_clust_step(
 
 	if (node->is_delete && node->table->id == DICT_INDEXES_ID) {
 
+		ut_ad(!dict_index_is_online_ddl(index));
+
 		dict_drop_index_tree(btr_pcur_get_rec(pcur), &mtr);
 
 		mtr_commit(&mtr);
@@ -2356,11 +2387,8 @@ row_upd_clust_step(
 			node->state = UPD_NODE_UPDATE_ALL_SEC;
 			node->index = dict_table_get_next_index(index);
 		}
-exit_func:
-		if (UNIV_LIKELY_NULL(heap)) {
-			mem_heap_free(heap);
-		}
-		return(err);
+
+		goto exit_func;
 	}
 
 	/* If the update is made for MySQL, we already have the update vector
@@ -2374,13 +2402,11 @@ exit_func:
 		row_upd_eval_new_vals(node->update);
 	}
 
-	if (UNIV_LIKELY_NULL(heap)) {
-		mem_heap_free(heap);
-	}
-
 	if (node->cmpl_info & UPD_NODE_NO_ORD_CHANGE) {
 
-		return(row_upd_clust_rec(node, index, thr, &mtr));
+		err = row_upd_clust_rec(
+			node, index, offsets, &heap, thr, &mtr);
+		goto exit_func;
 	}
 
 	row_upd_store_row(node);
@@ -2404,16 +2430,17 @@ exit_func:
 
 		if (err != DB_SUCCESS) {
 
-			return(err);
+			goto exit_func;
 		}
 
 		node->state = UPD_NODE_UPDATE_ALL_SEC;
 	} else {
-		err = row_upd_clust_rec(node, index, thr, &mtr);
+		err = row_upd_clust_rec(
+			node, index, offsets, &heap, thr, &mtr);
 
 		if (err != DB_SUCCESS) {
 
-			return(err);
+			goto exit_func;
 		}
 
 		node->state = UPD_NODE_UPDATE_SOME_SEC;
@@ -2421,6 +2448,10 @@ exit_func:
 
 	node->index = dict_table_get_next_index(index);
 
+exit_func:
+	if (heap) {
+		mem_heap_free(heap);
+	}
 	return(err);
 }
 

No bundle (reason: useless for push emails).
Thread
bzr push into mysql-trunk-wl6255 branch (marko.makela:3890 to 3892) WL#6255marko.makela29 May