List:Commits« Previous MessageNext Message »
From:marko.makela Date:June 2 2010 10:26am
Subject:bzr commit into mysql-5.1-innodb branch (marko.makela:3494) Bug#53674
View as plain text  
#At file:///home/marko/innobase/dev/mysql2a/5.1-innodb/ based on revid:marko.makela@stripped2xiivtqj9va1

 3494 Marko Mäkelä	2010-06-02
      Bug#53674: InnoDB: Error: unlock row could not find a 4 mode lock on the record
      
      In semi-consistent read, only unlock freshly locked non-matching records.
      
      Define DB_SUCCESS_LOCKED_REC for indicating a successful operation
      where a record lock was created.
      
      lock_rec_lock_fast(): Return LOCK_REC_SUCCESS,
      LOCK_REC_SUCCESS_CREATED, or LOCK_REC_FAIL instead of TRUE/FALSE.
      
      lock_sec_rec_read_check_and_lock(),
      lock_clust_rec_read_check_and_lock(), lock_rec_enqueue_waiting(),
      lock_rec_lock_slow(), lock_rec_lock(), row_ins_set_shared_rec_lock(),
      row_ins_set_exclusive_rec_lock(), sel_set_rec_lock(),
      row_sel_get_clust_rec_for_mysql(): Return DB_SUCCESS_LOCKED_REC if a
      new record lock was created. Adjust callers.
      
      row_unlock_for_mysql(): Correct the function documentation.
      
      row_prebuilt_t::new_rec_locks: Correct the documentation.

    added:
      mysql-test/suite/innodb/r/innodb_bug53674.result
      mysql-test/suite/innodb/t/innodb_bug53674-master.opt
      mysql-test/suite/innodb/t/innodb_bug53674.test
    modified:
      storage/innobase/include/db0err.h
      storage/innobase/include/lock0lock.h
      storage/innobase/include/row0mysql.h
      storage/innobase/lock/lock0lock.c
      storage/innobase/row/row0ins.c
      storage/innobase/row/row0mysql.c
      storage/innobase/row/row0sel.c
=== added file 'mysql-test/suite/innodb/r/innodb_bug53674.result'
--- a/mysql-test/suite/innodb/r/innodb_bug53674.result	1970-01-01 00:00:00 +0000
+++ b/mysql-test/suite/innodb/r/innodb_bug53674.result	revid:marko.makela@stripped02637-2tdov7jgx613vrrs
@@ -0,0 +1,11 @@
+create table bug53674(a int)engine=innodb;
+insert into bug53674 values (1),(2);
+start transaction;
+select * from bug53674 for update;
+a
+1
+2
+select * from bug53674 where a=(select a from bug53674 where a > 1);
+a
+2
+drop table bug53674;

=== added file 'mysql-test/suite/innodb/t/innodb_bug53674-master.opt'
--- a/mysql-test/suite/innodb/t/innodb_bug53674-master.opt	1970-01-01 00:00:00 +0000
+++ b/mysql-test/suite/innodb/t/innodb_bug53674-master.opt	revid:marko.makela@stripped
@@ -0,0 +1 @@
+--log-bin --innodb-locks-unsafe-for-binlog --binlog-format=mixed

=== added file 'mysql-test/suite/innodb/t/innodb_bug53674.test'
--- a/mysql-test/suite/innodb/t/innodb_bug53674.test	1970-01-01 00:00:00 +0000
+++ b/mysql-test/suite/innodb/t/innodb_bug53674.test	revid:marko.makela@stripped
@@ -0,0 +1,8 @@
+-- source include/have_innodb.inc
+
+create table bug53674(a int)engine=innodb;
+insert into bug53674 values (1),(2);
+start transaction;
+select * from bug53674 for update;
+select * from bug53674 where a=(select a from bug53674 where a > 1);
+drop table bug53674;

=== modified file 'storage/innobase/include/db0err.h'
--- a/storage/innobase/include/db0err.h	revid:marko.makela@stripped100602101940-60x32xiivtqj9va1
+++ b/storage/innobase/include/db0err.h	revid:marko.makela@stripped-2tdov7jgx613vrrs
@@ -10,6 +10,8 @@ Created 5/24/1996 Heikki Tuuri
 #define db0err_h
 
 
+#define DB_SUCCESS_LOCKED_REC	9	/* like DB_SUCCESS, but a new
+					explicit record lock was created */
 #define DB_SUCCESS		10
 
 /* The following are error codes */

=== modified file 'storage/innobase/include/lock0lock.h'
--- a/storage/innobase/include/lock0lock.h	revid:marko.makela@stripped940-60x32xiivtqj9va1
+++ b/storage/innobase/include/lock0lock.h	revid:marko.makela@stripped7jgx613vrrs
@@ -292,14 +292,15 @@ lock_sec_rec_modify_check_and_lock(
 	dict_index_t*	index,	/* in: secondary index */
 	que_thr_t*	thr);	/* in: query thread */
 /*************************************************************************
-Like the counterpart for a clustered index below, but now we read a
+Like lock_clust_rec_read_check_and_lock(), but reads a
 secondary index record. */
 
 ulint
 lock_sec_rec_read_check_and_lock(
 /*=============================*/
-				/* out: DB_SUCCESS, DB_LOCK_WAIT,
-				DB_DEADLOCK, or DB_QUE_THR_SUSPENDED */
+				/* out: DB_SUCCESS, DB_SUCCESS_LOCKED_REC,
+				DB_LOCK_WAIT, DB_DEADLOCK,
+				or DB_QUE_THR_SUSPENDED */
 	ulint		flags,	/* in: if BTR_NO_LOCKING_FLAG bit is set,
 				does nothing */
 	rec_t*		rec,	/* in: user record or page supremum record
@@ -324,8 +325,9 @@ lock on the record. */
 ulint
 lock_clust_rec_read_check_and_lock(
 /*===============================*/
-				/* out: DB_SUCCESS, DB_LOCK_WAIT,
-				DB_DEADLOCK, or DB_QUE_THR_SUSPENDED */
+				/* out: DB_SUCCESS, DB_SUCCESS_LOCKED_REC,
+				DB_LOCK_WAIT, DB_DEADLOCK,
+				or DB_QUE_THR_SUSPENDED */
 	ulint		flags,	/* in: if BTR_NO_LOCKING_FLAG bit is set,
 				does nothing */
 	rec_t*		rec,	/* in: user record or page supremum record

=== modified file 'storage/innobase/include/row0mysql.h'
--- a/storage/innobase/include/row0mysql.h	revid:marko.makela@stripped
+++ b/storage/innobase/include/row0mysql.h	revid:marko.makela@stripped
@@ -246,22 +246,20 @@ row_update_for_mysql(
 	row_prebuilt_t*	prebuilt);	/* in: prebuilt struct in MySQL
 					handle */
 /*************************************************************************
-This can only be used when srv_locks_unsafe_for_binlog is TRUE or
-session is using a READ COMMITTED isolation level. Before
-calling this function we must use trx_reset_new_rec_lock_info() and
-trx_register_new_rec_lock() to store the information which new record locks
-really were set. This function removes a newly set lock under prebuilt->pcur,
-and also under prebuilt->clust_pcur. Currently, this is only used and tested
-in the case of an UPDATE or a DELETE statement, where the row lock is of the
-LOCK_X type.
-Thus, this implements a 'mini-rollback' that releases the latest record
-locks we set. */
+This can only be used when srv_locks_unsafe_for_binlog is TRUE or this
+session is using a READ COMMITTED or READ UNCOMMITTED isolation level.
+Before calling this function row_search_for_mysql() must have
+initialized prebuilt->new_rec_locks to store the information which new
+record locks really were set. This function removes a newly set
+clustered index record lock under prebuilt->pcur or
+prebuilt->clust_pcur.  Thus, this implements a 'mini-rollback' that
+releases the latest clustered index record lock we set. */
 
 int
 row_unlock_for_mysql(
 /*=================*/
 					/* out: error code or DB_SUCCESS */
-	row_prebuilt_t*	prebuilt,	/* in: prebuilt struct in MySQL
+	row_prebuilt_t*	prebuilt,	/* in/out: prebuilt struct in MySQL
 					handle */
 	ibool		has_latches_on_recs);/* TRUE if called so that we have
 					the latches on the records under pcur
@@ -660,18 +658,17 @@ struct row_prebuilt_struct {
 	ulint		new_rec_locks;	/* normally 0; if
 					srv_locks_unsafe_for_binlog is
 					TRUE or session is using READ
-					COMMITTED isolation level, in a
-					cursor search, if we set a new
-					record lock on an index, this is
-					incremented; this is used in
-					releasing the locks under the
-					cursors if we are performing an
-					UPDATE and we determine after
-					retrieving the row that it does
-					not need to be locked; thus,
-					these can be used to implement a
-					'mini-rollback' that releases
-					the latest record locks */
+					COMMITTED or READ UNCOMMITTED
+					isolation level, set in
+					row_search_for_mysql() if we set a new
+					record lock on the secondary
+					or clustered index; this is
+					used in row_unlock_for_mysql()
+					when releasing the lock under
+					the cursor if we determine
+					after retrieving the row that
+					it does not need to be locked
+					('mini-rollback') */
 	ulint		mysql_prefix_len;/* byte offset of the end of
 					the last requested column */
 	ulint		mysql_row_len;	/* length in bytes of a row in the

=== modified file 'storage/innobase/lock/lock0lock.c'
--- a/storage/innobase/lock/lock0lock.c	revid:marko.makela@stripped
+++ b/storage/innobase/lock/lock0lock.c	revid:marko.makela@oracle.com-20100602102637-2tdov7jgx613vrrs
@@ -1739,11 +1739,12 @@ ulint
 lock_rec_enqueue_waiting(
 /*=====================*/
 				/* out: DB_LOCK_WAIT, DB_DEADLOCK, or
-				DB_QUE_THR_SUSPENDED, or DB_SUCCESS;
-				DB_SUCCESS means that there was a deadlock,
-				but another transaction was chosen as a
-				victim, and we got the lock immediately:
-				no need to wait then */
+				DB_QUE_THR_SUSPENDED, or DB_SUCCESS_LOCKED_REC;
+				DB_SUCCESS_LOCKED_REC means that there
+				was a deadlock, but another
+				transaction was chosen as a victim,
+				and we got the lock immediately: no
+				need to wait then */
 	ulint		type_mode,/* in: lock mode this transaction is
 				requesting: LOCK_S or LOCK_X, possibly ORed
 				with LOCK_GAP or LOCK_REC_NOT_GAP, ORed
@@ -1804,7 +1805,7 @@ lock_rec_enqueue_waiting(
 
 	if (trx->wait_lock == NULL) {
 
-		return(DB_SUCCESS);
+		return(DB_SUCCESS_LOCKED_REC);
 	}
 
 	trx->que_state = TRX_QUE_LOCK_WAIT;
@@ -1903,6 +1904,16 @@ lock_rec_add_to_queue(
 	return(lock_rec_create(type_mode, rec, index, trx));
 }
 
+/** Record locking request status */
+enum lock_rec_req_status {
+	/** Failed to acquire a lock */
+	LOCK_REC_FAIL,
+	/** Succeeded in acquiring a lock (implicit or already acquired) */
+	LOCK_REC_SUCCESS,
+	/** Explicitly created a new lock */
+	LOCK_REC_SUCCESS_CREATED
+};
+
 /*************************************************************************
 This is a fast routine for locking a record in the most common cases:
 there are no explicit locks on the page, or there is just one lock, owned
@@ -1911,10 +1922,10 @@ which does NOT look at implicit locks! C
 explicit locks. This function sets a normal next-key lock, or in the case of
 a page supremum record, a gap type lock. */
 UNIV_INLINE
-ibool
+enum lock_rec_req_status
 lock_rec_lock_fast(
 /*===============*/
-				/* out: TRUE if locking succeeded */
+				/* out: whether the locking succeeded */
 	ibool		impl,	/* in: if TRUE, no lock is set if no wait
 				is necessary: we assume that the caller will
 				set an implicit lock */
@@ -1950,19 +1961,19 @@ lock_rec_lock_fast(
 			lock_rec_create(mode, rec, index, trx);
 		}
 
-		return(TRUE);
+		return(LOCK_REC_SUCCESS_CREATED);
 	}
 
 	if (lock_rec_get_next_on_page(lock)) {
 
-		return(FALSE);
+		return(LOCK_REC_FAIL);
 	}
 
 	if (lock->trx != trx
 	    || lock->type_mode != (mode | LOCK_REC)
 	    || lock_rec_get_n_bits(lock) <= heap_no) {
 
-		return(FALSE);
+		return(LOCK_REC_FAIL);
 	}
 
 	if (!impl) {
@@ -1971,10 +1982,11 @@ lock_rec_lock_fast(
 
 		if (!lock_rec_get_nth_bit(lock, heap_no)) {
 			lock_rec_set_nth_bit(lock, heap_no);
+			return(LOCK_REC_SUCCESS_CREATED);
 		}
 	}
 
-	return(TRUE);
+	return(LOCK_REC_SUCCESS);
 }
 
 /*************************************************************************
@@ -1986,8 +1998,9 @@ static
 ulint
 lock_rec_lock_slow(
 /*===============*/
-				/* out: DB_SUCCESS, DB_LOCK_WAIT, or error
-				code */
+				/* out: DB_SUCCESS, DB_SUCCESS_LOCKED_REC,
+				DB_LOCK_WAIT, DB_DEADLOCK,
+				or DB_QUE_THR_SUSPENDED */
 	ibool		impl,	/* in: if TRUE, no lock is set if no wait is
 				necessary: we assume that the caller will set
 				an implicit lock */
@@ -1998,7 +2011,6 @@ lock_rec_lock_slow(
 	que_thr_t*	thr)	/* in: query thread */
 {
 	trx_t*	trx;
-	ulint	err;
 
 	ut_ad(mutex_own(&kernel_mutex));
 	ut_ad((LOCK_MODE_MASK & mode) != LOCK_S
@@ -2017,26 +2029,21 @@ lock_rec_lock_slow(
 		/* The trx already has a strong enough lock on rec: do
 		nothing */
 
-		err = DB_SUCCESS;
 	} else if (lock_rec_other_has_conflicting(mode, rec, trx)) {
 
 		/* If another transaction has a non-gap conflicting request in
 		the queue, as this transaction does not have a lock strong
 		enough already granted on the record, we have to wait. */
 
-		err = lock_rec_enqueue_waiting(mode, rec, index, thr);
-	} else {
-		if (!impl) {
-			/* Set the requested lock on the record */
+		return(lock_rec_enqueue_waiting(mode, rec, index, thr));
+	} else if (!impl) {
+		/* Set the requested lock on the record */
 
-			lock_rec_add_to_queue(LOCK_REC | mode, rec, index,
-					      trx);
-		}
-
-		err = DB_SUCCESS;
+		lock_rec_add_to_queue(LOCK_REC | mode, rec, index, trx);
+		return(DB_SUCCESS_LOCKED_REC);
 	}
 
-	return(err);
+	return(DB_SUCCESS);
 }
 
 /*************************************************************************
@@ -2049,8 +2056,9 @@ static
 ulint
 lock_rec_lock(
 /*==========*/
-				/* out: DB_SUCCESS, DB_LOCK_WAIT, or error
-				code */
+				/* out: DB_SUCCESS, DB_SUCCESS_LOCKED_REC,
+				DB_LOCK_WAIT, DB_DEADLOCK, or
+				DB_QUE_THR_SUSPENDED */
 	ibool		impl,	/* in: if TRUE, no lock is set if no wait is
 				necessary: we assume that the caller will set
 				an implicit lock */
@@ -2060,8 +2068,6 @@ lock_rec_lock(
 	dict_index_t*	index,	/* in: index of record */
 	que_thr_t*	thr)	/* in: query thread */
 {
-	ulint	err;
-
 	ut_ad(mutex_own(&kernel_mutex));
 	ut_ad((LOCK_MODE_MASK & mode) != LOCK_S
 	      || lock_table_has(thr_get_trx(thr), index->table, LOCK_IS));
@@ -2073,17 +2079,19 @@ lock_rec_lock(
 	      || mode - (LOCK_MODE_MASK & mode) == LOCK_REC_NOT_GAP
 	      || mode - (LOCK_MODE_MASK & mode) == 0);
 
-	if (lock_rec_lock_fast(impl, mode, rec, index, thr)) {
-
-		/* We try a simplified and faster subroutine for the most
-		common cases */
-
-		err = DB_SUCCESS;
-	} else {
-		err = lock_rec_lock_slow(impl, mode, rec, index, thr);
+	/* We try a simplified and faster subroutine for the most
+	common cases */
+	switch (lock_rec_lock_fast(impl, mode, rec, index, thr)) {
+	case LOCK_REC_SUCCESS:
+		return(DB_SUCCESS);
+	case LOCK_REC_SUCCESS_CREATED:
+		return(DB_SUCCESS_LOCKED_REC);
+	case LOCK_REC_FAIL:
+		return(lock_rec_lock_slow(impl, mode, rec, index, thr));
 	}
 
-	return(err);
+	ut_error;
+	return(DB_ERROR);
 }
 
 /*************************************************************************
@@ -4832,7 +4840,7 @@ lock_rec_insert_check_and_lock(
 
 	lock = lock_rec_get_first(next_rec);
 
-	if (lock == NULL) {
+	if (UNIV_LIKELY(lock == NULL)) {
 		/* We optimize CPU time usage in the simplest case */
 
 		lock_mutex_exit_kernel();
@@ -4840,8 +4848,7 @@ lock_rec_insert_check_and_lock(
 		if (!(index->type & DICT_CLUSTERED)) {
 
 			/* Update the page max trx id field */
-			page_update_max_trx_id(buf_frame_align(rec),
-					       thr_get_trx(thr)->id);
+			page_update_max_trx_id(buf_frame_align(rec), trx->id);
 		}
 
 		return(DB_SUCCESS);
@@ -4873,11 +4880,16 @@ lock_rec_insert_check_and_lock(
 
 	lock_mutex_exit_kernel();
 
-	if (!(index->type & DICT_CLUSTERED) && (err == DB_SUCCESS)) {
-
+	switch (err) {
+	case DB_SUCCESS_LOCKED_REC:
+		err = DB_SUCCESS;
+		/* fall through */
+	case DB_SUCCESS:
+		if (index->type & DICT_CLUSTERED) {
+			break;
+		}
 		/* Update the page max trx id field */
-		page_update_max_trx_id(buf_frame_align(rec),
-				       thr_get_trx(thr)->id);
+		page_update_max_trx_id(buf_frame_align(rec), trx->id);
 	}
 
 #ifdef UNIV_DEBUG
@@ -4984,6 +4996,10 @@ lock_clust_rec_modify_check_and_lock(
 
 	ut_ad(lock_rec_queue_validate(rec, index, offsets));
 
+	if (UNIV_UNLIKELY(err == DB_SUCCESS_LOCKED_REC)) {
+		err = DB_SUCCESS;
+	}
+
 	return(err);
 }
 
@@ -5043,25 +5059,29 @@ lock_sec_rec_modify_check_and_lock(
 	}
 #endif /* UNIV_DEBUG */
 
-	if (err == DB_SUCCESS) {
+	if (err == DB_SUCCESS || err == DB_SUCCESS_LOCKED_REC) {
 		/* Update the page max trx id field */
-
+		/* It might not be necessary to do this if
+		err == DB_SUCCESS (no new lock created),
+		but it should not cost too much performance. */
 		page_update_max_trx_id(buf_frame_align(rec),
 				       thr_get_trx(thr)->id);
+		err = DB_SUCCESS;
 	}
 
 	return(err);
 }
 
 /*************************************************************************
-Like the counterpart for a clustered index below, but now we read a
+Like lock_clust_rec_read_check_and_lock(), but reads a
 secondary index record. */
 
 ulint
 lock_sec_rec_read_check_and_lock(
 /*=============================*/
-				/* out: DB_SUCCESS, DB_LOCK_WAIT,
-				DB_DEADLOCK, or DB_QUE_THR_SUSPENDED */
+				/* out: DB_SUCCESS, DB_SUCCESS_LOCKED_REC,
+				DB_LOCK_WAIT, DB_DEADLOCK,
+				or DB_QUE_THR_SUSPENDED */
 	ulint		flags,	/* in: if BTR_NO_LOCKING_FLAG bit is set,
 				does nothing */
 	rec_t*		rec,	/* in: user record or page supremum record
@@ -5126,8 +5146,9 @@ lock on the record. */
 ulint
 lock_clust_rec_read_check_and_lock(
 /*===============================*/
-				/* out: DB_SUCCESS, DB_LOCK_WAIT,
-				DB_DEADLOCK, or DB_QUE_THR_SUSPENDED */
+				/* out: DB_SUCCESS, DB_SUCCESS_LOCKED_REC,
+				DB_LOCK_WAIT, DB_DEADLOCK,
+				or DB_QUE_THR_SUSPENDED */
 	ulint		flags,	/* in: if BTR_NO_LOCKING_FLAG bit is set,
 				does nothing */
 	rec_t*		rec,	/* in: user record or page supremum record
@@ -5206,16 +5227,21 @@ lock_clust_rec_read_check_and_lock_alt(
 	mem_heap_t*	tmp_heap	= NULL;
 	ulint		offsets_[REC_OFFS_NORMAL_SIZE];
 	ulint*		offsets		= offsets_;
-	ulint		ret;
+	ulint		err;
 	*offsets_ = (sizeof offsets_) / sizeof *offsets_;
 
 	offsets = rec_get_offsets(rec, index, offsets,
 				  ULINT_UNDEFINED, &tmp_heap);
-	ret = lock_clust_rec_read_check_and_lock(flags, rec, index,
+	err = lock_clust_rec_read_check_and_lock(flags, rec, index,
 						 offsets, mode, gap_mode, thr);
 	if (tmp_heap) {
 		mem_heap_free(tmp_heap);
 	}
-	return(ret);
+
+	if (UNIV_UNLIKELY(err == DB_SUCCESS_LOCKED_REC)) {
+		err = DB_SUCCESS;
+	}
+
+	return(err);
 }
 

=== modified file 'storage/innobase/row/row0ins.c'
--- a/storage/innobase/row/row0ins.c	revid:marko.makela@stripped2xiivtqj9va1
+++ b/storage/innobase/row/row0ins.c	revid:marko.makela@stripped
@@ -1114,7 +1114,8 @@ static
 ulint
 row_ins_set_shared_rec_lock(
 /*========================*/
-				/* out: DB_SUCCESS or error code */
+				/* out: DB_SUCCESS, DB_SUCCESS_LOCKED_REC,
+				or error code */
 	ulint		type,	/* in: LOCK_ORDINARY, LOCK_GAP, or
 				LOCK_REC_NOT_GAP type lock */
 	rec_t*		rec,	/* in: record */
@@ -1145,7 +1146,8 @@ static
 ulint
 row_ins_set_exclusive_rec_lock(
 /*===========================*/
-				/* out: DB_SUCCESS or error code */
+				/* out: DB_SUCCESS, DB_SUCCESS_LOCKED_REC,
+				or error code */
 	ulint		type,	/* in: LOCK_ORDINARY, LOCK_GAP, or
 				LOCK_REC_NOT_GAP type lock */
 	rec_t*		rec,	/* in: record */
@@ -1195,9 +1197,7 @@ row_ins_check_foreign_constraint(
 	dict_table_t*	check_table;
 	dict_index_t*	check_index;
 	ulint		n_fields_cmp;
-	rec_t*		rec;
 	btr_pcur_t	pcur;
-	ibool		moved;
 	int		cmp;
 	ulint		err;
 	ulint		i;
@@ -1328,12 +1328,12 @@ run_again:
 
 	/* Scan index records and check if there is a matching record */
 
-	for (;;) {
-		rec = btr_pcur_get_rec(&pcur);
+	do {
+		rec_t*	rec = btr_pcur_get_rec(&pcur);
 
 		if (page_rec_is_infimum(rec)) {
 
-			goto next_rec;
+			continue;
 		}
 
 		offsets = rec_get_offsets(rec, check_index,
@@ -1343,12 +1343,13 @@ run_again:
 
 			err = row_ins_set_shared_rec_lock(
 				LOCK_ORDINARY, rec, check_index, offsets, thr);
-			if (err != DB_SUCCESS) {
-
-				break;
+			switch (err) {
+			case DB_SUCCESS_LOCKED_REC:
+			case DB_SUCCESS:
+				continue;
+			default:
+				goto end_scan;
 			}
-
-			goto next_rec;
 		}
 
 		cmp = cmp_dtuple_rec(entry, rec, offsets);
@@ -1359,9 +1360,12 @@ run_again:
 				err = row_ins_set_shared_rec_lock(
 					LOCK_ORDINARY, rec, check_index,
 					offsets, thr);
-				if (err != DB_SUCCESS) {
-
+				switch (err) {
+				case DB_SUCCESS_LOCKED_REC:
+				case DB_SUCCESS:
 					break;
+				default:
+					goto end_scan;
 				}
 			} else {
 				/* Found a matching record. Lock only
@@ -1372,15 +1376,18 @@ run_again:
 					LOCK_REC_NOT_GAP, rec, check_index,
 					offsets, thr);
 
-				if (err != DB_SUCCESS) {
-
+				switch (err) {
+				case DB_SUCCESS_LOCKED_REC:
+				case DB_SUCCESS:
 					break;
+				default:
+					goto end_scan;
 				}
 
 				if (check_ref) {
 					err = DB_SUCCESS;
 
-					break;
+					goto end_scan;
 				} else if (foreign->type != 0) {
 					/* There is an ON UPDATE or ON DELETE
 					condition: check them in a separate
@@ -1406,7 +1413,7 @@ run_again:
 							err = DB_FOREIGN_DUPLICATE_KEY;
 						}
 
-						break;
+						goto end_scan;
 					}
 				} else {
 					row_ins_foreign_report_err(
@@ -1414,48 +1421,39 @@ run_again:
 						thr, foreign, rec, entry);
 
 					err = DB_ROW_IS_REFERENCED;
-					break;
+					goto end_scan;
 				}
 			}
-		}
+		} else {
+			ut_a(cmp < 0);
 
-		if (cmp < 0) {
 			err = row_ins_set_shared_rec_lock(
 				LOCK_GAP, rec, check_index, offsets, thr);
-			if (err != DB_SUCCESS) {
-
-				break;
-			}
-
-			if (check_ref) {
-				err = DB_NO_REFERENCED_ROW;
-				row_ins_foreign_report_add_err(
-					trx, foreign, rec, entry);
-			} else {
-				err = DB_SUCCESS;
+			switch (err) {
+			case DB_SUCCESS_LOCKED_REC:
+			case DB_SUCCESS:
+				if (check_ref) {
+					err = DB_NO_REFERENCED_ROW;
+					row_ins_foreign_report_add_err(
+						trx, foreign, rec, entry);
+				} else {
+					err = DB_SUCCESS;
+				}
 			}
 
-			break;
+			goto end_scan;
 		}
+	} while (btr_pcur_move_to_next(&pcur, &mtr));
 
-		ut_a(cmp == 0);
-next_rec:
-		moved = btr_pcur_move_to_next(&pcur, &mtr);
-
-		if (!moved) {
-			if (check_ref) {
-				rec = btr_pcur_get_rec(&pcur);
-				row_ins_foreign_report_add_err(
-					trx, foreign, rec, entry);
-				err = DB_NO_REFERENCED_ROW;
-			} else {
-				err = DB_SUCCESS;
-			}
-
-			break;
-		}
+	if (check_ref) {
+		row_ins_foreign_report_add_err(
+			trx, foreign, btr_pcur_get_rec(&pcur), entry);
+		err = DB_NO_REFERENCED_ROW;
+	} else {
+		err = DB_SUCCESS;
 	}
 
+end_scan:
 	btr_pcur_close(&pcur);
 
 	mtr_commit(&mtr);
@@ -1641,10 +1639,8 @@ row_ins_scan_sec_index_for_duplicate(
 	ulint		i;
 	int		cmp;
 	ulint		n_fields_cmp;
-	rec_t*		rec;
 	btr_pcur_t	pcur;
 	ulint		err		= DB_SUCCESS;
-	ibool		moved;
 	unsigned	allow_duplicates;
 	mtr_t		mtr;
 	mem_heap_t*	heap		= NULL;
@@ -1680,12 +1676,12 @@ row_ins_scan_sec_index_for_duplicate(
 
 	/* Scan index records and check if there is a duplicate */
 
-	for (;;) {
-		rec = btr_pcur_get_rec(&pcur);
+	do {
+		rec_t*	rec = btr_pcur_get_rec(&pcur);
 
 		if (page_rec_is_infimum(rec)) {
 
-			goto next_rec;
+			continue;
 		}
 
 		offsets = rec_get_offsets(rec, index, offsets,
@@ -1706,14 +1702,18 @@ row_ins_scan_sec_index_for_duplicate(
 				LOCK_ORDINARY, rec, index, offsets, thr);
 		}
 
-		if (err != DB_SUCCESS) {
-
+		switch (err) {
+		case DB_SUCCESS_LOCKED_REC:
+			err = DB_SUCCESS;
+		case DB_SUCCESS:
 			break;
+		default:
+			goto end_scan;
 		}
 
 		if (page_rec_is_supremum(rec)) {
 
-			goto next_rec;
+			continue;
 		}
 
 		cmp = cmp_dtuple_rec(entry, rec, offsets);
@@ -1725,23 +1725,15 @@ row_ins_scan_sec_index_for_duplicate(
 
 				thr_get_trx(thr)->error_info = index;
 
-				break;
+				goto end_scan;
 			}
+		} else {
+			ut_a(cmp < 0);
+			goto end_scan;
 		}
+	} while (btr_pcur_move_to_next(&pcur, &mtr));
 
-		if (cmp < 0) {
-			break;
-		}
-
-		ut_a(cmp == 0);
-next_rec:
-		moved = btr_pcur_move_to_next(&pcur, &mtr);
-
-		if (!moved) {
-			break;
-		}
-	}
-
+end_scan:
 	if (UNIV_LIKELY_NULL(heap)) {
 		mem_heap_free(heap);
 	}
@@ -1837,7 +1829,11 @@ row_ins_duplicate_error_in_clust(
 					cursor->index, offsets, thr);
 			}
 
-			if (err != DB_SUCCESS) {
+			switch (err) {
+			case DB_SUCCESS_LOCKED_REC:
+			case DB_SUCCESS:
+				break;
+			default:
 				goto func_exit;
 			}
 
@@ -1875,7 +1871,11 @@ row_ins_duplicate_error_in_clust(
 					cursor->index, offsets, thr);
 			}
 
-			if (err != DB_SUCCESS) {
+			switch (err) {
+			case DB_SUCCESS_LOCKED_REC:
+			case DB_SUCCESS:
+				break;
+			default:
 				goto func_exit;
 			}
 

=== modified file 'storage/innobase/row/row0mysql.c'
--- a/storage/innobase/row/row0mysql.c	revid:marko.makela@stripped00602101940-60x32xiivtqj9va1
+++ b/storage/innobase/row/row0mysql.c	revid:marko.makela@strippedtdov7jgx613vrrs
@@ -1455,22 +1455,20 @@ run_again:
 }
 
 /*************************************************************************
-This can only be used when srv_locks_unsafe_for_binlog is TRUE or
-this session is using a READ COMMITTED isolation level. Before
-calling this function we must use trx_reset_new_rec_lock_info() and
-trx_register_new_rec_lock() to store the information which new record locks
-really were set. This function removes a newly set lock under prebuilt->pcur,
-and also under prebuilt->clust_pcur. Currently, this is only used and tested
-in the case of an UPDATE or a DELETE statement, where the row lock is of the
-LOCK_X type.
-Thus, this implements a 'mini-rollback' that releases the latest record
-locks we set. */
+This can only be used when srv_locks_unsafe_for_binlog is TRUE or this
+session is using a READ COMMITTED or READ UNCOMMITTED isolation level.
+Before calling this function row_search_for_mysql() must have
+initialized prebuilt->new_rec_locks to store the information which new
+record locks really were set. This function removes a newly set
+clustered index record lock under prebuilt->pcur or
+prebuilt->clust_pcur.  Thus, this implements a 'mini-rollback' that
+releases the latest clustered index record lock we set. */
 
 int
 row_unlock_for_mysql(
 /*=================*/
 					/* out: error code or DB_SUCCESS */
-	row_prebuilt_t*	prebuilt,	/* in: prebuilt struct in MySQL
+	row_prebuilt_t*	prebuilt,	/* in/out: prebuilt struct in MySQL
 					handle */
 	ibool		has_latches_on_recs)/* TRUE if called so that we have
 					the latches on the records under pcur

=== modified file 'storage/innobase/row/row0sel.c'
--- a/storage/innobase/row/row0sel.c	revid:marko.makela@stripped
+++ b/storage/innobase/row/row0sel.c	revid:marko.makela@oracle.com-20100602102637-2tdov7jgx613vrrs
@@ -754,8 +754,14 @@ row_sel_get_clust_rec(
 			0, clust_rec, index, offsets,
 			node->row_lock_mode, lock_type, thr);
 
-		if (err != DB_SUCCESS) {
-
+		switch (err) {
+		case DB_SUCCESS:
+		case DB_SUCCESS_LOCKED_REC:
+			/* Declare the variable uninitialized in Valgrind.
+			It should be set to DB_SUCCESS at func_exit. */
+			UNIV_MEM_INVALID(&err, sizeof err);
+			break;
+		default:
 			goto err_exit;
 		}
 	} else {
@@ -826,7 +832,8 @@ UNIV_INLINE
 ulint
 sel_set_rec_lock(
 /*=============*/
-				/* out: DB_SUCCESS or error code */
+				/* out: DB_SUCCESS, DB_SUCCESS_LOCKED_REC,
+				or error code */
 	rec_t*		rec,	/* in: record */
 	dict_index_t*	index,	/* in: index */
 	const ulint*	offsets,/* in: rec_get_offsets(rec, index) */
@@ -1374,11 +1381,15 @@ rec_loop:
 					       node->row_lock_mode,
 					       lock_type, thr);
 
-			if (err != DB_SUCCESS) {
+			switch (err) {
+			case DB_SUCCESS_LOCKED_REC:
+				err = DB_SUCCESS;
+			case DB_SUCCESS:
+				break;
+			default:
 				/* Note that in this case we will store in pcur
 				the PREDECESSOR of the record we are waiting
 				the lock for */
-
 				goto lock_wait_or_error;
 			}
 		}
@@ -1429,8 +1440,12 @@ skip_lock:
 		err = sel_set_rec_lock(rec, index, offsets,
 				       node->row_lock_mode, lock_type, thr);
 
-		if (err != DB_SUCCESS) {
-
+		switch (err) {
+		case DB_SUCCESS_LOCKED_REC:
+			err = DB_SUCCESS;
+		case DB_SUCCESS:
+			break;
+		default:
 			goto lock_wait_or_error;
 		}
 	}
@@ -2745,7 +2760,8 @@ static
 ulint
 row_sel_get_clust_rec_for_mysql(
 /*============================*/
-				/* out: DB_SUCCESS or error code */
+				/* out: DB_SUCCESS, DB_SUCCESS_LOCKED_REC,
+				or error code */
 	row_prebuilt_t*	prebuilt,/* in: prebuilt struct in the handle */
 	dict_index_t*	sec_index,/* in: secondary index where rec resides */
 	rec_t*		rec,	/* in: record in a non-clustered index; if
@@ -2826,6 +2842,7 @@ row_sel_get_clust_rec_for_mysql(
 
 		clust_rec = NULL;
 
+		err = DB_SUCCESS;
 		goto func_exit;
 	}
 
@@ -2840,8 +2857,11 @@ row_sel_get_clust_rec_for_mysql(
 		err = lock_clust_rec_read_check_and_lock(
 			0, clust_rec, clust_index, *offsets,
 			prebuilt->select_lock_type, LOCK_REC_NOT_GAP, thr);
-		if (err != DB_SUCCESS) {
-
+		switch (err) {
+		case DB_SUCCESS:
+		case DB_SUCCESS_LOCKED_REC:
+			break;
+		default:
 			goto err_exit;
 		}
 	} else {
@@ -2900,6 +2920,8 @@ row_sel_get_clust_rec_for_mysql(
 				     rec, sec_index, clust_rec, clust_index));
 #endif
 		}
+
+		err = DB_SUCCESS;
 	}
 
 func_exit:
@@ -2912,7 +2934,6 @@ func_exit:
 		btr_pcur_store_position(prebuilt->clust_pcur, mtr);
 	}
 
-	err = DB_SUCCESS;
 err_exit:
 	return(err);
 }
@@ -3626,8 +3647,12 @@ shortcut_fails_too_big_rec:
 					       prebuilt->select_lock_type,
 					       LOCK_GAP, thr);
 
-			if (err != DB_SUCCESS) {
-
+			switch (err) {
+			case DB_SUCCESS_LOCKED_REC:
+				err = DB_SUCCESS;
+			case DB_SUCCESS:
+				break;
+			default:
 				goto lock_wait_or_error;
 			}
 		}
@@ -3724,8 +3749,12 @@ rec_loop:
 					       prebuilt->select_lock_type,
 					       LOCK_ORDINARY, thr);
 
-			if (err != DB_SUCCESS) {
-
+			switch (err) {
+			case DB_SUCCESS_LOCKED_REC:
+				err = DB_SUCCESS;
+			case DB_SUCCESS:
+				break;
+			default:
 				goto lock_wait_or_error;
 			}
 		}
@@ -3856,8 +3885,11 @@ wrong_offs:
 					prebuilt->select_lock_type, LOCK_GAP,
 					thr);
 
-				if (err != DB_SUCCESS) {
-
+				switch (err) {
+				case DB_SUCCESS_LOCKED_REC:
+				case DB_SUCCESS:
+					break;
+				default:
 					goto lock_wait_or_error;
 				}
 			}
@@ -3891,8 +3923,11 @@ wrong_offs:
 					prebuilt->select_lock_type, LOCK_GAP,
 					thr);
 
-				if (err != DB_SUCCESS) {
-
+				switch (err) {
+				case DB_SUCCESS_LOCKED_REC:
+				case DB_SUCCESS:
+					break;
+				default:
 					goto lock_wait_or_error;
 				}
 			}
@@ -3961,15 +3996,21 @@ no_gap_lock:
 
 		switch (err) {
 			rec_t*	old_vers;
-		case DB_SUCCESS:
+		case DB_SUCCESS_LOCKED_REC:
 			if (srv_locks_unsafe_for_binlog
-			    || trx->isolation_level <= TRX_ISO_READ_COMMITTED) {
+			    || trx->isolation_level
+			    <= TRX_ISO_READ_COMMITTED) {
 				/* Note that a record of
 				prebuilt->index was locked. */
 				prebuilt->new_rec_locks = 1;
 			}
+			err = DB_SUCCESS;
+		case DB_SUCCESS:
 			break;
 		case DB_LOCK_WAIT:
+			/* Never unlock rows that were part of a conflict. */
+			prebuilt->new_rec_locks = 0;
+
 			if (UNIV_LIKELY(prebuilt->row_read_type
 					!= ROW_READ_TRY_SEMI_CONSISTENT)
 			    || unique_search
@@ -3999,7 +4040,6 @@ no_gap_lock:
 			if (UNIV_LIKELY(trx->wait_lock != NULL)) {
 				lock_cancel_waiting_and_release(
 					trx->wait_lock);
-				prebuilt->new_rec_locks = 0;
 			} else {
 				mutex_exit(&kernel_mutex);
 
@@ -4011,9 +4051,6 @@ no_gap_lock:
 							  ULINT_UNDEFINED,
 							  &heap);
 				err = DB_SUCCESS;
-				/* Note that a record of
-				prebuilt->index was locked. */
-				prebuilt->new_rec_locks = 1;
 				break;
 			}
 			mutex_exit(&kernel_mutex);
@@ -4151,27 +4188,30 @@ requires_clust_rec:
 		err = row_sel_get_clust_rec_for_mysql(prebuilt, index, rec,
 						      thr, &clust_rec,
 						      &offsets, &heap, &mtr);
-		if (err != DB_SUCCESS) {
+		switch (err) {
+		case DB_SUCCESS:
+			if (clust_rec == NULL) {
+				/* The record did not exist in the read view */
+				ut_ad(prebuilt->select_lock_type == LOCK_NONE);
 
+				goto next_rec;
+			}
+			break;
+		case DB_SUCCESS_LOCKED_REC:
+			ut_a(clust_rec != NULL);
+			if (srv_locks_unsafe_for_binlog
+			     || trx->isolation_level
+			    <= TRX_ISO_READ_COMMITTED) {
+				/* Note that the clustered index record
+				was locked. */
+				prebuilt->new_rec_locks = 2;
+			}
+			err = DB_SUCCESS;
+			break;
+		default:
 			goto lock_wait_or_error;
 		}
 
-		if (clust_rec == NULL) {
-			/* The record did not exist in the read view */
-			ut_ad(prebuilt->select_lock_type == LOCK_NONE);
-
-			goto next_rec;
-		}
-
-		if ((srv_locks_unsafe_for_binlog
-		     || trx->isolation_level <= TRX_ISO_READ_COMMITTED)
-		    && prebuilt->select_lock_type != LOCK_NONE) {
-			/* Note that both the secondary index record
-			and the clustered index record were locked. */
-			ut_ad(prebuilt->new_rec_locks == 1);
-			prebuilt->new_rec_locks = 2;
-		}
-
 		if (UNIV_UNLIKELY(rec_get_deleted_flag(clust_rec, comp))) {
 
 			/* The record is delete marked: we can skip it */

Attachment: [text/bzr-bundle] bzr/marko.makela@oracle.com-20100602102637-2tdov7jgx613vrrs.bundle
Thread
bzr commit into mysql-5.1-innodb branch (marko.makela:3494) Bug#53674marko.makela2 Jun