List:Commits« Previous MessageNext Message »
From:Sunny Bains Date:April 28 2010 7:29am
Subject:bzr commit into mysql-trunk-innodb branch (Sunny.Bains:3043)
View as plain text  
#At file:///Users/sunny/innodb/bzr/mysql-trunk-innodb/ based on revid:sunny.bains@stripped

 3043 Sunny Bains	2010-04-28
      Local commit of the MT purge patch for testing on ReviewBoard

    modified:
      storage/innobase/buf/buf0buf.c
      storage/innobase/handler/ha_innodb.cc
      storage/innobase/include/buf0buf.h
      storage/innobase/include/os0sync.h
      storage/innobase/include/os0sync.ic
      storage/innobase/include/que0que.h
      storage/innobase/include/row0purge.h
      storage/innobase/include/srv0srv.h
      storage/innobase/include/trx0purge.h
      storage/innobase/include/trx0roll.h
      storage/innobase/que/que0que.c
      storage/innobase/row/row0purge.c
      storage/innobase/srv/srv0srv.c
      storage/innobase/srv/srv0start.c
      storage/innobase/trx/trx0purge.c
      storage/innobase/trx/trx0roll.c
      storage/innobase/trx/trx0sys.c
=== modified file 'storage/innobase/buf/buf0buf.c'
--- a/storage/innobase/buf/buf0buf.c	revid:sunny.bains@stripped
+++ b/storage/innobase/buf/buf0buf.c	revid:sunny.bains@stripped
@@ -1244,8 +1244,8 @@ buf_pool_init_instance(
 		buf_pool->no_flush[i] = os_event_create(NULL);
 	}
 
-	/* 3. Initialize LRU fields
-	--------------------------- */
+	buf_pool->watch = mem_zalloc(
+		sizeof(*buf_pool->watch) * BUF_POOL_WATCH_SIZE);
 
 	/* All fields are initialized by mem_zalloc(). */
 

=== modified file 'storage/innobase/handler/ha_innodb.cc'
--- a/storage/innobase/handler/ha_innodb.cc	revid:sunny.bains@stripped
+++ b/storage/innobase/handler/ha_innodb.cc	revid:sunny.bains@stripped
@@ -10708,11 +10708,11 @@ static MYSQL_SYSVAR_ULONG(purge_batch_si
 
 static MYSQL_SYSVAR_ULONG(purge_threads, srv_n_purge_threads,
   PLUGIN_VAR_OPCMDARG | PLUGIN_VAR_READONLY,
-  "Purge threads can be either 0 or 1. Default is 0.",
+  "Purge threads can be from 0 to 32. Default is 0.",
   NULL, NULL,
   0,			/* Default setting */
   0,			/* Minimum value */
-  1, 0);		/* Maximum value */
+  32, 0);		/* Maximum value */
 
 static MYSQL_SYSVAR_ULONG(fast_shutdown, innobase_fast_shutdown,
   PLUGIN_VAR_OPCMDARG,

=== modified file 'storage/innobase/include/buf0buf.h'
--- a/storage/innobase/include/buf0buf.h	revid:sunny.bains@stripped
+++ b/storage/innobase/include/buf0buf.h	revid:sunny.bains@stripped
@@ -66,8 +66,7 @@ Created 11/5/1995 Heikki Tuuri
 #define MAX_BUFFER_POOLS 64		/*!< The maximum number of buffer
 					pools that can be defined */
 
-#define BUF_POOL_WATCH_SIZE 1		/*!< Maximum number of concurrent
-					buffer pool watches */
+#define BUF_POOL_WATCH_SIZE 		(srv_n_purge_threads + 1)
 
 extern	buf_pool_t*	buf_pool_ptr[MAX_BUFFER_POOLS]; /*!< The buffer pools
 					of the database */
@@ -1646,7 +1645,7 @@ struct buf_pool_struct{
 	UT_LIST_BASE_NODE_T(buf_page_t) zip_free[BUF_BUDDY_SIZES];
 					/*!< buddy free lists */
 
-	buf_page_t			watch[BUF_POOL_WATCH_SIZE];
+	buf_page_t*			watch;
 					/*!< Sentinel records for buffer
 					pool watches. Protected by
 				       	buf_pool->mutex. */

=== modified file 'storage/innobase/include/os0sync.h'
--- a/storage/innobase/include/os0sync.h	revid:sunny.bains@stripped
+++ b/storage/innobase/include/os0sync.h	revid:sunny.bains@stripped
@@ -36,6 +36,7 @@ Created 9/6/1995 Heikki Tuuri
 
 #include "univ.i"
 #include "ut0lst.h"
+#include "sync0types.h"
 
 #ifdef __WIN__
 
@@ -282,6 +283,31 @@ os_fast_mutex_free(
 /*===============*/
 	os_fast_mutex_t*	fast_mutex);	/*!< in: mutex to free */
 
+#ifndef HAVE_ATOMIC_BUILTINS
+/**********************************************************//**
+Function that uses a mutex to decrement a variable atomically */
+UNIV_INLINE
+void
+os_atomic_dec_ulint_func(
+/*=====================*/
+	mutex_t*		mutex,		/*!< in: mutex guarding the
+						decrement */
+	ulint*			var,		/*!< in, out: variable to
+						decrement */
+	ulint			delta);		/*!< in: delta to decrement */
+/**********************************************************//**
+Function that uses a mutex to increment a variable atomically */
+UNIV_INLINE
+void
+os_atomic_inc_ulint_func(
+/*=====================*/
+	mutex_t*		mutex,		/*!< in: mutex guarding the
+						increment */
+	ulint*			var,		/*!< in, out: variable to
+						increment */
+	ulint			delta);		/*!< in: delta to increment */
+#endif /* !HAVE_ATOMIC_BUILTINS */
+
 /**********************************************************//**
 Atomic compare-and-swap and increment for InnoDB. */
 
@@ -326,12 +352,27 @@ amount of increment. */
 # define os_atomic_increment_ulint(ptr, amount) \
 	os_atomic_increment(ptr, amount)
 
+/* Returns the resulting value, ptr is pointer to target, amount is the
+amount to decrement. */
+
+# define os_atomic_decrement(ptr, amount) \
+	__sync_sub_and_fetch(ptr, amount)
+
+# define os_atomic_decrement_lint(ptr, amount) \
+	os_atomic_decrement(ptr, amount)
+
+# define os_atomic_decrement_ulint(ptr, amount) \
+	os_atomic_decrement(ptr, amount)
+
 /**********************************************************//**
 Returns the old value of *ptr, atomically sets *ptr to new_val */
 
 # define os_atomic_test_and_set_byte(ptr, new_val) \
 	__sync_lock_test_and_set(ptr, new_val)
 
+# define os_atomic_test_and_set_ulint(ptr, new_val) \
+	__sync_lock_test_and_set(ptr, new_val)
+
 #elif defined(HAVE_IB_SOLARIS_ATOMICS)
 
 #define HAVE_ATOMIC_BUILTINS
@@ -377,7 +418,16 @@ amount of increment. */
 	atomic_add_long_nv((ulong_t*) ptr, amount)
 
 # define os_atomic_increment_ulint(ptr, amount) \
-	atomic_add_long_nv(ptr, amount)
+	atomic_add_ulong_nv(ptr, amount)
+
+/* Returns the resulting value, ptr is pointer to target, amount is the
+amount to decrement. */
+
+# define os_atomic_decrement_lint(ptr, amount) \
+	atomic_dec_long_nv(ptr, amount)
+
+# define os_atomic_decrement_ulint(ptr, amount) \
+	atomic_dec_ulong_nv(ptr, amount)
 
 /**********************************************************//**
 Returns the old value of *ptr, atomically sets *ptr to new_val */
@@ -385,6 +435,9 @@ Returns the old value of *ptr, atomicall
 # define os_atomic_test_and_set_byte(ptr, new_val) \
 	atomic_swap_uchar(ptr, new_val)
 
+# define os_atomic_test_and_set_ulint(ptr, new_val) \
+	atomic_swap_ulong(ptr, new_val)
+
 #elif defined(HAVE_WINDOWS_ATOMICS)
 
 #define HAVE_ATOMIC_BUILTINS
@@ -393,9 +446,10 @@ Returns the old value of *ptr, atomicall
 # ifdef _WIN64
 #  define win_cmp_and_xchg InterlockedCompareExchange64
 #  define win_xchg_and_add InterlockedExchangeAdd64
+#  define win_xchg_and_dec InterlockedExchangeDecrement64
 # else /* _WIN64 */
 #  define win_cmp_and_xchg InterlockedCompareExchange
-#  define win_xchg_and_add InterlockedExchangeAdd
+#  define win_xchg_and_dec InterlockedExchangeDecrement
 # endif
 
 /**********************************************************//**
@@ -426,6 +480,16 @@ amount of increment. */
 	((ulint) (win_xchg_and_add(ptr, amount) + amount))
 
 /**********************************************************//**
+Returns the resulting value, ptr is pointer to target, amount is the
+amount to decrement. */
+
+# define os_atomic_decrement_lint(ptr, amount) \
+	(win_xchg_and_add(ptr, -amount) - amount)
+
+# define os_atomic_decrement_ulint(ptr, amount) \
+	((ulint) (win_xchg_and_add(ptr, -amount) - amount))
+
+/**********************************************************//**
 Returns the old value of *ptr, atomically sets *ptr to new_val.
 InterlockedExchange() operates on LONG, and the LONG will be
 clobbered */
@@ -433,10 +497,20 @@ clobbered */
 # define os_atomic_test_and_set_byte(ptr, new_val) \
 	((byte) InterlockedExchange(ptr, new_val))
 
+# define os_atomic_test_and_set_ulong(ptr, new_val) \
+	InterlockedExchange(ptr, new_val)
+
 #else
 # define IB_ATOMICS_STARTUP_MSG \
 	"Mutexes and rw_locks use InnoDB's own implementation"
 #endif
+#ifdef HAVE_ATOMIC_BUILTINS
+#define os_atomic_inc_ulint(m,v,d)	os_atomic_increment_ulint(v, d)
+#define os_atomic_dec_ulint(m,v,d)	os_atomic_decrement_ulint(v, d)
+#else
+#define os_atomic_inc_ulint(m,v,d)	os_atomic_inc_ulint_func(m, v, d)
+#define os_atomic_dec_ulint(m,v,d)	os_atomic_dec_ulint_func(m, v, d)
+#endif /* HAVE_ATOMIC_BUILTINS */
 
 #ifndef UNIV_NONINL
 #include "os0sync.ic"

=== modified file 'storage/innobase/include/os0sync.ic'
--- a/storage/innobase/include/os0sync.ic	revid:sunny.bains@stripped
+++ b/storage/innobase/include/os0sync.ic	revid:sunny.bains@stripped
@@ -51,3 +51,43 @@ os_fast_mutex_trylock(
 	return((ulint) pthread_mutex_trylock(fast_mutex));
 #endif
 }
+
+#ifndef HAVE_ATOMIC_BUILTINS
+/**********************************************************//**
+Function that uses a mutex to decrement a variable atomically */
+UNIV_INLINE
+void
+os_atomic_dec_ulint_func(
+/*=====================*/
+	mutex_t*	mutex,		/*!< in: mutex guarding the dec */
+	ulint*		var,		/*!< in, out: variable to decrement */
+	ulint		delta)		/*!< in: delta to decrement */
+{
+	mutex_enter(mutex);
+
+	/* I don't think we will encounter a situation where
+	this check will not be required. */
+	ut_ad(*var >= delta);
+
+	*var -= delta;
+
+	mutex_exit(mutex);
+}
+
+/**********************************************************//**
+Function that uses a mutex to increment a variable atomically */
+UNIV_INLINE
+void
+os_atomic_inc_ulint_func(
+/*=====================*/
+	mutex_t*	mutex,		/*!< in: mutex guarding the increment */
+	ulint*		var,		/*!< in, out: variable to increment */
+	ulint		delta)		/*!< in: delta to increment */
+{
+	mutex_enter(mutex);
+
+	*var += delta;
+
+	mutex_exit(mutex);
+}
+#endif /* !HAVE_ATOMIC_BUILTINS */

=== modified file 'storage/innobase/include/que0que.h'
--- a/storage/innobase/include/que0que.h	revid:sunny.bains@stripped
+++ b/storage/innobase/include/que0que.h	revid:sunny.bains@stripped
@@ -344,6 +344,18 @@ que_eval_sql(
 				dict_sys->mutex around call to pars_sql. */
 	trx_t*		trx);	/*!< in: trx */
 
+/**********************************************************************//**
+Round robin scheduler.
+@return a query thread of the graph moved to QUE_THR_RUNNING state, or
+NULL; the query thread should be executed by que_run_threads by the
+caller */
+UNIV_INTERN
+que_thr_t*
+que_fork_scheduler_round_robin(
+/*===========================*/
+	que_fork_t*	fork,		/*!< in: a query fork */
+	que_thr_t*	thr);		/*!< in: current pos */
+
 /* Query graph query thread node: the fields are protected by the kernel
 mutex with the exceptions named below */
 

=== modified file 'storage/innobase/include/row0purge.h'
--- a/storage/innobase/include/row0purge.h	revid:sunny.bains@stripped
+++ b/storage/innobase/include/row0purge.h	revid:sunny.bains@stripped
@@ -34,6 +34,8 @@ Created 3/14/1997 Heikki Tuuri
 #include "trx0types.h"
 #include "que0types.h"
 #include "row0types.h"
+#include "row0purge.h"
+#include "ut0vec.h"
 
 /********************************************************************//**
 Creates a purge node to a query graph.
@@ -42,8 +44,11 @@ UNIV_INTERN
 purge_node_t*
 row_purge_node_create(
 /*==================*/
-	que_thr_t*	parent,	/*!< in: parent node, i.e., a thr node */
-	mem_heap_t*	heap);	/*!< in: memory heap where created */
+	ulint		thread_id,	/*!< in: id of the thread doing
+					the purge */
+	que_thr_t*	parent,		/*!< in: parent node, i.e., a
+					thr node */
+	mem_heap_t*	heap);		/*!< in: memory heap where created */
 /***********************************************************//**
 Determines if it is possible to remove a secondary index entry.
 Removal is possible if the secondary index entry does not refer to any
@@ -83,7 +88,10 @@ struct purge_node_struct{
 	/*----------------------*/
 	/* Local storage for this graph node */
 	roll_ptr_t	roll_ptr;/* roll pointer to undo log record */
-	trx_undo_rec_t*	undo_rec;/* undo log record */
+	ib_vector_t*    undo_recs;/*!< Undo recs to purge */
+	ulint           thread_id;/*!< id of the thread doing the purge, this
+				maps directly to reservation below */
+
 	trx_undo_inf_t*	reservation;/* reservation for the undo log record in
 				the purge array */
 	undo_no_t	undo_no;/* undo number of the record */

=== modified file 'storage/innobase/include/srv0srv.h'
--- a/storage/innobase/include/srv0srv.h	revid:sunny.bains@stripped
+++ b/storage/innobase/include/srv0srv.h	revid:sunny.bains@stripped
@@ -404,6 +404,7 @@ enum srv_thread_type {
 	SRV_CONSOLE,	/**< thread serving console */
 	SRV_WORKER,	/**< threads serving parallelized queries and
 			queries released from lock wait */
+	SRV_PURGE,	/**< Purge coordinator thread */
 #if 0
 	/* Utility threads */
 	SRV_BUFFER,	/**< thread flushing dirty buffer blocks */
@@ -646,6 +647,43 @@ ibool
 srv_is_master_thread_active(void);
 /*==============================*/
 
+/*********************************************************************//**
+Purge coordinator thread that schedules the purge tasks.
+@return	a dummy parameter */
+UNIV_INTERN
+os_thread_ret_t
+srv_purge_coordinator_thread(
+/*=========================*/
+	void*	arg __attribute__((unused)));	/*!< in: a dummy parameter
+						required by os_thread_create */
+
+/*********************************************************************//**
+Worker thread that reads tasks from the work queue and executes them.
+@return	a dummy parameter */
+UNIV_INTERN
+os_thread_ret_t
+srv_worker_thread(
+/*==============*/
+	void*	arg __attribute__((unused)));	/*!< in: a dummy parameter
+						required by os_thread_create */
+
+/*******************************************************************//**
+Wakes up the worker threads. */
+UNIV_INTERN
+void
+srv_wake_worker_threads(
+/*====================*/
+	ulint	n_workers);			/*!< number or workers to
+						wake up */
+
+/**********************************************************************//**
+Get count of tasks in the queue.
+@return number of tasks in queue  */
+UNIV_INTERN
+ulint
+srv_get_task_queue_length(void);
+/*===========================*/
+
 /** Status variables to be passed to MySQL */
 struct export_var_struct{
 	ulint innodb_data_pending_reads;	/*!< Pending reads */

=== modified file 'storage/innobase/include/trx0purge.h'
--- a/storage/innobase/include/trx0purge.h	revid:sunny.bains@stripped
+++ b/storage/innobase/include/trx0purge.h	revid:sunny.bains@stripped
@@ -68,8 +68,9 @@ Creates the global purge system control 
 mutex. */
 UNIV_INTERN
 void
-trx_purge_sys_create(void);
-/*======================*/
+trx_purge_sys_create(
+/*=================*/
+	ulint	n_purge_threads);	/*!< in: number of purge threads */
 /********************************************************************//**
 Frees the global purge system control structure. */
 UNIV_INTERN
@@ -87,19 +88,6 @@ trx_purge_add_update_undo_to_history(
 	page_t*	undo_page,	/*!< in: update undo log header page,
 				x-latched */
 	mtr_t*	mtr);		/*!< in: mtr */
-/********************************************************************//**
-Fetches the next undo log record from the history list to purge. It must be
-released with the corresponding release function.
-@return copy of an undo log record or pointer to trx_purge_dummy_rec,
-if the whole undo log can skipped in purge; NULL if none left */
-UNIV_INTERN
-trx_undo_rec_t*
-trx_purge_fetch_next_rec(
-/*=====================*/
-	roll_ptr_t*	roll_ptr,/*!< out: roll pointer to undo record */
-	trx_undo_inf_t** cell,	/*!< out: storage cell for the record in the
-				purge array */
-	mem_heap_t*	heap);	/*!< in: memory heap where copied */
 /*******************************************************************//**
 Releases a reserved purge undo record. */
 UNIV_INTERN
@@ -114,8 +102,10 @@ UNIV_INTERN
 ulint
 trx_purge(
 /*======*/
-	ulint	limit);		/*!< in: the maximum number of records to
-				purge in one batch */
+	ulint	n_purge_threads,	/*!< in: number of purge tasks to
+					submit to task queue. */
+	ulint	limit);			/*!< in: the maximum number of
+					records to purge in one batch */
 /******************************************************************//**
 Prints information of the purge system to stderr. */
 UNIV_INTERN
@@ -125,7 +115,6 @@ trx_purge_sys_print(void);
 
 /** The control structure used in the purge operation */
 struct trx_purge_struct{
-	ulint		state;		/*!< Purge system state */
 	sess_t*		sess;		/*!< System session running the purge
 					query */
 	trx_t*		trx;		/*!< System transaction running the purge
@@ -133,17 +122,24 @@ struct trx_purge_struct{
 					of the trx system and it never ends */
 	que_t*		query;		/*!< The query graph which will do the
 					parallelized purge operation */
-	rw_lock_t	latch;		/*!< The latch protecting the purge view.
-					A purge operation must acquire an
+	rw_lock_t	latch;		/*!< The latch protecting the purge
+					view. A purge operation must acquire an
 					x-latch here for the instant at which
 					it changes the purge view: an undo
 					log operation can prevent this by
 					obtaining an s-latch here. */
 	read_view_t*	view;		/*!< The purge will not remove undo logs
 					which are >= this view (purge view) */
-	mutex_t		mutex;		/*!< Mutex protecting the fields below */
+	ulint		n_submitted;	/*!< Count of tasks submitted to the
+					task queue */
+	ulint		n_completed;	/*!< Count of tasks completed */
+
+	mutex_t		mutex;		/*!< Mutex protecting the fields
+					below */
 	ulint		n_pages_handled;/*!< Approximate number of undo log
 					pages processed in purge */
+	ulint		n_pages_handled_start; /*!< The value of n_pages_handled
+					when a purge was initiated */
 	ulint		handle_limit;	/*!< Target of how many pages to get
 					processed in the current purge */
 	/*------------------------------*/
@@ -181,9 +177,27 @@ struct trx_purge_struct{
 					completes */
 };
 
-#define TRX_PURGE_ON		1	/* purge operation is running */
-#define TRX_STOP_PURGE		2	/* purge operation is stopped, or
-					it should be stopped */
+/** Info required to purge a record */
+struct trx_purge_rec_struct {
+	trx_undo_rec_t*	undo_rec;	/*!< Record to purge */
+	roll_ptr_t	roll_ptr;	/*!< File pointr to UNDO record */
+};
+
+typedef struct trx_purge_rec_struct trx_purge_rec_t;
+
+/** Test if purge mutex is owned. */
+#define purge_mutex_own() mutex_own(&purge_sys->mutex)
+
+/** Acquire the flush list mutex. */
+#define purge_mutex_enter() do {		\
+	mutex_enter(&purge_sys->mutex);		\
+} while (0)
+
+/** Release the purge mutex. */
+# define purge_mutex_exit() do {	\
+	mutex_exit(&purge_sys->mutex);	\
+} while (0)
+
 #ifndef UNIV_NONINL
 #include "trx0purge.ic"
 #endif

=== modified file 'storage/innobase/include/trx0roll.h'
--- a/storage/innobase/include/trx0roll.h	revid:sunny.bains@stripped
+++ b/storage/innobase/include/trx0roll.h	revid:sunny.bains@stripped
@@ -56,8 +56,9 @@ trx_savept_take(
 Creates an undo number array. */
 UNIV_INTERN
 trx_undo_arr_t*
-trx_undo_arr_create(void);
-/*=====================*/
+trx_undo_arr_create(
+/*================*/
+	ulint	n_purge_threads);	/*!< in: number of purge threads */
 /*******************************************************************//**
 Frees an undo number array. */
 UNIV_INTERN

=== modified file 'storage/innobase/que/que0que.c'
--- a/storage/innobase/que/que0que.c	revid:sunny.bains@stripped
+++ b/storage/innobase/que/que0que.c	revid:sunny.bains@stripped
@@ -273,7 +273,7 @@ que_thr_end_wait(
 	if (next_thr && *next_thr == NULL) {
 		*next_thr = thr;
 	} else {
-		ut_a(0);
+		ut_error;
 		srv_que_task_enqueue_low(thr);
 	}
 }
@@ -332,6 +332,53 @@ que_thr_init_command(
 }
 
 /**********************************************************************//**
+Round robin scheduler.
+@return a query thread of the graph moved to QUE_THR_RUNNING state, or
+NULL; the query thread should be executed by que_run_threads by the
+caller */
+UNIV_INTERN
+que_thr_t*
+que_fork_scheduler_round_robin(
+/*===========================*/
+	que_fork_t*	fork,		/*!< in: a query fork */
+	que_thr_t*	thr)		/*!< in: current pos */
+{
+	/* If no current, start first available. */
+	if (thr == NULL) {
+		thr = UT_LIST_GET_FIRST(fork->thrs);
+	} else {
+		thr = UT_LIST_GET_NEXT(thrs, thr);
+	}
+
+	if (thr) {
+
+		fork->state = QUE_FORK_ACTIVE;
+
+		fork->last_sel_node = NULL;
+
+		switch (thr->state) {
+		case QUE_THR_SUSPENDED:
+			ut_a(!thr->is_active);
+			que_thr_move_to_run_state(thr);
+			break;
+
+		case QUE_THR_COMPLETED:
+		case QUE_THR_COMMAND_WAIT:
+			ut_a(!thr->is_active);
+			que_thr_init_command(thr);
+			break;
+
+		case QUE_THR_LOCK_WAIT:
+		default:
+			ut_error;
+
+		}
+	}
+
+	return(thr);
+}
+
+/**********************************************************************//**
 Starts execution of a command in a query fork. Picks a query thread which
 is not in the QUE_THR_RUNNING state and moves it to that state. If none
 can be chosen, a situation which may arise in parallelized fetches, NULL
@@ -414,6 +461,8 @@ que_fork_start_command(
 
 		thr = completed_thr;
 		que_thr_init_command(thr);
+	} else {
+		ut_error;
 	}
 
 	return(thr);
@@ -456,7 +505,7 @@ que_fork_error_handle(
 
 	que_thr_move_to_run_state(thr);
 
-	ut_a(0);
+	ut_error;
 	srv_que_task_enqueue_low(thr);
 }
 
@@ -764,14 +813,12 @@ que_thr_move_to_run_state(
 
 	if (!thr->is_active) {
 
-		(thr->graph)->n_active_thrs++;
+		thr->graph->n_active_thrs++;
 
 		trx->n_active_thrs++;
 
 		thr->is_active = TRUE;
 
-		ut_ad((thr->graph)->n_active_thrs == 1);
-		ut_ad(trx->n_active_thrs == 1);
 	}
 
 	thr->state = QUE_THR_RUNNING;
@@ -839,9 +886,6 @@ que_thr_dec_refer_count(
 		}
 	}
 
-	ut_ad(fork->n_active_thrs == 1);
-	ut_ad(trx->n_active_thrs == 1);
-
 	fork->n_active_thrs--;
 	trx->n_active_thrs--;
 

=== modified file 'storage/innobase/row/row0purge.c'
--- a/storage/innobase/row/row0purge.c	revid:sunny.bains@stripped
+++ b/storage/innobase/row/row0purge.c	revid:sunny.bains@stripped
@@ -51,14 +51,17 @@ UNIV_INTERN
 purge_node_t*
 row_purge_node_create(
 /*==================*/
-	que_thr_t*	parent,	/*!< in: parent node, i.e., a thr node */
-	mem_heap_t*	heap)	/*!< in: memory heap where created */
+	ulint		thread_id,	/* in: id of thread doing purge */
+	que_thr_t*	parent,		/*!< in: parent node  */
+	mem_heap_t*	heap)		/*!< in: memory heap where created */
 {
 	purge_node_t*	node;
 
 	ut_ad(parent && heap);
 
-	node = mem_heap_alloc(heap, sizeof(purge_node_t));
+	node = mem_heap_zalloc(heap, sizeof(purge_node_t));
+
+	node->thread_id = thread_id;
 
 	node->common.type = QUE_NODE_PURGE;
 	node->common.parent = parent;
@@ -481,7 +484,8 @@ static
 void
 row_purge_upd_exist_or_extern(
 /*==========================*/
-	purge_node_t*	node)	/*!< in: row purge node */
+	purge_node_t*	node,		/*!< in: row purge node */
+	trx_undo_rec_t*	undo_rec)	/*!< in: record to purge */
 {
 	mem_heap_t*	heap;
 	dtuple_t*	entry;
@@ -532,7 +536,7 @@ skip_secondaries:
 			byte*		data_field;
 
 			/* We use the fact that new_val points to
-			node->undo_rec and get thus the offset of
+			undo_rec and get thus the offset of
 			dfield data inside the undo record. Then we
 			can calculate from node->roll_ptr the file
 			address of the new_val data */
@@ -540,7 +544,7 @@ skip_secondaries:
 			internal_offset
 				= ((const byte*)
 				   dfield_get_data(&ufield->new_val))
-				- node->undo_rec;
+				- undo_rec;
 
 			ut_a(internal_offset < UNIV_PAGE_SIZE);
 
@@ -596,11 +600,11 @@ static
 ibool
 row_purge_parse_undo_rec(
 /*=====================*/
-	purge_node_t*	node,	/*!< in: row undo node */
-	ibool*		updated_extern,
-				/*!< out: TRUE if an externally stored field
-				was updated */
-	que_thr_t*	thr)	/*!< in: query thread */
+	purge_node_t*		node,		/*!< in: row undo node */
+	trx_undo_rec_t*		undo_rec,	/*!< in: record to purge */
+	ibool*			updated_extern, /*!< out: TRUE if an externally
+						stored field was updated */
+	que_thr_t*		thr)		/*!< in: query thread */
 {
 	dict_index_t*	clust_index;
 	byte*		ptr;
@@ -617,8 +621,10 @@ row_purge_parse_undo_rec(
 
 	trx = thr_get_trx(thr);
 
-	ptr = trx_undo_rec_get_pars(node->undo_rec, &type, &cmpl_info,
-				    updated_extern, &undo_no, &table_id);
+	ptr = trx_undo_rec_get_pars(
+		undo_rec, &type, &cmpl_info, updated_extern,
+		 &undo_no, &table_id);
+
 	node->rec_type = type;
 
 	if (type == TRX_UNDO_UPD_DEL_REC && !(*updated_extern)) {
@@ -641,7 +647,7 @@ row_purge_parse_undo_rec(
 	/* Prevent DROP TABLE etc. from running when we are doing the purge
 	for this row */
 
-	row_mysql_freeze_data_dictionary(trx);
+	rw_lock_s_lock_func(&dict_operation_lock, 0, __FILE__, __LINE__);
 
 	mutex_enter(&(dict_sys->mutex));
 
@@ -650,9 +656,9 @@ row_purge_parse_undo_rec(
 	mutex_exit(&(dict_sys->mutex));
 
 	if (node->table == NULL) {
-		/* The table has been dropped: no need to do purge */
 err_exit:
-		row_mysql_unfreeze_data_dictionary(trx);
+		/* The table has been dropped: no need to do purge */
+		rw_lock_s_unlock_gen(&dict_operation_lock, 0);
 		return(FALSE);
 	}
 
@@ -694,75 +700,83 @@ err_exit:
 /***********************************************************//**
 Fetches an undo log record and does the purge for the recorded operation.
 If none left, or the current purge completed, returns the control to the
-parent node, which is always a query thread node.
-@return	DB_SUCCESS if operation successfully completed, else error code */
+parent node, which is always a query thread node. */
 static
-ulint
+void
 row_purge(
 /*======*/
-	purge_node_t*	node,	/*!< in: row purge node */
-	que_thr_t*	thr)	/*!< in: query thread */
+	purge_node_t*	node,		/*!< in: row purge node */
+	trx_undo_rec_t*	undo_rec,	/*!< in: record to purge */
+	que_thr_t*	thr)		/*!< in: query thread */
 {
-	roll_ptr_t	roll_ptr;
-	ibool		purge_needed;
-	ibool		updated_extern;
-	trx_t*		trx;
-
 	ut_ad(node && thr);
 
-	trx = thr_get_trx(thr);
+	if (undo_rec != &trx_purge_dummy_rec) {
+		ibool	updated_extern;
 
-	node->undo_rec = trx_purge_fetch_next_rec(&roll_ptr,
-						  &(node->reservation),
-						  node->heap);
-	if (!node->undo_rec) {
-		/* Purge completed for this query thread */
+		if (row_purge_parse_undo_rec(
+			node, undo_rec, &updated_extern, thr)) {
 
-		thr->run_node = que_node_get_parent(node);
+			dict_index_t*	clust_index;
 
-		return(DB_SUCCESS);
-	}
+			clust_index = dict_table_get_first_index(node->table);
 
-	node->roll_ptr = roll_ptr;
+			node->found_clust = FALSE;
 
-	if (node->undo_rec == &trx_purge_dummy_rec) {
-		purge_needed = FALSE;
-	} else {
-		purge_needed = row_purge_parse_undo_rec(node, &updated_extern,
-							thr);
-		/* If purge_needed == TRUE, we must also remember to unfreeze
-		data dictionary! */
-	}
+			node->index = dict_table_get_next_index(clust_index);
 
-	if (purge_needed) {
-		node->found_clust = FALSE;
+			if (node->rec_type == TRX_UNDO_DEL_MARK_REC) {
+				row_purge_del_mark(node);
 
-		node->index = dict_table_get_next_index(
-			dict_table_get_first_index(node->table));
+			} else if (updated_extern
+				   || node->rec_type
+				   == TRX_UNDO_UPD_EXIST_REC) {
 
-		if (node->rec_type == TRX_UNDO_DEL_MARK_REC) {
-			row_purge_del_mark(node);
+				row_purge_upd_exist_or_extern(node, undo_rec);
+			}
 
-		} else if (updated_extern
-			   || node->rec_type == TRX_UNDO_UPD_EXIST_REC) {
+			if (node->found_clust) {
+				btr_pcur_close(&(node->pcur));
+			}
 
-			row_purge_upd_exist_or_extern(node);
+			rw_lock_s_unlock_gen(&dict_operation_lock, 0);
 		}
+	}
+}
 
-		if (node->found_clust) {
-			btr_pcur_close(&(node->pcur));
-		}
+/***********************************************************//**
+Reset the purge query thread. */
+UNIV_INLINE
+void
+row_purge_end(
+/*==========*/
+	que_thr_t*	thr)	/*!< in: query thread */
+{
+	purge_node_t*	node;
 
-		row_mysql_unfreeze_data_dictionary(trx);
-	}
+	ut_ad(thr);
 
-	/* Do some cleanup */
-	trx_purge_rec_release(node->reservation);
-	mem_heap_empty(node->heap);
+	node = thr->run_node;
+
+	ut_ad(que_node_get_type(node) == QUE_NODE_PURGE);
 
-	thr->run_node = node;
+	thr->run_node = que_node_get_parent(node);
 
-	return(DB_SUCCESS);
+	node->undo_recs = NULL;
+	node->reservation = NULL;
+
+	ut_a(thr->run_node != NULL);
+
+	mem_heap_empty(node->heap);
+
+	os_atomic_increment(&purge_sys->n_completed, 1);
+#if 0
+	printf("%10lu: %p: %lu:%lu\n",
+		os_thread_pf(os_thread_get_curr_id()),
+		node, 
+		purge_sys->n_submitted,
+	       	purge_sys->n_completed);
+#endif
 }
 
 /***********************************************************//**
@@ -776,7 +790,6 @@ row_purge_step(
 	que_thr_t*	thr)	/*!< in: query thread */
 {
 	purge_node_t*	node;
-	ulint		err;
 
 	ut_ad(thr);
 
@@ -784,9 +797,28 @@ row_purge_step(
 
 	ut_ad(que_node_get_type(node) == QUE_NODE_PURGE);
 
-	err = row_purge(node, thr);
+	ut_a(node->undo_recs);
+
+	if (!ib_vector_is_empty(node->undo_recs)) {
+		trx_purge_rec_t*purge_rec;
+
+		purge_rec = ib_vector_pop(node->undo_recs);
+
+		node->reservation = &purge_sys->arr->infos[node->thread_id];
+		node->roll_ptr = purge_rec->roll_ptr;
+
+		row_purge(node, purge_rec->undo_rec, thr);
 
-	ut_ad(err == DB_SUCCESS);
+		trx_purge_rec_release(node->reservation);
+
+		if (ib_vector_is_empty(node->undo_recs)) {
+			row_purge_end(thr);
+		} else {
+			thr->run_node = node;
+		}
+	} else {
+		row_purge_end(thr);
+	}
 
 	return(thr);
 }

=== modified file 'storage/innobase/srv/srv0srv.c'
--- a/storage/innobase/srv/srv0srv.c	revid:sunny.bains@stripped
+++ b/storage/innobase/srv/srv0srv.c	revid:sunny.bains@stripped
@@ -442,6 +442,8 @@ UNIV_INTERN mysql_pfs_key_t	srv_dict_tmp
 UNIV_INTERN mysql_pfs_key_t	srv_misc_tmpfile_mutex_key;
 /* Key to register srv_sys_t::mutex with performance schema */
 UNIV_INTERN mysql_pfs_key_t	srv_srv_sys_mutex_key;
+/* Key to register srv_sys_t::tasks_mutex with performance schema */
+UNIV_INTERN mysql_pfs_key_t	srv_srv_sys_tasks_mutex_key;
 #endif /* UNIV_PFS_MUTEX */
 
 /* Temporary file for innodb monitor output */
@@ -698,13 +700,15 @@ typedef struct srv_sys_struct	srv_sys_t;
 
 /** The server system struct */
 struct srv_sys_struct{
-	mutex_t		mutex;			/*!< variable protecting the
-						fields in this structure. */
-	srv_table_t*	sys_threads;		/*!< server thread table */
-
+	mutex_t		tasks_mutex;		/*!< variable protecting the
+						tasks queue */
 	UT_LIST_BASE_NODE_T(que_thr_t)
 			tasks;			/*!< task queue */
 
+	mutex_t		mutex;			/*!< variable protecting the
+						fields below. */
+	srv_table_t*	sys_threads;		/*!< server thread table */
+
 	ulint		n_threads[SRV_MASTER + 1];
 						/*!< number of system threads
 						in a thread class */
@@ -1034,6 +1038,9 @@ srv_init(void)
 
 	mutex_create(srv_srv_sys_mutex_key, &srv_sys->mutex, SYNC_THREADS);
 
+	mutex_create(srv_srv_sys_tasks_mutex_key,
+		     &srv_sys->tasks_mutex, SYNC_NO_ORDER_CHECK);
+
 	srv_sys_mutex_enter();
 
 	srv_sys->sys_threads = (srv_slot_t*) &srv_sys[1];
@@ -2613,9 +2620,9 @@ srv_wake_purge_thread_if_not_active(void
 	ut_ad(!srv_sys_mutex_own());
 
 	if (srv_n_purge_threads > 0
-	    && srv_sys->n_threads_active[SRV_WORKER] == 0) {
+	    && srv_sys->n_threads_active[SRV_PURGE] == 0) {
 
-		srv_release_threads(SRV_WORKER, 1);
+		srv_release_threads(SRV_PURGE, 1);
 	}
 }
 
@@ -2646,7 +2653,25 @@ srv_wake_purge_thread(void)
 
 	if (srv_n_purge_threads > 0) {
 
-		srv_release_threads(SRV_WORKER, 1);
+		srv_release_threads(SRV_PURGE, 1);
+	}
+}
+
+/*******************************************************************//**
+Wakes up the worker threads. */
+UNIV_INTERN
+void
+srv_wake_worker_threads(
+/*====================*/
+	ulint	n_workers)		/*!< number or workers to wake up */
+{
+	ut_ad(!mutex_own(&kernel_mutex));
+	ut_ad(!srv_sys_mutex_own());
+
+	if (srv_n_purge_threads > 1) {
+
+		ut_a(n_workers > 0);
+		srv_release_threads(SRV_WORKER, n_workers);
 	}
 }
 
@@ -2710,7 +2735,7 @@ srv_master_do_purge(void)
 			/* Nothing to purge. */
 			n_pages_purged = 0;
 		} else {
-			n_pages_purged = trx_purge(srv_purge_batch_size);
+			n_pages_purged = trx_purge(0, srv_purge_batch_size);
 		}
 
 		srv_sync_log_buffer_in_background();
@@ -3156,50 +3181,143 @@ suspend_thread:
 }
 
 /*********************************************************************//**
-Asynchronous purge thread.
+Fetch and execute a task from the work queue.
+@return	TRUE if a task was executed */
+static
+ibool
+srv_task_execute(void)
+/*==================*/
+{
+	que_thr_t*	thr = NULL;
+
+	/* Normally there shouldn't be any tasks in the work
+	queue, but we play it safe just in case. */
+	if (srv_force_recovery < SRV_FORCE_NO_BACKGROUND
+	    && srv_shutdown_state == 0
+	    && !srv_fast_shutdown) {
+
+		mutex_enter(&srv_sys->tasks_mutex);
+
+		if (UT_LIST_GET_LEN(srv_sys->tasks) > 0) {
+
+			thr = UT_LIST_GET_FIRST(srv_sys->tasks);
+
+			UT_LIST_REMOVE(queue, srv_sys->tasks, thr);
+		}
+
+		mutex_exit(&srv_sys->tasks_mutex);
+
+		if (thr != NULL) {
+			que_run_threads(thr);
+
+			srv_wake_purge_thread_if_not_active();
+		}
+	}
+
+	return(thr != NULL);
+}
+
+/*********************************************************************//**
+Worker thread that reads tasks from the work queue and executes them.
 @return	a dummy parameter */
 UNIV_INTERN
 os_thread_ret_t
-srv_purge_thread(
-/*=============*/
+srv_worker_thread(
+/*==============*/
 	void*	arg __attribute__((unused)))	/*!< in: a dummy parameter
 						required by os_thread_create */
 {
 	srv_slot_t*	slot;
 	ulint		slot_no = ULINT_UNDEFINED;
-	ulint		n_total_purged = ULINT_UNDEFINED;
 
-	ut_a(srv_n_purge_threads == 1);
+#ifdef UNIV_DEBUG_THREAD_CREATION
+	fprintf(stderr, "Worker thread starts, id %lu\n",
+		os_thread_pf(os_thread_get_curr_id()));
+#endif /* UNIV_DEBUG_THREAD_CREATION */
+
+	srv_sys_mutex_enter();
+
+	slot_no = srv_table_reserve_slot(SRV_WORKER);
+
+	++srv_sys->n_threads_active[SRV_WORKER];
+
+	srv_sys_mutex_exit();
+
+	while (srv_shutdown_state != SRV_SHUTDOWN_EXIT_THREADS) {
+		os_event_t	event;
+
+		event = srv_suspend_thread();
+
+		os_event_wait(event);
+
+		srv_task_execute();
+	}
+
+	/* Free the thread local memory. */
+	thr_local_free(os_thread_get_curr_id());
+
+	srv_sys_mutex_enter();
+
+	/* Free the slot for reuse. */
+	slot = srv_table_get_nth_slot(slot_no);
+	slot->in_use = FALSE;
+
+	srv_sys_mutex_exit();
+
+#ifdef UNIV_DEBUG_THREAD_CREATION
+	fprintf(stderr, "Worker thread exits, id %lu\n",
+		os_thread_pf(os_thread_get_curr_id()));
+#endif /* UNIV_DEBUG_THREAD_CREATION */
+
+	/* We count the number of threads in os_thread_exit(). A created
+	thread should always use that to exit and not use return() to exit. */
+	os_thread_exit(NULL);
+
+	OS_THREAD_DUMMY_RETURN;	/* Not reached, avoid compiler warning */
+}
+
+/*********************************************************************//**
+Purge coordinator thread that schedules the purge tasks.
+@return	a dummy parameter */
+UNIV_INTERN
+os_thread_ret_t
+srv_purge_coordinator_thread(
+/*=========================*/
+	void*	arg __attribute__((unused)))	/*!< in: a dummy parameter
+						required by os_thread_create */
+{
+	srv_slot_t*	slot;
+	ulint		slot_no = ULINT_UNDEFINED;
+
+	ut_a(srv_n_purge_threads >= 1);
 
 #ifdef UNIV_PFS_THREAD
 	pfs_register_thread(srv_purge_thread_key);
 #endif /* UNIV_PFS_THREAD */
 
+
 #ifdef UNIV_DEBUG_THREAD_CREATION
-	fprintf(stderr, "InnoDB: Purge thread running, id %lu\n",
+	fprintf(stderr, "Purge coordinator thread starts, id %lu\n",
 		os_thread_pf(os_thread_get_curr_id()));
 #endif /* UNIV_DEBUG_THREAD_CREATION */
 
 	srv_sys_mutex_enter();
 
-	slot_no = srv_table_reserve_slot(SRV_WORKER);
+	slot_no = srv_table_reserve_slot(SRV_PURGE);
 
-	++srv_sys->n_threads_active[SRV_WORKER];
+	++srv_sys->n_threads_active[SRV_PURGE];
 
 	srv_sys_mutex_exit();
 
 	while (srv_shutdown_state != SRV_SHUTDOWN_EXIT_THREADS) {
 
-		ulint	n_pages_purged;
-
 		/* If there are very few records to purge or the last
 		purge didn't purge any records then wait for activity.
-	        We peek at the history len without holding any mutex
+		We peek at the history len without holding any mutex
 		because in the worst case we will end up waiting for
 		the next purge event. */
-		if (trx_sys->rseg_history_len < srv_purge_batch_size
-		    || n_total_purged == 0) {
 
+		if (trx_sys->rseg_history_len < 200) {
 			os_event_t	event;
 
 			event = srv_suspend_thread();
@@ -3207,7 +3325,6 @@ srv_purge_thread(
 			os_event_wait(event);
 		}
 
-		/* Check for shutdown and whether we should do purge at all. */
 		if (srv_force_recovery >= SRV_FORCE_NO_BACKGROUND
 		    || srv_shutdown_state != 0
 		    || srv_fast_shutdown) {
@@ -3215,16 +3332,22 @@ srv_purge_thread(
 			break;
 		}
 
-		n_total_purged = 0;
+		/* If number of threads is 1 then we let trx_purge() do
+		the actual purge for us. */
+		if (srv_n_purge_threads == 1) {
+			ulint	n_pages_purged;
+
+			do {
+				n_pages_purged = trx_purge(
+					0, srv_purge_batch_size);
 
-		/* Purge until there are no more records to purge and there is
-		no change in configuration or server state. */
-		do {
-			n_pages_purged = trx_purge(srv_purge_batch_size);
+			} while (n_pages_purged > 0 && !srv_fast_shutdown);
 
-			n_total_purged += n_pages_purged;
+		} else {
+			trx_purge(srv_n_purge_threads, srv_purge_batch_size);
 
-		} while (n_pages_purged > 0 && !srv_fast_shutdown);
+			srv_task_execute();
+		}
 
 		srv_sync_log_buffer_in_background();
 	}
@@ -3241,7 +3364,7 @@ srv_purge_thread(
 	srv_sys_mutex_exit();
 
 #ifdef UNIV_DEBUG_THREAD_CREATION
-	fprintf(stderr, "InnoDB: Purge thread exiting, id %lu\n",
+	fprintf(stderr, "Purge coordinator exiting, id %lu\n",
 		os_thread_pf(os_thread_get_curr_id()));
 #endif /* UNIV_DEBUG_THREAD_CREATION */
 
@@ -3263,12 +3386,30 @@ srv_que_task_enqueue_low(
 {
 	ut_ad(thr);
 
-	srv_sys_mutex_enter();
+	mutex_enter(&srv_sys->tasks_mutex);
 
 	UT_LIST_ADD_LAST(queue, srv_sys->tasks, thr);
 
-	srv_sys_mutex_exit();
+	mutex_exit(&srv_sys->tasks_mutex);
 
 	srv_release_threads(SRV_WORKER, 1);
 }
 
+/**********************************************************************//**
+Get count of tasks in the queue.
+@return number of tasks in queue  */
+UNIV_INTERN
+ulint
+srv_get_task_queue_length(void)
+/*===========================*/
+{
+	ulint	n_tasks;
+
+	mutex_enter(&srv_sys->tasks_mutex);
+
+	n_tasks = UT_LIST_GET_LEN(srv_sys->tasks);
+
+	mutex_exit(&srv_sys->tasks_mutex);
+
+	return(n_tasks);
+}

=== modified file 'storage/innobase/srv/srv0start.c'
--- a/storage/innobase/srv/srv0start.c	revid:sunny.bains@stripped
+++ b/storage/innobase/srv/srv0start.c	revid:sunny.bains@stripped
@@ -1544,6 +1544,10 @@ innobase_start_or_create_for_mysql(void)
 		after the double write buffer has been created. */
 		trx_sys_create();
 
+		trx_sys_init_at_db_start();
+
+		trx_purge_sys_create(srv_n_purge_threads);
+
 		dict_create();
 
 		srv_startup_is_before_trx_rollback_phase = FALSE;
@@ -1565,6 +1569,8 @@ innobase_start_or_create_for_mysql(void)
 
 		dict_boot();
 
+		trx_purge_sys_create(srv_n_purge_threads);
+
 		trx_sys_init_at_db_start();
 
 		srv_startup_is_before_trx_rollback_phase = FALSE;
@@ -1624,6 +1630,8 @@ innobase_start_or_create_for_mysql(void)
 		dict_boot();
 		trx_sys_init_at_db_start();
 
+		trx_purge_sys_create(srv_n_purge_threads);
+
 		/* Initialize the fsp free limit global variable in the log
 		system */
 		fsp_header_get_free_limit();
@@ -1743,13 +1751,17 @@ innobase_start_or_create_for_mysql(void)
 	os_thread_create(&srv_master_thread, NULL, thread_ids
 			 + (1 + SRV_MAX_N_IO_THREADS));
 
-	/* Currently we allow only a single purge thread. */
-	ut_a(srv_n_purge_threads == 0 || srv_n_purge_threads == 1);
-
 	/* If the user has requested a separate purge thread then
 	start the purge thread. */
-	if (srv_n_purge_threads == 1) {
-		os_thread_create(&srv_purge_thread, NULL, NULL);
+	if (srv_n_purge_threads >= 1) {
+
+		os_thread_create(&srv_purge_coordinator_thread, NULL, NULL);
+
+		for (i = 1; i < srv_n_purge_threads; ++i) {
+			os_thread_create(
+				&srv_worker_thread, NULL,
+			       	thread_ids + 5 + i + SRV_MAX_N_IO_THREADS);
+		}
 	}
 
 #ifdef UNIV_DEBUG
@@ -2005,8 +2017,11 @@ innobase_shutdown_for_mysql(void)
 		/* c. We wake the master thread so that it exits */
 		srv_wake_master_thread();
 
-		/* d. We wake the purge thread so that it exits */
-		srv_wake_purge_thread();
+		/* d. We wake the purge thread(s) so that they exit */
+		if (srv_n_purge_threads > 0) {
+			srv_wake_purge_thread();
+			srv_wake_worker_threads(srv_n_purge_threads - 1);
+		}
 
 		/* e. Exit the i/o threads */
 

=== modified file 'storage/innobase/trx/trx0purge.c'
--- a/storage/innobase/trx/trx0purge.c	revid:sunny.bains@stripped
+++ b/storage/innobase/trx/trx0purge.c	revid:sunny.bains@stripped
@@ -61,6 +61,19 @@ UNIV_INTERN mysql_pfs_key_t	trx_purge_la
 UNIV_INTERN mysql_pfs_key_t	purge_sys_mutex_key;
 #endif /* UNIV_PFS_MUTEX */
 
+/********************************************************************//**
+Fetches the next undo log record from the history list to purge. It must be
+released with the corresponding release function.
+@return copy of an undo log record or pointer to trx_purge_dummy_rec,
+if the whole undo log can skipped in purge; NULL if none left */
+static
+trx_undo_rec_t*
+trx_purge_fetch_next_rec(
+/*=====================*/
+	roll_ptr_t*	roll_ptr,	/*!< out: roll pointer to undo record */
+	ulint		thread_id,	/*!< in: id of thread doing purge */
+	mem_heap_t*	heap);		/*!< in: memory heap where copied */
+
 /*****************************************************************//**
 Checks if trx_id is >= purge_view: then it is guaranteed that its update
 undo log still exists in the system.
@@ -90,52 +103,60 @@ trx_purge_update_undo_must_exist(
 /*******************************************************************//**
 Stores info of an undo log record during a purge.
 @return	pointer to the storage cell */
-static
+UNIV_INLINE
 trx_undo_inf_t*
-trx_purge_arr_store_info(
+trx_purge_arr_set_info(
 /*=====================*/
-	trx_id_t	trx_no,	/*!< in: transaction number */
-	undo_no_t	undo_no)/*!< in: undo number */
+	ulint		index,		/*!< in: index of cell */
+	trx_id_t	trx_no,		/*!< in: transaction number */
+	undo_no_t	undo_no)	/*!< in: undo number */
 {
 	trx_undo_inf_t*	cell;
-	trx_undo_arr_t*	arr;
-	ulint		i;
+	int		trx_cmp;
 
-	arr = purge_sys->arr;
+	ut_ad(!purge_mutex_own());
 
-	for (i = 0;; i++) {
-		cell = trx_undo_arr_get_nth_info(arr, i);
+	cell = trx_undo_arr_get_nth_info(purge_sys->arr, index);
 
-		if (!(cell->in_use)) {
-			/* Not in use, we may store here */
-			cell->undo_no = undo_no;
-			cell->trx_no = trx_no;
-			cell->in_use = TRUE;
+	++cell->in_use;
 
-			arr->n_used++;
+	trx_cmp = ut_dulint_cmp(cell->trx_no, trx_no);
 
-			return(cell);
-		}
+	/* We store the max of {trx_id, undo_no}. */
+	if (trx_cmp > 0
+	    || (trx_cmp == 0
+	        && ut_dulint_cmp(cell->undo_no, undo_no) >= 0)) {
+
+		cell->trx_no = trx_no;
+		cell->undo_no = undo_no;
 	}
+
+	purge_mutex_enter();
+	++purge_sys->arr->n_used;
+	purge_mutex_exit();
+
+	return(cell);
 }
 
 /*******************************************************************//**
 Removes info of an undo log record during a purge. */
 UNIV_INLINE
 void
-trx_purge_arr_remove_info(
+trx_purge_arr_clear_info(
 /*======================*/
 	trx_undo_inf_t*	cell)	/*!< in: pointer to the storage cell */
 {
-	trx_undo_arr_t*	arr;
-
-	arr = purge_sys->arr;
+	ut_ad(!purge_mutex_own());
 
-	cell->in_use = FALSE;
+	--cell->in_use;
 
-	ut_ad(arr->n_used > 0);
+	if (cell->in_use == 0) {
+		memset(cell, 0x0, sizeof(*cell));
+	}
 
-	arr->n_used--;
+	purge_mutex_enter();
+	--purge_sys->arr->n_used;
+	purge_mutex_exit();
 }
 
 /*******************************************************************//**
@@ -149,43 +170,43 @@ trx_purge_arr_get_biggest(
 				if array is empty */
 	undo_no_t*	undo_no)/*!< out: undo number */
 {
-	trx_undo_inf_t*	cell;
-	trx_id_t	pair_trx_no;
-	undo_no_t	pair_undo_no;
-	int		trx_cmp;
-	ulint		n_used;
 	ulint		i;
 	ulint		n;
+	ulint		n_used;
+	trx_id_t	pair_trx_no;
+	undo_no_t	pair_undo_no;
+
+	ut_ad(purge_mutex_own());
 
 	n = 0;
 	n_used = arr->n_used;
 	pair_trx_no = ut_dulint_zero;
 	pair_undo_no = ut_dulint_zero;
 
-	for (i = 0;; i++) {
+	for (i = 0; i < arr->n_cells; i++) {
+		const trx_undo_inf_t*	cell;
+
 		cell = trx_undo_arr_get_nth_info(arr, i);
 
 		if (cell->in_use) {
+			int	trx_cmp;
+
 			n++;
 			trx_cmp = ut_dulint_cmp(cell->trx_no, pair_trx_no);
 
-			if ((trx_cmp > 0)
-			    || ((trx_cmp == 0)
-				&& (ut_dulint_cmp(cell->undo_no,
-						  pair_undo_no) >= 0))) {
+			if (trx_cmp > 0
+			    || (trx_cmp == 0
+				&& ut_dulint_cmp(cell->undo_no,
+						  pair_undo_no) >= 0)) {
 
 				pair_trx_no = cell->trx_no;
 				pair_undo_no = cell->undo_no;
 			}
 		}
-
-		if (n == n_used) {
-			*trx_no = pair_trx_no;
-			*undo_no = pair_undo_no;
-
-			return;
-		}
 	}
+
+	*trx_no = pair_trx_no;
+	*undo_no = pair_undo_no;
 }
 
 /****************************************************************//**
@@ -194,25 +215,27 @@ this query graph.
 @return	own: the query graph */
 static
 que_t*
-trx_purge_graph_build(void)
-/*=======================*/
+trx_purge_graph_build(
+/*==================*/
+	trx_t*		trx,			/*!< in: transaction */
+	ulint		n_purge_threads)	/*!< in: number of purge
+						threads */
 {
+	ulint		i;
 	mem_heap_t*	heap;
 	que_fork_t*	fork;
-	que_thr_t*	thr;
-	/*	que_thr_t*	thr2; */
 
 	heap = mem_heap_create(512);
 	fork = que_fork_create(NULL, NULL, QUE_FORK_PURGE, heap);
-	fork->trx = purge_sys->trx;
-
-	thr = que_thr_create(fork, heap);
+	fork->trx = trx;
 
-	thr->child = row_purge_node_create(thr, heap);
+	for (i = 0; i < n_purge_threads; ++i) {
+		que_thr_t*	thr;
 
-	/*	thr2 = que_thr_create(fork, fork, heap);
+		thr = que_thr_create(fork, heap);
 
-	thr2->child = row_purge_node_create(fork, thr2, heap);	 */
+		thr->child = row_purge_node_create(i, thr, heap);
+	}
 
 	return(fork);
 }
@@ -222,20 +245,13 @@ Creates the global purge system control 
 mutex. */
 UNIV_INTERN
 void
-trx_purge_sys_create(void)
-/*======================*/
+trx_purge_sys_create(
+/*=================*/
+	ulint	n_purge_threads)	/*!< in: number of purge threads */
 {
-	ut_ad(mutex_own(&kernel_mutex));
-
-	purge_sys = mem_alloc(sizeof(trx_purge_t));
-
-	purge_sys->state = TRX_STOP_PURGE;
-
-	purge_sys->n_pages_handled = 0;
+	mutex_enter(&kernel_mutex);
 
-	purge_sys->purge_trx_no = ut_dulint_zero;
-	purge_sys->purge_undo_no = ut_dulint_zero;
-	purge_sys->next_stored = FALSE;
+	purge_sys = mem_zalloc(sizeof(*purge_sys));
 
 	rw_lock_create(trx_purge_latch_key,
 		       &purge_sys->latch, SYNC_PURGE_LATCH);
@@ -243,9 +259,14 @@ trx_purge_sys_create(void)
 	mutex_create(purge_sys_mutex_key,
 		     &purge_sys->mutex, SYNC_PURGE_SYS);
 
+	purge_sys->purge_trx_no = ut_dulint_zero;
+
 	purge_sys->heap = mem_heap_create(256);
 
-	purge_sys->arr = trx_undo_arr_create();
+	/* Handle the case for the traditional mode. */
+	if (n_purge_threads == 0) {
+		n_purge_threads = 1;
+	}
 
 	purge_sys->sess = sess_open();
 
@@ -253,12 +274,17 @@ trx_purge_sys_create(void)
 
 	purge_sys->trx->is_purge = 1;
 
-	ut_a(trx_start_low(purge_sys->trx, ULINT_UNDEFINED));
+	purge_sys->arr = trx_undo_arr_create(n_purge_threads);
 
-	purge_sys->query = trx_purge_graph_build();
+	purge_sys->query = trx_purge_graph_build(
+		purge_sys->trx, n_purge_threads);
 
-	purge_sys->view = read_view_oldest_copy_or_open_new(ut_dulint_zero,
-							    purge_sys->heap);
+	purge_sys->view = read_view_oldest_copy_or_open_new(
+		ut_dulint_zero, purge_sys->heap);
+
+	ut_a(trx_start_low(purge_sys->trx, ULINT_UNDEFINED));
+
+	mutex_exit(&kernel_mutex);
 }
 
 /************************************************************************
@@ -273,8 +299,11 @@ trx_purge_sys_close(void)
 	que_graph_free(purge_sys->query);
 
 	ut_a(purge_sys->sess->trx->is_purge);
+
 	purge_sys->sess->trx->conc_state = TRX_NOT_STARTED;
+
 	sess_close(purge_sys->sess);
+
 	purge_sys->sess = NULL;
 
 	if (purge_sys->view != NULL) {
@@ -360,11 +389,12 @@ trx_purge_add_update_undo_to_history(
 	/* Add the log as the first in the history list */
 	flst_add_first(rseg_header + TRX_RSEG_HISTORY,
 		       undo_header + TRX_UNDO_HISTORY_NODE, mtr);
-	mutex_enter(&kernel_mutex);
-	trx_sys->rseg_history_len++;
-	mutex_exit(&kernel_mutex);
 
-	if (!(trx_sys->rseg_history_len % srv_purge_batch_size)) {
+	os_atomic_inc_ulint(&kernel_mutex, &trx_sys->rseg_history_len, 1);
+
+	if (trx_sys->rseg_history_len 
+	    > (srv_purge_batch_size * srv_n_purge_threads)) {
+
 		/* Inform the purge thread that there is work to do. */
 		srv_wake_purge_thread_if_not_active();
 	}
@@ -460,10 +490,8 @@ loop:
 	flst_cut_end(rseg_hdr + TRX_RSEG_HISTORY,
 		     log_hdr + TRX_UNDO_HISTORY_NODE, n_removed_logs, &mtr);
 
-	mutex_enter(&kernel_mutex);
-	ut_ad(trx_sys->rseg_history_len >= n_removed_logs);
-	trx_sys->rseg_history_len -= n_removed_logs;
-	mutex_exit(&kernel_mutex);
+	os_atomic_dec_ulint(
+		&kernel_mutex, &trx_sys->rseg_history_len, n_removed_logs);
 
 	freed = FALSE;
 
@@ -549,10 +577,10 @@ loop:
 	}
 
 	if (cmp >= 0) {
-		mutex_enter(&kernel_mutex);
-		ut_a(trx_sys->rseg_history_len >= n_removed_logs);
-		trx_sys->rseg_history_len -= n_removed_logs;
-		mutex_exit(&kernel_mutex);
+
+		os_atomic_dec_ulint(
+			&kernel_mutex, &trx_sys->rseg_history_len,
+		       	n_removed_logs);
 
 		flst_truncate_end(rseg_hdr + TRX_RSEG_HISTORY,
 				  log_hdr + TRX_UNDO_HISTORY_NODE,
@@ -609,10 +637,10 @@ trx_purge_truncate_history(void)
 	trx_id_t	limit_trx_no;
 	undo_no_t	limit_undo_no;
 
-	ut_ad(mutex_own(&(purge_sys->mutex)));
+	ut_ad(purge_mutex_own());
 
-	trx_purge_arr_get_biggest(purge_sys->arr, &limit_trx_no,
-				  &limit_undo_no);
+	trx_purge_arr_get_biggest(
+		purge_sys->arr, &limit_trx_no, &limit_undo_no);
 
 	if (ut_dulint_is_zero(limit_trx_no)) {
 
@@ -649,7 +677,7 @@ ibool
 trx_purge_truncate_if_arr_empty(void)
 /*=================================*/
 {
-	ut_ad(mutex_own(&(purge_sys->mutex)));
+	ut_ad(!purge_mutex_own());
 
 	if (purge_sys->arr->n_used == 0) {
 
@@ -873,15 +901,12 @@ trx_purge_get_next_rec(
 	trx_undo_rec_t*	rec;
 	trx_undo_rec_t*	rec_copy;
 	trx_undo_rec_t*	rec2;
-	trx_undo_rec_t*	next_rec;
 	page_t*		undo_page;
 	page_t*		page;
 	ulint		offset;
 	ulint		page_no;
 	ulint		space;
 	ulint		zip_size;
-	ulint		type;
-	ulint		cmpl_info;
 	mtr_t		mtr;
 
 	ut_ad(mutex_own(&(purge_sys->mutex)));
@@ -914,6 +939,10 @@ trx_purge_get_next_rec(
 	rec2 = rec;
 
 	for (;;) {
+		ulint		type;
+		trx_undo_rec_t*	next_rec;
+		ulint		cmpl_info;
+
 		/* Try first to find the next record which requires a purge
 		operation from the same page of the same undo log */
 
@@ -988,34 +1017,24 @@ Fetches the next undo log record from th
 released with the corresponding release function.
 @return copy of an undo log record or pointer to trx_purge_dummy_rec,
 if the whole undo log can skipped in purge; NULL if none left */
-UNIV_INTERN
+static
 trx_undo_rec_t*
 trx_purge_fetch_next_rec(
 /*=====================*/
-	roll_ptr_t*	roll_ptr,/*!< out: roll pointer to undo record */
-	trx_undo_inf_t** cell,	/*!< out: storage cell for the record in the
-				purge array */
-	mem_heap_t*	heap)	/*!< in: memory heap where copied */
+	roll_ptr_t*	roll_ptr,	/*!< out: roll pointer to undo record */
+	ulint		thread_id,	/*!< in: id of the calling thread */
+	mem_heap_t*	heap)		/*!< in: memory heap where copied */
 {
+	dulint		trx_id;
+	dulint		undo_no;
 	trx_undo_rec_t*	undo_rec;
 
-	mutex_enter(&(purge_sys->mutex));
-
-	if (purge_sys->state == TRX_STOP_PURGE) {
-		trx_purge_truncate_if_arr_empty();
-
-		mutex_exit(&(purge_sys->mutex));
-
-		return(NULL);
-	}
+	ut_ad(mutex_own(&purge_sys->mutex));
 
 	if (!purge_sys->next_stored) {
 		trx_purge_choose_next_log();
 
 		if (!purge_sys->next_stored) {
-			purge_sys->state = TRX_STOP_PURGE;
-
-			trx_purge_truncate_if_arr_empty();
 
 			if (srv_print_thread_releases) {
 				fprintf(stderr,
@@ -1024,30 +1043,15 @@ trx_purge_fetch_next_rec(
 					(ulong) purge_sys->n_pages_handled);
 			}
 
-			mutex_exit(&(purge_sys->mutex));
 
 			return(NULL);
 		}
 	}
 
-	if (purge_sys->n_pages_handled >= purge_sys->handle_limit) {
-
-		purge_sys->state = TRX_STOP_PURGE;
-
-		trx_purge_truncate_if_arr_empty();
-
-		mutex_exit(&(purge_sys->mutex));
-
-		return(NULL);
-	}
-
-	if (ut_dulint_cmp(purge_sys->purge_trx_no,
-			  purge_sys->view->low_limit_no) >= 0) {
-		purge_sys->state = TRX_STOP_PURGE;
-
-		trx_purge_truncate_if_arr_empty();
+	trx_id = purge_sys->purge_trx_no;
+	undo_no = purge_sys->purge_undo_no;
 
-		mutex_exit(&(purge_sys->mutex));
+	if (ut_dulint_cmp(trx_id, purge_sys->view->low_limit_no) >= 0) {
 
 		return(NULL);
 	}
@@ -1057,22 +1061,18 @@ trx_purge_fetch_next_rec(
 	ut_dulint_get_low(purge_sys->purge_trx_no),
 	ut_dulint_get_low(purge_sys->purge_undo_no)); */
 
-	*roll_ptr = trx_undo_build_roll_ptr(FALSE, (purge_sys->rseg)->id,
-					    purge_sys->page_no,
-					    purge_sys->offset);
+	*roll_ptr = trx_undo_build_roll_ptr(
+		FALSE, purge_sys->rseg->id,
+		purge_sys->page_no, purge_sys->offset);
 
-	*cell = trx_purge_arr_store_info(purge_sys->purge_trx_no,
-					 purge_sys->purge_undo_no);
-
-	ut_ad(ut_dulint_cmp(purge_sys->purge_trx_no,
-			    (purge_sys->view)->low_limit_no) < 0);
+	ut_ad(ut_dulint_cmp(trx_id, purge_sys->view->low_limit_no) < 0);
 
 	/* The following call will advance the stored values of purge_trx_no
 	and purge_undo_no, therefore we had to store them first */
 
 	undo_rec = trx_purge_get_next_rec(heap);
 
-	mutex_exit(&(purge_sys->mutex));
+	trx_purge_arr_set_info(thread_id, trx_id, undo_no);
 
 	return(undo_rec);
 }
@@ -1085,15 +1085,73 @@ trx_purge_rec_release(
 /*==================*/
 	trx_undo_inf_t*	cell)	/*!< in: storage cell */
 {
-	trx_undo_arr_t*	arr;
+	trx_purge_arr_clear_info(cell);
+}
 
-	mutex_enter(&(purge_sys->mutex));
+/***********************************************************//**
+Fetches an undo log record into the purge nodes in the query graph. */
+static
+void
+trx_purge_attach_undo_recs(
+/*=======================*/
+	ulint		limit)		/*!< no. of records to purge */
+{
+	que_thr_t*	thr;
 
-	arr = purge_sys->arr;
+	ut_ad(mutex_own(&purge_sys->mutex));
 
-	trx_purge_arr_remove_info(cell);
+	thr = UT_LIST_GET_FIRST(purge_sys->query->thrs);
 
-	mutex_exit(&(purge_sys->mutex));
+	while (thr) {
+		purge_node_t*	node;
+
+		ut_a(!thr->is_active);
+
+		/* Get the purge node. */
+		node = thr->child;
+
+		ut_a(que_node_get_type(node) == QUE_NODE_PURGE);
+		ut_a(node->undo_recs == NULL);
+
+		node->undo_recs = ib_vector_create(node->heap, limit);
+
+		thr = UT_LIST_GET_NEXT(thrs, thr);
+	}
+
+	thr = UT_LIST_GET_FIRST(purge_sys->query->thrs);
+
+	while (thr) {
+		purge_node_t*	node;
+		trx_purge_rec_t*purge_rec;
+
+		ut_a(!thr->is_active);
+
+		/* Get the purge node. */
+		node = thr->child;
+		ut_a(que_node_get_type(node) == QUE_NODE_PURGE);
+
+		purge_rec = mem_heap_zalloc(node->heap, sizeof(*purge_rec));
+
+		purge_rec->undo_rec = trx_purge_fetch_next_rec(
+			&purge_rec->roll_ptr, node->thread_id, node->heap);
+
+		if (purge_rec->undo_rec != NULL) {
+
+			ib_vector_push(node->undo_recs, purge_rec);
+
+			if (ib_vector_size(node->undo_recs) >= limit) {
+				break;
+			}
+		} else {
+			break;
+		}
+
+		thr = UT_LIST_GET_NEXT(thrs, thr);
+
+		if (thr == NULL) {
+			thr = UT_LIST_GET_FIRST(purge_sys->query->thrs);
+		}
+	}
 }
 
 /*******************************************************************//**
@@ -1103,27 +1161,45 @@ UNIV_INTERN
 ulint
 trx_purge(
 /*======*/
-	ulint	limit)		/*!< in: the maximum number of records to
-				purge in one batch */
+	ulint	n_purge_threads,	/*!< in: number of purge tasks
+					to submit to the work queue */
+	ulint	limit)			/*!< in: the maximum number of records
+					to purge in one batch */
 {
-	que_thr_t*	thr;
-	/*	que_thr_t*	thr2; */
-	ulint		old_pages_handled;
+	ut_a(purge_sys->n_submitted >= purge_sys->n_completed);
 
-	mutex_enter(&(purge_sys->mutex));
+	mutex_enter(&purge_sys->mutex);
 
-	if (purge_sys->trx->n_active_thrs > 0) {
+	mutex_enter(&kernel_mutex);
 
-		mutex_exit(&(purge_sys->mutex));
+	/* Ensure that the work queue empties out. Note that we also
+	check the active query thread count. This is because the
+	completed count decreases before the query fully completes. */
+	while (purge_sys->n_submitted > purge_sys->n_completed
+	       || purge_sys->query->n_active_thrs > 0) {
+		ulint	n_tasks;
 
-		/* Should not happen */
+		mutex_exit(&purge_sys->mutex);
 
-		ut_error;
+		mutex_exit(&kernel_mutex);
 
-		return(0);
-	}
+		n_tasks = srv_get_task_queue_length();
+
+		if (n_tasks > 0) {
+			srv_wake_worker_threads(n_tasks);
+		}
+
+ 		mutex_enter(&purge_sys->mutex);
+
+		mutex_enter(&kernel_mutex);
+  	}
+
+	mutex_exit(&kernel_mutex);
+
+	ut_a(srv_get_task_queue_length() == 0);
+	ut_a(purge_sys->n_submitted == purge_sys->n_completed);
 
-	rw_lock_x_lock(&(purge_sys->latch));
+	rw_lock_x_lock(&purge_sys->latch);
 
 	mutex_enter(&kernel_mutex);
 
@@ -1157,50 +1233,76 @@ trx_purge(
 		}
 	}
 
-	purge_sys->view = read_view_oldest_copy_or_open_new(ut_dulint_zero,
-							    purge_sys->heap);
+	purge_sys->view = read_view_oldest_copy_or_open_new(
+		ut_dulint_zero, purge_sys->heap);
+
 	mutex_exit(&kernel_mutex);
 
-	rw_lock_x_unlock(&(purge_sys->latch));
+	purge_sys->n_pages_handled_start = purge_sys->n_pages_handled;
 
-	purge_sys->state = TRX_PURGE_ON;
+	/* This is to avoid calling truncate the first time. */
+	if (purge_sys->n_completed > 0) {
+		trx_purge_truncate_if_arr_empty();
+	}
 
-	purge_sys->handle_limit = purge_sys->n_pages_handled + limit;
+	trx_purge_attach_undo_recs(limit);
+
+	rw_lock_x_unlock(&purge_sys->latch);
 
-	old_pages_handled = purge_sys->n_pages_handled;
+	purge_sys->handle_limit = purge_sys->n_pages_handled + limit;
 
 	mutex_exit(&(purge_sys->mutex));
 
-	mutex_enter(&kernel_mutex);
+	/* Do we do an asynchronous purge or not ? */
+	if (n_purge_threads > 0) {
+		ulint		i = 0;
+		que_thr_t*	thr = NULL;
 
-	thr = que_fork_start_command(purge_sys->query);
+		ut_a(purge_sys->n_submitted == purge_sys->n_completed);
 
-	ut_ad(thr);
+		for (i = 0; i < n_purge_threads; ++i) {
 
-	/*	thr2 = que_fork_start_command(purge_sys->query);
+			mutex_enter(&kernel_mutex);
 
-	ut_ad(thr2); */
+			thr = que_fork_scheduler_round_robin(
+				purge_sys->query, thr);
 
+			mutex_exit(&kernel_mutex);
 
-	mutex_exit(&kernel_mutex);
+			ut_a(thr != NULL);
 
-	/*	srv_que_task_enqueue(thr2); */
+			os_atomic_increment(&purge_sys->n_submitted, 1);
 
-	if (srv_print_thread_releases) {
+			srv_que_task_enqueue_low(thr);
+		}
 
-		fputs("Starting purge\n", stderr);
-	}
+	/* Do it synchronously. */
+	} else {
+		que_thr_t*	thr;
 
-	que_run_threads(thr);
+		ut_a(purge_sys->n_submitted == purge_sys->n_completed);
 
-	if (srv_print_thread_releases) {
+		thr = que_fork_start_command(purge_sys->query);
+		ut_ad(thr);
 
-		fprintf(stderr,
-			"Purge ends; pages handled %lu\n",
-			(ulong) purge_sys->n_pages_handled);
-	}
+		os_atomic_increment(&purge_sys->n_submitted, 1);
+
+		que_run_threads(thr);
+
+		if (srv_print_thread_releases) {
+ 
+			fputs("Starting purge\n", stderr);
+		}
+
+		if (srv_print_thread_releases) {
+
+			fprintf(stderr,
+				"Purge ends; pages handled %lu\n",
+				(ulong) purge_sys->n_pages_handled);
+		}
+  	}
 
-	return(purge_sys->n_pages_handled - old_pages_handled);
+	return(purge_sys->n_pages_handled - purge_sys->n_pages_handled_start);
 }
 
 /******************************************************************//**

=== modified file 'storage/innobase/trx/trx0roll.c'
--- a/storage/innobase/trx/trx0roll.c	revid:sunny.bains@stripped
+++ b/storage/innobase/trx/trx0roll.c	revid:sunny.bains@stripped
@@ -633,28 +633,22 @@ Creates an undo number array.
 @return	own: undo number array */
 UNIV_INTERN
 trx_undo_arr_t*
-trx_undo_arr_create(void)
-/*=====================*/
+trx_undo_arr_create(
+/*================*/
+	ulint		n_cells)	/*!< Number of cells */
 {
 	trx_undo_arr_t*	arr;
 	mem_heap_t*	heap;
-	ulint		i;
 
 	heap = mem_heap_create(1024);
 
-	arr = mem_heap_alloc(heap, sizeof(trx_undo_arr_t));
-
-	arr->infos = mem_heap_alloc(heap, sizeof(trx_undo_inf_t)
-				    * UNIV_MAX_PARALLELISM);
-	arr->n_cells = UNIV_MAX_PARALLELISM;
-	arr->n_used = 0;
+	arr = mem_heap_zalloc(heap, sizeof(trx_undo_arr_t));
 
-	arr->heap = heap;
+	arr->infos = mem_heap_zalloc(heap, sizeof(*arr->infos) * n_cells);
 
-	for (i = 0; i < UNIV_MAX_PARALLELISM; i++) {
+	arr->n_cells = n_cells;
 
-		(trx_undo_arr_get_nth_info(arr, i))->in_use = FALSE;
-	}
+	arr->heap = heap;
 
 	return(arr);
 }
@@ -1110,7 +1104,7 @@ trx_rollback(
 	trx->pages_undone = 0;
 
 	if (trx->undo_no_arr == NULL) {
-		trx->undo_no_arr = trx_undo_arr_create();
+		trx->undo_no_arr = trx_undo_arr_create(UNIV_MAX_PARALLELISM);
 	}
 
 	/* Build a 'query' graph which will perform the undo operations */

=== modified file 'storage/innobase/trx/trx0sys.c'
--- a/storage/innobase/trx/trx0sys.c	revid:sunny.bains@stripped
+++ b/storage/innobase/trx/trx0sys.c	revid:sunny.bains@stripped
@@ -1022,8 +1022,6 @@ trx_sys_init_at_db_start(void)
 
 	UT_LIST_INIT(trx_sys->view_list);
 
-	trx_purge_sys_create();
-
 	mutex_exit(&kernel_mutex);
 
 	mtr_commit(&mtr);
@@ -1043,8 +1041,6 @@ trx_sys_create(void)
 	trx_sysf_create(&mtr);
 
 	mtr_commit(&mtr);
-
-	trx_sys_init_at_db_start();
 }
 
 /*****************************************************************//**


Attachment: [text/bzr-bundle] bzr/sunny.bains@oracle.com-20100428072944-10hz7ds7j6941gg5.bundle
Thread
bzr commit into mysql-trunk-innodb branch (Sunny.Bains:3043) Sunny Bains28 Apr