List:Commits« Previous MessageNext Message »
From:Inaam Rana Date:August 9 2011 8:44pm
Subject:bzr commit into mysql-trunk branch (inaam.rana:3232)
View as plain text  
#At file:///home/inaam/w/mysql-innodb-pb2/ based on revid:vasil.dimov@stripped

 3232 Inaam Rana	2011-08-09
      Revert changes that are forked from mysql-trunk and are causing merge conflicts

    added:
      mysql-test/suite/sys_vars/r/innodb_undo_directory_basic.result
      mysql-test/suite/sys_vars/r/innodb_undo_logs_basic.result
      mysql-test/suite/sys_vars/r/innodb_undo_tablespaces_basic.result
      mysql-test/suite/sys_vars/t/innodb_undo_directory_basic.test
      mysql-test/suite/sys_vars/t/innodb_undo_logs_basic.test
      mysql-test/suite/sys_vars/t/innodb_undo_tablespaces_basic.test
    modified:
      mysql-test/suite/sys_vars/r/all_vars.result
      storage/innobase/dict/dict0crea.c
      storage/innobase/fil/fil0fil.c
      storage/innobase/handler/ha_innodb.cc
      storage/innobase/include/os0file.h
      storage/innobase/include/os0file.ic
      storage/innobase/include/srv0srv.h
      storage/innobase/include/trx0rseg.h
      storage/innobase/include/trx0sys.h
      storage/innobase/include/trx0sys.ic
      storage/innobase/include/ut0ut.h
      storage/innobase/lock/lock0lock.c
      storage/innobase/log/log0recv.c
      storage/innobase/os/os0file.c
      storage/innobase/row/row0purge.c
      storage/innobase/srv/srv0mon.c
      storage/innobase/srv/srv0srv.c
      storage/innobase/srv/srv0start.c
      storage/innobase/trx/trx0purge.c
      storage/innobase/trx/trx0rseg.c
      storage/innobase/trx/trx0sys.c
      storage/innobase/trx/trx0trx.c
      storage/innobase/trx/trx0undo.c
      storage/innobase/ut/ut0ut.c
=== modified file 'mysql-test/suite/sys_vars/r/all_vars.result'
--- a/mysql-test/suite/sys_vars/r/all_vars.result	revid:vasil.dimov@stripped
+++ b/mysql-test/suite/sys_vars/r/all_vars.result	revid:inaam.rana@stripped
@@ -13,7 +13,6 @@ select variable_name as `There should be
 left join t1 on variable_name=test_name where test_name is null;
 There should be *no* variables listed below:
 INNODB_STATS_TRANSIENT_SAMPLE_PAGES
-INNODB_ROLLBACK_SEGMENTS
 INNODB_STATS_PERSISTENT_SAMPLE_PAGES
 RELAY_LOG_BASENAME
 LOG_BIN_BASENAME
@@ -29,7 +28,6 @@ INNODB_FILE_FORMAT_MAX
 INNODB_MONITOR_ENABLE
 INNODB_LARGE_PREFIX
 INNODB_STATS_TRANSIENT_SAMPLE_PAGES
-INNODB_ROLLBACK_SEGMENTS
 INNODB_STATS_PERSISTENT_SAMPLE_PAGES
 RELAY_LOG_BASENAME
 LOG_BIN_BASENAME

=== added file 'mysql-test/suite/sys_vars/r/innodb_undo_directory_basic.result'
--- a/mysql-test/suite/sys_vars/r/innodb_undo_directory_basic.result	1970-01-01 00:00:00 +0000
+++ b/mysql-test/suite/sys_vars/r/innodb_undo_directory_basic.result	revid:inaam.rana@stripped
@@ -0,0 +1,48 @@
+SELECT @@GLOBAL.innodb_undo_directory;
+@@GLOBAL.innodb_undo_directory
+.
+. Expected
+SET @@GLOBAL.innodb_undo_directory="/tmp";
+ERROR HY000: Variable 'innodb_undo_directory' is a read only variable
+Expected error 'Read only variable'
+SELECT COUNT(@@GLOBAL.innodb_undo_directory);
+COUNT(@@GLOBAL.innodb_undo_directory)
+1
+1 Expected
+SELECT VARIABLE_VALUE
+FROM INFORMATION_SCHEMA.GLOBAL_VARIABLES
+WHERE VARIABLE_NAME='innodb_undo_directory';
+VARIABLE_VALUE
+.
+. Expected
+SELECT COUNT(@@GLOBAL.innodb_undo_directory);
+COUNT(@@GLOBAL.innodb_undo_directory)
+1
+1 Expected
+SELECT COUNT(VARIABLE_VALUE)
+FROM INFORMATION_SCHEMA.GLOBAL_VARIABLES 
+WHERE VARIABLE_NAME='innodb_undo_directory';
+COUNT(VARIABLE_VALUE)
+1
+1 Expected
+SELECT @@innodb_undo_directory = @@GLOBAL.innodb_undo_directory;
+@@innodb_undo_directory = @@GLOBAL.innodb_undo_directory
+1
+1 Expected
+SELECT COUNT(@@innodb_undo_directory);
+COUNT(@@innodb_undo_directory)
+1
+1 Expected
+SELECT COUNT(@@local.innodb_undo_directory);
+ERROR HY000: Variable 'innodb_undo_directory' is a GLOBAL variable
+Expected error 'Variable is a GLOBAL variable'
+SELECT COUNT(@@SESSION.innodb_undo_directory);
+ERROR HY000: Variable 'innodb_undo_directory' is a GLOBAL variable
+Expected error 'Variable is a GLOBAL variable'
+SELECT COUNT(@@GLOBAL.innodb_undo_directory);
+COUNT(@@GLOBAL.innodb_undo_directory)
+1
+1 Expected
+SELECT innodb_undo_directory = @@SESSION.innodb_undo_directory;
+ERROR 42S22: Unknown column 'innodb_undo_directory' in 'field list'
+Expected error 'Readonly variable'

=== added file 'mysql-test/suite/sys_vars/r/innodb_undo_logs_basic.result'
--- a/mysql-test/suite/sys_vars/r/innodb_undo_logs_basic.result	1970-01-01 00:00:00 +0000
+++ b/mysql-test/suite/sys_vars/r/innodb_undo_logs_basic.result	revid:inaam.rana@stripped
@@ -0,0 +1,31 @@
+SELECT @@GLOBAL.innodb_undo_logs;
+@@GLOBAL.innodb_undo_logs
+128
+128 Expected
+SET @@GLOBAL.innodb_undo_logs=128;
+SELECT COUNT(@@GLOBAL.innodb_undo_logs);
+COUNT(@@GLOBAL.innodb_undo_logs)
+1
+1 Expected
+SELECT VARIABLE_VALUE
+FROM INFORMATION_SCHEMA.GLOBAL_VARIABLES
+WHERE VARIABLE_NAME='innodb_undo_logs';
+VARIABLE_VALUE
+128
+128 Expected
+SELECT @@innodb_undo_logs = @@GLOBAL.innodb_undo_logs;
+@@innodb_undo_logs = @@GLOBAL.innodb_undo_logs
+1
+1 Expected
+SELECT COUNT(@@innodb_undo_logs);
+COUNT(@@innodb_undo_logs)
+1
+1 Expected
+SELECT COUNT(@@local.innodb_undo_logs);
+ERROR HY000: Variable 'innodb_undo_logs' is a GLOBAL variable
+Expected error 'Variable is a GLOBAL variable'
+SELECT COUNT(@@SESSION.innodb_undo_logs);
+ERROR HY000: Variable 'innodb_undo_logs' is a GLOBAL variable
+Expected error 'Variable is a GLOBAL variable'
+SELECT innodb_undo_logs = @@SESSION.innodb_undo_logs;
+ERROR 42S22: Unknown column 'innodb_undo_logs' in 'field list'

=== added file 'mysql-test/suite/sys_vars/r/innodb_undo_tablespaces_basic.result'
--- a/mysql-test/suite/sys_vars/r/innodb_undo_tablespaces_basic.result	1970-01-01 00:00:00 +0000
+++ b/mysql-test/suite/sys_vars/r/innodb_undo_tablespaces_basic.result	revid:inaam.rana@stripped
@@ -0,0 +1,38 @@
+SELECT @@GLOBAL.innodb_undo_tablespaces;
+@@GLOBAL.innodb_undo_tablespaces
+0
+0 Expected
+SET @@GLOBAL.innodb_undo_tablespaces=128;
+ERROR HY000: Variable 'innodb_undo_tablespaces' is a read only variable
+Expected error 'Read only variable'
+SELECT COUNT(@@GLOBAL.innodb_undo_tablespaces);
+COUNT(@@GLOBAL.innodb_undo_tablespaces)
+1
+1 Expected
+SELECT VARIABLE_VALUE
+FROM INFORMATION_SCHEMA.GLOBAL_VARIABLES
+WHERE VARIABLE_NAME='innodb_undo_tablespaces';
+VARIABLE_VALUE
+0
+0 Expected
+SELECT @@innodb_undo_tablespaces = @@GLOBAL.innodb_undo_tablespaces;
+@@innodb_undo_tablespaces = @@GLOBAL.innodb_undo_tablespaces
+1
+1 Expected
+SELECT COUNT(@@innodb_undo_tablespaces);
+COUNT(@@innodb_undo_tablespaces)
+1
+1 Expected
+SELECT COUNT(@@local.innodb_undo_tablespaces);
+ERROR HY000: Variable 'innodb_undo_tablespaces' is a GLOBAL variable
+Expected error 'Variable is a GLOBAL variable'
+SELECT COUNT(@@SESSION.innodb_undo_tablespaces);
+ERROR HY000: Variable 'innodb_undo_tablespaces' is a GLOBAL variable
+Expected error 'Variable is a GLOBAL variable'
+SELECT COUNT(@@GLOBAL.innodb_undo_tablespaces);
+COUNT(@@GLOBAL.innodb_undo_tablespaces)
+1
+1 Expected
+SELECT innodb_undo_tablespaces = @@SESSION.innodb_undo_tablespaces;
+ERROR 42S22: Unknown column 'innodb_undo_tablespaces' in 'field list'
+Expected error 'Readonly variable'

=== added file 'mysql-test/suite/sys_vars/t/innodb_undo_directory_basic.test'
--- a/mysql-test/suite/sys_vars/t/innodb_undo_directory_basic.test	1970-01-01 00:00:00 +0000
+++ b/mysql-test/suite/sys_vars/t/innodb_undo_directory_basic.test	revid:inaam.rana@stripped
@@ -0,0 +1,85 @@
+################## mysql-test/t/innodb_undo_directory_basic.test ##############
+#                                                                             #
+# Variable Name: innodb_undo_directory                                        #
+# Scope: Global                                                               #
+# Access Type: Static                                                         #
+# Data Type: string                                                           #
+#                                                                             #
+#                                                                             #
+# Creation Date: 2011-07-05                                                   #
+# Author : Sunny Bains                                                        #
+#                                                                             #
+#                                                                             #
+# Description: Read-only config global variable innodb_undo_directory         #
+#              * Value check                                                  #
+#              * Scope check                                                  #
+#                                                                             #
+###############################################################################
+
+--source include/have_innodb.inc
+
+####################################################################
+#   Display the default value                                      #
+####################################################################
+SELECT @@GLOBAL.innodb_undo_directory;
+--echo . Expected
+
+
+####################################################################
+#   Check if Value can set                                         #
+####################################################################
+
+--error ER_INCORRECT_GLOBAL_LOCAL_VAR
+SET @@GLOBAL.innodb_undo_directory="/tmp";
+--echo Expected error 'Read only variable'
+
+SELECT COUNT(@@GLOBAL.innodb_undo_directory);
+--echo 1 Expected
+
+
+################################################################################
+# Check if the value in GLOBAL table matches value in variable                 #
+################################################################################
+
+SELECT VARIABLE_VALUE
+FROM INFORMATION_SCHEMA.GLOBAL_VARIABLES
+WHERE VARIABLE_NAME='innodb_undo_directory';
+--echo . Expected
+
+SELECT COUNT(@@GLOBAL.innodb_undo_directory);
+--echo 1 Expected
+
+SELECT COUNT(VARIABLE_VALUE)
+FROM INFORMATION_SCHEMA.GLOBAL_VARIABLES 
+WHERE VARIABLE_NAME='innodb_undo_directory';
+--echo 1 Expected
+
+
+################################################################################
+#  Check if accessing variable with and without GLOBAL point to same variable  #
+################################################################################
+SELECT @@innodb_undo_directory = @@GLOBAL.innodb_undo_directory;
+--echo 1 Expected
+
+
+################################################################################
+#   Check if innodb_undo_directory can be accessed with and without @@ sign    #
+################################################################################
+
+SELECT COUNT(@@innodb_undo_directory);
+--echo 1 Expected
+
+--Error ER_INCORRECT_GLOBAL_LOCAL_VAR
+SELECT COUNT(@@local.innodb_undo_directory);
+--echo Expected error 'Variable is a GLOBAL variable'
+
+--Error ER_INCORRECT_GLOBAL_LOCAL_VAR
+SELECT COUNT(@@SESSION.innodb_undo_directory);
+--echo Expected error 'Variable is a GLOBAL variable'
+
+SELECT COUNT(@@GLOBAL.innodb_undo_directory);
+--echo 1 Expected
+
+--Error ER_BAD_FIELD_ERROR
+SELECT innodb_undo_directory = @@SESSION.innodb_undo_directory;
+--echo Expected error 'Readonly variable'

=== added file 'mysql-test/suite/sys_vars/t/innodb_undo_logs_basic.test'
--- a/mysql-test/suite/sys_vars/t/innodb_undo_logs_basic.test	1970-01-01 00:00:00 +0000
+++ b/mysql-test/suite/sys_vars/t/innodb_undo_logs_basic.test	revid:inaam.rana@stripped
@@ -0,0 +1,71 @@
+################## mysql-test/t/innodb_undo_logs_basic.test ############
+#                                                                             #
+# Variable Name: innodb_undo_logs                                             #
+# Scope: Global                                                               #
+# Access Type: Static                                                         #
+# Data Type: numeric                                                          #
+#                                                                             #
+#                                                                             #
+# Creation Date: 2011-07-05                                                   #
+# Author : Sunny Bains                                                        #
+#                                                                             #
+#                                                                             #
+# Description: Read-only config global variable innodb_undo_logs              #
+#              * Value check                                                  #
+#              * Scope check                                                  #
+#                                                                             #
+###############################################################################
+
+--source include/have_innodb.inc
+
+####################################################################
+#   Display default value                                          #
+####################################################################
+SELECT @@GLOBAL.innodb_undo_logs;
+--echo 128 Expected
+
+
+####################################################################
+#   Check if value can be set                                      #
+####################################################################
+
+SET @@GLOBAL.innodb_undo_logs=128;
+
+SELECT COUNT(@@GLOBAL.innodb_undo_logs);
+--echo 1 Expected
+
+
+################################################################################
+# Check if the value in GLOBAL table matches value in variable                 #
+################################################################################
+
+SELECT VARIABLE_VALUE
+FROM INFORMATION_SCHEMA.GLOBAL_VARIABLES
+WHERE VARIABLE_NAME='innodb_undo_logs';
+--echo 128 Expected
+
+
+################################################################################
+#  Check if accessing variable with and without GLOBAL point to same variable  #
+################################################################################
+SELECT @@innodb_undo_logs = @@GLOBAL.innodb_undo_logs;
+--echo 1 Expected
+
+
+################################################################################
+#   Check if innodb_undo_logs can be accessed with and without @@ sign         #
+################################################################################
+
+SELECT COUNT(@@innodb_undo_logs);
+--echo 1 Expected
+
+--Error ER_INCORRECT_GLOBAL_LOCAL_VAR
+SELECT COUNT(@@local.innodb_undo_logs);
+--echo Expected error 'Variable is a GLOBAL variable'
+
+--Error ER_INCORRECT_GLOBAL_LOCAL_VAR
+SELECT COUNT(@@SESSION.innodb_undo_logs);
+--echo Expected error 'Variable is a GLOBAL variable'
+
+--Error ER_BAD_FIELD_ERROR
+SELECT innodb_undo_logs = @@SESSION.innodb_undo_logs;

=== added file 'mysql-test/suite/sys_vars/t/innodb_undo_tablespaces_basic.test'
--- a/mysql-test/suite/sys_vars/t/innodb_undo_tablespaces_basic.test	1970-01-01 00:00:00 +0000
+++ b/mysql-test/suite/sys_vars/t/innodb_undo_tablespaces_basic.test	revid:inaam.rana@stripped
@@ -0,0 +1,77 @@
+################## mysql-test/t/innodb_undo_tablespaces_basic.test ############
+#                                                                             #
+# Variable Name: innodb_undo_tablespaces                                      #
+# Scope: Global                                                               #
+# Access Type: Static                                                         #
+# Data Type: string                                                           #
+#                                                                             #
+#                                                                             #
+# Creation Date: 2011-07-05                                                   #
+# Author : Sunny Bains                                                        #
+#                                                                             #
+#                                                                             #
+# Description: Read-only config global variable innodb_undo_tablespaces       #
+#              * Value check                                                  #
+#              * Scope check                                                  #
+#                                                                             #
+###############################################################################
+
+--source include/have_innodb.inc
+
+####################################################################
+#   Display default value                                          #
+####################################################################
+SELECT @@GLOBAL.innodb_undo_tablespaces;
+--echo 0 Expected
+
+
+####################################################################
+#   Check if value can be set                                      #
+####################################################################
+
+--error ER_INCORRECT_GLOBAL_LOCAL_VAR
+SET @@GLOBAL.innodb_undo_tablespaces=128;
+--echo Expected error 'Read only variable'
+
+SELECT COUNT(@@GLOBAL.innodb_undo_tablespaces);
+--echo 1 Expected
+
+
+################################################################################
+# Check if the value in GLOBAL table matches value in variable                 #
+################################################################################
+
+SELECT VARIABLE_VALUE
+FROM INFORMATION_SCHEMA.GLOBAL_VARIABLES
+WHERE VARIABLE_NAME='innodb_undo_tablespaces';
+--echo 0 Expected
+
+
+################################################################################
+#  Check if accessing variable with and without GLOBAL point to same variable  #
+################################################################################
+SELECT @@innodb_undo_tablespaces = @@GLOBAL.innodb_undo_tablespaces;
+--echo 1 Expected
+
+
+################################################################################
+#   Check if innodb_undo_tablespaces can be accessed with and without @@ sign  #
+################################################################################
+
+SELECT COUNT(@@innodb_undo_tablespaces);
+--echo 1 Expected
+
+--Error ER_INCORRECT_GLOBAL_LOCAL_VAR
+SELECT COUNT(@@local.innodb_undo_tablespaces);
+--echo Expected error 'Variable is a GLOBAL variable'
+
+--Error ER_INCORRECT_GLOBAL_LOCAL_VAR
+SELECT COUNT(@@SESSION.innodb_undo_tablespaces);
+--echo Expected error 'Variable is a GLOBAL variable'
+
+SELECT COUNT(@@GLOBAL.innodb_undo_tablespaces);
+--echo 1 Expected
+
+--Error ER_BAD_FIELD_ERROR
+SELECT innodb_undo_tablespaces = @@SESSION.innodb_undo_tablespaces;
+--echo Expected error 'Readonly variable'

=== modified file 'storage/innobase/dict/dict0crea.c'
--- a/storage/innobase/dict/dict0crea.c	revid:vasil.dimov@stripped
+++ b/storage/innobase/dict/dict0crea.c	revid:inaam.rana@stripped
@@ -1253,6 +1253,7 @@ dict_create_or_check_foreign_constraint_
 	trx_t*		trx;
 	ulint		error;
 	ibool		success;
+	ibool		srv_file_per_table_backup;
 
 	ut_a(srv_get_active_thread_type() == SRV_NONE);
 
@@ -1298,6 +1299,13 @@ dict_create_or_check_foreign_constraint_
 	VARBINARY, like in other InnoDB system tables, to get a clean
 	design. */
 
+	srv_file_per_table_backup = (ibool) srv_file_per_table;
+
+	/* We always want SYSTEM tables to be created inside the system
+	tablespace. */
+
+	srv_file_per_table = 0;
+
 	error = que_eval_sql(NULL,
 			     "PROCEDURE CREATE_FOREIGN_SYS_TABLES_PROC () IS\n"
 			     "BEGIN\n"
@@ -1355,6 +1363,8 @@ dict_create_or_check_foreign_constraint_
 	success = dict_check_sys_foreign_tables_exist();
 	ut_a(success);
 
+	srv_file_per_table = (my_bool) srv_file_per_table_backup;
+
 	return(error);
 }
 

=== modified file 'storage/innobase/fil/fil0fil.c'
--- a/storage/innobase/fil/fil0fil.c	revid:vasil.dimov@stripped
+++ b/storage/innobase/fil/fil0fil.c	revid:inaam.rana@stripped
@@ -40,6 +40,7 @@ Created 10/25/1995 Heikki Tuuri
 #include "dict0dict.h"
 #include "page0page.h"
 #include "page0zip.h"
+#include "trx0sys.h"
 #ifndef UNIV_HOTBACKUP
 # include "buf0lru.h"
 # include "ibuf0ibuf.h"
@@ -305,6 +306,9 @@ struct fil_system_struct {
 initialized. */
 static fil_system_t*	fil_system	= NULL;
 
+/** Determine if (i) is a user tablespace id or not. */
+# define fil_is_user_tablespace_id(i) ((i) > srv_undo_tablespaces)
+
 #ifdef UNIV_DEBUG
 /** Try fil_validate() every this many times */
 # define FIL_VALIDATE_SKIP	17
@@ -335,6 +339,19 @@ fil_validate_skip(void)
 #endif /* UNIV_DEBUG */
 
 /********************************************************************//**
+Determines if a file node belongs to the least-recently-used list.
+@return TRUE if the file belongs to fil_system->LRU mutex. */
+UNIV_INLINE
+ibool
+fil_space_belongs_in_lru(
+/*=====================*/
+	const fil_space_t*	space)	/*!< in: file space */
+{
+	return(space->purpose == FIL_TABLESPACE
+	       && fil_is_user_tablespace_id(space->id));
+}
+
+/********************************************************************//**
 NOTE: you must call fil_mutex_enter_and_prepare_for_io() first!
 
 Prepares a file node for i/o. Opens the file if it is closed. Updates the
@@ -658,7 +675,7 @@ fil_node_create(
 }
 
 /********************************************************************//**
-Opens a the file of a node of a tablespace. The caller must own the fil_system
+Opens a file of a node of a tablespace. The caller must own the fil_system
 mutex. */
 static
 void
@@ -716,7 +733,7 @@ fil_node_open_file(
 		}
 #endif /* UNIV_HOTBACKUP */
 		ut_a(space->purpose != FIL_LOG);
-		ut_a(space->id != 0);
+		ut_a(fil_is_user_tablespace_id(space->id));
 
 		if (size_bytes < FIL_IBD_FILE_INITIAL_SIZE * UNIV_PAGE_SIZE) {
 			fprintf(stderr,
@@ -829,7 +846,8 @@ add_size:
 	system->n_open++;
 	fil_n_file_opened++;
 
-	if (space->purpose == FIL_TABLESPACE && space->id != 0) {
+	if (fil_space_belongs_in_lru(space)) {
+
 		/* Put the node to the LRU list */
 		UT_LIST_ADD_FIRST(LRU, system->LRU, node);
 	}
@@ -852,8 +870,10 @@ fil_node_close_file(
 	ut_a(node->n_pending == 0);
 	ut_a(node->n_pending_flushes == 0);
 	ut_a(!node->being_extended);
+#ifndef UNIV_HOTBACKUP
 	ut_a(node->modification_counter == node->flush_counter
 	     || srv_fast_shutdown == 2);
+#endif /* !UNIV_HOTBACKUP */
 
 	ret = os_file_close(node->handle);
 	ut_a(ret);
@@ -865,7 +885,8 @@ fil_node_close_file(
 	system->n_open--;
 	fil_n_file_opened--;
 
-	if (node->space->purpose == FIL_TABLESPACE && node->space->id != 0) {
+	if (fil_space_belongs_in_lru(node->space)) {
+
 		ut_a(UT_LIST_GET_LEN(system->LRU) > 0);
 
 		/* The node is in the LRU list, remove it */
@@ -1242,7 +1263,7 @@ try_again:
 		return(FALSE);
 	}
 
-	space = mem_alloc(sizeof(fil_space_t));
+	space = mem_zalloc(sizeof(*space));
 
 	space->name = mem_strdup(name);
 	space->id = id;
@@ -1267,19 +1288,9 @@ try_again:
 		fil_system->max_assigned_id = id;
 	}
 
-	space->stop_ios = FALSE;
-	space->stop_ibuf_merges = FALSE;
-	space->is_being_deleted = FALSE;
 	space->purpose = purpose;
-	space->size = 0;
 	space->flags = flags;
 
-	space->n_reserved_extents = 0;
-
-	space->n_pending_flushes = 0;
-	space->n_pending_ibuf_merges = 0;
-
-	UT_LIST_INIT(space->chain);
 	space->magic_n = FIL_SPACE_MAGIC_N;
 
 	rw_lock_create(fil_space_latch_key, &space->latch, SYNC_FSP);
@@ -1612,47 +1623,51 @@ fil_open_log_and_system_tablespace_files
 /*==========================================*/
 {
 	fil_space_t*	space;
-	fil_node_t*	node;
 
 	mutex_enter(&fil_system->mutex);
 
-	space = UT_LIST_GET_FIRST(fil_system->space_list);
+	for (space = UT_LIST_GET_FIRST(fil_system->space_list);
+	     space != NULL;
+	     space = UT_LIST_GET_NEXT(space_list, space)) {
 
-	while (space != NULL) {
-		if (space->purpose != FIL_TABLESPACE || space->id == 0) {
-			node = UT_LIST_GET_FIRST(space->chain);
+		fil_node_t*	node;
 
-			while (node != NULL) {
-				if (!node->open) {
-					fil_node_open_file(node, fil_system,
-							   space);
-				}
-				if (fil_system->max_n_open
-				    < 10 + fil_system->n_open) {
-					fprintf(stderr,
-						"InnoDB: Warning: you must"
-						" raise the value of"
-						" innodb_open_files in\n"
-						"InnoDB: my.cnf! Remember that"
-						" InnoDB keeps all log files"
-						" and all system\n"
-						"InnoDB: tablespace files open"
-						" for the whole time mysqld is"
-						" running, and\n"
-						"InnoDB: needs to open also"
-						" some .ibd files if the"
-						" file-per-table storage\n"
-						"InnoDB: model is used."
-						" Current open files %lu,"
-						" max allowed"
-						" open files %lu.\n",
-						(ulong) fil_system->n_open,
-						(ulong) fil_system->max_n_open);
-				}
-				node = UT_LIST_GET_NEXT(chain, node);
+		if (fil_space_belongs_in_lru(space)) {
+
+			continue;
+		}
+
+		for (node = UT_LIST_GET_FIRST(space->chain);
+		     node != NULL;
+		     node = UT_LIST_GET_NEXT(chain, node)) {
+
+			if (!node->open) {
+				fil_node_open_file(node, fil_system, space);
+			}
+
+			if (fil_system->max_n_open < 10 + fil_system->n_open) {
+
+				fprintf(stderr,
+					"InnoDB: Warning: you must"
+					" raise the value of"
+					" innodb_open_files in\n"
+					"InnoDB: my.cnf! Remember that"
+					" InnoDB keeps all log files"
+					" and all system\n"
+					"InnoDB: tablespace files open"
+					" for the whole time mysqld is"
+					" running, and\n"
+					"InnoDB: needs to open also"
+					" some .ibd files if the"
+					" file-per-table storage\n"
+					"InnoDB: model is used."
+					" Current open files %lu,"
+					" max allowed"
+					" open files %lu.\n",
+					(ulong) fil_system->n_open,
+					(ulong) fil_system->max_n_open);
 			}
 		}
-		space = UT_LIST_GET_NEXT(space_list, space);
 	}
 
 	mutex_exit(&fil_system->mutex);
@@ -1727,6 +1742,7 @@ static
 ulint
 fil_write_lsn_and_arch_no_to_file(
 /*==============================*/
+	ulint	space,		/*!< in: space to write to */
 	ulint	sum_of_sizes,	/*!< in: combined size of previous files
 				in space, in database pages */
 	lsn_t	lsn,		/*!< in: lsn to write */
@@ -1739,11 +1755,11 @@ fil_write_lsn_and_arch_no_to_file(
 	buf1 = mem_alloc(2 * UNIV_PAGE_SIZE);
 	buf = ut_align(buf1, UNIV_PAGE_SIZE);
 
-	fil_read(TRUE, 0, 0, sum_of_sizes, 0, UNIV_PAGE_SIZE, buf, NULL);
+	fil_read(TRUE, space, 0, sum_of_sizes, 0, UNIV_PAGE_SIZE, buf, NULL);
 
 	mach_write_to_8(buf + FIL_PAGE_FILE_FLUSH_LSN, lsn);
 
-	fil_write(TRUE, 0, 0, sum_of_sizes, 0, UNIV_PAGE_SIZE, buf, NULL);
+	fil_write(TRUE, space, 0, sum_of_sizes, 0, UNIV_PAGE_SIZE, buf, NULL);
 
 	mem_free(buf1);
 
@@ -1763,30 +1779,35 @@ fil_write_flushed_lsn_to_data_files(
 {
 	fil_space_t*	space;
 	fil_node_t*	node;
-	ulint		sum_of_sizes;
 	ulint		err;
 
 	mutex_enter(&fil_system->mutex);
 
-	space = UT_LIST_GET_FIRST(fil_system->space_list);
+	for (space = UT_LIST_GET_FIRST(fil_system->space_list);
+	     space != NULL;
+	     space = UT_LIST_GET_NEXT(space_list, space)) {
 
-	while (space) {
 		/* We only write the lsn to all existing data files which have
 		been open during the lifetime of the mysqld process; they are
 		represented by the space objects in the tablespace memory
-		cache. Note that all data files in the system tablespace 0 are
-		always open. */
+		cache. Note that all data files in the system tablespace 0
+		and the UNDO log tablespaces (if separate) are always open. */
 
 		if (space->purpose == FIL_TABLESPACE
-		    && space->id == 0) {
-			sum_of_sizes = 0;
+		    && !fil_is_user_tablespace_id(space->id)) {
+
+			ulint	sum_of_sizes = 0;
+
+			for (node = UT_LIST_GET_FIRST(space->chain);
+			     node != NULL;
+			     node = UT_LIST_GET_NEXT(chain, node)) {
 
-			node = UT_LIST_GET_FIRST(space->chain);
-			while (node) {
 				mutex_exit(&fil_system->mutex);
 
 				err = fil_write_lsn_and_arch_no_to_file(
-					sum_of_sizes, lsn, arch_log_no);
+					space->id, sum_of_sizes, lsn,
+					arch_log_no);
+
 				if (err != DB_SUCCESS) {
 
 					return(err);
@@ -1795,10 +1816,8 @@ fil_write_flushed_lsn_to_data_files(
 				mutex_enter(&fil_system->mutex);
 
 				sum_of_sizes += node->size;
-				node = UT_LIST_GET_NEXT(chain, node);
 			}
 		}
-		space = UT_LIST_GET_NEXT(space_list, space);
 	}
 
 	mutex_exit(&fil_system->mutex);
@@ -2690,13 +2709,15 @@ fil_create_new_single_table_tablespace(
 					tablespace file in pages,
 					must be >= FIL_IBD_FILE_INITIAL_SIZE */
 {
-	os_file_t	file;
-	ibool		ret;
-	ulint		err;
-	byte*		buf2;
-	byte*		page;
-	ibool		success;
-	char*		path;
+	os_file_t		file;
+	ibool			ret;
+	ulint			err;
+	byte*			buf2;
+	byte*			page;
+	char*			path;
+	ibool			success;
+	os_file_create_t	create_mode;
+	os_file_create_t	error_ignore;
 
 	ut_a(space_id > 0);
 	ut_a(space_id < SRV_LOG_SPACE_FIRST_ID);
@@ -2710,9 +2731,28 @@ fil_create_new_single_table_tablespace(
 
 	path = fil_make_ibd_name(tablename, is_temp);
 
-	file = os_file_create(innodb_file_data_key, path,
-			      OS_FILE_CREATE, OS_FILE_NORMAL,
-			      OS_DATA_FILE, &ret);
+	/* When srv_file_per_table is on, file creation failure may not
+	be critical to the whole instance. Do not crash the server in
+	case of unknown errors.
+
+	Note "srv_file_per_table" is a global variable with no explicit
+	synchronization protection. It could be changed during this execution
+	path. It might not have the same value as the one when building the
+	table definition */
+
+	error_ignore = srv_file_per_table ? OS_FILE_ON_ERROR_NO_EXIT : 0;
+
+	create_mode = srv_file_per_table
+		    ? OS_FILE_CREATE | OS_FILE_ON_ERROR_NO_EXIT
+		    : OS_FILE_CREATE;
+
+	file = os_file_create(
+		innodb_file_data_key, path,
+		create_mode,
+		OS_FILE_NORMAL,
+		OS_DATA_FILE,
+		&ret);
+
 	if (ret == FALSE) {
 		ut_print_timestamp(stderr);
 		fputs("  InnoDB: Error creating file ", stderr);
@@ -4218,8 +4258,7 @@ fil_node_prepare_for_io(
 		fil_node_open_file(node, system, space);
 	}
 
-	if (node->n_pending == 0 && space->purpose == FIL_TABLESPACE
-	    && space->id != 0) {
+	if (node->n_pending == 0 && fil_space_belongs_in_lru(space)) {
 		/* The node is in the LRU list, remove it */
 
 		ut_a(UT_LIST_GET_LEN(system->LRU) > 0);
@@ -4264,8 +4303,8 @@ fil_node_complete_io(
 		}
 	}
 
-	if (node->n_pending == 0 && node->space->purpose == FIL_TABLESPACE
-	    && node->space->id != 0) {
+	if (node->n_pending == 0 && fil_space_belongs_in_lru(node->space)) {
+
 		/* The node must be put back to the LRU list */
 		UT_LIST_ADD_FIRST(LRU, system->LRU, node);
 	}
@@ -4430,7 +4469,7 @@ fil_io(
 			ut_error;
 		}
 
-		if (space->id != 0 && node->size == 0) {
+		if (fil_is_user_tablespace_id(space->id) && node->size == 0) {
 			/* We do not know the size of a single-table tablespace
 			before we open the file */
 
@@ -4450,7 +4489,7 @@ fil_io(
 	fil_node_prepare_for_io(node, fil_system, space);
 
 	/* Check that at least the start offset is within the bounds of a
-	single-table tablespace */
+	single-table tablespace, including rollback tablespaces. */
 	if (UNIV_UNLIKELY(node->size <= block_offset)
 	    && space->id != 0 && space->purpose == FIL_TABLESPACE) {
 
@@ -4816,8 +4855,7 @@ fil_validate(void)
 		ut_a(fil_node->n_pending == 0);
 		ut_a(!fil_node->being_extended);
 		ut_a(fil_node->open);
-		ut_a(fil_node->space->purpose == FIL_TABLESPACE);
-		ut_a(fil_node->space->id != 0);
+		ut_a(fil_space_belongs_in_lru(fil_node->space));
 
 		fil_node = UT_LIST_GET_NEXT(LRU, fil_node);
 	}

=== modified file 'storage/innobase/handler/ha_innodb.cc'
--- a/storage/innobase/handler/ha_innodb.cc	revid:vasil.dimov@stripped
+++ b/storage/innobase/handler/ha_innodb.cc	revid:inaam.rana@stripped
@@ -514,7 +514,7 @@ static
 int
 innobase_close_connection(
 /*======================*/
-	handlerton*	hton,		/*!< in: Innodb handlerton */
+	handlerton*	hton,		/*!< in/out: Innodb handlerton */
 	THD*		thd);		/*!< in: MySQL thread handle for
 					which to close the connection */
 
@@ -526,7 +526,7 @@ static
 int
 innobase_commit(
 /*============*/
-	handlerton*	hton,		/*!< in: Innodb handlerton */
+	handlerton*	hton,		/*!< in/out: Innodb handlerton */
 	THD*		thd,		/*!< in: MySQL thread handle of the
 					user for whom the transaction should
 					be committed */
@@ -542,7 +542,7 @@ static
 int
 innobase_rollback(
 /*==============*/
-	handlerton*	hton,		/*!< in: Innodb handlerton */ 
+	handlerton*	hton,		/*!< in/out: Innodb handlerton */
 	THD*		thd,		/*!< in: handle to the MySQL thread
 					of the user whose transaction should
 					be rolled back */
@@ -558,7 +558,7 @@ static
 int
 innobase_rollback_to_savepoint(
 /*===========================*/
-	handlerton*	hton,		/*!< in: InnoDB handlerton */
+	handlerton*	hton,		/*!< in/out: InnoDB handlerton */
 	THD*		thd,		/*!< in: handle to the MySQL thread of
 					the user whose XA transaction should
 					be rolled back to savepoint */
@@ -571,7 +571,7 @@ static
 int
 innobase_savepoint(
 /*===============*/
-	handlerton*	hton,		/*!< in: InnoDB handlerton */
+	handlerton*	hton,		/*!< in/out: InnoDB handlerton */
 	THD*		thd,		/*!< in: handle to the MySQL thread of
 					the user's XA transaction for which
 					we need to take a savepoint */
@@ -585,7 +585,7 @@ static
 int
 innobase_release_savepoint(
 /*=======================*/
-	handlerton*	hton,		/*!< in: handlerton for Innodb */
+	handlerton*	hton,		/*!< in/out: handlerton for Innodb */
 	THD*		thd,		/*!< in: handle to the MySQL thread
 					of the user whose transaction's
 					savepoint should be released */
@@ -597,7 +597,7 @@ static
 handler*
 innobase_create_handler(
 /*====================*/
-	handlerton*	hton,		/*!< in: handlerton for Innodb */
+	handlerton*	hton,		/*!< in/out: handlerton for Innodb */
 	TABLE_SHARE*	table,
 	MEM_ROOT*	mem_root);
 
@@ -2515,7 +2515,7 @@ mem_free_and_error:
 		goto error;
 	}
 
-	/* -------------- Log files ---------------------------*/
+	/* -------------- All log files ---------------------------*/
 
 	/* The default dir for log files is the datadir of MySQL */
 
@@ -3065,7 +3065,7 @@ static
 int
 innobase_rollback(
 /*==============*/
-	handlerton*	hton,		/*!< in: Innodb handlerton */ 
+	handlerton*	hton,		/*!< in: Innodb handlerton */
 	THD*		thd,		/*!< in: handle to the MySQL thread
 					of the user whose transaction should
 					be rolled back */
@@ -3147,7 +3147,7 @@ static
 int
 innobase_rollback_to_savepoint(
 /*===========================*/
-	handlerton*	hton,		/*!< in: Innodb handlerton */ 
+	handlerton*	hton,		/*!< in: Innodb handlerton */
 	THD*		thd,		/*!< in: handle to the MySQL thread
 					of the user whose transaction should
 					be rolled back to savepoint */
@@ -12183,14 +12183,6 @@ static MYSQL_SYSVAR_ULONG(purge_batch_si
   1,			/* Minimum value */
   5000, 0);		/* Maximum value */
 
-static MYSQL_SYSVAR_ULONG(rollback_segments, srv_rollback_segments,
-  PLUGIN_VAR_OPCMDARG,
-  "Number of UNDO logs to use.",
-  NULL, NULL,
-  128,			/* Default setting */
-  1,			/* Minimum value */
-  TRX_SYS_N_RSEGS, 0);	/* Maximum value */
-
 static MYSQL_SYSVAR_ULONG(purge_threads, srv_n_purge_threads,
   PLUGIN_VAR_OPCMDARG | PLUGIN_VAR_READONLY,
   "Purge threads can be from 0 to 32. Default is 0.",
@@ -12491,6 +12483,27 @@ static MYSQL_SYSVAR_STR(data_file_path,
   "Path to individual files and their sizes.",
   NULL, NULL, NULL);
 
+static MYSQL_SYSVAR_STR(undo_directory, srv_undo_dir,
+  PLUGIN_VAR_RQCMDARG | PLUGIN_VAR_READONLY,
+  "Directory where undo tablespace files live, this path can be absolute.",
+  NULL, NULL, ".");
+
+static MYSQL_SYSVAR_ULONG(undo_tablespaces, srv_undo_tablespaces,
+  PLUGIN_VAR_RQCMDARG | PLUGIN_VAR_READONLY,
+  "Number of undo tablespaces to use. ",
+  NULL, NULL,
+  0L,			/* Default seting */
+  0L,			/* Minimum value */
+  126L, 0);		/* Maximum value */
+
+static MYSQL_SYSVAR_ULONG(undo_logs, srv_undo_logs,
+  PLUGIN_VAR_OPCMDARG,
+  "Number of undo logs to use.",
+  NULL, NULL,
+  TRX_SYS_N_RSEGS,	/* Default setting */
+  1,			/* Minimum value */
+  TRX_SYS_N_RSEGS, 0);	/* Maximum value */
+
 static MYSQL_SYSVAR_LONG(autoinc_lock_mode, innobase_autoinc_lock_mode,
   PLUGIN_VAR_RQCMDARG | PLUGIN_VAR_READONLY,
   "The AUTOINC lock modes supported by InnoDB:               "
@@ -12665,7 +12678,9 @@ static struct st_mysql_sys_var* innobase
   MYSQL_SYSVAR(page_hash_locks),
 #endif /* defined UNIV_DEBUG || defined UNIV_PERF_DEBUG */
   MYSQL_SYSVAR(print_all_deadlocks),
-  MYSQL_SYSVAR(rollback_segments),
+  MYSQL_SYSVAR(undo_logs),
+  MYSQL_SYSVAR(undo_directory),
+  MYSQL_SYSVAR(undo_tablespaces),
   MYSQL_SYSVAR(sync_array_size),
   NULL
 };

=== modified file 'storage/innobase/include/os0file.h'
--- a/storage/innobase/include/os0file.h	revid:vasil.dimov@stripped
+++ b/storage/innobase/include/os0file.h	revid:inaam.rana@stripped
@@ -104,14 +104,28 @@ log. */
 
 #define OS_FILE_LOG_BLOCK_SIZE		512
 
-/** Options for file_create @{ */
-#define	OS_FILE_OPEN			51
-#define	OS_FILE_CREATE			52
-#define OS_FILE_OVERWRITE		53
-#define OS_FILE_OPEN_RAW		54
-#define	OS_FILE_CREATE_PATH		55
-#define	OS_FILE_OPEN_RETRY		56	/* for os_file_create() on
-						the first ibdata file */
+/** Options for os_file_create_func @{ */
+typedef enum os_file_create_enum {
+	OS_FILE_OPEN = 51,		/*!< to open an existing file (if
+					doesn't exist, error) */
+	OS_FILE_CREATE,			/*!< to create new file (if
+					exists, error) */
+	OS_FILE_OVERWRITE,		/*!< to create a new file, if exists
+					the overwrite old file */
+	OS_FILE_OPEN_RAW,		/*!< to open a raw device or disk
+					partition */
+	OS_FILE_CREATE_PATH,		/*!< to create the directories */
+	OS_FILE_OPEN_RETRY,		/*!< open with retry */
+
+	/** Flags that can be combined with the above values. Please ensure
+	that the above values stay below 128. */
+
+	OS_FILE_ON_ERROR_NO_EXIT = 128,	/*!< do not exit on unknown errors */
+	OS_FILE_ON_ERROR_SILENT = 256	/*!< don't print diagnostic messages to
+					the log unless it is a fatal error,
+					this flag is only used if
+					ON_ERROR_NO_EXIT is set */
+} os_file_create_t;
 
 #define OS_FILE_READ_ONLY		333
 #define	OS_FILE_READ_WRITE		444
@@ -457,13 +471,8 @@ os_file_create_simple_func(
 /*=======================*/
 	const char*	name,	/*!< in: name of the file or path as a
 				null-terminated string */
-	ulint		create_mode,/*!< in: OS_FILE_OPEN if an existing file is
-				opened (if does not exist, error), or
-				OS_FILE_CREATE if a new file is created
-				(if exists, error), or
-				OS_FILE_CREATE_PATH if new file
-				(if exists, error) and subdirectories along
-				its path are created (if needed)*/
+	os_file_create_t
+			create_mode,/*!< in: create mode */
 	ulint		access_type,/*!< in: OS_FILE_READ_ONLY or
 				OS_FILE_READ_WRITE */
 	ibool*		success);/*!< out: TRUE if succeed, FALSE if error */
@@ -479,15 +488,14 @@ os_file_create_simple_no_error_handling_
 /*=========================================*/
 	const char*	name,	/*!< in: name of the file or path as a
 				null-terminated string */
-	ulint		create_mode,/*!< in: OS_FILE_OPEN if an existing file
-				is opened (if does not exist, error), or
-				OS_FILE_CREATE if a new file is created
-				(if exists, error) */
+	os_file_create_t
+			create_mode,/*!< in: create mode */
 	ulint		access_type,/*!< in: OS_FILE_READ_ONLY,
 				OS_FILE_READ_WRITE, or
 				OS_FILE_READ_ALLOW_DELETE; the last option is
 				used by a backup program reading the file */
-	ibool*		success);/*!< out: TRUE if succeed, FALSE if error */
+	ibool*		success)/*!< out: TRUE if succeed, FALSE if error */
+	__attribute__((nonnull, warn_unused_result));
 /****************************************************************//**
 Tries to disable OS caching on an opened file descriptor. */
 UNIV_INTERN
@@ -511,14 +519,8 @@ os_file_create_func(
 /*================*/
 	const char*	name,	/*!< in: name of the file or path as a
 				null-terminated string */
-	ulint		create_mode,/*!< in: OS_FILE_OPEN if an existing file
-				is opened (if does not exist, error), or
-				OS_FILE_CREATE if a new file is created
-				(if exists, error),
-				OS_FILE_OVERWRITE if a new file is created
-				or an old overwritten;
-				OS_FILE_OPEN_RAW, if a raw device or disk
-				partition should be opened */
+	os_file_create_t
+			create_mode,/*!< in: create mode */
 	ulint		purpose,/*!< in: OS_FILE_AIO, if asynchronous,
 				non-buffered i/o is desired,
 				OS_FILE_NORMAL, if any normal file;
@@ -527,7 +529,8 @@ os_file_create_func(
 				async i/o or unbuffered i/o: look in the
 				function source code for the exact rules */
 	ulint		type,	/*!< in: OS_DATA_FILE or OS_LOG_FILE */
-	ibool*		success);/*!< out: TRUE if succeed, FALSE if error */
+	ibool*		success)/*!< out: TRUE if succeed, FALSE if error */
+	__attribute__((nonnull, warn_unused_result));
 /***********************************************************************//**
 Deletes a file. The file has to be closed before calling this.
 @return	TRUE if success */
@@ -535,7 +538,8 @@ UNIV_INTERN
 ibool
 os_file_delete(
 /*===========*/
-	const char*	name);	/*!< in: file path as a null-terminated string */
+	const char*	name);	/*!< in: file path as a null-terminated
+				string */
 
 /***********************************************************************//**
 Deletes a file if it exists. The file has to be closed before calling this.
@@ -544,7 +548,8 @@ UNIV_INTERN
 ibool
 os_file_delete_if_exists(
 /*=====================*/
-	const char*	name);	/*!< in: file path as a null-terminated string */
+	const char*	name);	/*!< in: file path as a null-terminated
+				string */
 /***********************************************************************//**
 NOTE! Use the corresponding macro os_file_rename(), not directly
 this function!
@@ -585,18 +590,14 @@ pfs_os_file_create_simple_func(
 	mysql_pfs_key_t key,	/*!< in: Performance Schema Key */
 	const char*	name,	/*!< in: name of the file or path as a
 				null-terminated string */
-	ulint		create_mode,/*!< in: OS_FILE_OPEN if an existing file is
-				opened (if does not exist, error), or
-				OS_FILE_CREATE if a new file is created
-				(if exists, error), or
-				OS_FILE_CREATE_PATH if new file
-				(if exists, error) and subdirectories along
-				its path are created (if needed)*/
+	os_file_create_t
+			create_mode,/*!< in: create mode */
 	ulint		access_type,/*!< in: OS_FILE_READ_ONLY or
 				OS_FILE_READ_WRITE */
 	ibool*		success,/*!< out: TRUE if succeed, FALSE if error */
 	const char*	src_file,/*!< in: file name where func invoked */
-	ulint		src_line);/*!< in: line where the func invoked */
+	ulint		src_line)/*!< in: line where the func invoked */
+	__attribute__((nonnull, warn_unused_result));
 
 /****************************************************************//**
 NOTE! Please use the corresponding macro
@@ -613,17 +614,16 @@ pfs_os_file_create_simple_no_error_handl
 	mysql_pfs_key_t key,	/*!< in: Performance Schema Key */
 	const char*	name,	/*!< in: name of the file or path as a
 				null-terminated string */
-	ulint		create_mode,/*!< in: OS_FILE_OPEN if an existing file
-				is opened (if does not exist, error), or
-				OS_FILE_CREATE if a new file is created
-				(if exists, error) */
+	os_file_create_t
+			create_mode, /*!< in: file create mode */
 	ulint		access_type,/*!< in: OS_FILE_READ_ONLY,
 				OS_FILE_READ_WRITE, or
 				OS_FILE_READ_ALLOW_DELETE; the last option is
 				used by a backup program reading the file */
 	ibool*		success,/*!< out: TRUE if succeed, FALSE if error */
 	const char*	src_file,/*!< in: file name where func invoked */
-	ulint		src_line);/*!< in: line where the func invoked */
+	ulint		src_line)/*!< in: line where the func invoked */
+	__attribute__((nonnull, warn_unused_result));
 
 /****************************************************************//**
 NOTE! Please use the corresponding macro os_file_create(), not directly
@@ -639,14 +639,8 @@ pfs_os_file_create_func(
 	mysql_pfs_key_t key,	/*!< in: Performance Schema Key */
 	const char*	name,	/*!< in: name of the file or path as a
 				null-terminated string */
-	ulint		create_mode,/*!< in: OS_FILE_OPEN if an existing file
-				is opened (if does not exist, error), or
-				OS_FILE_CREATE if a new file is created
-				(if exists, error),
-				OS_FILE_OVERWRITE if a new file is created
-				or an old overwritten;
-				OS_FILE_OPEN_RAW, if a raw device or disk
-				partition should be opened */
+	os_file_create_t
+			create_mode,/*!< in: file create mode */
 	ulint		purpose,/*!< in: OS_FILE_AIO, if asynchronous,
 				non-buffered i/o is desired,
 				OS_FILE_NORMAL, if any normal file;
@@ -657,7 +651,8 @@ pfs_os_file_create_func(
 	ulint		type,	/*!< in: OS_DATA_FILE or OS_LOG_FILE */
 	ibool*		success,/*!< out: TRUE if succeed, FALSE if error */
 	const char*	src_file,/*!< in: file name where func invoked */
-	ulint		src_line);/*!< in: line where the func invoked */
+	ulint		src_line)/*!< in: line where the func invoked */
+	__attribute__((nonnull, warn_unused_result));
 
 /***********************************************************************//**
 NOTE! Please use the corresponding macro os_file_close(), not directly

=== modified file 'storage/innobase/include/os0file.ic'
--- a/storage/innobase/include/os0file.ic	revid:vasil.dimov@stripped
+++ b/storage/innobase/include/os0file.ic	revid:inaam.rana@stripped
@@ -40,13 +40,8 @@ pfs_os_file_create_simple_func(
 	mysql_pfs_key_t key,	/*!< in: Performance Schema Key */
 	const char*	name,	/*!< in: name of the file or path as a
 				null-terminated string */
-	ulint		create_mode,/*!< in: OS_FILE_OPEN if an existing file is
-				opened (if does not exist, error), or
-				OS_FILE_CREATE if a new file is created
-				(if exists, error), or
-				OS_FILE_CREATE_PATH if new file
-				(if exists, error) and subdirectories along
-				its path are created (if needed)*/
+	os_file_create_t
+			create_mode,/*!< in: create mode */
 	ulint		access_type,/*!< in: OS_FILE_READ_ONLY or
 				OS_FILE_READ_WRITE */
 	ibool*		success,/*!< out: TRUE if succeed, FALSE if error */
@@ -88,10 +83,8 @@ pfs_os_file_create_simple_no_error_handl
 	mysql_pfs_key_t key,	/*!< in: Performance Schema Key */
 	const char*	name,	/*!< in: name of the file or path as a
 				null-terminated string */
-	ulint		create_mode,/*!< in: OS_FILE_OPEN if an existing file
-				is opened (if does not exist, error), or
-				OS_FILE_CREATE if a new file is created
-				(if exists, error) */
+	os_file_create_t
+			create_mode, /*!< in: file create mode */
 	ulint		access_type,/*!< in: OS_FILE_READ_ONLY,
 				OS_FILE_READ_WRITE, or
 				OS_FILE_READ_ALLOW_DELETE; the last option is
@@ -133,14 +126,8 @@ pfs_os_file_create_func(
 	mysql_pfs_key_t key,	/*!< in: Performance Schema Key */
 	const char*	name,	/*!< in: name of the file or path as a
 				null-terminated string */
-	ulint		create_mode,/*!< in: OS_FILE_OPEN if an existing file
-				is opened (if does not exist, error), or
-				OS_FILE_CREATE if a new file is created
-				(if exists, error),
-				OS_FILE_OVERWRITE if a new file is created
-				or an old overwritten;
-				OS_FILE_OPEN_RAW, if a raw device or disk
-				partition should be opened */
+	os_file_create_t
+			create_mode,/*!< in: file create mode */
 	ulint		purpose,/*!< in: OS_FILE_AIO, if asynchronous,
 				non-buffered i/o is desired,
 				OS_FILE_NORMAL, if any normal file;

=== modified file 'storage/innobase/include/srv0srv.h'
--- a/storage/innobase/include/srv0srv.h	revid:vasil.dimov@stripped
+++ b/storage/innobase/include/srv0srv.h	revid:inaam.rana@stripped
@@ -105,6 +105,16 @@ extern FILE*	srv_misc_tmpfile;
 /* Server parameters which are read from the initfile */
 
 extern char*	srv_data_home;
+
+/** Server undo tablespaces directory, can be absolute path. */
+extern char*	srv_undo_dir;
+
+/** Number of undo tablespaces to use. */
+extern	ulint	srv_undo_tablespaces;
+
+/* The number of undo segments to use */
+extern ulong	srv_undo_logs;
+
 #ifdef UNIV_LOG_ARCHIVE
 extern char*	srv_arch_dir;
 #endif /* UNIV_LOG_ARCHIVE */
@@ -309,9 +319,6 @@ extern ulong srv_n_purge_threads;
 /* the number of pages to purge in one batch */
 extern ulong srv_purge_batch_size;
 
-/* the number of rollback segments to use */
-extern ulong srv_rollback_segments;
-
 /* the number of sync wait arrays */
 extern ulong srv_sync_array_size;
 

=== modified file 'storage/innobase/include/trx0rseg.h'
--- a/storage/innobase/include/trx0rseg.h	revid:vasil.dimov@stripped
+++ b/storage/innobase/include/trx0rseg.h	revid:inaam.rana@stripped
@@ -29,6 +29,7 @@ Created 3/26/1996 Heikki Tuuri
 #include "univ.i"
 #include "trx0types.h"
 #include "trx0sys.h"
+#include "ut0bh.h"
 
 /******************************************************************//**
 Gets a rollback segment header.
@@ -121,15 +122,28 @@ UNIV_INTERN
 void
 trx_rseg_mem_free(
 /*==============*/
-	trx_rseg_t*	rseg);		/* in, own: instance to free */
+	trx_rseg_t*	rseg);		/*!< in, own: instance to free */
 
 /*********************************************************************
 Creates a rollback segment. */
 UNIV_INTERN
 trx_rseg_t*
-trx_rseg_create(void);
-/*==================*/
+trx_rseg_create(
+/*============*/
+	ulint	space);			/*!< in: id of UNDO tablespace */
 
+/********************************************************************
+Get the number of unique rollback tablespaces in use except space id 0.
+The last space id will be the sentinel value ULINT_UNDEFINED. The array
+will be sorted on space id. Note: space_ids should have have space for
+TRX_SYS_N_RSEGS + 1 elements.
+@return number of unique rollback tablespaces in use. */
+UNIV_INTERN
+ulint
+trx_rseg_get_n_undo_tablespaces(
+/*============================*/
+	ulint*		space_ids);	/*!< out: array of space ids of
+					UNDO tablespaces */
 /* Number of undo log slots in a rollback segment file copy */
 #define TRX_RSEG_N_SLOTS	(UNIV_PAGE_SIZE / 16)
 

=== modified file 'storage/innobase/include/trx0sys.h'
--- a/storage/innobase/include/trx0sys.h	revid:vasil.dimov@stripped
+++ b/storage/innobase/include/trx0sys.h	revid:inaam.rana@stripped
@@ -161,16 +161,6 @@ trx_sys_get_nth_rseg(
 /*=================*/
 	trx_sys_t*	sys,	/*!< in: trx system */
 	ulint		n);	/*!< in: index of slot */
-/***************************************************************//**
-Sets the pointer in the nth slot of the rseg array. */
-UNIV_INLINE
-void
-trx_sys_set_nth_rseg(
-/*=================*/
-	trx_sys_t*	sys,	/*!< in/out: trx system */
-	ulint		n,	/*!< in: index of slot */
-	trx_rseg_t*	rseg);	/*!< in: pointer to rseg object, NULL if slot
-				not in use */
 /**********************************************************************//**
 Gets a pointer to the transaction system file copy and x-locks its page.
 @return	pointer to system file copy, page x-locked */
@@ -405,11 +395,13 @@ trx_sys_file_format_max_set(
 	const char**	name);		/*!< out: max file format name or
 					NULL if not needed. */
 /*********************************************************************
-Creates the rollback segments */
+Creates the rollback segments
+@return number of rollback segments that are active. */
 UNIV_INTERN
-void
+ulint
 trx_sys_create_rsegs(
 /*=================*/
+	ulint	n_spaces,	/*!< number of tablespaces for UNDO logs */
 	ulint	n_rsegs);	/*!< number of rollback segments to create */
 /*****************************************************************//**
 Get the number of transaction in the system, independent of their state.
@@ -687,7 +679,7 @@ struct trx_sys_struct{
 	UT_LIST_BASE_NODE_T(trx_t) mysql_trx_list;
 					/*!< List of transactions created
 					for MySQL */
-	trx_rseg_t*	rseg_array[TRX_SYS_N_RSEGS];
+	trx_rseg_t*	const rseg_array[TRX_SYS_N_RSEGS];
 					/*!< Pointer array to rollback
 					segments; NULL if slot not in use;
 					created and destroyed in

=== modified file 'storage/innobase/include/trx0sys.ic'
--- a/storage/innobase/include/trx0sys.ic	revid:vasil.dimov@stripped
+++ b/storage/innobase/include/trx0sys.ic	revid:inaam.rana@stripped
@@ -85,22 +85,6 @@ trx_sys_get_nth_rseg(
 	return(sys->rseg_array[n]);
 }
 
-/***************************************************************//**
-Sets the pointer in the nth slot of the rseg array. */
-UNIV_INLINE
-void
-trx_sys_set_nth_rseg(
-/*=================*/
-	trx_sys_t*	sys,	/*!< in/out: trx system */
-	ulint		n,	/*!< in: index of slot */
-	trx_rseg_t*	rseg)	/*!< in: pointer to rseg object, NULL if slot
-				not in use */
-{
-	ut_ad(n < TRX_SYS_N_RSEGS);
-
-	sys->rseg_array[n] = rseg;
-}
-
 /**********************************************************************//**
 Gets a pointer to the transaction system header and x-latches its page.
 @return	pointer to system header, page x-latched. */

=== modified file 'storage/innobase/include/ut0ut.h'
--- a/storage/innobase/include/ut0ut.h	revid:vasil.dimov@stripped
+++ b/storage/innobase/include/ut0ut.h	revid:inaam.rana@stripped
@@ -429,6 +429,18 @@ ut_strerr(
 /*======*/
 	enum db_err	num);	/*!< in: error number */
 
+/****************************************************************
+Sort function for ulint arrays. */
+UNIV_INTERN
+void
+ut_ulint_sort(
+/*==========*/
+	ulint*	arr,		/*!< in/out: array to sort */
+	ulint*	aux_arr,	/*!< in/out: aux array to use in sort */
+	ulint	low,		/*!< in: lower bound */
+	ulint	high)		/*!< in: upper bound */
+	__attribute__((nonnull));
+
 #ifndef UNIV_NONINL
 #include "ut0ut.ic"
 #endif

=== modified file 'storage/innobase/lock/lock0lock.c'
--- a/storage/innobase/lock/lock0lock.c	revid:vasil.dimov@stripped
+++ b/storage/innobase/lock/lock0lock.c	revid:inaam.rana@stripped
@@ -49,8 +49,8 @@ Created 5/7/1996 Heikki Tuuri
 graph of transactions */
 #define LOCK_MAX_N_STEPS_IN_DEADLOCK_CHECK 1000000
 
-/* Restricts the recursion depth of the search we will do in the waits-for
-graph of transactions */
+/* Restricts the search depth we will do in the waits-for graph of
+transactions */
 #define LOCK_MAX_DEPTH_IN_DEADLOCK_CHECK 200
 
 /* When releasing transaction locks, this specifies how often we release
@@ -298,6 +298,8 @@ waiting, in its lock queue. Solution: We
 locks, so that also the waiting locks are transformed to granted gap type
 locks on the inserted record. */
 
+#define LOCK_STACK_SIZE		OS_THREAD_MAX_N
+
 /* LOCK COMPATIBILITY MATRIX
  *    IS IX S  X  AI
  * IS +	 +  +  -  +
@@ -340,6 +342,42 @@ static const byte lock_strength_matrix[5
  /* AI */ {  FALSE, FALSE, FALSE, FALSE,  TRUE}
 };
 
+/** Deadlock check context. */
+typedef struct lock_deadlock_ctx_struct lock_deadlock_ctx_t;
+
+/** Deadlock check context. */
+struct lock_deadlock_ctx_struct {
+	const trx_t*	start;		/*!< Joining transaction that is
+					requesting a lock in an incompatible
+					mode */
+
+	const lock_t*	wait_lock;	/*!< Lock that trx wants */
+
+	ib_uint64_t	mark_start;	/*!<  Value of lock_mark_count at
+					the start of the deadlock check. */
+
+	ulint		depth;		/*!< Stack depth */
+
+	ulint		cost;		/*!< Calculation steps thus far */
+
+	ibool		too_deep;	/*!< TRUE if search was too deep and
+					was aborted */
+};
+
+typedef struct lock_stack_struct lock_stack_t;
+
+/** DFS visited node information used during deadlock checking. */
+struct lock_stack_struct {
+	const lock_t*	lock;			/*!< Current lock */
+	const lock_t*	wait_lock;		/*!< Waiting for lock */
+	unsigned	heap_no:16;		/*!< heap number if rec lock */
+};
+
+/** Stack to use during DFS search. Currently only a single stack is required
+because there is no parallel deadlock check. This stack is protected by
+the lock_sys_t::mutex. */
+static lock_stack_t*	lock_stack;
+
 /** The count of the types of locks. */
 static const ulint	lock_types = UT_ARR_SIZE(lock_compatibility_matrix);
 
@@ -380,51 +418,20 @@ Monitor will then fetch it and print */
 UNIV_INTERN ibool	lock_deadlock_found = FALSE;
 static FILE*		lock_latest_err_file;
 
-/* Flags for recursive deadlock search */
-enum lock_victim_enum {
-	LOCK_VICTIM_NONE,
-	LOCK_VICTIM_IS_START,
-	LOCK_VICTIM_IS_OTHER,
-	LOCK_VICTIM_EXCEED_MAX_DEPTH
-};
-
-typedef enum lock_victim_enum lock_victim_t;
-
-/********************************************************************//**
-Checks if a lock request results in a deadlock.
-@return TRUE if a deadlock was detected and we chose trx as a victim;
-FALSE if no deadlock, or there was a deadlock, but we chose other
-transaction(s) as victim(s) */
-static
-ibool
-lock_deadlock_occurs(
-/*=================*/
-	lock_t*	lock,	/*!< in: lock the transaction is requesting */
-	trx_t*	trx);	/*!< in: transaction */
 /********************************************************************//**
-Looks recursively for a deadlock.
-@return LOCK_VICTIM_NONE if no deadlock found, LOCK_VICTIM_IS_START
-if there was a deadlock and we chose 'start' as the victim,
-LOCK_VICTIM_IS_OTHER if a deadlock was found and we chose some other
-trx as a victim: we must do the search again in this last case because
-there may be another deadlock!  LOCK_EXCEED_MAX_DEPTH if the lock search
-exceeds max steps or max depth. */
+Checks if a joining lock request results in a deadlock. If a deadlock is
+found this function will resolve the dadlock by choosing a victim transaction
+and rolling it back. It will attempt to resolve all deadlocks. The returned
+transaction id will be the joining transaction id or 0 if some other
+transaction was chosen as a victim and rolled back or no deadlock found.
+
+@return id of transaction chosen as victim or 0 */
 static
-lock_victim_t
-lock_deadlock_recursive(
-/*====================*/
-	trx_t*	start,		/*!< in: recursion starting point */
-	trx_t*	trx,		/*!< in: a transaction waiting for a lock */
-	lock_t*	wait_lock,	/*!< in:  lock that is waiting to be granted */
-	ulint*	cost,		/*!< in/out: number of calculation steps thus
-				far: if this exceeds LOCK_MAX_N_STEPS_...
-				we return LOCK_VICTIM_EXCEED_MAX_DEPTH */
-	ulint	depth,		/*!< in: recursion depth: if this exceeds
-				LOCK_MAX_DEPTH_IN_DEADLOCK_CHECK, we
-				return LOCK_VICTIM_EXCEED_MAX_DEPTH */
-	const ib_uint64_t
-		mark_start);	/*!< in: Value of lock_mark_count at the start
-				of the deadlock check. */
+trx_id_t
+lock_deadlock_check_and_resolve(
+/*===========================*/
+	const lock_t*	lock,	/*!< in: lock the transaction is requesting */
+	const trx_t*	trx);	/*!< in: transaction */
 
 /*********************************************************************//**
 Gets the nth bit of a record lock.
@@ -591,6 +598,8 @@ lock_sys_create(
 
 	lock_sys = mem_zalloc(lock_sys_sz);
 
+	lock_stack = mem_alloc(sizeof(*lock_stack) * LOCK_STACK_SIZE);
+
 	lock_sys->waiting_threads = (srv_slot_t*) &lock_sys[1];
 
 	lock_sys->last_slot = lock_sys->waiting_threads;
@@ -625,9 +634,11 @@ lock_sys_close(void)
 	mutex_free(&lock_sys->mutex);
 	mutex_free(&lock_sys->wait_mutex);
 
+	mem_free(lock_stack);
 	mem_free(lock_sys);
 
 	lock_sys = NULL;
+	lock_stack = NULL;
 }
 
 /*********************************************************************//**
@@ -1827,8 +1838,9 @@ lock_rec_enqueue_waiting(
 	dict_index_t*		index,	/*!< in: index of record */
 	que_thr_t*		thr)	/*!< in: query thread */
 {
-	lock_t*	lock;
-	trx_t*	trx;
+	trx_t*			trx;
+	lock_t*			lock;
+	trx_id_t		victim_trx_id;
 
 	ut_ad(lock_mutex_own());
 
@@ -1867,20 +1879,34 @@ lock_rec_enqueue_waiting(
 	lock = lock_rec_create(
 		type_mode | LOCK_WAIT, block, heap_no, index, trx, TRUE);
 
-	/* Check if a deadlock occurs: if yes, remove the lock request and
-	return an error code */
-	if (UNIV_UNLIKELY(lock_deadlock_occurs(lock, trx))) {
+	/* Release the mutex to obey the latching order.
+	This is safe, because lock_deadlock_check_and_resolve()
+	is invoked when a lock wait is enqueued for the currently
+	running transaction. Because trx is a running transaction
+	(it is not currently suspended because of a lock wait),
+	its state can only be changed by this thread, which is
+	currently associated with the transaction. */
+
+	trx_mutex_exit(trx);
+
+	victim_trx_id = lock_deadlock_check_and_resolve(lock, trx);
+
+	trx_mutex_enter(trx);
+
+	if (victim_trx_id != 0) {
+
+		ut_ad(victim_trx_id == trx->id);
 
 		lock_reset_lock_and_trx_wait(lock);
 		lock_rec_reset_nth_bit(lock, heap_no);
 
 		return(DB_DEADLOCK);
-	}
 
-	/* If there was a deadlock but we chose another transaction as a
-	victim, it is possible that we already have the lock now granted! */
+	} else if (trx->lock.wait_lock == NULL) {
 
-	if (trx->lock.wait_lock == NULL) {
+		/* If there was a deadlock but we chose another
+		transaction as a victim, it is possible that we
+		already have the lock now granted! */
 
 		return(DB_SUCCESS_LOCKED_REC);
 	}
@@ -2140,16 +2166,17 @@ lock_rec_lock_slow(
 		nothing */
 
 	} else if (lock_rec_other_has_conflicting(mode, block, heap_no, trx)) {
+		/* If another transaction has a non-gap conflicting
+		request in the queue, as this transaction does not
+		have a lock strong enough already granted on the
+		record, we have to wait. */
 
-		/* If another transaction has a non-gap conflicting request in
-		the queue, as this transaction does not have a lock strong
-		enough already granted on the record, we have to wait. */
+		err = lock_rec_enqueue_waiting(
+			mode, block, heap_no, index, thr);
 
-		err = lock_rec_enqueue_waiting(mode, block, heap_no,
-						index, thr);
 	} else if (!impl) {
-		/* Set the requested lock on the record, note that we already
-		own the transaction mutex. */
+		/* Set the requested lock on the record, note that
+		we already own the transaction mutex. */
 
 		lock_rec_add_to_queue(
 			LOCK_REC | mode, block, heap_no, index, trx, TRUE);
@@ -2216,9 +2243,9 @@ lock_rec_lock(
 
 /*********************************************************************//**
 Checks if a waiting record lock request still has to wait in a queue.
-@return	TRUE if still has to wait */
+@return	lock that is causing the wait */
 static
-ibool
+const lock_t*
 lock_rec_has_to_wait_in_queue(
 /*==========================*/
 	const lock_t*	wait_lock)	/*!< in: waiting record lock */
@@ -2251,11 +2278,11 @@ lock_rec_has_to_wait_in_queue(
 		    && (p[bit_offset] & bit_mask)
 		    && lock_has_to_wait(wait_lock, lock)) {
 
-			return(TRUE);
+			return(lock);
 		}
 	}
 
-	return(FALSE);
+	return(NULL);
 }
 
 /*************************************************************//**
@@ -2270,6 +2297,7 @@ lock_grant(
 	ut_ad(lock_mutex_own());
 
 	lock_reset_lock_and_trx_wait(lock);
+
 	trx_mutex_enter(lock->trx);
 
 	if (lock_get_mode(lock) == LOCK_AUTO_INC) {
@@ -2386,7 +2414,8 @@ lock_rec_dequeue_from_page(
 	MONITOR_DEC(MONITOR_NUM_RECLOCK);
 
 	/* Check if waiting locks in the queue can now be granted: grant
-	locks if there are no conflicting locks ahead. */
+	locks if there are no conflicting locks ahead. Stop at the first
+	X lock that is waiting or has been granted. */
 
 	for (lock = lock_rec_get_first_on_page_addr(space, page_no);
 	     lock != NULL;
@@ -3455,6 +3484,8 @@ lock_deadlock_lock_print(
 /*=====================*/
 	const lock_t*	lock)	/*!< in: record or table type lock */
 {
+	ut_ad(lock_mutex_own());
+
 	if (lock_get_type_low(lock) == LOCK_REC) {
 		lock_rec_print(lock_latest_err_file, lock);
 
@@ -3473,307 +3504,475 @@ lock_deadlock_lock_print(
 /** Used in deadlock tracking. Protected by lock_sys->mutex. */
 static ib_uint64_t	lock_mark_counter = 0;
 
+/** Check if the search is too deep. */
+#define lock_deadlock_too_deep(c)				\
+	(c->depth > LOCK_MAX_DEPTH_IN_DEADLOCK_CHECK		\
+	 || c->cost > LOCK_MAX_N_STEPS_IN_DEADLOCK_CHECK)
+
 /********************************************************************//**
-Checks if a lock request results in a deadlock.
-@return TRUE if a deadlock was detected and we chose trx as a victim;
-FALSE if no deadlock, or there was a deadlock, but we chose other
-transaction(s) as victim(s) */
+Get the next lock in the queue that is owned by a transaction whose
+sub-tree has not already been searched.
+@return next lock or NULL if at end of queue */
 static
-ibool
-lock_deadlock_occurs(
-/*=================*/
-	lock_t*	lock,	/*!< in: lock the transaction is requesting */
-	trx_t*	trx)	/*!< in/out: transaction */
+const lock_t*
+lock_get_next_lock(
+/*===============*/
+	const lock_deadlock_ctx_t*
+				ctx,	/*!< in: deadlock context */
+	const lock_t*		lock,	/*!< in: lock in the queue */
+	ulint			heap_no)/*!< in: heap no if rec lock else
+					ULINT_UNDEFINED */
 {
-	ulint		cost	= 0;
-
-	ut_ad(trx);
-	ut_ad(lock);
 	ut_ad(lock_mutex_own());
-	ut_ad(trx_mutex_own(trx));
-	ut_ad(trx->in_trx_list);
 
-retry:
-	/* We check that adding this trx to the waits-for graph
-	does not produce a cycle. First mark all active transactions
-	with 0: */
-
-	switch (lock_deadlock_recursive(
-		trx, trx, lock, &cost, 0, lock_mark_counter)) {
-
-	case LOCK_VICTIM_IS_OTHER:
-		/* We chose some other trx as a victim: retry if there still
-		is a deadlock */
-		goto retry;
-
-	case LOCK_VICTIM_EXCEED_MAX_DEPTH:
-		/* Release the mutex to obey the latching order.
-		This is safe, because lock_deadlock_occurs() is invoked
-		when a lock wait is enqueued for the currently running
-		transaction. Because trx is a running transaction
-		(it is not currently suspended because of a lock wait),
-		its state can only be changed by this thread, which is
-		currently associated with the transaction. */
-		trx_mutex_exit(trx);
+	do {
+		if (lock_get_type_low(lock) == LOCK_REC) {
+			ut_ad(heap_no != ULINT_UNDEFINED);
+			lock = lock_rec_get_next_const(heap_no, lock);
+		} else {
+			ut_ad(heap_no == ULINT_UNDEFINED);
+			ut_ad(lock_get_type_low(lock) == LOCK_TABLE);
+			lock = UT_LIST_GET_PREV(un_member.tab_lock.locks, lock);
+		}
 
-		/* If the lock search exceeds the max step
-		or the max depth, the current trx will be
-		the victim. Print its information. */
-		lock_deadlock_start_print();
-
-		lock_deadlock_fputs(
-			"TOO DEEP OR LONG SEARCH IN THE LOCK TABLE"
-			" WAITS-FOR GRAPH, WE WILL ROLL BACK"
-			" FOLLOWING TRANSACTION \n\n"
-			"*** TRANSACTION:\n");
+		if (lock == NULL) {
+			return(NULL);
+		}
 
-		lock_deadlock_trx_print(trx, 3000);
+	} while (lock->trx->lock.deadlock_mark > ctx->mark_start);
 
-		lock_deadlock_fputs(
-			"*** WAITING FOR THIS LOCK TO BE GRANTED:\n");
+	ut_ad(lock_get_type_low(lock) == lock_get_type_low(ctx->wait_lock));
 
-		lock_deadlock_lock_print(lock);
+	return(lock);
+}
 
-		trx_mutex_enter(trx);
-		break;
+/********************************************************************//**
+Get the first lock to search. The search starts from the current
+wait_lock. What we are really interested in is an edge from the
+current wait_lock's owning transaction to another transaction that has
+a lock ahead in the queue. We skip locks where the owning transaction's
+sub-tree has already been searched.
+@return first lock or NULL */
+static
+const lock_t*
+lock_get_first_lock(
+/*================*/
+	const lock_deadlock_ctx_t*
+				ctx,	/*!< in: deadlock context */
+	ulint*			heap_no)/*!< out: heap no if rec lock,
+					else ULINT_UNDEFINED */
+{
+	const lock_t*		lock;
 
-	case LOCK_VICTIM_IS_START:
-		lock_deadlock_fputs("*** WE ROLL BACK TRANSACTION (2)\n");
-		break;
+	ut_ad(lock_mutex_own());
 
-	case LOCK_VICTIM_NONE:
-		/* No deadlock detected */
-		ut_ad(trx_mutex_own(trx));
-		ut_ad(lock_mutex_own());
-		return(FALSE);
+	lock = ctx->wait_lock;
+
+	if (lock_get_type_low(lock) == LOCK_REC) {
+
+		*heap_no = lock_rec_find_set_bit(lock);
+		ut_ad(*heap_no != ULINT_UNDEFINED);
+
+		lock = lock_rec_get_first_on_page_addr(
+			lock->un_member.rec_lock.space,
+			lock->un_member.rec_lock.page_no);
+	} else {
+		*heap_no = ULINT_UNDEFINED;
+		ut_ad(lock_get_type_low(lock) == LOCK_TABLE);
+		lock = UT_LIST_GET_PREV(un_member.tab_lock.locks, lock);
 	}
 
-	lock_deadlock_found = TRUE;
+	ut_ad(lock != NULL);
 
-	ut_ad(trx_mutex_own(trx));
-	ut_ad(lock_mutex_own());
+	/* Skip sub-trees that have already been searched. */
 
-	return(TRUE);
+	if (lock->trx->lock.deadlock_mark > ctx->mark_start) {
+		return(lock_get_next_lock(ctx, lock, *heap_no));
+	}
+
+	ut_ad(lock_get_type_low(lock) == lock_get_type_low(ctx->wait_lock));
+
+	return(lock);
 }
 
 /********************************************************************//**
-Looks recursively for a deadlock.
-@return LOCK_VICTIM_NONE if no deadlock found, LOCK_VICTIM_IS_START
-if there was a deadlock and we chose 'start' as the victim,
-LOCK_VICTIM_IS_OTHER if a deadlock was found and we chose some other
-trx as a victim: we must do the search again in this last case because
-there may be another deadlock!  LOCK_EXCEED_MAX_DEPTH if the lock search
-exceeds max steps or max depth. */
+Notify that a deadlock has been detected and print the conflicting
+transaction info. */
 static
-lock_victim_t
-lock_deadlock_recursive(
-/*====================*/
-	trx_t*	start,		/*!< in: recursion starting point */
-	trx_t*	trx,		/*!< in: a transaction waiting for a lock */
-	lock_t*	wait_lock,	/*!< in: lock that is waiting to be granted */
-	ulint*	cost,		/*!< in/out: number of calculation steps thus
-				far: if this exceeds LOCK_MAX_N_STEPS_...
-				we return LOCK_VICTIM_EXCEED_MAX_DEPTH */
-	ulint	depth,		/*!< in: recursion depth: if this exceeds
-				LOCK_MAX_DEPTH_IN_DEADLOCK_CHECK, we
-				return LOCK_VICTIM_EXCEED_MAX_DEPTH */
-	const ib_uint64_t
-		mark_start)	/*!< in: Value of lock_mark_count at the start
-				of the deadlock check. */
+void
+lock_deadlock_notify(
+/*=================*/
+	const lock_deadlock_ctx_t*	ctx,	/*!< in: deadlock context */
+	const lock_t*			lock)	/*!< in: lock causing
+						deadlock */
 {
-	lock_victim_t	ret;
-	lock_t*		lock;
-	ulint		heap_no		= ULINT_UNDEFINED;
+	ut_ad(lock_mutex_own());
+
+	lock_deadlock_start_print();
+
+	lock_deadlock_fputs("\n*** (1) TRANSACTION:\n");
+
+	lock_deadlock_trx_print(ctx->wait_lock->trx, 3000);
+
+	lock_deadlock_fputs("*** (1) WAITING FOR THIS LOCK TO BE GRANTED:\n");
+
+	lock_deadlock_lock_print(ctx->wait_lock);
+
+	lock_deadlock_fputs("*** (2) TRANSACTION:\n");
+
+	lock_deadlock_trx_print(lock->trx, 3000);
+
+	lock_deadlock_fputs("*** (2) HOLDS THE LOCK(S):\n");
+
+	lock_deadlock_lock_print(lock);
+
+	lock_deadlock_fputs("*** (2) WAITING FOR THIS LOCK TO BE GRANTED:\n");
 
-	ut_a(trx);
-	ut_a(start);
-	ut_a(wait_lock);
+	lock_deadlock_lock_print(ctx->start->lock.wait_lock);
+
+#ifdef UNIV_DEBUG
+	if (lock_print_waits) {
+		fputs("Deadlock detected\n", stderr);
+	}
+#endif /* UNIV_DEBUG */
+}
+
+/********************************************************************//**
+Select the victim transaction that should be rolledback.
+@return victim transaction */
+static
+const trx_t*
+lock_deadlock_select_victim(
+/*========================*/
+	const lock_deadlock_ctx_t*	ctx)	/*!< in: deadlock context */
+{
 	ut_ad(lock_mutex_own());
-	ut_ad(trx->in_trx_list);
-	ut_ad(mark_start <= lock_mark_counter);
+	ut_ad(ctx->wait_lock->trx != ctx->start);
 
-	if (trx->lock.deadlock_mark > mark_start) {
-		/* We have already exhaustively searched the subtree starting
-		from this trx */
+	if (trx_weight_ge(ctx->wait_lock->trx, ctx->start)) {
+		/* The joining  transaction is 'smaller',
+		choose it as the victim and roll it back. */
 
-		return(LOCK_VICTIM_NONE);
+		return(ctx->start);
 	}
 
-	(*cost)++;
+	return(ctx->wait_lock->trx);
+}
 
-	if (lock_get_type_low(wait_lock) == LOCK_REC) {
-		ulint		space;
-		ulint		page_no;
+/********************************************************************//**
+Check whether the current waiting lock in the context has to wait for
+the given lock that is ahead in the queue.
+@return lock instance that could cause potential deadlock. */
+static
+const lock_t*
+lock_deadlock_check(
+/*================*/
+	const lock_deadlock_ctx_t*	ctx,	/*!< in: deadlock context */
+	const lock_t*			lock)	/*!< in: lock to check */
+{
+	ut_ad(lock_mutex_own());
 
-		heap_no = lock_rec_find_set_bit(wait_lock);
-		ut_a(heap_no != ULINT_UNDEFINED);
+	/* If it is the joining transaction wait lock. */
+	if (lock == ctx->start->lock.wait_lock) {
+		; /* Skip */
+	} else if (lock == ctx->wait_lock) {
 
-		space = wait_lock->un_member.rec_lock.space;
-		page_no = wait_lock->un_member.rec_lock.page_no;
+		/* We can mark this subtree as searched */
+		ut_ad(lock->trx->lock.deadlock_mark <= ctx->mark_start);
+		lock->trx->lock.deadlock_mark = ++lock_mark_counter;
 
-		lock = lock_rec_get_first_on_page_addr(space, page_no);
+		/* We are not prepared for an overflow. This 64-bit
+		counter should never wrap around. At 10^9 increments
+		per second, it would take 10^3 years of uptime. */
 
-		ut_ad(lock != NULL);
+		ut_ad(lock_mark_counter > 0);
 
-	} else {
-		lock = wait_lock;
+	} else if (lock_has_to_wait(ctx->wait_lock, lock)) {
+
+		return(lock);
 	}
 
-	/* Look at the locks ahead of wait_lock in the lock queue */
+	return(NULL);
+}
 
-	for (;;) {
-		/* Get previous table lock. */
-		if (heap_no == ULINT_UNDEFINED) {
+/********************************************************************//**
+Pop the deadlock search state from the stack.
+@return stack slot instance that was on top of the stack. */
+static
+const lock_stack_t const*
+lock_deadlock_pop(
+/*==============*/
+	lock_deadlock_ctx_t*	ctx)		/*!< in/out: context */
+{
+	const lock_stack_t const*	stack;
+	const trx_lock_t const*		trx_lock;
 
-			lock = UT_LIST_GET_PREV(
-				un_member.tab_lock.locks, lock);
-		}
+	ut_ad(lock_mutex_own());
 
-		if (lock == NULL || lock == wait_lock) {
-			/* We can mark this subtree as searched */
-			ut_a(trx->lock.deadlock_mark <= mark_start);
-			trx->lock.deadlock_mark = ++lock_mark_counter;
-
-			/* We are not prepared for an overflow. This 64-bit
-			counter should never wrap around. At 10^9 increments
-			per second, it would take 10^3 years of uptime. */
+	ut_ad(ctx->depth > 0);
 
-			ut_a(lock_mark_counter > 0);
+	do {
+		/* Restore search state. */
 
-			return(LOCK_VICTIM_NONE);
-		}
+		stack = &lock_stack[--ctx->depth];
+		trx_lock = &stack->lock->trx->lock;
 
-		if (lock_has_to_wait(wait_lock, lock)) {
+		/* Skip sub-trees that have already been searched. */
+	} while (ctx->depth > 0 && trx_lock->deadlock_mark > ctx->mark_start);
 
-			trx_t*	lock_trx;
+	return(ctx->depth == 0) ? NULL : stack;
+}
 
-			ibool	too_far
-				= depth > LOCK_MAX_DEPTH_IN_DEADLOCK_CHECK
-				|| *cost > LOCK_MAX_N_STEPS_IN_DEADLOCK_CHECK;
+/********************************************************************//**
+Push the deadlock search state onto the stack.
+@return slot that was used in the stack */
+static
+lock_stack_t*
+lock_deadlock_push(
+/*===============*/
+	lock_deadlock_ctx_t*	ctx,		/*!< in/out: context */
+	const lock_t*		lock,		/*!< in: current lock */
+	ulint			heap_no)	/*!< in: heap number */
+{
+	ut_ad(lock_mutex_own());
 
-			lock_trx = lock->trx;
+	/* Save current search state. */
 
-			if (lock_trx == start) {
+	if (LOCK_STACK_SIZE > ctx->depth) {
+		lock_stack_t*	stack;
 
-				trx_mutex_exit(start);
+		stack = &lock_stack[ctx->depth++];
 
-				/* We came back to the recursion starting
-				point: a deadlock detected; or we have
-				searched the waits-for graph too long */
+		stack->lock = lock;
+		stack->heap_no = heap_no;
+		stack->wait_lock = ctx->wait_lock;
 
-				lock_deadlock_start_print();
+		return(stack);
+	}
+
+	return(NULL);
+}
 
-				lock_deadlock_fputs("\n*** (1) TRANSACTION:\n");
+/********************************************************************//**
+Looks iteratively for a deadlock.
+@return 0 if no deadlock else the victim transaction id.*/
+static
+trx_id_t
+lock_deadlock_search(
+/*=================*/
+	lock_deadlock_ctx_t*	ctx)	/*!< in/out: deadlock context */
+{
+	const lock_t*	lock;
+	ulint		heap_no;
 
-				lock_deadlock_trx_print(wait_lock->trx, 3000);
+	ut_ad(lock_mutex_own());
+	ut_ad(!trx_mutex_own(ctx->start));
 
-				lock_deadlock_fputs(
-					"*** (1) WAITING FOR THIS LOCK"
-					" TO BE GRANTED:\n");
+	ut_ad(ctx->start != NULL);
+	ut_ad(ctx->wait_lock != NULL);
+	ut_ad(ctx->wait_lock->trx->in_trx_list);
+	ut_ad(ctx->mark_start <= lock_mark_counter);
 
-				lock_deadlock_lock_print(wait_lock);
+	/* Look at the locks ahead of wait_lock in the lock queue. */
+	lock = lock_get_first_lock(ctx, &heap_no);
+	do {
+		/* We should never visit the same sub-tree more than once. */
+		ut_ad(lock->trx->lock.deadlock_mark <= ctx->mark_start);
 
-				lock_deadlock_fputs("*** (2) TRANSACTION:\n");
+		++ctx->cost;
 
-				lock_deadlock_trx_print(lock->trx, 3000);
+		if (lock_deadlock_check(ctx, lock) == NULL) {
 
-				lock_deadlock_fputs(
-					"*** (2) HOLDS THE LOCK(S):\n");
+			/* No conflict found, skip this lock. */
 
-				lock_deadlock_lock_print(lock);
+		} else if (lock->trx == ctx->start) {
 
-				lock_deadlock_fputs(
-					"*** (2) WAITING FOR THIS LOCK"
-					" TO BE GRANTED:\n");
+			/* Found a cycle. */
 
-				lock_deadlock_lock_print(start->lock.wait_lock);
-#ifdef UNIV_DEBUG
-				if (lock_print_waits) {
-					fputs("Deadlock detected\n",
-					      stderr);
-				}
-#endif /* UNIV_DEBUG */
-				MONITOR_INC(MONITOR_DEADLOCK);
+			lock_deadlock_notify(ctx, lock);
 
-				if (trx_weight_ge(wait_lock->trx, start)) {
-					/* Our recursion starting point
-					transaction is 'smaller', let us
-					choose 'start' as the victim and roll
-					back it */
+			return(lock_deadlock_select_victim(ctx)->id);
 
-					trx_mutex_enter(start);
+		} else if (lock_deadlock_too_deep(ctx)) {
 
-					return(LOCK_VICTIM_IS_START);
-				}
+			/* Search too deep to continue. */
 
-				lock_deadlock_found = TRUE;
+			ctx->too_deep = TRUE;
 
-				/* Let us choose the transaction of wait_lock
-				as a victim to try to avoid deadlocking our
-				recursion starting point transaction */
+			/* Select the joining transaction as the victim. */
+			return(ctx->start->id);
 
-				lock_deadlock_fputs(
-					"*** WE ROLL BACK TRANSACTION (1)\n");
+		} else if (lock->trx->lock.que_state == TRX_QUE_LOCK_WAIT) {
 
-				trx_mutex_enter(wait_lock->trx);
-				wait_lock->trx->lock
-					.was_chosen_as_deadlock_victim = TRUE;
+			/* Another trx ahead has requested a lock in an
+			incompatible mode, and is itself waiting for a lock. */
 
-				lock_cancel_waiting_and_release(wait_lock);
-				trx_mutex_exit(wait_lock->trx);
+			/* Save current search state. */
+			if (!lock_deadlock_push(ctx, lock, heap_no)) {
 
-				trx_mutex_enter(start);
+				/* Unable to save current search state, stack
+				size not big enough. */
 
-				/* Since trx and wait_lock are no longer
-				in the waits-for graph, we can return FALSE;
-				note that our selective algorithm can choose
-				several transactions as victims, but still
-				we may end up rolling back also the recursion
-				starting point transaction! */
+				ctx->too_deep = TRUE;
 
-				return(LOCK_VICTIM_IS_OTHER);
+				return(ctx->start->id);
 			}
 
-			if (too_far) {
+			ctx->wait_lock = lock->trx->lock.wait_lock;
+			lock = lock_get_first_lock(ctx, &heap_no);
 
-#ifdef UNIV_DEBUG
-				if (lock_print_waits) {
-					fputs("Deadlock search exceeds"
-					      " max steps or depth.\n",
-					      stderr);
-				}
-#endif /* UNIV_DEBUG */
-				/* The information about transaction/lock
-				to be rolled back is available in the top
-				level. Do not print anything here. */
-				return(LOCK_VICTIM_EXCEED_MAX_DEPTH);
+			if (lock != NULL) {
+				continue;
 			}
+		}
 
-			if (lock_trx->lock.que_state == TRX_QUE_LOCK_WAIT) {
+		if (lock != NULL) {
+			lock = lock_get_next_lock(ctx, lock, heap_no);
+		}
 
-				/* Another trx ahead has requested lock	in an
-				incompatible mode, and is itself waiting for
-				a lock */
+		if (lock == NULL && ctx->depth > 0) {
+			const lock_stack_t const*	stack;
 
-				ret = lock_deadlock_recursive(
-					start, lock_trx,
-					lock_trx->lock.wait_lock,
-					cost, depth + 1, mark_start);
+			/* Restore previous search state. */
 
-				if (ret != LOCK_VICTIM_NONE) {
+			stack = lock_deadlock_pop(ctx);
 
-					return(ret);
-				}
+			if (stack != NULL) {
+				lock = stack->lock;
+				heap_no = stack->heap_no;
+				ctx->wait_lock = stack->wait_lock;
 			}
 		}
-		/* Get the next record lock to check. */
-		if (heap_no != ULINT_UNDEFINED) {
 
-			ut_a(lock != NULL);
+	} while (lock != NULL || ctx->depth > 0);
+
+	/* No deadlock found. */
+	return(0);
+}
+
+/********************************************************************//**
+Print info about transaction that was rolled back. */
+static
+void
+lock_deadlock_joining_trx_print(
+/*============================*/
+	const trx_t*	trx,		/*!< in: transaction rolled back */
+	const lock_t*	lock)		/*!< in: lock trx wants */
+{
+	ut_ad(lock_mutex_own());
+
+	/* If the lock search exceeds the max step
+	or the max depth, the current trx will be
+	the victim. Print its information. */
+	lock_deadlock_start_print();
+
+	lock_deadlock_fputs(
+		"TOO DEEP OR LONG SEARCH IN THE LOCK TABLE"
+		" WAITS-FOR GRAPH, WE WILL ROLL BACK"
+		" FOLLOWING TRANSACTION \n\n"
+		"*** TRANSACTION:\n");
+
+	lock_deadlock_trx_print(trx, 3000);
 
-			lock = lock_rec_get_next(heap_no, lock);
+	lock_deadlock_fputs("*** WAITING FOR THIS LOCK TO BE GRANTED:\n");
+
+	lock_deadlock_lock_print(lock);
+}
+
+/********************************************************************//**
+Rollback transaction selected as the victim. */
+static
+void
+lock_deadlock_trx_rollback(
+/*=======================*/
+	lock_deadlock_ctx_t*	ctx,		/*!< in: deadlock context */
+	trx_id_t		victim_trx_id)	/*!< in: transaction id */
+{
+	trx_t*			trx;
+
+	ut_ad(lock_mutex_own());
+	ut_ad(victim_trx_id == ctx->wait_lock->trx->id);
+
+	trx = ctx->wait_lock->trx;
+
+	lock_deadlock_fputs("*** WE ROLL BACK TRANSACTION (1)\n");
+
+	trx_mutex_enter(trx);
+
+	trx->lock.was_chosen_as_deadlock_victim = TRUE;
+
+	lock_cancel_waiting_and_release(trx->lock.wait_lock);
+
+	trx_mutex_exit(trx);
+}
+
+/********************************************************************//**
+Checks if a joining lock request results in a deadlock. If a deadlock is
+found this function will resolve the dadlock by choosing a victim transaction
+and rolling it back. It will attempt to resolve all deadlocks. The returned
+transaction id will be the joining transaction id or 0 if some other
+transaction was chosen as a victim and rolled back or no deadlock found.
+
+@return id of transaction chosen as victim or 0 */
+static
+trx_id_t
+lock_deadlock_check_and_resolve(
+/*============================*/
+	const lock_t*	lock,	/*!< in: lock the transaction is requesting */
+	const trx_t*	trx)	/*!< in: transaction */
+{
+	trx_id_t	victim_trx_id;
+
+	ut_ad(trx != NULL);
+	ut_ad(lock != NULL);
+	ut_ad(lock_mutex_own());
+	ut_ad(trx->in_trx_list);
+
+	/* Try and resolve as many deadlocks as possible. */
+	do {
+		lock_deadlock_ctx_t	ctx;
+
+		/* Reset the context. */
+		ctx.cost = 0;
+		ctx.depth = 0;
+		ctx.start = trx;
+		ctx.too_deep = FALSE;
+		ctx.wait_lock = lock;
+		ctx.mark_start = lock_mark_counter;
+
+		victim_trx_id = lock_deadlock_search(&ctx);
+
+		/* Search too deep, we rollback the joining transaction. */
+		if (ctx.too_deep) {
+
+			ut_a(trx == ctx.start);
+			ut_a(victim_trx_id == trx->id);
+
+			lock_deadlock_joining_trx_print(trx, lock);
+
+			MONITOR_INC(MONITOR_DEADLOCK);
+
+		} else if (victim_trx_id != 0 && victim_trx_id != trx->id) {
+
+			lock_deadlock_trx_rollback(&ctx, victim_trx_id);
+
+			lock_deadlock_found = TRUE;
+
+			MONITOR_INC(MONITOR_DEADLOCK);
 		}
-	}/* end of the 'for (;;)'-loop */
+
+	} while (victim_trx_id != 0 && victim_trx_id != trx->id);
+
+	/* If the joining transaction was selected as the victim. */
+	if (victim_trx_id != 0) {
+		ut_a(victim_trx_id == trx->id);
+
+		lock_deadlock_fputs("*** WE ROLL BACK TRANSACTION (2)\n");
+
+		lock_deadlock_found = TRUE;
+	}
+
+	return(victim_trx_id);
 }
 
 /*========================= TABLE LOCKS ==============================*/
@@ -3983,8 +4182,9 @@ lock_table_enqueue_waiting(
 	dict_table_t*	table,	/*!< in/out: table */
 	que_thr_t*	thr)	/*!< in: query thread */
 {
-	lock_t*	lock;
-	trx_t*	trx;
+	trx_t*		trx;
+	lock_t*		lock;
+	trx_id_t	victim_trx_id;
 
 	ut_ad(lock_mutex_own());
 
@@ -4021,10 +4221,22 @@ lock_table_enqueue_waiting(
 
 	lock = lock_table_create(table, mode | LOCK_WAIT, trx);
 
-	/* Check if a deadlock occurs: if yes, remove the lock request and
-	return an error code */
+	/* Release the mutex to obey the latching order.
+	This is safe, because lock_deadlock_check_and_resolve()
+	is invoked when a lock wait is enqueued for the currently
+	running transaction. Because trx is a running transaction
+	(it is not currently suspended because of a lock wait),
+	its state can only be changed by this thread, which is
+	currently associated with the transaction. */
+
+	trx_mutex_exit(trx);
 
-	if (lock_deadlock_occurs(lock, trx)) {
+	victim_trx_id = lock_deadlock_check_and_resolve(lock, trx);
+
+	trx_mutex_enter(trx);
+
+	if (victim_trx_id != 0) {
+		ut_ad(victim_trx_id == trx->id);
 
 		/* The order here is important, we don't want to
 		lose the state of the lock before calling remove. */
@@ -4032,9 +4244,7 @@ lock_table_enqueue_waiting(
 		lock_reset_lock_and_trx_wait(lock);
 
 		return(DB_DEADLOCK);
-	}
-
-	if (trx->lock.wait_lock == NULL) {
+	} else if (trx->lock.wait_lock == NULL) {
 		/* Deadlock resolution chose another transaction as a victim,
 		and we accidentally got our lock granted! */
 
@@ -4105,7 +4315,7 @@ lock_table(
 {
 	trx_t*		trx;
 	ulint		err;
-	const lock_t*	lock;
+	const lock_t*	wait_for;
 
 	ut_ad(table && thr);
 
@@ -4133,14 +4343,15 @@ lock_table(
 	/* We have to check if the new lock is compatible with any locks
 	other transactions have in the table lock queue. */
 
-	lock = lock_table_other_has_incompatible(trx, LOCK_WAIT, table, mode);
+	wait_for = lock_table_other_has_incompatible(
+		trx, LOCK_WAIT, table, mode);
 
 	trx_mutex_enter(trx);
 
 	/* Another trx has a request on the table in an incompatible
 	mode: this trx may have to wait */
 
-	if (lock != NULL) {
+	if (wait_for != NULL) {
 		err = lock_table_enqueue_waiting(mode | flags, table, thr);
 	} else {
 		lock_table_create(table, mode | flags, trx);
@@ -5513,10 +5724,11 @@ lock_rec_insert_check_and_lock(
 
 		/* Note that we may get DB_SUCCESS also here! */
 		trx_mutex_enter(trx);
-		err = lock_rec_enqueue_waiting(LOCK_X | LOCK_GAP
-					       | LOCK_INSERT_INTENTION,
-					       block, next_rec_heap_no,
-					       index, thr);
+
+		err = lock_rec_enqueue_waiting(
+			LOCK_X | LOCK_GAP | LOCK_INSERT_INTENTION,
+			block, next_rec_heap_no, index, thr);
+
 		trx_mutex_exit(trx);
 	} else {
 		err = DB_SUCCESS;

=== modified file 'storage/innobase/log/log0recv.c'
--- a/storage/innobase/log/log0recv.c	revid:vasil.dimov@stripped
+++ b/storage/innobase/log/log0recv.c	revid:inaam.rana@stripped
@@ -3477,7 +3477,7 @@ recv_reset_log_files_for_backup(
 		exit(1);
 	}
 
-	os_file_write(name, log_file, buf, 0, 0,
+	os_file_write(name, log_file, buf, 0,
 		      LOG_FILE_HDR_SIZE + OS_FILE_LOG_BLOCK_SIZE);
 	os_file_flush(log_file);
 	os_file_close(log_file);

=== modified file 'storage/innobase/os/os0file.c'
--- a/storage/innobase/os/os0file.c	revid:vasil.dimov@stripped
+++ b/storage/innobase/os/os0file.c	revid:inaam.rana@stripped
@@ -379,12 +379,14 @@ The number should be retrieved before an
 overwrite the error number). If the number is not known to this program,
 the OS error number + 100 is returned.
 @return	error number, or OS error number + 100 */
-UNIV_INTERN
+static
 ulint
-os_file_get_last_error(
-/*===================*/
-	ibool	report_all_errors)	/*!< in: TRUE if we want an error message
-					printed of all errors */
+os_file_get_last_error_low(
+/*=======================*/
+	ibool	report_all_errors,	/*!< in: TRUE if we want an error
+					message printed of all errors */
+	ibool	on_error_silent)	/*!< in: TRUE then don't print any
+					diagnostic to the log */
 {
 	ulint	err;
 
@@ -474,7 +476,7 @@ os_file_get_last_error(
 	err = (ulint) errno;
 
 	if (report_all_errors
-	    || (err != ENOSPC && err != EEXIST)) {
+	    || (err != ENOSPC && err != EEXIST && !on_error_silent)) {
 
 		ut_print_timestamp(stderr);
 		fprintf(stderr,
@@ -543,10 +545,26 @@ os_file_get_last_error(
 #endif
 }
 
+/***********************************************************************//**
+Retrieves the last error number if an error occurs in a file io function.
+The number should be retrieved before any other OS calls (because they may
+overwrite the error number). If the number is not known to this program,
+the OS error number + 100 is returned.
+@return	error number, or OS error number + 100 */
+UNIV_INTERN
+ulint
+os_file_get_last_error(
+/*===================*/
+	ibool	report_all_errors)	/*!< in: TRUE if we want an error
+					message printed of all errors */
+{
+	return(os_file_get_last_error_low(report_all_errors, FALSE));
+}
+
 /****************************************************************//**
 Does error handling when a file operation fails.
 Conditionally exits (calling exit(3)) based on should_exit value and the
-error type
+error type, if should_exit is TRUE then on_error_silent is ignored.
 @return	TRUE if we should retry the operation */
 static
 ibool
@@ -554,14 +572,18 @@ os_file_handle_error_cond_exit(
 /*===========================*/
 	const char*	name,		/*!< in: name of a file or NULL */
 	const char*	operation,	/*!< in: operation */
-	ibool		should_exit)	/*!< in: call exit(3) if unknown error
+	ibool		should_exit,	/*!< in: call exit(3) if unknown error
 					and this parameter is TRUE */
+	ibool		on_error_silent)/*!< in: if TRUE then don't print
+					any message to the log iff it is
+					an unknown non-fatal error */
 {
 	ulint	err;
 
-	err = os_file_get_last_error(FALSE);
+	err = os_file_get_last_error_low(FALSE, on_error_silent);
 
-	if (err == OS_FILE_DISK_FULL) {
+	switch(err) {
+	case OS_FILE_DISK_FULL:
 		/* We only print a warning about disk full once */
 
 		if (os_has_said_disk_full) {
@@ -569,6 +591,9 @@ os_file_handle_error_cond_exit(
 			return(FALSE);
 		}
 
+		/* Disk full error is reported irrespective of the
+		on_error_silent setting. */
+
 		if (name) {
 			ut_print_timestamp(stderr);
 			fprintf(stderr,
@@ -586,42 +611,53 @@ os_file_handle_error_cond_exit(
 		fflush(stderr);
 
 		return(FALSE);
-	} else if (err == OS_FILE_AIO_RESOURCES_RESERVED) {
 
-		return(TRUE);
-	} else if (err == OS_FILE_AIO_INTERRUPTED) {
+	case OS_FILE_AIO_RESOURCES_RESERVED:
+	case OS_FILE_AIO_INTERRUPTED:
 
 		return(TRUE);
-	} else if (err == OS_FILE_ALREADY_EXISTS
-		   || err == OS_FILE_PATH_ERROR) {
+
+	case OS_FILE_PATH_ERROR:
+	case OS_FILE_ALREADY_EXISTS:
 
 		return(FALSE);
-	} else if (err == OS_FILE_SHARING_VIOLATION) {
+
+	case OS_FILE_SHARING_VIOLATION:
 
 		os_thread_sleep(10000000);  /* 10 sec */
 		return(TRUE);
-	} else if (err == OS_FILE_INSUFFICIENT_RESOURCE) {
 
-		os_thread_sleep(100000);	/* 100 ms */
-		return(TRUE);
-	} else if (err == OS_FILE_OPERATION_ABORTED) {
+	case OS_FILE_OPERATION_ABORTED:
+	case OS_FILE_INSUFFICIENT_RESOURCE:
 
 		os_thread_sleep(100000);	/* 100 ms */
 		return(TRUE);
-	} else {
-		if (name) {
-			fprintf(stderr, "InnoDB: File name %s\n", name);
-		}
 
-		fprintf(stderr, "InnoDB: File operation call: '%s'.\n",
-			operation);
+	default:
+
+		/* If it is an operation that can crash on error then it
+		is better to ignore on_error_silent and print an error message
+		to the log. */
+
+		if (should_exit || !on_error_silent) {
+			if (name) {
+				ut_print_timestamp(stderr);
+				fprintf(stderr,
+					"  InnoDB: File name %s\n", name);
+			}
+
+			ut_print_timestamp(stderr);
+			fprintf(stderr, "  InnoDB: File operation call: "
+				"'%s'.\n", operation);
+		}
 
 		if (should_exit) {
-			fprintf(stderr, "InnoDB: Cannot continue operation.\n");
+			ut_print_timestamp(stderr);
+			fprintf(stderr, "  InnoDB: Cannot continue "
+				"operation.\n");
 
 			fflush(stderr);
-
-			exit(1);
+			ut_error;
 		}
 	}
 
@@ -635,11 +671,11 @@ static
 ibool
 os_file_handle_error(
 /*=================*/
-	const char*	name,	/*!< in: name of a file or NULL */
-	const char*	operation)/*!< in: operation */
+	const char*	name,		/*!< in: name of a file or NULL */
+	const char*	operation)	/*!< in: operation */
 {
 	/* exit in case of unknown error */
-	return(os_file_handle_error_cond_exit(name, operation, TRUE));
+	return(os_file_handle_error_cond_exit(name, operation, TRUE, FALSE));
 }
 
 /****************************************************************//**
@@ -649,11 +685,14 @@ static
 ibool
 os_file_handle_error_no_exit(
 /*=========================*/
-	const char*	name,	/*!< in: name of a file or NULL */
-	const char*	operation)/*!< in: operation */
+	const char*	name,		/*!< in: name of a file or NULL */
+	const char*	operation,	/*!< in: operation */
+	ibool		on_error_silent)/*!< in: if TRUE then don't print
+					any message to the log. */
 {
 	/* don't exit in case of unknown error */
-	return(os_file_handle_error_cond_exit(name, operation, FALSE));
+	return(os_file_handle_error_cond_exit(
+			name, operation, FALSE, on_error_silent));
 }
 
 #undef USE_FILE_LOCK
@@ -819,7 +858,7 @@ os_file_closedir(
 	ret = FindClose(dir);
 
 	if (!ret) {
-		os_file_handle_error_no_exit(NULL, "closedir");
+		os_file_handle_error_no_exit(NULL, "closedir", FALSE);
 
 		return(-1);
 	}
@@ -831,7 +870,7 @@ os_file_closedir(
 	ret = closedir(dir);
 
 	if (ret) {
-		os_file_handle_error_no_exit(NULL, "closedir");
+		os_file_handle_error_no_exit(NULL, "closedir", FALSE);
 	}
 
 	return(ret);
@@ -902,8 +941,7 @@ next_file:
 
 		return(1);
 	} else {
-		os_file_handle_error_no_exit(dirname,
-					     "readdir_next_file");
+		os_file_handle_error_no_exit(NULL, "readdir_next_file", FALSE);
 		return(-1);
 	}
 #else
@@ -988,7 +1026,7 @@ next_file:
 			goto next_file;
 		}
 
-		os_file_handle_error_no_exit(full_path, "stat");
+		os_file_handle_error_no_exit(full_path, "stat", FALSE);
 
 		ut_free(full_path);
 
@@ -1070,13 +1108,8 @@ os_file_create_simple_func(
 /*=======================*/
 	const char*	name,	/*!< in: name of the file or path as a
 				null-terminated string */
-	ulint		create_mode,/*!< in: OS_FILE_OPEN if an existing file is
-				opened (if does not exist, error), or
-				OS_FILE_CREATE if a new file is created
-				(if exists, error), or
-				OS_FILE_CREATE_PATH if new file
-				(if exists, error) and subdirectories along
-				its path are created (if needed)*/
+	os_file_create_t
+			create_mode,/*!< in: create mode */
 	ulint		access_type,/*!< in: OS_FILE_READ_ONLY or
 				OS_FILE_READ_WRITE */
 	ibool*		success)/*!< out: TRUE if succeed, FALSE if error */
@@ -1088,6 +1121,8 @@ os_file_create_simple_func(
 	DWORD		attributes	= 0;
 	ibool		retry;
 
+	ut_a(!(create_mode & OS_FILE_ON_ERROR_SILENT));
+	ut_a(!(create_mode & OS_FILE_ON_ERROR_NO_EXIT));
 try_again:
 	ut_a(name);
 
@@ -1146,6 +1181,9 @@ try_again:
 	int		create_flag;
 	ibool		retry;
 
+	ut_a(!(create_mode & OS_FILE_ON_ERROR_SILENT));
+	ut_a(!(create_mode & OS_FILE_ON_ERROR_NO_EXIT));
+
 try_again:
 	ut_a(name);
 
@@ -1213,10 +1251,8 @@ os_file_create_simple_no_error_handling_
 /*=========================================*/
 	const char*	name,	/*!< in: name of the file or path as a
 				null-terminated string */
-	ulint		create_mode,/*!< in: OS_FILE_OPEN if an existing file
-				is opened (if does not exist, error), or
-				OS_FILE_CREATE if a new file is created
-				(if exists, error) */
+	os_file_create_t
+			create_mode,/*!< in: create mode */
 	ulint		access_type,/*!< in: OS_FILE_READ_ONLY,
 				OS_FILE_READ_WRITE, or
 				OS_FILE_READ_ALLOW_DELETE; the last option is
@@ -1232,6 +1268,9 @@ os_file_create_simple_no_error_handling_
 
 	ut_a(name);
 
+	ut_a(!(create_mode & OS_FILE_ON_ERROR_SILENT));
+	ut_a(!(create_mode & OS_FILE_ON_ERROR_NO_EXIT));
+
 	if (create_mode == OS_FILE_OPEN) {
 		create_flag = OPEN_EXISTING;
 	} else if (create_mode == OS_FILE_CREATE) {
@@ -1278,6 +1317,9 @@ os_file_create_simple_no_error_handling_
 
 	ut_a(name);
 
+	ut_a(!(create_mode & OS_FILE_ON_ERROR_SILENT));
+	ut_a(!(create_mode & OS_FILE_ON_ERROR_NO_EXIT));
+
 	if (create_mode == OS_FILE_OPEN) {
 		if (access_type == OS_FILE_READ_ONLY) {
 			create_flag = O_RDONLY;
@@ -1372,14 +1414,8 @@ os_file_create_func(
 /*================*/
 	const char*	name,	/*!< in: name of the file or path as a
 				null-terminated string */
-	ulint		create_mode,/*!< in: OS_FILE_OPEN if an existing file
-				is opened (if does not exist, error), or
-				OS_FILE_CREATE if a new file is created
-				(if exists, error),
-				OS_FILE_OVERWRITE if a new file is created
-				or an old overwritten;
-				OS_FILE_OPEN_RAW, if a raw device or disk
-				partition should be opened */
+	os_file_create_t
+			create_mode,/*!< in: create mode */
 	ulint		purpose,/*!< in: OS_FILE_AIO, if asynchronous,
 				non-buffered i/o is desired,
 				OS_FILE_NORMAL, if any normal file;
@@ -1390,12 +1426,24 @@ os_file_create_func(
 	ulint		type,	/*!< in: OS_DATA_FILE or OS_LOG_FILE */
 	ibool*		success)/*!< out: TRUE if succeed, FALSE if error */
 {
+	ibool		on_error_no_exit;
+	ibool		on_error_silent;
+
 #ifdef __WIN__
 	os_file_t	file;
 	DWORD		share_mode	= FILE_SHARE_READ;
 	DWORD		create_flag;
 	DWORD		attributes;
 	ibool		retry;
+
+	on_error_no_exit = create_mode & OS_FILE_ON_ERROR_NO_EXIT
+		? TRUE : FALSE;
+	on_error_silent = create_mode & OS_FILE_ON_ERROR_SILENT
+		? TRUE : FALSE;
+
+	create_mode &= ~OS_FILE_ON_ERROR_NO_EXIT;
+	create_mode &= ~OS_FILE_ON_ERROR_SILENT;
+
 try_again:
 	ut_a(name);
 
@@ -1478,23 +1526,17 @@ try_again:
 			  NULL);	/*!< no template file */
 
 	if (file == INVALID_HANDLE_VALUE) {
+		const char*	operation;
+
+		operation = create_mode == OS_FILE_CREATE ? "create" : "open";
+
 		*success = FALSE;
 
-		/* When srv_file_per_table is on, file creation failure may not
-		be critical to the whole instance. Do not crash the server in
-		case of unknown errors.
-		Please note "srv_file_per_table" is a global variable with
-		no explicit synchronization protection. It could be
-		changed during this execution path. It might not have the
-		same value as the one when building the table definition */
-		if (srv_file_per_table) {
-			retry = os_file_handle_error_no_exit(name,
-						create_mode == OS_FILE_CREATE ?
-						"create" : "open");
+		if (on_error_no_exit) {
+			retry = os_file_handle_error_no_exit(
+				name, operation, on_error_silent);
 		} else {
-			retry = os_file_handle_error(name,
-						create_mode == OS_FILE_CREATE ?
-						"create" : "open");
+			retry = os_file_handle_error(name, operation);
 		}
 
 		if (retry) {
@@ -1511,6 +1553,14 @@ try_again:
 	ibool		retry;
 	const char*	mode_str	= NULL;
 
+	on_error_no_exit = create_mode & OS_FILE_ON_ERROR_NO_EXIT
+		? TRUE : FALSE;
+	on_error_silent = create_mode & OS_FILE_ON_ERROR_SILENT
+		? TRUE : FALSE;
+
+	create_mode &= ~OS_FILE_ON_ERROR_NO_EXIT;
+	create_mode &= ~OS_FILE_ON_ERROR_SILENT;
+
 try_again:
 	ut_a(name);
 
@@ -1550,23 +1600,17 @@ try_again:
 	file = open(name, create_flag, os_innodb_umask);
 
 	if (file == -1) {
+		const char*	operation;
+
+		operation = create_mode == OS_FILE_CREATE ? "create" : "open";
+
 		*success = FALSE;
 
-		/* When srv_file_per_table is on, file creation failure may not
-		be critical to the whole instance. Do not crash the server in
-		case of unknown errors.
-		Please note "srv_file_per_table" is a global variable with
-		no explicit synchronization protection. It could be
-		changed during this execution path. It might not have the
-		same value as the one when building the table definition */
-		if (srv_file_per_table) {
-			retry = os_file_handle_error_no_exit(name,
-						create_mode == OS_FILE_CREATE ?
-						"create" : "open");
+		if (on_error_no_exit) {
+			retry = os_file_handle_error_no_exit(
+				name, operation, on_error_silent);
 		} else {
-			retry = os_file_handle_error(name,
-						create_mode == OS_FILE_CREATE ?
-						"create" : "open");
+			retry = os_file_handle_error(name, operation);
 		}
 
 		if (retry) {
@@ -1624,7 +1668,8 @@ UNIV_INTERN
 ibool
 os_file_delete_if_exists(
 /*=====================*/
-	const char*	name)	/*!< in: file path as a null-terminated string */
+	const char*	name)	/*!< in: file path as a null-terminated
+				string */
 {
 #ifdef __WIN__
 	BOOL	ret;
@@ -1670,7 +1715,7 @@ loop:
 	ret = unlink(name);
 
 	if (ret != 0 && errno != ENOENT) {
-		os_file_handle_error_no_exit(name, "delete");
+		os_file_handle_error_no_exit(name, "delete", FALSE);
 
 		return(FALSE);
 	}
@@ -1686,7 +1731,8 @@ UNIV_INTERN
 ibool
 os_file_delete(
 /*===========*/
-	const char*	name)	/*!< in: file path as a null-terminated string */
+	const char*	name)	/*!< in: file path as a null-terminated
+				string */
 {
 #ifdef __WIN__
 	BOOL	ret;
@@ -1733,7 +1779,7 @@ loop:
 	ret = unlink(name);
 
 	if (ret != 0) {
-		os_file_handle_error_no_exit(name, "delete");
+		os_file_handle_error_no_exit(name, "delete", FALSE);
 
 		return(FALSE);
 	}
@@ -1764,7 +1810,7 @@ os_file_rename_func(
 		return(TRUE);
 	}
 
-	os_file_handle_error_no_exit(oldpath, "rename");
+	os_file_handle_error_no_exit(oldpath, "rename", FALSE);
 
 	return(FALSE);
 #else
@@ -1773,7 +1819,7 @@ os_file_rename_func(
 	ret = rename(oldpath, newpath);
 
 	if (ret != 0) {
-		os_file_handle_error_no_exit(oldpath, "rename");
+		os_file_handle_error_no_exit(oldpath, "rename", FALSE);
 
 		return(FALSE);
 	}
@@ -2567,7 +2613,7 @@ try_again:
 #ifdef __WIN__
 error_handling:
 #endif
-	retry = os_file_handle_error_no_exit(NULL, "read");
+	retry = os_file_handle_error_no_exit(NULL, "read", FALSE);
 
 	if (retry) {
 		goto try_again;
@@ -2822,7 +2868,7 @@ os_file_status(
 	} else if (ret) {
 		/* file exists, but stat call failed */
 
-		os_file_handle_error_no_exit(path, "stat");
+		os_file_handle_error_no_exit(path, "stat", FALSE);
 
 		return(FALSE);
 	}
@@ -2850,7 +2896,7 @@ os_file_status(
 	} else if (ret) {
 		/* file exists, but stat call failed */
 
-		os_file_handle_error_no_exit(path, "stat");
+		os_file_handle_error_no_exit(path, "stat", FALSE);
 
 		return(FALSE);
 	}
@@ -2894,7 +2940,7 @@ os_file_get_status(
 	} else if (ret) {
 		/* file exists, but stat call failed */
 
-		os_file_handle_error_no_exit(path, "stat");
+		os_file_handle_error_no_exit(path, "stat", FALSE);
 
 		return(FALSE);
 	}
@@ -2925,7 +2971,7 @@ os_file_get_status(
 	} else if (ret) {
 		/* file exists, but stat call failed */
 
-		os_file_handle_error_no_exit(path, "stat");
+		os_file_handle_error_no_exit(path, "stat", FALSE);
 
 		return(FALSE);
 	}

=== modified file 'storage/innobase/row/row0purge.c'
--- a/storage/innobase/row/row0purge.c	revid:vasil.dimov@stripped
+++ b/storage/innobase/row/row0purge.c	revid:inaam.rana@stripped
@@ -541,6 +541,7 @@ skip_secondaries:
 			= upd_get_nth_field(node->update, i);
 
 		if (dfield_is_ext(&ufield->new_val)) {
+			trx_rseg_t*	rseg;
 			buf_block_t*	block;
 			ulint		internal_offset;
 			byte*		data_field;
@@ -561,6 +562,11 @@ skip_secondaries:
 			trx_undo_decode_roll_ptr(node->roll_ptr,
 						 &is_insert, &rseg_id,
 						 &page_no, &offset);
+
+			rseg = trx_sys_get_nth_rseg(trx_sys, rseg_id);
+			ut_a(rseg != NULL);
+			ut_a(rseg->id == rseg_id);
+
 			mtr_start(&mtr);
 
 			/* We have to acquire an X-latch to the clustered
@@ -581,10 +587,9 @@ skip_secondaries:
 
 			btr_root_get(index, &mtr);
 
-			/* We assume in purge of externally stored fields
-			that the space id of the undo log record is 0! */
+			block = buf_page_get(
+				rseg->space, 0, page_no, RW_X_LATCH, &mtr);
 
-			block = buf_page_get(0, 0, page_no, RW_X_LATCH, &mtr);
 			buf_block_dbg_add_level(block, SYNC_TRX_UNDO_PAGE);
 
 			data_field = buf_block_get_frame(block)

=== modified file 'storage/innobase/srv/srv0mon.c'
--- a/storage/innobase/srv/srv0mon.c	revid:vasil.dimov@stripped
+++ b/storage/innobase/srv/srv0mon.c	revid:inaam.rana@stripped
@@ -1024,7 +1024,7 @@ srv_mon_get_rseg_size(void)
 	for (i = 0; i < TRX_SYS_N_RSEGS; ++i) {
 		const trx_rseg_t*	rseg = trx_sys->rseg_array[i];
 
-		if (rseg) {
+		if (rseg != NULL) {
 			value += rseg->curr_size;
 		}
 	}

=== modified file 'storage/innobase/srv/srv0srv.c'
--- a/storage/innobase/srv/srv0srv.c	revid:vasil.dimov@stripped
+++ b/storage/innobase/srv/srv0srv.c	revid:inaam.rana@stripped
@@ -95,6 +95,16 @@ UNIV_INTERN const char	srv_mysql50_table
 names, where the file name itself may also contain a path */
 
 UNIV_INTERN char*	srv_data_home	= NULL;
+
+/** Rollback files directory, can be absolute. */
+UNIV_INTERN char*	srv_undo_dir = NULL;
+
+/** The number of tablespaces to use for rollback segments. */
+UNIV_INTERN ulint	srv_undo_tablespaces = 8;
+
+/* The number of rollback segments to use */
+UNIV_INTERN ulong	srv_undo_logs = 1;
+
 #ifdef UNIV_LOG_ARCHIVE
 UNIV_INTERN char*	srv_arch_dir	= NULL;
 #endif /* UNIV_LOG_ARCHIVE */
@@ -251,9 +261,6 @@ UNIV_INTERN ulong srv_n_purge_threads =
 /* the number of pages to purge in one batch */
 UNIV_INTERN ulong srv_purge_batch_size = 20;
 
-/* the number of rollback segments to use */
-UNIV_INTERN ulong srv_rollback_segments = TRX_SYS_N_RSEGS;
-
 /* variable counts amount of data read in total (in bytes) */
 UNIV_INTERN ulint srv_data_read = 0;
 

=== modified file 'storage/innobase/srv/srv0start.c'
--- a/storage/innobase/srv/srv0start.c	revid:vasil.dimov@stripped
+++ b/storage/innobase/srv/srv0start.c	revid:inaam.rana@stripped
@@ -64,6 +64,7 @@ Created 2/16/1996 Heikki Tuuri
 #include "srv0start.h"
 #include "srv0srv.h"
 #ifndef UNIV_HOTBACKUP
+# include "trx0rseg.h"
 # include "os0proc.h"
 # include "sync0sync.h"
 # include "buf0flu.h"
@@ -111,7 +112,7 @@ UNIV_INTERN ibool	srv_is_being_started =
 /** TRUE if the server was successfully started */
 UNIV_INTERN ibool	srv_was_started = FALSE;
 /** TRUE if innobase_start_or_create_for_mysql() has been called */
-static ibool	srv_start_has_been_called = FALSE;
+static ibool		srv_start_has_been_called = FALSE;
 
 /** At a shutdown this value climbs from SRV_SHUTDOWN_NONE to
 SRV_SHUTDOWN_CLEANUP and then to SRV_SHUTDOWN_LAST_PHASE, and so on */
@@ -133,6 +134,10 @@ static os_fast_mutex_t	srv_os_test_mutex
 static char*	srv_monitor_file_name;
 #endif /* !UNIV_HOTBACKUP */
 
+/** Default undo  tablespace size in UNIV_PAGEs count (10MB). */
+static const ulint SRV_UNDO_TABLESPACE_SIZE_IN_PAGES =
+	((1024 * 1024) * 10) / UNIV_PAGE_SIZE;
+
 /** */
 #define SRV_N_PENDING_IOS_PER_THREAD	OS_AIO_N_PENDING_IOS_PER_THREAD
 #define SRV_MAX_N_PENDING_SYNC_IOS	100
@@ -955,6 +960,315 @@ skip_size_check:
 	return(DB_SUCCESS);
 }
 
+/*********************************************************************//**
+Create undo tablespace.
+@return	DB_SUCCESS or error code */
+static
+enum db_err
+srv_undo_tablespace_create(
+/*=======================*/
+	const char*	name,		/*!< in: tablespace name */
+	ulint		size)		/*!< in: tablespace size in pages */
+{
+	os_file_t	fh;
+	ibool		ret;
+	enum db_err	err = DB_SUCCESS;
+
+	os_file_create_subdirs_if_needed(name);
+
+	fh = os_file_create(
+		innodb_file_data_key, name, OS_FILE_CREATE,
+		OS_FILE_NORMAL, OS_DATA_FILE, &ret);
+
+	if (ret == FALSE
+	    && os_file_get_last_error(FALSE) != OS_FILE_ALREADY_EXISTS
+#ifdef UNIV_AIX
+	    /* AIX 5.1 after security patch ML7 may have
+	    errno set to 0 here, which causes our function
+	    to return 100; work around that AIX problem */
+	    && os_file_get_last_error(FALSE) != 100
+#endif
+		) {
+
+		fprintf(stderr, "InnoDB: Error in creating %s\n", name);
+
+		err = DB_ERROR;
+	} else {
+		/* We created the data file and now write it full of zeros */
+
+		ut_print_timestamp(stderr);
+		fprintf(stderr, "  InnoDB: Data file %s did not"
+				" exist: new to be created\n", name);
+
+		ut_print_timestamp(stderr);
+		fprintf(stderr, "  InnoDB: Setting file %s size to %lu MB\n",
+				name, size >> (20 - UNIV_PAGE_SIZE_SHIFT));
+
+		ut_print_timestamp(stderr);
+		fprintf(stderr, "  InnoDB: Database physically writes the"
+				" file full: wait...\n");
+
+		ret = os_file_set_size(name, fh, size << UNIV_PAGE_SIZE_SHIFT);
+
+		if (!ret) {
+			ut_print_timestamp(stderr);
+			fprintf(stderr, "  InnoDB: Error in creating %s:"
+					" probably out of disk space\n", name);
+
+			err = DB_ERROR;
+		}
+	}
+
+	return(err);
+}
+
+/*********************************************************************//**
+Open an undo tablespace.
+@return	DB_SUCCESS or error code */
+static
+enum db_err
+srv_undo_tablespace_open(
+/*=====================*/
+	const char*	name,		/*!< in: tablespace name */
+	ulint		space)		/*!< in: tablespace id */
+{
+	os_file_t	fh;
+	enum db_err	err;
+	ibool		ret;
+
+	fh = os_file_create(
+		innodb_file_data_key, name,
+		OS_FILE_OPEN_RETRY
+		| OS_FILE_ON_ERROR_NO_EXIT
+		| OS_FILE_ON_ERROR_SILENT,
+		OS_FILE_NORMAL,
+		OS_DATA_FILE,
+		&ret);
+
+	/* If the file open was successful then load the tablespace. */
+
+	if (ret) {
+		os_offset_t	size;
+		os_offset_t	n_pages;
+
+		size = os_file_get_size(fh);
+		ut_a(size != (os_offset_t) -1);
+
+		ret = os_file_close(fh);
+		ut_a(ret);
+
+		/* Load the tablespace into InnoDB's internal
+		data structures. */
+
+		/* We set the biggest space id to the undo tablespace
+		because InnoDB hasn't opened any other tablespace apart
+		from the system tablespace. */
+
+		fil_set_max_space_id_if_bigger(space);
+
+		/* Set the compressed page size to 0 (non-compressed) */
+
+		fil_space_create(name, space, 0, FIL_TABLESPACE);
+
+		ut_a(fil_validate());
+
+		n_pages = size / UNIV_PAGE_SIZE;
+
+		/* On 64 bit Windows ulint can be 32 bit and os_offset_t
+		is 64 bit. It is OK to cast the n_pages to ulint because
+		the unit has been scaled to pages and they are always
+		32 bit. */
+		fil_node_create(name, (ulint) n_pages, space, FALSE);
+
+		err = DB_SUCCESS;
+	} else {
+		err = DB_ERROR;
+	}
+
+	return(err);
+}
+
+/********************************************************************
+Opens the configured number of undo tablespaces.
+@return	DB_SUCCESS or error code */
+static
+enum db_err
+srv_undo_tablespaces_init(
+/*======================*/
+	ibool		create_new_db,		/*!< in: TRUE if new db being
+						created */
+	const ulint	n_conf_tablespaces)	/*!< in: configured undo
+						tablespaces */
+{
+	ulint		i;
+	enum db_err	err = DB_SUCCESS;
+	ulint		prev_space_id = 0;
+	ulint		n_undo_tablespaces;
+	ulint		undo_tablespace_ids[TRX_SYS_N_RSEGS + 1];
+
+	ut_a(n_conf_tablespaces <= TRX_SYS_N_RSEGS);
+
+	memset(undo_tablespace_ids, 0x0, sizeof(undo_tablespace_ids));
+
+	/* Create the undo spaces only if we are creating a new
+	instance. We don't allow creating of new undo tablespaces
+	in an existing instance (yet).  This restriction exists because
+	we check in several places for SYSTEM tablespaces to be less than
+	the min of user defined tablespace ids. Once we implement saving
+	the location of the undo tablespaces and their space ids this
+	restriction will/should be lifted. */
+
+	for (i = 0; create_new_db && i < n_conf_tablespaces; ++i) {
+		char	name[OS_FILE_MAX_PATH];
+
+		ut_snprintf(
+			name, sizeof(name),
+			"%s%cundo%03lu",
+			srv_undo_dir, SRV_PATH_SEPARATOR, i + 1);
+
+		/* Undo space ids start from 1. */
+		err = srv_undo_tablespace_create(
+			name, SRV_UNDO_TABLESPACE_SIZE_IN_PAGES);
+
+		if (err != DB_SUCCESS) {
+			ut_print_timestamp(stderr);
+			fprintf(stderr,
+				" InnoDB: Could not create "
+				"undo tablespace '%s'.\n", name);
+
+			return(err);
+		}
+	}
+
+	/* Get the tablespace ids of all the undo segments excluding
+	the system tablespace (0). If we are creating a new instance then
+	we build the undo_tablespace_ids ourselves since they don't
+	already exist. */
+
+	if (!create_new_db) {
+		n_undo_tablespaces = trx_rseg_get_n_undo_tablespaces(
+			undo_tablespace_ids);
+	} else {
+		n_undo_tablespaces = n_conf_tablespaces;
+
+		for (i = 1; i <= n_undo_tablespaces; ++i) {
+			undo_tablespace_ids[i - 1] = i;
+		}
+
+		undo_tablespace_ids[i] = ULINT_UNDEFINED;
+	}
+
+	/* Open all the undo tablespaces that are currently in use. If we
+	fail to open any of these it is a fatal error. The tablespace ids
+	should be contiguous. It is a fatal error because they are required
+	for recovery and are referenced by the UNDO logs (a.k.a RBS). */
+
+	for (i = 0; i < n_undo_tablespaces; ++i) {
+		char	name[OS_FILE_MAX_PATH];
+
+		ut_snprintf(
+			name, sizeof(name),
+			"%s%cundo%03lu",
+			srv_undo_dir, SRV_PATH_SEPARATOR,
+			undo_tablespace_ids[i]);
+
+		/* Should be no gaps in undo tablespace ids. */
+		ut_a(prev_space_id + 1 == undo_tablespace_ids[i]);
+
+		/* The system space id should not be in this array. */
+		ut_a(undo_tablespace_ids[i] != 0);
+		ut_a(undo_tablespace_ids[i] != ULINT_UNDEFINED);
+
+		/* Undo space ids start from 1. */
+
+		err = srv_undo_tablespace_open(name, undo_tablespace_ids[i]);
+
+		if (err != DB_SUCCESS) {
+			ut_print_timestamp(stderr);
+			fprintf(stderr,
+				" InnoDB: Error opening undo "
+				"tablespace %s.\n", name);
+
+			return(err);
+		}
+
+		prev_space_id = undo_tablespace_ids[i];
+	}
+
+	/* Open any extra unused undo tablespaces. These must be contiguous.
+	We stop at the first failure. These are undo tablespaces that are
+	not in use and therefore not required by recovery. We only check
+	that there are no gaps. */
+
+	for (i = prev_space_id + 1; i < TRX_SYS_N_RSEGS; ++i) {
+		char	name[OS_FILE_MAX_PATH];
+
+		ut_snprintf(
+			name, sizeof(name),
+			"%s%cundo%03lu", srv_undo_dir, SRV_PATH_SEPARATOR, i);
+
+		/* Undo space ids start from 1. */
+		err = srv_undo_tablespace_open(name, i);
+
+		if (err != DB_SUCCESS) {
+			break;
+		}
+
+		++n_undo_tablespaces;
+	}
+
+	/* If the user says that there are fewer than what we find we
+	tolerate that discrepancy but not the inverse. Because there could
+	be unused undo tablespaces for future use. */
+
+	if (n_conf_tablespaces > n_undo_tablespaces) {
+		ut_print_timestamp(stderr);
+		fprintf(stderr,
+			" InnoDB: Expected to open %lu undo "
+			"tablespaces but was able\n",
+			n_conf_tablespaces);
+		ut_print_timestamp(stderr);
+		fprintf(stderr,
+			" InnoDB: to find only %lu undo "
+			"tablespaces.\n", n_undo_tablespaces);
+		ut_print_timestamp(stderr);
+		fprintf(stderr,
+			" InnoDB: Set the "
+			"innodb_undo_tablespaces parameter to "
+			"the\n");
+		ut_print_timestamp(stderr);
+		fprintf(stderr,
+			" InnoDB: correct value and retry. Suggested "
+			"value is %lu\n", n_undo_tablespaces);
+
+		return(err != DB_SUCCESS ? err : DB_ERROR);
+	}
+
+	if (n_undo_tablespaces > 0) {
+		ut_print_timestamp(stderr);
+		fprintf(stderr,
+			" InnoDB: Opened %lu undo tablespaces\n",
+			n_conf_tablespaces);
+	}
+
+	if (create_new_db) {
+		mtr_t	mtr;
+
+		mtr_start(&mtr);
+
+		/* The undo log tablespace */
+		for (i = 1; i <= n_undo_tablespaces; ++i) {
+
+			fsp_header_init(
+				i, SRV_UNDO_TABLESPACE_SIZE_IN_PAGES, &mtr);
+		}
+
+		mtr_commit(&mtr);
+	}
+
+	return(DB_SUCCESS);
+}
+
 /********************************************************************
 Starts InnoDB and creates a new database if database files
 are not found and the user wants.
@@ -980,8 +1294,6 @@ innobase_start_or_create_for_mysql(void)
 	ulint		err;
 	ulint		i;
 	ulint		io_limit;
-	my_bool		srv_file_per_table_original_value
-		= srv_file_per_table;
 	mtr_t		mtr;
 	ib_bh_t*	ib_bh;
 
@@ -1027,11 +1339,6 @@ innobase_start_or_create_for_mysql(void)
 			"of memory.\n");
 	}
 
-	/* System tables are created in tablespace 0.  Thus, we must
-	temporarily clear srv_file_per_table.  This is ok, because the
-	server will not accept connections (which could modify
-	innodb_file_per_table) until this function has returned. */
-	srv_file_per_table = FALSE;
 #ifdef UNIV_DEBUG
 	ut_print_timestamp(stderr);
 	fprintf(stderr,
@@ -1319,8 +1626,7 @@ innobase_start_or_create_for_mysql(void)
 		    srv_n_write_io_threads,
 		    SRV_MAX_N_PENDING_SYNC_IOS);
 
-	fil_init(srv_file_per_table ? 50000 : 5000,
-		 srv_max_n_open_files);
+	fil_init(srv_file_per_table ? 50000 : 5000, srv_max_n_open_files);
 
 	/* Print time to initialize the buffer pool */
 	ut_print_timestamp(stderr);
@@ -1526,6 +1832,17 @@ innobase_start_or_create_for_mysql(void)
 
 	fil_open_log_and_system_tablespace_files();
 
+	err = srv_undo_tablespaces_init(create_new_db, srv_undo_tablespaces);
+
+	/* If the force recovery is set very high then we carry on regardless
+	of all errors. Basically this is fingers crossed mode. */
+
+	if (err != DB_SUCCESS
+	    && srv_force_recovery < SRV_FORCE_NO_UNDO_LOG_SCAN) {
+
+		return((int) err);
+	}
+
 	if (log_created && !create_new_db
 #ifdef UNIV_LOG_ARCHIVE
 	    && !srv_archive_recovery
@@ -1648,7 +1965,7 @@ innobase_start_or_create_for_mysql(void)
 	} else {
 
 		/* Check if we support the max format that is stamped
-		on the system tablespace. 
+		on the system tablespace.
 		Note:  We are NOT allowed to make any modifications to
 		the TRX_SYS_PAGE_NO page before recovery  because this
 		page also contains the max_trx_id etc. important system
@@ -1794,7 +2111,16 @@ innobase_start_or_create_for_mysql(void)
 	running in single threaded mode essentially. Only the IO threads
 	should be running at this stage. */
 
-	trx_sys_create_rsegs(TRX_SYS_N_RSEGS - 1);
+	ut_a(srv_undo_logs > 0);
+	ut_a(srv_undo_logs <= TRX_SYS_N_RSEGS);
+
+	/* Note: We set the config variable here to the number of rollback
+	segments that are actually active. This allows the user to discover
+	the currently configured number of undo segments in an existing
+	instance. */
+
+	srv_undo_logs = trx_sys_create_rsegs(
+		srv_undo_tablespaces, srv_undo_logs);
 
 	/* Create the thread which watches the timeouts for lock waits */
 	os_thread_create(&lock_wait_timeout_thread, NULL,
@@ -2071,8 +2397,6 @@ innobase_start_or_create_for_mysql(void)
 		ibuf_update_max_tablespace_id();
 	}
 
-	srv_file_per_table = srv_file_per_table_original_value;
-
 	/* Create the buffer pool dump/load thread */
 	os_thread_create(buf_dump_thread, NULL, NULL);
 

=== modified file 'storage/innobase/trx/trx0purge.c'
--- a/storage/innobase/trx/trx0purge.c	revid:vasil.dimov@stripped
+++ b/storage/innobase/trx/trx0purge.c	revid:inaam.rana@stripped
@@ -707,8 +707,10 @@ trx_purge_get_rseg_with_min_trx_id(
 
 	ut_a(purge_sys->rseg->last_page_no != FIL_NULL);
 
-	/* We assume in purge of externally stored fields that space id == 0 */
-	ut_a(purge_sys->rseg->space == 0);
+	/* We assume in purge of externally stored fields that space id is
+	in the range of UNDO tablespace space ids */
+	ut_a(purge_sys->rseg->space <= srv_undo_tablespaces);
+
 	zip_size = purge_sys->rseg->zip_size;
 
 	ut_a(purge_sys->iter.trx_no <= purge_sys->rseg->last_trx_no);
@@ -745,7 +747,8 @@ trx_purge_read_undo_rec(
 		mtr_start(&mtr);
 
 		undo_rec = trx_undo_get_first_rec(
-			0 /* System space id */, zip_size,
+			purge_sys->rseg->space,
+			zip_size,
 			purge_sys->hdr_page_no,
 			purge_sys->hdr_offset, RW_S_LATCH, &mtr);
 

=== modified file 'storage/innobase/trx/trx0rseg.c'
--- a/storage/innobase/trx/trx0rseg.c	revid:vasil.dimov@stripped
+++ b/storage/innobase/trx/trx0rseg.c	revid:inaam.rana@stripped
@@ -67,8 +67,7 @@ trx_rseg_header_create(
 				MTR_MEMO_X_LOCK));
 
 	/* Allocate a new file segment for the rollback segment */
-	block = fseg_create(space, 0,
-			    TRX_RSEG + TRX_RSEG_FSEG_HEADER, mtr);
+	block = fseg_create(space, 0, TRX_RSEG + TRX_RSEG_FSEG_HEADER, mtr);
 
 	if (block == NULL) {
 		/* No space left */
@@ -152,7 +151,10 @@ trx_rseg_mem_free(
 		trx_undo_mem_free(undo);
 	}
 
-	trx_sys_set_nth_rseg(trx_sys, rseg->id, NULL);
+	/* const_cast<trx_rseg_t*>() because this function is
+	like a destructor.  */
+
+	*((trx_rseg_t**) trx_sys->rseg_array + rseg->id) = NULL;
 
 	mem_free(rseg);
 }
@@ -193,7 +195,9 @@ trx_rseg_mem_create(
 
 	mutex_create(rseg_mutex_key, &rseg->mutex, SYNC_RSEG);
 
-	trx_sys_set_nth_rseg(trx_sys, id, rseg);
+	/* const_cast<trx_rseg_t*>() because this function is
+	like a constructor.  */
+	*((trx_rseg_t**) trx_sys->rseg_array + rseg->id) = rseg;
 
 	rseg_header = trx_rsegf_get_new(space, zip_size, page_no, mtr);
 
@@ -268,9 +272,7 @@ trx_rseg_create_instance(
 
 		page_no = trx_sysf_rseg_get_page_no(sys_header, i, mtr);
 
-		if (page_no == FIL_NULL) {
-			trx_sys_set_nth_rseg(trx_sys, i, NULL);
-		} else {
+		if (page_no != FIL_NULL) {
 			ulint		space;
 			ulint		zip_size;
 			trx_rseg_t*	rseg = NULL;
@@ -285,6 +287,8 @@ trx_rseg_create_instance(
 				i, space, zip_size, page_no, ib_bh, mtr);
 
 			ut_a(rseg->id == i);
+		} else {
+			ut_a(trx_sys->rseg_array[i] == NULL);
 		}
 	}
 }
@@ -294,8 +298,9 @@ Creates a rollback segment.
 @return pointer to new rollback segment if create successful */
 UNIV_INTERN
 trx_rseg_t*
-trx_rseg_create(void)
-/*=================*/
+trx_rseg_create(
+/*============*/
+	ulint		space)		/*!< in: id of UNDO tablespace */
 {
 	mtr_t		mtr;
 	ulint		slot_no;
@@ -305,24 +310,25 @@ trx_rseg_create(void)
 
 	/* To obey the latching order, acquire the file space
 	x-latch before the trx_sys->lock. */
-	mtr_x_lock(fil_space_get_latch(TRX_SYS_SPACE, NULL), &mtr);
+	mtr_x_lock(fil_space_get_latch(space, NULL), &mtr);
 
 	slot_no = trx_sysf_rseg_find_free(&mtr);
 
 	if (slot_no != ULINT_UNDEFINED) {
-		ulint		space;
+		ulint		id;
 		ulint		page_no;
 		ulint		zip_size;
 		trx_sysf_t*	sys_header;
 
 		page_no = trx_rseg_header_create(
-			TRX_SYS_SPACE, 0, ULINT_MAX, slot_no, &mtr);
+			space, 0, ULINT_MAX, slot_no, &mtr);
 
 		ut_a(page_no != FIL_NULL);
 
 		sys_header = trx_sysf_get(&mtr);
 
-		space = trx_sysf_rseg_get_space(sys_header, slot_no, &mtr);
+		id = trx_sysf_rseg_get_space(sys_header, slot_no, &mtr);
+		ut_a(id == space);
 
 		zip_size = space ? fil_space_get_zip_size(space) : 0;
 
@@ -351,3 +357,69 @@ trx_rseg_array_init(
 
 	trx_rseg_create_instance(sys_header, ib_bh, mtr);
 }
+
+/********************************************************************
+Get the number of unique rollback tablespaces in use except space id 0.
+The last space id will be the sentinel value ULINT_UNDEFINED. The array
+will be sorted on space id. Note: space_ids should have have space for
+TRX_SYS_N_RSEGS + 1 elements.
+@return number of unique rollback tablespaces in use. */
+UNIV_INTERN
+ulint
+trx_rseg_get_n_undo_tablespaces(
+/*============================*/
+	ulint*		space_ids)	/*!< out: array of space ids of
+					UNDO tablespaces */
+{
+	ulint		i;
+	mtr_t		mtr;
+	trx_sysf_t*	sys_header;
+	ulint		n_undo_tablespaces = 0;
+	ulint		space_ids_aux[TRX_SYS_N_RSEGS + 1];
+
+	mtr_start(&mtr);
+
+	sys_header = trx_sysf_get(&mtr);
+
+	for (i = 0; i < TRX_SYS_N_RSEGS; i++) {
+		ulint	page_no;
+		ulint	space;
+
+		page_no = trx_sysf_rseg_get_page_no(sys_header, i, &mtr);
+
+		if (page_no == FIL_NULL) {
+			continue;
+		}
+
+		space = trx_sysf_rseg_get_space(sys_header, i, &mtr);
+
+		if (space != 0) {
+			ulint	j;
+			ibool	found = FALSE;
+
+			for (j = 0; j < n_undo_tablespaces; ++j) {
+				if (space_ids[j] == space) {
+					found = TRUE;
+					break;
+				}
+			}
+
+			if (!found) {
+				ut_a(n_undo_tablespaces <= i);
+				space_ids[n_undo_tablespaces++] = space;
+			}
+		}
+	}
+
+	mtr_commit(&mtr);
+
+	ut_a(n_undo_tablespaces <= TRX_SYS_N_RSEGS);
+
+	space_ids[n_undo_tablespaces] = ULINT_UNDEFINED;
+
+	if (n_undo_tablespaces > 0) {
+		ut_ulint_sort(space_ids, space_ids_aux, 0, n_undo_tablespaces);
+	}
+
+	return(n_undo_tablespaces);
+}

=== modified file 'storage/innobase/trx/trx0sys.c'
--- a/storage/innobase/trx/trx0sys.c	revid:vasil.dimov@stripped
+++ b/storage/innobase/trx/trx0sys.c	revid:inaam.rana@stripped
@@ -1008,7 +1008,9 @@ trx_sys_init_at_db_start(void)
 
 	sys_header = trx_sysf_get(&mtr);
 
-	trx_rseg_array_init(sys_header, ib_bh, &mtr);
+	if (srv_force_recovery < SRV_FORCE_NO_UNDO_LOG_SCAN) {
+		trx_rseg_array_init(sys_header, ib_bh, &mtr);
+	}
 
 	/* VERY important: after the database is started, max_trx_id value is
 	divisible by TRX_SYS_TRX_ID_WRITE_MARGIN, and the 'if' in
@@ -1366,36 +1368,69 @@ trx_sys_file_format_close(void)
 }
 
 /*********************************************************************
-Creates the rollback segments */
+Creates the rollback segments.
+@return number of rollback segments that are active. */
 UNIV_INTERN
-void
+ulint
 trx_sys_create_rsegs(
 /*=================*/
+	ulint	n_spaces,	/*!< number of tablespaces for UNDO logs */
 	ulint	n_rsegs)	/*!< number of rollback segments to create */
 {
-	ulint	new_rsegs = 0;
+	mtr_t	mtr;
+	ulint	n_used;
+
+	ut_a(n_spaces < TRX_SYS_N_RSEGS);
+	ut_a(n_rsegs <= TRX_SYS_N_RSEGS);
+
+	if (srv_force_recovery >= SRV_FORCE_NO_UNDO_LOG_SCAN) {
+		return(ULINT_UNDEFINED);
+	}
+
+	/* This is executed in single-threaded mode therefore it is not
+	necessary to use the same mtr in trx_rseg_create(). n_used cannot
+	change while the function is executing. */
 
-	/* Do not create additional rollback segments if
-	innodb_force_recovery has been set and the database
-	was not shutdown cleanly. */
-	if (!srv_force_recovery && !recv_needed_recovery) {
+	mtr_start(&mtr);
+	n_used = trx_sysf_rseg_find_free(&mtr);
+	mtr_commit(&mtr);
+
+	if (n_used == ULINT_UNDEFINED) {
+		n_used = TRX_SYS_N_RSEGS;
+	}
+
+	/* Do not create additional rollback segments if innodb_force_recovery
+	has been set and the database was not shutdown cleanly. */
+
+	if (!srv_force_recovery && !recv_needed_recovery && n_used < n_rsegs) {
 		ulint	i;
+		ulint	new_rsegs = n_rsegs - n_used;
+
+		for (i = 0; i < new_rsegs; ++i) {
+			ulint	space;
 
-		for (i = 0;  i < n_rsegs; ++i) {
+			/* Tablespace 0 is the system tablespace. All UNDO
+			log tablespaces start from 1. */
+
+			if (n_spaces > 0) {
+				space = (i % n_spaces) + 1;
+			} else {
+				space = 0; /* System tablespace */
+			}
 
-			if (trx_rseg_create() != NULL) {
-				++new_rsegs;
+			if (trx_rseg_create(space) != NULL) {
+				++n_used;
 			} else {
 				break;
 			}
 		}
 	}
 
-	if (new_rsegs > 0) {
-		fprintf(stderr,
-			"InnoDB: %lu rollback segment(s) active.\n",
-		       	new_rsegs);
-	}
+	ut_print_timestamp(stderr);
+	fprintf(stderr, " InnoDB: %lu rollback segment(s) are active.\n",
+		n_used);
+
+	return(n_used);
 }
 
 #else /* !UNIV_HOTBACKUP */
@@ -1470,18 +1505,18 @@ trx_sys_read_file_format_id(
 		ut_print_timestamp(stderr);
 
 		fprintf(stderr,
-"  ibbackup: Error: trying to read system tablespace file format,\n"
-"  ibbackup: but could not open the tablespace file %s!\n",
-			pathname
-		);
+			"  ibbackup: Error: trying to read system tablespace "
+			"file format,\n"
+			"  ibbackup: but could not open the tablespace "
+			"file %s!\n", pathname);
 		return(FALSE);
 	}
 
 	/* Read the page on which file format is stored */
 
 	success = os_file_read_no_error_handling(
-		file, page, TRX_SYS_PAGE_NO * UNIV_PAGE_SIZE, 0, UNIV_PAGE_SIZE
-	);
+		file, page, TRX_SYS_PAGE_NO * UNIV_PAGE_SIZE, UNIV_PAGE_SIZE);
+
 	if (!success) {
 		/* The following call prints an error message */
 		os_file_get_last_error(TRUE);
@@ -1489,10 +1524,11 @@ trx_sys_read_file_format_id(
 		ut_print_timestamp(stderr);
 
 		fprintf(stderr,
-"  ibbackup: Error: trying to read system table space file format,\n"
-"  ibbackup: but failed to read the tablespace file %s!\n",
-			pathname
-		);
+			"  ibbackup: Error: trying to read system tablespace "
+			"file format,\n"
+			"  ibbackup: but failed to read the tablespace "
+			"file %s!\n", pathname);
+
 		os_file_close(file);
 		return(FALSE);
 	}
@@ -1514,7 +1550,6 @@ trx_sys_read_file_format_id(
 	return(TRUE);
 }
 
-
 /*****************************************************************//**
 Reads the file format id from the given per-table data file.
 @return TRUE if call succeeds */
@@ -1546,33 +1581,34 @@ trx_sys_read_pertable_file_format_id(
 	if (!success) {
 		/* The following call prints an error message */
 		os_file_get_last_error(TRUE);
-        
+
 		ut_print_timestamp(stderr);
-        
+
 		fprintf(stderr,
-"  ibbackup: Error: trying to read per-table tablespace format,\n"
-"  ibbackup: but could not open the tablespace file %s!\n",
-			pathname
-		);
+			"  ibbackup: Error: trying to read per-table "
+			"tablespace format,\n"
+			"  ibbackup: but could not open the tablespace "
+			"file %s!\n", pathname);
+
 		return(FALSE);
 	}
 
 	/* Read the first page of the per-table datafile */
 
-	success = os_file_read_no_error_handling(
-		file, page, 0, 0, UNIV_PAGE_SIZE
-	);
+	success = os_file_read_no_error_handling(file, page, 0, UNIV_PAGE_SIZE);
+
 	if (!success) {
 		/* The following call prints an error message */
 		os_file_get_last_error(TRUE);
-        
+
 		ut_print_timestamp(stderr);
-        
+
 		fprintf(stderr,
-"  ibbackup: Error: trying to per-table data file format,\n"
-"  ibbackup: but failed to read the tablespace file %s!\n",
-			pathname
-		);
+			"  ibbackup: Error: trying to per-table data file "
+			"format,\n"
+			"  ibbackup: but failed to read the tablespace "
+			"file %s!\n", pathname);
+
 		os_file_close(file);
 		return(FALSE);
 	}

=== modified file 'storage/innobase/trx/trx0trx.c'
--- a/storage/innobase/trx/trx0trx.c	revid:vasil.dimov@stripped
+++ b/storage/innobase/trx/trx0trx.c	revid:inaam.rana@stripped
@@ -621,12 +621,18 @@ UNIV_INLINE
 trx_rseg_t*
 trx_assign_rseg(
 /*============*/
-	ulint	max_undo_logs)	/*!< in: maximum number of UNDO logs to use */
+	ulint	max_undo_logs,	/*!< in: maximum number of UNDO logs to use */
+	ulint	n_tablespaces)	/*!< in: number of rollback tablespaces */
 {
 	ulint		i;
 	trx_rseg_t*	rseg;
 	static ulint	latest_rseg = 0;
 
+	if (srv_force_recovery >= SRV_FORCE_NO_UNDO_LOG_SCAN) {
+		ut_a(max_undo_logs == ULINT_UNDEFINED);
+		return(NULL);
+	}
+
 	/* This breaks true round robin but that should be OK. */
 
 	ut_a(max_undo_logs > 0 && max_undo_logs <= TRX_SYS_N_RSEGS);
@@ -640,13 +646,20 @@ trx_assign_rseg(
 
 	ut_a(trx_sys->rseg_array[0] != NULL);
 
+	/* Skip the system tablespace if we have more than one tablespace
+	defined for rollback segments. We want all UNDO records to be in
+	the non-system tablespaces. */
+
 	do {
 		rseg = trx_sys->rseg_array[i];
 		ut_a(rseg == NULL || i == rseg->id);
 
 		i = (rseg == NULL) ? 0 : i + 1;
 
-	} while (rseg == NULL);
+	} while (rseg == NULL
+		 || (rseg->space == 0
+		     && n_tablespaces > 0
+		     && trx_sys->rseg_array[1] != NULL));
 
 	return(rseg);
 }
@@ -670,7 +683,7 @@ trx_start_low(
 	/* The initial value for trx->no: IB_ULONGLONG_MAX is used in
 	read_view_open_now: */
 
-	rseg = trx_assign_rseg(srv_rollback_segments);
+	rseg = trx_assign_rseg(srv_undo_logs, srv_undo_tablespaces);
 
 	trx->no = IB_ULONGLONG_MAX;
 

=== modified file 'storage/innobase/trx/trx0undo.c'
--- a/storage/innobase/trx/trx0undo.c	revid:vasil.dimov@stripped
+++ b/storage/innobase/trx/trx0undo.c	revid:inaam.rana@stripped
@@ -1387,8 +1387,6 @@ trx_undo_lists_init(
 /*================*/
 	trx_rseg_t*	rseg)	/*!< in: rollback segment memory object */
 {
-	ulint		page_no;
-	trx_undo_t*	undo;
 	ulint		size	= 0;
 	trx_rsegf_t*	rseg_header;
 	ulint		i;
@@ -1401,10 +1399,12 @@ trx_undo_lists_init(
 
 	mtr_start(&mtr);
 
-	rseg_header = trx_rsegf_get_new(rseg->space, rseg->zip_size,
-					rseg->page_no, &mtr);
+	rseg_header = trx_rsegf_get_new(
+		rseg->space, rseg->zip_size, rseg->page_no, &mtr);
 
 	for (i = 0; i < TRX_RSEG_N_SLOTS; i++) {
+		ulint	page_no;
+
 		page_no = trx_rsegf_get_nth_undo(rseg_header, i, &mtr);
 
 		/* In forced recovery: try to avoid operations which look
@@ -1415,8 +1415,11 @@ trx_undo_lists_init(
 		if (page_no != FIL_NULL
 		    && srv_force_recovery < SRV_FORCE_NO_UNDO_LOG_SCAN) {
 
-			undo = trx_undo_mem_create_at_db_start(rseg, i,
-							       page_no, &mtr);
+			trx_undo_t*	undo;
+
+			undo = trx_undo_mem_create_at_db_start(
+				rseg, i, page_no, &mtr);
+
 			size += undo->size;
 
 			mtr_commit(&mtr);

=== modified file 'storage/innobase/ut/ut0ut.c'
--- a/storage/innobase/ut/ut0ut.c	revid:vasil.dimov@stripped
+++ b/storage/innobase/ut/ut0ut.c	revid:inaam.rana@stripped
@@ -24,6 +24,7 @@ Created 5/11/1994 Heikki Tuuri
 ********************************************************************/
 
 #include "ut0ut.h"
+#include "ut0sort.h"
 
 #ifdef UNIV_NONINL
 #include "ut0ut.ic"
@@ -434,6 +435,21 @@ ut_print_buf(
 	putc(';', file);
 }
 
+/**********************************************************************//**
+Sort function for ulint arrays. */
+UNIV_INTERN
+void
+ut_ulint_sort(
+/*==========*/
+	ulint*	arr,		/*!< in/out: array to sort */
+	ulint*	aux_arr,	/*!< in/out: aux array to use in sort */
+	ulint	low,		/*!< in: lower bound */
+	ulint	high)		/*!< in: upper bound */
+{
+	UT_SORT_FUNCTION_BODY(ut_ulint_sort, arr, aux_arr, low, high,
+			      ut_ulint_cmp);
+}
+
 /*************************************************************//**
 Calculates fast the number rounded up to the nearest power of 2.
 @return	first power of 2 which is >= n */


Attachment: [text/bzr-bundle] bzr/inaam.rana@oracle.com-20110809204318-igczx1h1zdyh9h5t.bundle
Thread
bzr commit into mysql-trunk branch (inaam.rana:3232) Inaam Rana10 Aug