List:Commits« Previous MessageNext Message »
From:ingo Date:April 26 2006 10:44am
Subject:bk commit into 5.0 tree (ingo:1.2052) BUG#17332
View as plain text  
Below is the list of changes that have just been committed into a local
5.0 repository of mydev. When mydev does a push these changes will
be propagated to the main repository and, within 24 hours after the
push, to the public repository.
For information on how to access the public repository
see http://dev.mysql.com/doc/mysql/en/installing-source-tree.html

ChangeSet
  1.2052 06/04/26 12:43:48 ingo@stripped +2 -0
  Bug#17332 - changing key_buffer_size on a running server can crash under load
  Not to be pushed. It contains a lot of experimental code.
  
  The key cache had a couple of race conditions for "on-line"
  resize.
  
  I rewrote parts of the locking scheme. This fixes a lot of
  crashes. But now I saw index corruptions. Hence, this patch
  is not yet complete. It is just a preview for further testing.

  mysys/mf_keycache.c
    1.56 06/04/26 12:43:36 ingo@stripped +965 -254
    Bug#17332 - changing key_buffer_size on a running server can crash under load
    Changed and added comments, removed unused defines,
    added new debugging features, fixed queue/mutex handling
    so that a common scheme is followed. See the new big comment
    at top of the file. Consequently created a new queue for
    the resizer to wait in for others to leave the cache after
    flushing.
    Made small style fixes. Initialized block pointers on unlink.
    Added checks if other threads freed a block where necessary.
    Return a changed block during resize to avoid its contents
    to be lost if writer wants to replace just a part of the block
    or that the meanwhile obsolete contents gets flushed later.
    Don't free reassigned blocks if they are tried to write
    during resize. Don't accept new blocks in the cache during
    resize. Added a new block status flag BLOCK_IN_USE to
    distinguish between freshly initialized and free blocks.
    Always mark a block BLOCK_IN_FLUSH before releasing the
    mutex for writing. Clear flags and wake waiters after the 
    write. Added handling for NULL return of find_key_block()
    in key_cache_read(). This can now happen during resize.
    Fixed offset usage in key_cache_read() to be symmetric to
    key_cache_insert() and key_cache_write(). Added handling 
    for NULL return of find_key_block() in key_cache_insert().
    Fixed offset usage in key_cache_write() to be symmetric to
    key_cache_read() and key_cache_insert(). Fixed removal from
    the hash_link in key_cache_write().
    In free_block() be careful not to try to free already freed
    pointers of the block. (Re-)initialize more pointers to avoid
    a free block pointing into arbitrary queues.
    Fixed the use of the BLOCK_IN_FLUSH flag at some places.
    Always set it before going to write and clear it after write.
    In flush_cached_blocks() do not free every block immediately 
    after write. They may be used for further reading until all
    blocks are flushed.
    In flush_key_blocks_int() do not try to flush blocks that
    are already BLOCK_IN_FLUSH. Mark BLOCK_IN_FLUSH immediately
    when selecting blocks for flush. Restarting flushing until
    no more blocks for flush are found. While writing, more blocks
    might become dirty.
    Changed flush_all_key_blocks() so that it loops over all files
    until no more blocks for flush are found.

  include/keycache.h
    1.7 06/04/26 12:43:36 ingo@stripped +1 -0
    Bug#17332 - changing key_buffer_size on a running server can crash under load
    Added the 'resizer' queue for the resizer to wait
    for others to leave the cache after flushing.

# This is a BitKeeper patch.  What follows are the unified diffs for the
# set of deltas contained in the patch.  The rest of the patch, the part
# that BitKeeper cares about, is below these diffs.
# User:	ingo
# Host:	chilla.local
# Root:	/home/mydev/mysql-5.0-bug17332

--- 1.55/mysys/mf_keycache.c	2005-10-11 23:58:17 +02:00
+++ 1.56/mysys/mf_keycache.c	2006-04-26 12:43:36 +02:00
@@ -26,7 +26,7 @@
   When a new block is required it is first tried to pop one from the stack.
   If the stack is empty, it is tried to get a never-used block from the pool.
   If this is empty too, then a block is taken from the LRU ring, flushing it
-  to disk, if neccessary. This is handled in find_key_block().
+  to file, if neccessary. This is handled in find_key_block().
   With the new free list, the blocks can have three temperatures:
   hot, warm and cold (which is free). This is remembered in the block header
   by the enum BLOCK_TEMPERATURE temperature variable. Remembering the
@@ -37,6 +37,41 @@
   blocks_unused is the sum of never used blocks in the pool and of currently
   free blocks. blocks_used is the number of blocks fetched from the pool and
   as such gives the maximum number of in-use blocks at any time.
+
+  Key Cache Locking
+  =================
+
+  All key cache locking is done with a single mutex per key cache:
+  keycache->cache_lock. This mutex is locked almost all the time.
+  However it is released for I/O and some copy operations.
+
+  The cache_lock is also released when waiting for some event. Waiting
+  and signalling is done via condition variables. In most cases the
+  thread waits on its thread->suspend condition variable. Every thread
+  has a my_thread_var structure, which contains this variable and a
+  'next' and 'prev' pointer. These pointers are used to insert the
+  thread into a wait queue.
+
+  Since there is only one pair of queue pointers per thread, a thread
+  can be part of one wait queue only.
+
+  Before starting to wait on its condition variale with
+  pthread_cond_wait(), the thread enters itself to a specific wait queue
+  with link_into_queue() (double linked with 'next' + 'prev') or
+  add_to_queue() (single linked with 'next').
+
+  Another thread, when releasing a resource, looks up the waiting thread
+  in the related wait queue. It sends a signal with
+  pthread_cond_signal() to the waiting thread and removes it from the
+  wait queue with unlink_from_queue() or release_queue() respectively.
+
+  There is one exception from this locking scheme. Each block has a
+  reference to a condition variable (condvar). It holds a reference to
+  the thread->suspend condition variable, if that thread is waiting for
+  the block. When that thread is signalled, the reference is cleared.
+  This is similar to the above, but it clearly means that only one
+  thread can wait for a particular block. There is no queue in this
+  case.
 */
 
 #include "mysys_priv.h"
@@ -98,7 +133,6 @@
 /* types of condition variables */
 #define  COND_FOR_REQUESTED 0
 #define  COND_FOR_SAVED     1
-#define  COND_FOR_READERS   2
 
 typedef pthread_cond_t KEYCACHE_CONDVAR;
 
@@ -120,12 +154,13 @@
 };
 
 /* simple states of a block */
-#define BLOCK_ERROR       1   /* an error occured when performing disk i/o   */
+#define BLOCK_ERROR       1   /* an error occured when performing file i/o   */
 #define BLOCK_READ        2   /* the is page in the block buffer             */
 #define BLOCK_IN_SWITCH   4   /* block is preparing to read new page         */
 #define BLOCK_REASSIGNED  8   /* block does not accept requests for old page */
 #define BLOCK_IN_FLUSH   16   /* block is in flush operation                 */
 #define BLOCK_CHANGED    32   /* block buffer contains a dirty page          */
+#define BLOCK_IN_USE     64   /* block buffer is in use (not free)           */
 
 /* page status, returned by find_key_block */
 #define PAGE_READ               0
@@ -262,6 +297,164 @@
 #define keycache_pthread_cond_signal pthread_cond_signal
 #endif /* defined(KEYCACHE_DEBUG) */
 
+#ifdef EXTRA_DEBUG
+static void check_free_block_list(KEY_CACHE *keycache, BLOCK_LINK *block);
+#define CHECK_FREE_BLOCK_LIST(_k_, _b_)    check_free_block_list((_k_), (_b_))
+#else
+#define CHECK_FREE_BLOCK_LIST(_k_, _b_)    do{/*empty*/;}while(0)
+#endif
+
+static uint keycache_stats_link_block_lru_became_nonempty;
+static uint keycache_stats_unlink_block;
+static uint keycache_stats_unlink_block_lru_became_empty;
+#ifdef NOTUSED
+static uint keycache_stats_unlink_block_noop;
+#endif
+static uint keycache_stats_get_hash_link_enter;
+static uint keycache_stats_get_hash_link_next;
+static uint keycache_stats_get_hash_link_free;
+static uint keycache_stats_get_hash_link_new;
+static uint keycache_stats_get_hash_link_wait;
+static uint keycache_stats_find_key_block_enter;
+static uint keycache_stats_find_key_block_hit;
+static uint keycache_stats_find_key_block_resize_nowrite;
+static uint keycache_stats_find_key_block_resize_changed;
+static uint keycache_stats_find_key_block_resize_free_noflushed;
+static uint keycache_stats_find_key_block_resize_reassigned_noflushed;
+static uint keycache_stats_find_key_block_resize_wait;
+static uint keycache_stats_find_key_block_resize_free_flushed;
+static uint keycache_stats_find_key_block_resize_switching_flushed;
+static uint keycache_stats_find_key_block_resize_reassigned_flushed;
+static uint keycache_stats_find_key_block_hit_read_switching;
+static uint keycache_stats_find_key_block_hit_wait_reassigned_or_switching;
+static uint keycache_stats_find_key_block_miss_resize_in_flush;
+static uint keycache_stats_find_key_block_miss_free_block_list;
+static uint keycache_stats_find_key_block_miss_block_root;
+static uint keycache_stats_find_key_block_miss_wait_insert_lru;
+static uint keycache_stats_find_key_block_miss_take_lru;
+static uint keycache_stats_find_key_block_miss_switching;
+static uint keycache_stats_find_key_block_miss_write_changed;
+static uint keycache_stats_find_key_block_miss_wake_changed;
+static uint keycache_stats_find_key_block_miss_wait_for_readers;
+static uint keycache_stats_find_key_block_miss_unlink_hash;
+static uint keycache_stats_find_key_block_miss_release_queue;
+static uint keycache_stats_find_key_block_miss_block_initialized;
+static uint keycache_stats_find_key_block_miss_page_read;
+static uint keycache_stats_find_key_block_miss_page_wait_to_be_read;
+static uint keycache_stats_find_key_block_hit_page_read;
+static uint keycache_stats_find_key_block_hit_page_wait_to_be_read;
+static uint keycache_stats_read_block_primary;
+static uint keycache_stats_read_block_primary_wake;
+static uint keycache_stats_read_block_secondary_wait;
+static uint keycache_stats_key_cache_read_null_block;
+static uint keycache_stats_key_cache_read_read_block;
+static uint keycache_stats_key_cache_read_error;
+static uint keycache_stats_key_cache_read_copied;
+static uint keycache_stats_key_cache_read_no_cache;
+static uint keycache_stats_key_cache_insert_null_block;
+static uint keycache_stats_key_cache_insert_copied;
+static uint keycache_stats_key_cache_write_null_block;
+static uint keycache_stats_key_cache_write_read_block;
+static uint keycache_stats_key_cache_write_have_block;
+static uint keycache_stats_key_cache_write_changed;
+static uint keycache_stats_key_cache_write_copied;
+static uint keycache_stats_key_cache_write_error;
+static uint keycache_stats_key_cache_write_no_cache;
+static uint keycache_stats_free_block;
+static uint keycache_stats_free_block_wait_for_readers;
+static uint keycache_stats_free_block_unlink_hash;
+static uint keycache_stats_free_block_release_queue;
+static uint keycache_stats_flush_cached_blocks;
+static uint keycache_stats_flush_cached_blocks_write;
+static uint keycache_stats_flush_cached_blocks_error;
+static uint keycache_stats_flush_cached_blocks_release_queue;
+static uint keycache_stats_flush_key_blocks_int;
+static uint keycache_stats_flush_key_blocks_int_wait_flushed;
+static uint keycache_stats_flush_key_blocks_int_wait_switching;
+static uint keycache_stats_flush_key_blocks_int_freed_unchanged;
+static uint keycache_stats_flush_key_blocks;
+static uint keycache_stats_flush_all_key_blocks;
+static uint keycache_stats_flush_all_key_blocks_try;
+static uint keycache_stats_flush_all_key_blocks_scan_changed;
+
+static void print_stats(void)
+{
+  printf("%-68s %10u\n", "keycache_stats_link_block_lru_became_nonempty", keycache_stats_link_block_lru_became_nonempty);
+  printf("%-68s %10u\n", "keycache_stats_unlink_block", keycache_stats_unlink_block);
+  printf("%-68s %10u\n", "keycache_stats_unlink_block_lru_became_empty", keycache_stats_unlink_block_lru_became_empty);
+#ifdef NOTUSED
+  printf("%-68s %10u\n", "keycache_stats_unlink_block_noop", keycache_stats_unlink_block_noop);
+#endif
+  printf("%-68s %10u\n", "keycache_stats_get_hash_link_enter", keycache_stats_get_hash_link_enter);
+  printf("%-68s %10u\n", "keycache_stats_get_hash_link_next", keycache_stats_get_hash_link_next);
+  printf("%-68s %10u\n", "keycache_stats_get_hash_link_free", keycache_stats_get_hash_link_free);
+  printf("%-68s %10u\n", "keycache_stats_get_hash_link_new", keycache_stats_get_hash_link_new);
+  printf("%-68s %10u\n", "keycache_stats_get_hash_link_wait", keycache_stats_get_hash_link_wait);
+  printf("%-68s %10u\n", "keycache_stats_find_key_block_enter", keycache_stats_find_key_block_enter);
+  printf("%-68s %10u\n", "keycache_stats find_key_block restarted", keycache_stats_get_hash_link_enter - keycache_stats_find_key_block_enter);
+  printf("%-68s %10u\n", "keycache_stats_find_key_block_hit", keycache_stats_find_key_block_hit);
+  printf("%-68s %10u\n", "keycache_stats find_key_block miss", keycache_stats_get_hash_link_enter - keycache_stats_find_key_block_hit);
+  printf("%-68s %10u\n", "keycache_stats_find_key_block_resize_nowrite", keycache_stats_find_key_block_resize_nowrite);
+  printf("%-68s %10u\n", "keycache_stats_find_key_block_resize_changed", keycache_stats_find_key_block_resize_changed);
+  printf("%-68s %10u\n", "keycache_stats_find_key_block_resize_free_noflushed", keycache_stats_find_key_block_resize_free_noflushed);
+  printf("%-68s %10u\n", "keycache_stats_find_key_block_resize_reassigned_noflushed", keycache_stats_find_key_block_resize_reassigned_noflushed);
+  printf("%-68s %10u\n", "keycache_stats_find_key_block_resize_wait", keycache_stats_find_key_block_resize_wait);
+  printf("%-68s %10u\n", "keycache_stats_find_key_block_resize_free_flushed", keycache_stats_find_key_block_resize_free_flushed);
+  printf("%-68s %10u\n", "keycache_stats_find_key_block_resize_switching_flushed", keycache_stats_find_key_block_resize_switching_flushed);
+  printf("%-68s %10u\n", "keycache_stats_find_key_block_resize_reassigned_flushed", keycache_stats_find_key_block_resize_reassigned_flushed);
+  printf("%-68s %10u\n", "keycache_stats_find_key_block_hit_read_switching", keycache_stats_find_key_block_hit_read_switching);
+  printf("%-68s %10u\n", "keycache_stats_find_key_block_hit_wait_reassigned_or_switching", keycache_stats_find_key_block_hit_wait_reassigned_or_switching);
+  printf("%-68s %10u\n", "keycache_stats_find_key_block_miss_resize_in_flush", keycache_stats_find_key_block_miss_resize_in_flush);
+  printf("%-68s %10u\n", "keycache_stats_find_key_block_miss_free_block_list", keycache_stats_find_key_block_miss_free_block_list);
+  printf("%-68s %10u\n", "keycache_stats_find_key_block_miss_block_root", keycache_stats_find_key_block_miss_block_root);
+  printf("%-68s %10u\n", "keycache_stats_find_key_block_miss_wait_insert_lru", keycache_stats_find_key_block_miss_wait_insert_lru);
+  printf("%-68s %10u\n", "keycache_stats_find_key_block_miss_take_lru", keycache_stats_find_key_block_miss_take_lru);
+  printf("%-68s %10u\n", "keycache_stats_find_key_block_miss_switching", keycache_stats_find_key_block_miss_switching);
+  printf("%-68s %10u\n", "keycache_stats_find_key_block_miss_write_changed", keycache_stats_find_key_block_miss_write_changed);
+  printf("%-68s %10u\n", "keycache_stats_find_key_block_miss_wake_changed", keycache_stats_find_key_block_miss_wake_changed);
+  printf("%-68s %10u\n", "keycache_stats_find_key_block_miss_wait_for_readers", keycache_stats_find_key_block_miss_wait_for_readers);
+  printf("%-68s %10u\n", "keycache_stats_find_key_block_miss_unlink_hash", keycache_stats_find_key_block_miss_unlink_hash);
+  printf("%-68s %10u\n", "keycache_stats_find_key_block_miss_release_queue", keycache_stats_find_key_block_miss_release_queue);
+  printf("%-68s %10u\n", "keycache_stats_find_key_block_miss_block_initialized", keycache_stats_find_key_block_miss_block_initialized);
+  printf("%-68s %10u\n", "keycache_stats_find_key_block_miss_page_read", keycache_stats_find_key_block_miss_page_read);
+  printf("%-68s %10u\n", "keycache_stats_find_key_block_miss_page_wait_to_be_read", keycache_stats_find_key_block_miss_page_wait_to_be_read);
+  printf("%-68s %10u\n", "keycache_stats_find_key_block_hit_page_read", keycache_stats_find_key_block_hit_page_read);
+  printf("%-68s %10u\n", "keycache_stats_find_key_block_hit_page_wait_to_be_read", keycache_stats_find_key_block_hit_page_wait_to_be_read);
+  printf("%-68s %10u\n", "keycache_stats_read_block_primary", keycache_stats_read_block_primary);
+  printf("%-68s %10u\n", "keycache_stats_read_block_primary_wake", keycache_stats_read_block_primary_wake);
+  printf("%-68s %10u\n", "keycache_stats_read_block_secondary_wait", keycache_stats_read_block_secondary_wait);
+  printf("%-68s %10u\n", "keycache_stats_key_cache_read_null_block", keycache_stats_key_cache_read_null_block);
+  printf("%-68s %10u\n", "keycache_stats_key_cache_read_read_block", keycache_stats_key_cache_read_read_block);
+  printf("%-68s %10u\n", "keycache_stats_key_cache_read_error", keycache_stats_key_cache_read_error);
+  printf("%-68s %10u\n", "keycache_stats_key_cache_read_copied", keycache_stats_key_cache_read_copied);
+  printf("%-68s %10u\n", "keycache_stats_key_cache_read_no_cache", keycache_stats_key_cache_read_no_cache);
+  printf("%-68s %10u\n", "keycache_stats_key_cache_insert_null_block", keycache_stats_key_cache_insert_null_block);
+  printf("%-68s %10u\n", "keycache_stats_key_cache_insert_copied", keycache_stats_key_cache_insert_copied);
+  printf("%-68s %10u\n", "keycache_stats_key_cache_write_null_block", keycache_stats_key_cache_write_null_block);
+  printf("%-68s %10u\n", "keycache_stats_key_cache_write_read_block", keycache_stats_key_cache_write_read_block);
+  printf("%-68s %10u\n", "keycache_stats_key_cache_write_have_block", keycache_stats_key_cache_write_have_block);
+  printf("%-68s %10u\n", "keycache_stats_key_cache_write_changed", keycache_stats_key_cache_write_changed);
+  printf("%-68s %10u\n", "keycache_stats_key_cache_write_copied", keycache_stats_key_cache_write_copied);
+  printf("%-68s %10u\n", "keycache_stats_key_cache_write_error", keycache_stats_key_cache_write_error);
+  printf("%-68s %10u\n", "keycache_stats_key_cache_write_no_cache", keycache_stats_key_cache_write_no_cache);
+  printf("%-68s %10u\n", "keycache_stats_free_block", keycache_stats_free_block);
+  printf("%-68s %10u\n", "keycache_stats_free_block_wait_for_readers", keycache_stats_free_block_wait_for_readers);
+  printf("%-68s %10u\n", "keycache_stats_free_block_unlink_hash", keycache_stats_free_block_unlink_hash);
+  printf("%-68s %10u\n", "keycache_stats_free_block_release_queue", keycache_stats_free_block_release_queue);
+  printf("%-68s %10u\n", "keycache_stats_flush_cached_blocks", keycache_stats_flush_cached_blocks);
+  printf("%-68s %10u\n", "keycache_stats_flush_cached_blocks_write", keycache_stats_flush_cached_blocks_write);
+  printf("%-68s %10u\n", "keycache_stats_flush_cached_blocks_error", keycache_stats_flush_cached_blocks_error);
+  printf("%-68s %10u\n", "keycache_stats_flush_cached_blocks_release_queue", keycache_stats_flush_cached_blocks_release_queue);
+  printf("%-68s %10u\n", "keycache_stats_flush_key_blocks_int", keycache_stats_flush_key_blocks_int);
+  printf("%-68s %10u\n", "keycache_stats_flush_key_blocks_int_wait_flushed", keycache_stats_flush_key_blocks_int_wait_flushed);
+  printf("%-68s %10u\n", "keycache_stats_flush_key_blocks_int_wait_switching", keycache_stats_flush_key_blocks_int_wait_switching);
+  printf("%-68s %10u\n", "keycache_stats_flush_key_blocks_int_freed_unchanged", keycache_stats_flush_key_blocks_int_freed_unchanged);
+  printf("%-68s %10u\n", "keycache_stats_flush_key_blocks", keycache_stats_flush_key_blocks);
+  printf("%-68s %10u\n", "keycache_stats_flush_all_key_blocks", keycache_stats_flush_all_key_blocks);
+  printf("%-68s %10u\n", "keycache_stats_flush_all_key_blocks_try", keycache_stats_flush_all_key_blocks_try);
+  printf("%-68s %10u\n", "keycache_stats_flush_all_key_blocks_scan_changed", keycache_stats_flush_all_key_blocks_scan_changed);
+}
+
 static uint next_power(uint value)
 {
   uint old_value= 1;
@@ -485,7 +678,6 @@
 {
   int blocks;
   struct st_my_thread_var *thread;
-  KEYCACHE_WQUEUE *wqueue;
   DBUG_ENTER("resize_key_cache");
 
   if (!keycache->key_cache_inited)
@@ -501,13 +693,13 @@
   keycache_pthread_mutex_lock(&keycache->cache_lock);
 
 #ifdef THREAD
-  wqueue= &keycache->resize_queue;
   thread= my_thread_var;
-  link_into_queue(wqueue, thread);
-
-  while (wqueue->last_thread->next != thread)
+  while (keycache->resize_in_flush)
   {
+    /* Wait for other resizes to finish. */
+    link_into_queue(&keycache->resize_queue, thread);
     keycache_pthread_cond_wait(&thread->suspend, &keycache->cache_lock);
+    /* The signalling thread unlinks this thread from the queue. */
   }
 #endif
 
@@ -523,11 +715,18 @@
   keycache->resize_in_flush= 0;
   keycache->can_be_used= 0;
 #ifdef THREAD
+  /*
+    Wait until all users of the cache have left before freeing all data
+    structures. Use the resizer "queue" for waiting. There should be
+    only one entry in the "queue". dec_counter_for_resize_op() will wake us.
+  */
   while (keycache->cnt_for_resize_op)
   {
     KEYCACHE_DBUG_PRINT("resize_key_cache: wait",
                         ("suspend thread %ld", thread->id));
+    link_into_queue(&keycache->resizer, thread);
     keycache_pthread_cond_wait(&thread->suspend, &keycache->cache_lock);
+    /* The signalling thread unlinks this thread from the queue. */
   }
 #else
   KEYCACHE_DBUG_ASSERT(keycache->cnt_for_resize_op == 0);
@@ -540,43 +739,86 @@
 
 finish:
 #ifdef THREAD
-  unlink_from_queue(wqueue, thread);
   /* Signal for the next resize request to proceeed if any */
-  if (wqueue->last_thread)
+  if ((thread= keycache->resize_queue.last_thread))
   {
+    thread= thread->next;
     KEYCACHE_DBUG_PRINT("resize_key_cache: signal",
-                        ("thread %ld", wqueue->last_thread->next->id));
-    keycache_pthread_cond_signal(&wqueue->last_thread->next->suspend);
+                        ("thread %ld", thread->id));
+    keycache_pthread_cond_signal(&thread->suspend);
+    unlink_from_queue(&keycache->resize_queue, thread);
   }
 #endif
   keycache_pthread_mutex_unlock(&keycache->cache_lock);
-  return blocks;
+  DBUG_RETURN(blocks);
 }
 
 
 /*
-  Increment counter blocking resize key cache operation
+  Increment counter blocking resize key cache operation to finish.
+
+  SYNOPSIS
+    inc_counter_for_resize_op()
+      keycache                  The key cache.
+
+  DESCRIPTION
+    This is used whenever key_cache_read(), key_cache_insert(), or
+    key_cache_write() want to copy data to/from a cached block.
+    They release the cache_lock while copying or even reading or
+    writing the file. The resize counter tells resize_key_cache()
+    that someone is still using a cached block. The data structures
+    must not yet be freed.
+    flush_key_blocks() increments the counter too before it flushes
+    the blocks for an index file.
+    The counter is decremented by dec_counter_for_resize_op().
+
+  NOTE
+    It is important that the counter is incremented in a cache_lock.
+
+  RETURN
+    void
 */
+
 static inline void inc_counter_for_resize_op(KEY_CACHE *keycache)
 {
+  safe_mutex_assert_owner(&keycache->cache_lock);
   keycache->cnt_for_resize_op++;
 }
 
 
 /*
-  Decrement counter blocking resize key cache operation;
-  Signal the operation to proceed when counter becomes equal zero
+  Decrement counter blocking resize key cache operation to finish.
+
+  SYNOPSIS
+    dec_counter_for_resize_op()
+      keycache                  The key cache.
+
+  DESCRIPTION
+    See description for inc_counter_for_resize_op().
+    Signal the resizing thread to awake when counter becomes zero.
+    When the resizer finished flushing, it waits on the resizer queue
+    until users of the cache have left.
+
+  NOTE
+    It is important that the counter is decremented in a cache_lock.
+
+  RETURN
+    void
 */
+
 static inline void dec_counter_for_resize_op(KEY_CACHE *keycache)
 {
 #ifdef THREAD
-  struct st_my_thread_var *last_thread;
+  struct st_my_thread_var *thread;
+
+  safe_mutex_assert_owner(&keycache->cache_lock);
   if (!--keycache->cnt_for_resize_op &&
-      (last_thread= keycache->resize_queue.last_thread))
+      (thread= keycache->resizer.last_thread))
   {
     KEYCACHE_DBUG_PRINT("dec_counter_for_resize_op: signal",
-                        ("thread %ld", last_thread->next->id));
-    keycache_pthread_cond_signal(&last_thread->next->suspend);
+                        ("thread %ld", thread->id));
+    keycache_pthread_cond_signal(&thread->suspend);
+    unlink_from_queue(&keycache->resizer, thread);
   }
 #else
   keycache->cnt_for_resize_op--;
@@ -665,6 +907,7 @@
     pthread_mutex_destroy(&keycache->cache_lock);
     keycache->key_cache_inited= keycache->can_be_used= 0;
     KEYCACHE_DEBUG_CLOSE;
+    /* INGO TEST ONLY print_stats(); */
   }
   DBUG_VOID_RETURN;
 } /* end_key_cache */
@@ -728,8 +971,10 @@
 {
   KEYCACHE_DBUG_PRINT("unlink_from_queue", ("thread %ld", thread->id));
   if (thread->next == thread)
+  {
     /* The queue contains only one member */
     wqueue->last_thread= NULL;
+  }
   else
   {
     thread->next->prev= thread->prev;
@@ -801,10 +1046,10 @@
     thread=next;
     KEYCACHE_DBUG_PRINT("release_queue: signal", ("thread %ld", thread->id));
     keycache_pthread_cond_signal(&thread->suspend);
+    /* Unlink the signalled thread from the queue. */
     next=thread->next;
     thread->next= NULL;
-  }
-  while (thread != last);
+  } while (thread != last);
   wqueue->last_thread= NULL;
 }
 #endif
@@ -921,9 +1166,8 @@
     /* Signal that in the LRU warm sub-chain an available block has appeared */
     struct st_my_thread_var *last_thread=
                                keycache->waiting_for_block.last_thread;
-    struct st_my_thread_var *first_thread= last_thread->next;
-    struct st_my_thread_var *next_thread= first_thread;
-    HASH_LINK *hash_link= (HASH_LINK *) first_thread->opt_info;
+    struct st_my_thread_var *next_thread= last_thread->next;
+    HASH_LINK *hash_link= (HASH_LINK*) next_thread->opt_info;
     struct st_my_thread_var *thread;
     do
     {
@@ -956,6 +1200,7 @@
   KEYCACHE_DBUG_ASSERT(! (!hot && keycache->waiting_for_block.last_thread));
       /* Condition not transformed using DeMorgan, to keep the text identical */
 #endif /* THREAD */
+  safe_mutex_assert_owner(&keycache->cache_lock);
   pins= hot ? &keycache->used_ins : &keycache->used_last;
   ins= *pins;
   if (ins)
@@ -972,6 +1217,7 @@
     /* The LRU chain is empty */
     keycache->used_last= keycache->used_ins= block->next_used= block;
     block->prev_used= &block->next_used;
+    keycache_stats_link_block_lru_became_nonempty++;
   }
   KEYCACHE_THREAD_TRACE("link_block");
 #if defined(KEYCACHE_DEBUG)
@@ -1003,19 +1249,78 @@
 
 static void unlink_block(KEY_CACHE *keycache, BLOCK_LINK *block)
 {
-  if (block->next_used == block)
-    /* The list contains only one member */
-    keycache->used_last= keycache->used_ins= NULL;
+#ifdef NOTUSED
+  /* A block is not neccessarily always in the LRU chain. */
+  if (block->next_used)
+  {
+    if (block->prev_used)
+    {
+      /* The block is in the LRU chain. */
+      DBUG_ASSERT((block->next_used->prev_used == &block->next_used) &&
+                  (*block->prev_used == block));
+#else
+      DBUG_ASSERT(block->next_used && block->prev_used &&
+                  (block->next_used->prev_used == &block->next_used) &&
+                  (*block->prev_used == block));
+#endif
+      keycache_stats_unlink_block++;
+      if (block->next_used == block)
+      {
+        /* The list contains only one member */
+        keycache->used_last= keycache->used_ins= NULL;
+        keycache_stats_unlink_block_lru_became_empty++;
+      }
+      else
+      {
+        block->next_used->prev_used= block->prev_used;
+        *block->prev_used= block->next_used;
+        if (keycache->used_last == block)
+          keycache->used_last= STRUCT_PTR(BLOCK_LINK, next_used,
+                                          block->prev_used);
+        if (keycache->used_ins == block)
+          keycache->used_ins=STRUCT_PTR(BLOCK_LINK, next_used, block->prev_used);
+      }
+      block->prev_used= NULL;
+#ifdef NOTUSED
+    }
+    else
+    {
+      /* The block is in the free blocks list. */
+      uint       count= 0;
+      BLOCK_LINK **sblk= &keycache->free_block_list;
+      while (*sblk && (*sblk != block))
+      {
+        /* Detect a loop in the free list. */
+        DBUG_ASSERT(++count < keycache->blocks_used);
+        /* Detect improper initialized blocks. Maybe linked from LRU. */
+        DBUG_ASSERT(! (*sblk)->prev_used && ! (*sblk)->status);
+        DBUG_ASSERT(! (*sblk)->temperature);
+        sblk= &(*sblk)->next_used;
+      }
+      /* We need to find block in free list. It can be nowhere else. */
+      DBUG_ASSERT(*sblk);
+      /* In non-debug mode we need to re-initialize the key cache. */
+      if (! *sblk)
+      {
+        /* Resize key cache to recover from cache corruption. */
+      }
+      *sblk= block->next_used;
+    }
+#endif
+    block->next_used= NULL;
+#ifdef NOTUSED
+  }
   else
   {
-    block->next_used->prev_used= block->prev_used;
-    *block->prev_used= block->next_used;
-    if (keycache->used_last == block)
-      keycache->used_last= STRUCT_PTR(BLOCK_LINK, next_used, block->prev_used);
-    if (keycache->used_ins == block)
-      keycache->used_ins=STRUCT_PTR(BLOCK_LINK, next_used, block->prev_used);
+    /*
+      If next_used is NULL, prev_used must also be NULL.
+      LRU is doubly linked using next_used and prev_used.
+      Free list is single linked using next_used, having prev_used == NULL.
+    */
+    DBUG_ASSERT(! block->prev_used);
+    keycache_stats_unlink_block_noop++;
   }
-  block->next_used= NULL;
+#endif
 
   KEYCACHE_THREAD_TRACE("unlink_block");
 #if defined(KEYCACHE_DEBUG)
@@ -1117,7 +1422,11 @@
 static inline void remove_reader(BLOCK_LINK *block)
 {
   if (! --block->hash_link->requests && block->condvar)
+  {
     keycache_pthread_cond_signal(block->condvar);
+    /* Do not signal this thread again. */
+    block->condvar= NULL;
+  }
 }
 
 
@@ -1130,14 +1439,16 @@
 {
 #ifdef THREAD
   struct st_my_thread_var *thread= my_thread_var;
-  while (block->hash_link->requests)
+  /* Block could have been freed while waiting (after an iteration). */
+  while (block->hash_link && block->hash_link->requests)
   {
     KEYCACHE_DBUG_PRINT("wait_for_readers: wait",
                         ("suspend thread %ld  block %u",
                          thread->id, BLOCK_NUMBER(block)));
+    DBUG_ASSERT(! block->condvar);
     block->condvar= &thread->suspend;
     keycache_pthread_cond_wait(&thread->suspend, &keycache->cache_lock);
-    block->condvar= NULL;
+    /* The condition variable is cleared by the signalling thread. */
   }
 #else
   KEYCACHE_DBUG_ASSERT(block->hash_link->requests == 0);
@@ -1177,9 +1488,8 @@
     /* Signal that a free hash link has appeared */
     struct st_my_thread_var *last_thread=
                                keycache->waiting_for_hash_link.last_thread;
-    struct st_my_thread_var *first_thread= last_thread->next;
-    struct st_my_thread_var *next_thread= first_thread;
-    KEYCACHE_PAGE *first_page= (KEYCACHE_PAGE *) (first_thread->opt_info);
+    struct st_my_thread_var *next_thread= last_thread->next;
+    KEYCACHE_PAGE *first_page= (KEYCACHE_PAGE*) next_thread->opt_info;
     struct st_my_thread_var *thread;
 
     hash_link->file= first_page->file;
@@ -1227,9 +1537,11 @@
 #if defined(KEYCACHE_DEBUG)
   int cnt;
 #endif
+  DBUG_ENTER("get_hash_link");
 
   KEYCACHE_DBUG_PRINT("get_hash_link", ("fd: %u  pos: %lu",
                       (uint) file,(ulong) filepos));
+  keycache_stats_get_hash_link_enter++;
 
 restart:
   /*
@@ -1246,6 +1558,7 @@
          (hash_link->diskpos != filepos || hash_link->file != file))
   {
     hash_link= hash_link->next;
+    keycache_stats_get_hash_link_next++;
 #if defined(KEYCACHE_DEBUG)
     cnt++;
     if (! (cnt <= keycache->hash_links_used))
@@ -1268,10 +1581,12 @@
     {
       hash_link= keycache->free_hash_list;
       keycache->free_hash_list= hash_link->next;
+      keycache_stats_get_hash_link_free++;
     }
     else if (keycache->hash_links_used < keycache->hash_links)
     {
       hash_link= &keycache->hash_link_root[keycache->hash_links_used++];
+      keycache_stats_get_hash_link_new++;
     }
     else
     {
@@ -1282,11 +1597,13 @@
       page.file= file;
       page.filepos= filepos;
       thread->opt_info= (void *) &page;
+      keycache_stats_get_hash_link_wait++;
       link_into_queue(&keycache->waiting_for_hash_link, thread);
       KEYCACHE_DBUG_PRINT("get_hash_link: wait",
                         ("suspend thread %ld", thread->id));
       keycache_pthread_cond_wait(&thread->suspend,
                                  &keycache->cache_lock);
+      /* The signalling thread unlinks this thread from the queue. */
       thread->opt_info= NULL;
 #else
       KEYCACHE_DBUG_ASSERT(0);
@@ -1300,7 +1617,7 @@
   /* Register the request for the page */
   hash_link->requests++;
 
-  return hash_link;
+  DBUG_RETURN(hash_link);
 }
 
 
@@ -1313,7 +1630,7 @@
 
     find_key_block()
       keycache            pointer to a key cache data structure
-      file                handler for the file to read page from
+      file                handle for the file to read page from
       filepos             position of the page in the file
       init_hits_left      how initialize the block counter for the page
       wrmode              <-> get for writing
@@ -1361,6 +1678,7 @@
   DBUG_EXECUTE("check_keycache2",
                test_key_cache(keycache, "start of find_key_block", 0););
 #endif
+  keycache_stats_find_key_block_enter++;
 
 restart:
   /* Find the hash link for the requested page (file, filepos) */
@@ -1369,41 +1687,65 @@
   page_status= -1;
   if ((block= hash_link->block) &&
       block->hash_link == hash_link && (block->status & BLOCK_READ))
+  {
     page_status= PAGE_READ;
+    keycache_stats_find_key_block_hit++;
+  }
 
   if (wrmode && keycache->resize_in_flush)
   {
     /* This is a write request during the flush phase of a resize operation */
-
     if (page_status != PAGE_READ)
     {
-      /* We don't need the page in the cache: we are going to write on disk */
+      /*
+        We don't have the block in the cache. And we don't need it as we
+        are going to write to file. So we happily return NULL.
+      */
       hash_link->requests--;
       unlink_hash(keycache, hash_link);
-      return 0;
+      keycache_stats_find_key_block_resize_nowrite++;
+      DBUG_RETURN(NULL);
     }
+    /* The block is in the cache. */
     if (!(block->status & BLOCK_IN_FLUSH))
     {
-      hash_link->requests--;
+      /* The block is not yet or no longer part of a flush operation. */
+      if (block->status & BLOCK_CHANGED)
+      {
+        /* The block has changed contents, which must not get lost. */
+        reg_requests(keycache, block, 1);
+        *page_st= page_status;
+        keycache_stats_find_key_block_resize_changed++;
+        DBUG_RETURN(block);
+      }
       /*
         Remove block to invalidate the page in the block buffer
-        as we are going to write directly on disk.
+        as we are going to write it directly to file.
         Although we have an exlusive lock for the updated key part
-        the control can be yieded by the current thread as we might
+        the control can be yielded by the current thread as we might
         have unfinished readers of other key parts in the block
         buffer. Still we are guaranteed not to have any readers
         of the key part we are writing into until the block is
-        removed from the cache as we set the BLOCL_REASSIGNED
+        removed from the cache as we set the BLOCK_REASSIGNED
         flag (see the code below that handles reading requests).
       */
-      free_block(keycache, block);
-      return 0;
+      hash_link->requests--;
+      /* We must not free reassigned blocks. They are reused already. */
+      if (! (block->status & BLOCK_REASSIGNED))
+      {
+        keycache_stats_find_key_block_resize_free_noflushed++;
+        free_block(keycache, block);
+      }
+      else
+        keycache_stats_find_key_block_resize_reassigned_noflushed++;
+      DBUG_RETURN(NULL);
     }
-    /* Wait intil the page is flushed on disk */
+    /* Wait until the page is flushed to file. */
     hash_link->requests--;
     {
 #ifdef THREAD
       struct st_my_thread_var *thread= my_thread_var;
+      keycache_stats_find_key_block_resize_wait++;
       add_to_queue(&block->wqueue[COND_FOR_SAVED], thread);
       do
       {
@@ -1411,6 +1753,8 @@
                             ("suspend thread %ld", thread->id));
         keycache_pthread_cond_wait(&thread->suspend,
                                    &keycache->cache_lock);
+        /* The signalling thread unlinks this thread from the queue. */
+        DBUG_ASSERT(! thread->next);
       }
       while(thread->next);
 #else
@@ -1424,10 +1768,24 @@
       */
 #endif
     }
-    /* Invalidate page in the block if it has not been done yet */
-    if (block->status)
+    /*
+      Invalidate page in the block if it has not been done yet, but
+      do not free reassigned or switching blocks. They are reused already.
+    */
+    if (block->status &&
+        ! (block->status & (BLOCK_IN_SWITCH | BLOCK_REASSIGNED)))
+    {
+      keycache_stats_find_key_block_resize_free_flushed++;
       free_block(keycache, block);
-    return 0;
+    }
+    else
+    {
+      if (block->status & BLOCK_IN_SWITCH)
+        keycache_stats_find_key_block_resize_switching_flushed++;
+      if (block->status & BLOCK_REASSIGNED)
+        keycache_stats_find_key_block_resize_reassigned_flushed++;
+    }
+    DBUG_RETURN(NULL);
   }
 
   if (page_status == PAGE_READ &&
@@ -1444,7 +1802,11 @@
        all others are to be suspended, then resubmitted
     */
     if (!wrmode && !(block->status & BLOCK_REASSIGNED))
+    {
+/*ingo: possible flaw.*/
+      keycache_stats_find_key_block_hit_read_switching++;
       reg_requests(keycache, block, 1);
+    }
     else
     {
       hash_link->requests--;
@@ -1454,6 +1816,7 @@
 #ifdef THREAD
         struct st_my_thread_var *thread= my_thread_var;
         /* Put the request into the queue of those waiting for the old page */
+        keycache_stats_find_key_block_hit_wait_reassigned_or_switching++;
         add_to_queue(&block->wqueue[COND_FOR_SAVED], thread);
         /* Wait until the request can be resubmitted */
         do
@@ -1462,6 +1825,8 @@
                               ("suspend thread %ld", thread->id));
           keycache_pthread_cond_wait(&thread->suspend,
                                      &keycache->cache_lock);
+          /* The signalling thread unlinks this thread from the queue. */
+          DBUG_ASSERT(! thread->next);
         }
         while(thread->next);
 #else
@@ -1480,15 +1845,27 @@
     /* This is a request for a new page or for a page not to be removed */
     if (! block)
     {
+      if (keycache->resize_in_flush)
+      {
+        /*
+          Do not allow new blocks to go in the cache now.
+          We would need to throw them out anyway.
+        */
+        hash_link->requests--;
+        keycache_stats_find_key_block_miss_resize_in_flush++;
+        DBUG_RETURN(NULL);
+      }
       /* No block is assigned for the page yet */
       if (keycache->blocks_unused)
       {
         if (keycache->free_block_list)
         {
           /* There is a block in the free list. */
+          safe_mutex_assert_owner(&keycache->cache_lock);
           block= keycache->free_block_list;
           keycache->free_block_list= block->next_used;
           block->next_used= NULL;
+          keycache_stats_find_key_block_miss_free_block_list++;
         }
         else
         {
@@ -1499,9 +1876,10 @@
                                      keycache->key_cache_block_size),
                                     byte*);
           keycache->blocks_used++;
+          keycache_stats_find_key_block_miss_block_root++;
         }
         keycache->blocks_unused--;
-        block->status= 0;
+        block->status= BLOCK_IN_USE;
         block->length= 0;
         block->offset= keycache->key_cache_block_size;
         block->requests= 1;
@@ -1531,6 +1909,7 @@
         {
           struct st_my_thread_var *thread= my_thread_var;
           thread->opt_info= (void *) hash_link;
+          keycache_stats_find_key_block_miss_wait_insert_lru++;
           link_into_queue(&keycache->waiting_for_block, thread);
           do
           {
@@ -1538,6 +1917,8 @@
                                 ("suspend thread %ld", thread->id));
             keycache_pthread_cond_wait(&thread->suspend,
                                        &keycache->cache_lock);
+            /* The signalling thread unlinks this thread from the queue. */
+            DBUG_ASSERT(! thread->next);
           }
           while (thread->next);
           thread->opt_info= NULL;
@@ -1552,11 +1933,13 @@
              Take the first block from the LRU chain
              unlinking it from the chain
           */
+          safe_mutex_assert_owner(&keycache->cache_lock);
           block= keycache->used_last->next_used;
           block->hits_left= init_hits_left;
           block->last_hit_time= 0;
           reg_requests(keycache, block,1);
           hash_link->block= block;
+          keycache_stats_find_key_block_miss_take_lru++;
         }
 
         if (block->hash_link != hash_link &&
@@ -1564,6 +1947,7 @@
         {
 	  /* this is a primary request for a new page */
           block->status|= BLOCK_IN_SWITCH;
+          keycache_stats_find_key_block_miss_switching++;
 
           KEYCACHE_DBUG_PRINT("find_key_block",
                         ("got block %u for new page", BLOCK_NUMBER(block)));
@@ -1573,7 +1957,10 @@
 	    /* The block contains a dirty page - push it out of the cache */
 
             KEYCACHE_DBUG_PRINT("find_key_block", ("block is dirty"));
+            keycache_stats_find_key_block_miss_write_changed++;
 
+            /* Mark block in flush to avoid flushing by another thread. */
+            block->status|= BLOCK_IN_FLUSH;
             keycache_pthread_mutex_unlock(&keycache->cache_lock);
             /*
 	      The call is thread safe because only the current
@@ -1586,6 +1973,23 @@
 			     MYF(MY_NABP | MY_WAIT_IF_FULL));
             keycache_pthread_mutex_lock(&keycache->cache_lock);
 	    keycache->global_cache_write++;
+            /*
+              Unmark the block. The changed flag could also be cleared by
+              link_to_file_list() below, but we can enter a wait state in
+              between, so better clear it immediately.
+            */
+            block->status&= ~(BLOCK_IN_FLUSH | BLOCK_CHANGED);
+            keycache->blocks_changed--;
+            keycache->global_blocks_changed--;
+            /*
+              Wake possible waiting requests to write to the block page.
+              It might happen only during an operation to resize the key cache.
+            */
+            if (block->wqueue[COND_FOR_SAVED].last_thread)
+            {
+              release_queue(&block->wqueue[COND_FOR_SAVED]);
+              keycache_stats_find_key_block_miss_wake_changed++;
+            }
           }
 
           block->status|= BLOCK_REASSIGNED;
@@ -1597,17 +2001,27 @@
 	      (we could have avoided this waiting, if we had read
 	      a page in the cache in a sweep, without yielding control)
             */
+            keycache_stats_find_key_block_miss_wait_for_readers++;
             wait_for_readers(keycache, block);
 
             /* Remove the hash link for this page from the hash table */
-            unlink_hash(keycache, block->hash_link);
+            /* While waiting, the block might have been freed. */
+            if (block->hash_link)
+            {
+              unlink_hash(keycache, block->hash_link);
+              keycache_stats_find_key_block_miss_unlink_hash++;
+            }
             /* All pending requests for this page must be resubmitted */
             if (block->wqueue[COND_FOR_SAVED].last_thread)
+            {
+/*ingo: possible flaw.*/
               release_queue(&block->wqueue[COND_FOR_SAVED]);
+              keycache_stats_find_key_block_miss_release_queue++;
+            }
           }
           link_to_file_list(keycache, block, file,
                             (my_bool)(block->hash_link ? 1 : 0));
-          block->status= error? BLOCK_ERROR : 0;
+          block->status= (error? BLOCK_ERROR : 0) | BLOCK_IN_USE;
           block->length= 0;
           block->offset= keycache->key_cache_block_size;
           block->hash_link= hash_link;
@@ -1615,6 +2029,7 @@
 
           KEYCACHE_DBUG_ASSERT(block->hash_link->block == block);
           KEYCACHE_DBUG_ASSERT(hash_link->block->hash_link == hash_link);
+          keycache_stats_find_key_block_miss_block_initialized++;
         }
         else
         {
@@ -1623,15 +2038,24 @@
                               ("block->hash_link: %p  hash_link: %p  "
                                "block->status: %u", block->hash_link,
                                hash_link, block->status ));
+          /*
+            If the block is not fully initialized,
+            someone is reading it in already.
+          */
           page_status= (((block->hash_link == hash_link) &&
                          (block->status & BLOCK_READ)) ?
                         PAGE_READ : PAGE_WAIT_TO_BE_READ);
+          if (page_status == PAGE_READ)
+            keycache_stats_find_key_block_miss_page_read++;
+          else
+            keycache_stats_find_key_block_miss_page_wait_to_be_read++;
         }
       }
       keycache->global_cache_read++;
     }
     else
     {
+      /* There is already an block assigned for this file and position. */
       reg_requests(keycache, block, 1);
       KEYCACHE_DBUG_PRINT("find_key_block",
                           ("block->hash_link: %p  hash_link: %p  "
@@ -1640,6 +2064,10 @@
       page_status= (((block->hash_link == hash_link) &&
                      (block->status & BLOCK_READ)) ?
                     PAGE_READ : PAGE_WAIT_TO_BE_READ);
+      if (page_status == PAGE_READ)
+        keycache_stats_find_key_block_hit_page_read++;
+      else
+        keycache_stats_find_key_block_hit_page_wait_to_be_read++;
     }
   }
 
@@ -1660,7 +2088,7 @@
 
 
 /*
-  Read into a key cache block buffer from disk.
+  Read into a key cache block buffer from file.
 
   SYNOPSIS
 
@@ -1688,6 +2116,7 @@
                        uint min_length, my_bool primary)
 {
   uint got_length;
+  DBUG_ENTER("read_block");
 
   /* On entry cache_lock is locked */
 
@@ -1698,11 +2127,12 @@
       This code is executed only by threads
       that submitted primary requests
     */
+    keycache_stats_read_block_primary++;
 
     KEYCACHE_DBUG_PRINT("read_block",
                         ("page to be read by primary request"));
 
-    /* Page is not in buffer yet, is to be read from disk */
+    /* Page is not in buffer yet, is to be read from file. */
     keycache_pthread_mutex_unlock(&keycache->cache_lock);
     /*
       Here other threads may step in and register as secondary readers.
@@ -1715,14 +2145,17 @@
       block->status|= BLOCK_ERROR;
     else
     {
-      block->status= BLOCK_READ;
+      block->status|= BLOCK_READ;
       block->length= got_length;
     }
     KEYCACHE_DBUG_PRINT("read_block",
                         ("primary request: new page in cache"));
     /* Signal that all pending requests for this page now can be processed */
     if (block->wqueue[COND_FOR_REQUESTED].last_thread)
+    {
       release_queue(&block->wqueue[COND_FOR_REQUESTED]);
+      keycache_stats_read_block_primary_wake++;
+    }
   }
   else
   {
@@ -1736,6 +2169,7 @@
 #ifdef THREAD
       struct st_my_thread_var *thread= my_thread_var;
       /* Put the request into a queue and wait until it can be processed */
+      keycache_stats_read_block_secondary_wait++;
       add_to_queue(&block->wqueue[COND_FOR_REQUESTED], thread);
       do
       {
@@ -1743,6 +2177,8 @@
                             ("suspend thread %ld", thread->id));
         keycache_pthread_cond_wait(&thread->suspend,
                                    &keycache->cache_lock);
+        /* The signalling thread unlinks this thread from the queue. */
+        DBUG_ASSERT(! thread->next);
       }
       while (thread->next);
 #else
@@ -1753,6 +2189,7 @@
     KEYCACHE_DBUG_PRINT("read_block",
                         ("secondary request: new page in cache"));
   }
+  DBUG_VOID_RETURN;
 }
 
 
@@ -1763,7 +2200,7 @@
 
     key_cache_read()
       keycache            pointer to a key cache data structure
-      file                handler for the file for the block of data to be read
+      file                handle for the file for the block of data to be read
       filepos             position of the block of data in the file
       level               determines the weight of the data
       buff                buffer to where the data must be placed
@@ -1791,7 +2228,6 @@
 		     int return_buffer __attribute__((unused)))
 {
   int error=0;
-  uint offset= 0;
   byte *start= buff;
   DBUG_ENTER("key_cache_read");
   DBUG_PRINT("enter", ("fd: %u  pos: %lu  length: %u",
@@ -1802,12 +2238,12 @@
     /* Key cache is used */
     reg1 BLOCK_LINK *block;
     uint read_length;
-    uint status;
     int page_st;
 
     /* Read data in key_cache_block_size increments */
     do
     {
+      uint offset;
       keycache_pthread_mutex_lock(&keycache->cache_lock);
       if (!keycache->can_be_used)
       {
@@ -1828,71 +2264,100 @@
       inc_counter_for_resize_op(keycache);
       keycache->global_cache_r_requests++;
       block=find_key_block(keycache, file, filepos, level, 0, &page_st);
-      if (block->status != BLOCK_ERROR && page_st != PAGE_READ)
-      {
-        /* The requested page is to be read into the block buffer */
-        read_block(keycache, block,
-                   keycache->key_cache_block_size, read_length+offset,
-                   (my_bool)(page_st == PAGE_TO_BE_READ));
-      }
-      else if (! (block->status & BLOCK_ERROR) &&
-               block->length < read_length + offset)
+      if (!block)
       {
         /*
-           Impossible if nothing goes wrong:
-           this could only happen if we are using a file with
-           small key blocks and are trying to read outside the file
+          It happens only for requests submitted during resize operation.
+          Block is not in cache and shall not go in now. Read (this piece of)
+          the requested buffer directly. Do not read more than (the rest of)
+          the block as the next block could be in the cache again (and even
+          changed).
+          We can release the resize counter and cache_lock here because
+          we can assume that the thread has a table lock. No other thread
+          will try to read the same block.
         */
-        my_errno= -1;
-        block->status|= BLOCK_ERROR;
+        dec_counter_for_resize_op(keycache);
+        keycache_stats_key_cache_read_null_block++;
+        keycache_pthread_mutex_unlock(&keycache->cache_lock);
+        error= my_pread(file, buff, read_length, filepos + offset,
+                        MYF(MY_NABP));
+        if (error)
+          DBUG_RETURN((byte *) 0);
       }
-
-      if (! ((status= block->status) & BLOCK_ERROR))
+      else
       {
+        if (block->status != BLOCK_ERROR && page_st != PAGE_READ)
+        {
+          /* The requested page is to be read into the block buffer */
+          read_block(keycache, block,
+                     keycache->key_cache_block_size, read_length+offset,
+                     (my_bool)(page_st == PAGE_TO_BE_READ));
+          keycache_stats_key_cache_read_read_block++;
+        }
+        else if (! (block->status & BLOCK_ERROR) &&
+                 block->length < read_length + offset)
+        {
+          /*
+            Impossible if nothing goes wrong:
+            this could only happen if we are using a file with
+            small key blocks and are trying to read outside the file
+          */
+          my_errno= -1;
+          block->status|= BLOCK_ERROR;
+          keycache_stats_key_cache_read_error++;
+        }
+
+        if (! (block->status & BLOCK_ERROR))
+        {
 #ifndef THREAD
-        if (! return_buffer)
+          if (! return_buffer)
 #endif
-        {
+          {
 #if !defined(SERIALIZED_READ_FROM_CACHE)
-          keycache_pthread_mutex_unlock(&keycache->cache_lock);
+            keycache_pthread_mutex_unlock(&keycache->cache_lock);
 #endif
 
-          /* Copy data from the cache buffer */
-          if (!(read_length & 511))
-            bmove512(buff, block->buffer+offset, read_length);
-          else
-            memcpy(buff, block->buffer+offset, (size_t) read_length);
+            /* Copy data from the cache buffer */
+            if (!(read_length & 511))
+              bmove512(buff, block->buffer+offset, read_length);
+            else
+              memcpy(buff, block->buffer+offset, (size_t) read_length);
 
 #if !defined(SERIALIZED_READ_FROM_CACHE)
-          keycache_pthread_mutex_lock(&keycache->cache_lock);
+            keycache_pthread_mutex_lock(&keycache->cache_lock);
 #endif
+            keycache_stats_key_cache_read_copied++;
+          }
         }
-      }
-
-      remove_reader(block);
-      /*
-         Link the block into the LRU chain
-         if it's the last submitted request for the block
-      */
-      unreg_request(keycache, block, 1);
-
-      dec_counter_for_resize_op(keycache);
+        else
+          error= 1;
 
-      keycache_pthread_mutex_unlock(&keycache->cache_lock);
+        /*
+          Remove myself as a reader of this hash_link and wake a
+          waiting thread, if I was the last one.
+        */
+        remove_reader(block);
+        /*
+          Link the block into the LRU chain
+          if it's the last submitted request for the block
+        */
+        unreg_request(keycache, block, 1);
+        /* Decrement resize counter before breaking on error. */
+        dec_counter_for_resize_op(keycache);
 
-      if (status & BLOCK_ERROR)
-        DBUG_RETURN((byte *) 0);
+        keycache_pthread_mutex_unlock(&keycache->cache_lock);
 
 #ifndef THREAD
-      /* This is only true if we where able to read everything in one block */
-      if (return_buffer)
-	DBUG_RETURN(block->buffer);
+        /* This is only true if we where able to read everything in one block */
+        if (return_buffer)
+          DBUG_RETURN(block->buffer);
 #endif
+      }
       buff+= read_length;
       filepos+= read_length+offset;
 
-    } while ((length-= read_length));
-    DBUG_RETURN(start);
+    } while ((length-= read_length) && ! error);
+    DBUG_RETURN(error ? (byte*) 0 : start);
   }
 
 no_key_cache:					/* Key cache is not used */
@@ -1900,7 +2365,8 @@
   /* We can't use mutex here as the key cache may not be initialized */
   keycache->global_cache_r_requests++;
   keycache->global_cache_read++;
-  if (my_pread(file, (byte*) buff, length, filepos+offset, MYF(MY_NABP)))
+  keycache_stats_key_cache_read_no_cache++;
+  if (my_pread(file, (byte*) buff, length, filepos, MYF(MY_NABP)))
     error= 1;
   DBUG_RETURN(error ? (byte*) 0 : start);
 }
@@ -1912,7 +2378,7 @@
   SYNOPSIS
     key_cache_insert()
     keycache            pointer to a key cache data structure
-    file                handler for the file to insert data from
+    file                handle for the file to insert data from
     filepos             position of the block of data in the file to insert
     level               determines the weight of the data
     buff                buffer to read data from
@@ -1961,7 +2427,19 @@
       inc_counter_for_resize_op(keycache);
       keycache->global_cache_r_requests++;
       block= find_key_block(keycache, file, filepos, level, 0, &page_st);
-      if (block->status != BLOCK_ERROR && page_st != PAGE_READ)
+      if (!block)
+      {
+        /*
+          It happens only for requests submitted during resize operation.
+          Block is not in cache and shall not go in now.
+          This means that a preloading during resizing may be void.
+        */
+        dec_counter_for_resize_op(keycache);
+        keycache_stats_key_cache_insert_null_block++;
+        keycache_pthread_mutex_unlock(&keycache->cache_lock);
+        DBUG_RETURN(0);
+      }
+      if (block->status != BLOCK_ERROR && page_st == PAGE_TO_BE_READ)
       {
         /* The requested page is to be read into the block buffer */
 #if !defined(SERIALIZED_READ_FROM_CACHE)
@@ -1982,7 +2460,8 @@
         keycache_pthread_mutex_lock(&keycache->cache_lock);
         /* Here we are alone again. */
 #endif
-        block->status= BLOCK_READ;
+        keycache_stats_key_cache_insert_copied++;
+        block->status|= BLOCK_READ;
         block->length= read_length+offset;
         KEYCACHE_DBUG_PRINT("key_cache_insert",
                             ("primary request: new page in cache"));
@@ -1990,16 +2469,19 @@
         if (block->wqueue[COND_FOR_REQUESTED].last_thread)
           release_queue(&block->wqueue[COND_FOR_REQUESTED]);
       }
+      error= (block->status & BLOCK_ERROR);
 
+      /*
+        Remove myself as a reader of this hash_link and wake a
+        waiting thread, if I was the last one.
+      */
       remove_reader(block);
       /*
          Link the block into the LRU chain
          if it's the last submitted request for the block
       */
       unreg_request(keycache, block, 1);
-
-      error= (block->status & BLOCK_ERROR);
-
+      /* Decrement resize counter before breaking on error. */
       dec_counter_for_resize_op(keycache);
 
       keycache_pthread_mutex_unlock(&keycache->cache_lock);
@@ -2023,7 +2505,7 @@
 
     key_cache_write()
       keycache            pointer to a key cache data structure
-      file                handler for the file to write data to
+      file                handle for the file to write data to
       filepos             position in the file to write data to
       level               determines the weight of the data
       buff                buffer with the data
@@ -2049,7 +2531,6 @@
                     uint block_length  __attribute__((unused)),
                     int dont_write)
 {
-  reg1 BLOCK_LINK *block;
   int error=0;
   DBUG_ENTER("key_cache_write");
   DBUG_PRINT("enter",
@@ -2059,7 +2540,11 @@
 
   if (!dont_write)
   {
-    /* Force writing from buff into disk */
+    /*
+      Force writing from buff to file.
+      This branch is not taken in the server.
+    */
+    /* INGO TEST ONLY */ DBUG_ASSERT(0);
     keycache->global_cache_write++;
     if (my_pwrite(file, buff, length, filepos, MYF(MY_NABP | MY_WAIT_IF_FULL)))
       DBUG_RETURN(1);
@@ -2073,6 +2558,7 @@
   if (keycache->can_be_used)
   {
     /* Key cache is used */
+    reg1 BLOCK_LINK *block;
     uint read_length;
     int page_st;
 
@@ -2097,71 +2583,113 @@
       block= find_key_block(keycache, file, filepos, level, 1, &page_st);
       if (!block)
       {
-        /* It happens only for requests submitted during resize operation */
+        /*
+          It happens only for requests submitted during resize operation.
+          Block is not in cache and shall not go in now.
+          We can release the resize counter and cache_lock here because
+          we can assume that the thread has a table lock. Since the block
+          is not in cache, it also cannot be written by a resizer.
+        */
         dec_counter_for_resize_op(keycache);
+	if (dont_write)
+          keycache->global_cache_write++;
+        keycache_stats_key_cache_write_null_block++;
 	keycache_pthread_mutex_unlock(&keycache->cache_lock);
 	if (dont_write)
         {
-          keycache->global_cache_w_requests++;
-          keycache->global_cache_write++;
-          if (my_pwrite(file, (byte*) buff, length, filepos,
+          /*
+            This branch is always taken in the server during resize.
+            Write just until the end of the block. We need to call
+            find_key_block() for every block. If the requested write
+            (buff, length) spans multiple blocks, every block could be
+            in a different state.
+          */
+          if (my_pwrite(file, buff, read_length, filepos + offset,
 		        MYF(MY_NABP | MY_WAIT_IF_FULL)))
             error=1;
 	}
-        goto next_block;
+        /* INGO TEST ONLY */ else DBUG_ASSERT(0);
       }
-
-      if (block->status != BLOCK_ERROR && page_st != PAGE_READ &&
-          (offset || read_length < keycache->key_cache_block_size))
-        read_block(keycache, block,
-                   offset + read_length >= keycache->key_cache_block_size?
-                   offset : keycache->key_cache_block_size,
-                   offset,(my_bool)(page_st == PAGE_TO_BE_READ));
-
-      if (!dont_write)
+      else
       {
-	/* buff has been written to disk at start */
-        if ((block->status & BLOCK_CHANGED) &&
-            (!offset && read_length >= keycache->key_cache_block_size))
-             link_to_file_list(keycache, block, block->hash_link->file, 1);
-      }
-      else if (! (block->status & BLOCK_CHANGED))
-        link_to_changed_list(keycache, block);
+        /* No resize in progress or block changed and not in flush. */
+        if (block->status != BLOCK_ERROR && page_st != PAGE_READ &&
+            (offset || read_length < keycache->key_cache_block_size))
+        {
+          /*
+            A block has been allocated, but not yet read.
+            The write does not want to replace the whole block, so we
+            need to read the other contents.
+          */
+          read_block(keycache, block,
+                     offset + read_length >= keycache->key_cache_block_size?
+                     offset : keycache->key_cache_block_size,
+                     offset,(my_bool)(page_st == PAGE_TO_BE_READ));
+          keycache_stats_key_cache_write_read_block++;
+        }
+        else
+          keycache_stats_key_cache_write_have_block++;
 
-      set_if_smaller(block->offset, offset);
-      set_if_bigger(block->length, read_length+offset);
+        if (!dont_write)
+        {
+          /*
+            buff has been written to file at start.
+            This branch is not taken in the server.
+          */
+          /* INGO TEST ONLY */ DBUG_ASSERT(0);
+          if ((block->status & BLOCK_CHANGED) &&
+              (!offset && read_length >= keycache->key_cache_block_size))
+            link_to_file_list(keycache, block, block->hash_link->file, 1);
+        }
+        else if (! (block->status & BLOCK_CHANGED))
+        {
+          link_to_changed_list(keycache, block);
+          keycache_stats_key_cache_write_changed++;
+        }
 
-      if (! (block->status & BLOCK_ERROR))
-      {
-        if (!(read_length & 511))
-	  bmove512(block->buffer+offset, buff, read_length);
+        set_if_smaller(block->offset, offset);
+        set_if_bigger(block->length, read_length+offset);
+
+        if (! (block->status & BLOCK_ERROR))
+        {
+          if (!(read_length & 511))
+            bmove512(block->buffer+offset, buff, read_length);
+          else
+            memcpy(block->buffer+offset, buff, (size_t) read_length);
+          keycache_stats_key_cache_write_copied++;
+        }
         else
-          memcpy(block->buffer+offset, buff, (size_t) read_length);
-      }
+        {
+          error= 1;
+          keycache_stats_key_cache_write_error++;
+        }
 
-      block->status|=BLOCK_READ;
+        block->status|=BLOCK_READ;
 
-      /* Unregister the request */
-      block->hash_link->requests--;
-      unreg_request(keycache, block, 1);
+#ifdef OLD
+        block->hash_link->requests--;
+#else
+        /*
+          Remove myself as a reader of this hash_link and wake a
+          waiting thread, if I was the last one.
+        */
+        remove_reader(block);
+#endif
+        /*
+          Link the block into the LRU chain
+          if it's the last submitted request for the block
+        */
+        unreg_request(keycache, block, 1);
+        /* Decrement resize counter before breaking on error. */
+        dec_counter_for_resize_op(keycache);
 
-      if (block->status & BLOCK_ERROR)
-      {
         keycache_pthread_mutex_unlock(&keycache->cache_lock);
-        error= 1;
-        break;
       }
 
-      dec_counter_for_resize_op(keycache);
-
-      keycache_pthread_mutex_unlock(&keycache->cache_lock);
-
-    next_block:
       buff+= read_length;
       filepos+= read_length+offset;
-      offset= 0;
 
-    } while ((length-= read_length));
+    } while ((length-= read_length) && ! error);
     goto end;
   }
 
@@ -2169,12 +2697,15 @@
   /* Key cache is not used */
   if (dont_write)
   {
+    /* This branch is always taken in the server. */
     keycache->global_cache_w_requests++;
     keycache->global_cache_write++;
+    keycache_stats_key_cache_write_no_cache++;
     if (my_pwrite(file, (byte*) buff, length, filepos,
 		  MYF(MY_NABP | MY_WAIT_IF_FULL)))
       error=1;
   }
+  /* INGO TEST ONLY */ else DBUG_ASSERT(0);
 
 end:
 #if !defined(DBUG_OFF) && defined(EXTRA_DEBUG)
@@ -2197,6 +2728,10 @@
   KEYCACHE_DBUG_PRINT("free_block",
                       ("block %u to be freed, hash_link %p",
                        BLOCK_NUMBER(block), block->hash_link));
+
+  DBUG_ASSERT(block->status);
+
+  keycache_stats_free_block++;
   if (block->hash_link)
   {
     /*
@@ -2206,11 +2741,23 @@
       later.
     */
     block->status|= BLOCK_REASSIGNED;
+    keycache_stats_free_block_wait_for_readers++;
     wait_for_readers(keycache, block);
-    unlink_hash(keycache, block->hash_link);
+    /* While waiting, the block might have been freed. */
+    if (block->hash_link)
+    {
+      unlink_hash(keycache, block->hash_link);
+      keycache_stats_free_block_unlink_hash++;
+    }
   }
 
+  /* Forgot keycache->blocks_changed-- ? */
+  DBUG_ASSERT(! (block->status & BLOCK_CHANGED));
+
   unlink_changed(block);
+  block->next_changed= NULL;
+  block->prev_changed= NULL;
+
   block->status= 0;
   block->length= 0;
   block->offset= keycache->key_cache_block_size;
@@ -2220,20 +2767,34 @@
   unreg_request(keycache, block, 0);
   block->hash_link= NULL;
 
-  /* Remove the free block from the LRU ring. */
-  unlink_block(keycache, block);
+  /* Remove the free block from the LRU ring if it is part of it. */
+  if (block->prev_used)
+    unlink_block(keycache, block);
+
   if (block->temperature == BLOCK_WARM)
     keycache->warm_blocks--;
   block->temperature= BLOCK_COLD;
-  /* Insert the free block in the free list. */
-  block->next_used= keycache->free_block_list;
-  keycache->free_block_list= block;
+
+  /* Insert the free block in the free list if not already there. */
+  if (! block->next_used)
+  {
+    block->next_used= keycache->free_block_list;
+    keycache->free_block_list= block;
+  }
+#ifdef EXTRA_DEBUG
+  else
+    CHECK_FREE_BLOCK_LIST(keycache, block);
+#endif
+
   /* Keep track of the number of currently unused blocks. */
   keycache->blocks_unused++;
 
   /* All pending requests for this page must be resubmitted. */
   if (block->wqueue[COND_FOR_SAVED].last_thread)
+  {
     release_queue(&block->wqueue[COND_FOR_SAVED]);
+    keycache_stats_free_block_release_queue++;
+  }
 }
 
 
@@ -2245,24 +2806,42 @@
 
 
 /*
-  Flush a portion of changed blocks to disk,
-  free used blocks if requested
+  Flush a portion of changed blocks to file.
+
+  SYNOPSIS
+    flush_cached_blocks()
+      keycache                  The key cache
+      file                      Handle for the file to flush to
+      cache                     Array of blocks to flush
+      end                       Pointer past last array element
+
+  NOTE
+    Formerly this function freed flushed blocks. Now it does only link
+    the blocks to the file list (list of non-changed blocks).
+    flush_key_blocks_int() frees all file blocks after all changed
+    blocks are flushed. This way the blocks can be used for reading as
+    long as possible.
+
+  RETURN
+    0           OK
+    != 0        Error number
 */
 
 static int flush_cached_blocks(KEY_CACHE *keycache,
                                File file, BLOCK_LINK **cache,
-                               BLOCK_LINK **end,
-                               enum flush_type type)
+                               BLOCK_LINK **end)
 {
   int error;
   int last_errno= 0;
   uint count= (uint) (end-cache);
+  DBUG_ENTER("flush_cached_blocks");
 
-  /* Don't lock the cache during the flush */
+  keycache_stats_flush_cached_blocks++;
+  /* Don't lock the cache during the sort. */
   keycache_pthread_mutex_unlock(&keycache->cache_lock);
   /*
      As all blocks referred in 'cache' are marked by BLOCK_IN_FLUSH
-     we are guarunteed no thread will change them
+     we are guaranteed no thread will change them.
   */
   qsort((byte*) cache, count, sizeof(*cache), (qsort_cmp) cmp_sec_link);
 
@@ -2280,58 +2859,67 @@
                      block->hash_link->diskpos+ block->offset,
                      MYF(MY_NABP | MY_WAIT_IF_FULL));
     keycache_pthread_mutex_lock(&keycache->cache_lock);
+    keycache_stats_flush_cached_blocks_write++;
     keycache->global_cache_write++;
+    block->status&= ~BLOCK_IN_FLUSH;
     if (error)
     {
       block->status|= BLOCK_ERROR;
       if (!last_errno)
         last_errno= errno ? errno : -1;
+      keycache_stats_flush_cached_blocks_error++;
     }
     /*
-      Let to proceed for possible waiting requests to write to the block page.
+      Wake possible waiting requests to write to the block page.
       It might happen only during an operation to resize the key cache.
     */
     if (block->wqueue[COND_FOR_SAVED].last_thread)
-      release_queue(&block->wqueue[COND_FOR_SAVED]);
-    /* type will never be FLUSH_IGNORE_CHANGED here */
-    if (! (type == FLUSH_KEEP || type == FLUSH_FORCE_WRITE))
     {
-      keycache->blocks_changed--;
-      keycache->global_blocks_changed--;
-      free_block(keycache, block);
-    }
-    else
-    {
-      block->status&= ~BLOCK_IN_FLUSH;
-      link_to_file_list(keycache, block, file, 1);
-      unreg_request(keycache, block, 1);
+      release_queue(&block->wqueue[COND_FOR_SAVED]);
+      keycache_stats_flush_cached_blocks_release_queue++;
     }
 
+    /*
+      Do not free the block now.
+      It can be used for reading while flushing other blocks.
+      link_to_file_list() clears the BLOCK_CHANGED flag.
+    */
+    link_to_file_list(keycache, block, file, 1);
+    unreg_request(keycache, block, 1);
   }
-  return last_errno;
+  DBUG_RETURN(last_errno);
 }
 
 
 /*
-  flush all key blocks for a file to disk, but don't do any mutex locks
+  flush all key blocks for a file, but don't do any mutex locks
 
+  SYNOPSIS
     flush_key_blocks_int()
-      keycache            pointer to a key cache data structure
-      file                handler for the file to flush to
-      flush_type          type of the flush
+      keycache                  Pointer to a key cache data structure
+      file                      Handle for the file to flush to
+      type                      Type of the flush
+      blocks_flushed       OUT  Additional number of blocks flushed by this
+                                run. Added to the referenced variable.
 
   NOTES
     This function doesn't do any mutex locks because it needs to be called both
     from flush_key_blocks and flush_all_key_blocks (the later one does the
     mutex lock in the resize_key_cache() function).
 
+    But it releases the mutex for every write in flush_cached_blocks().
+    Hence it is possible for multiple threads to execute this function
+    at the same time. Every block selected by this function is marked as
+    BLOCK_IN_FLUSH. This makes other threads to skip it when flushing.
+
   RETURN
     0   ok
     1  error
 */
 
 static int flush_key_blocks_int(KEY_CACHE *keycache,
-				File file, enum flush_type type)
+				File file, enum flush_type type,
+                                uint *blocks_flushed)
 {
   BLOCK_LINK *cache_buff[FLUSH_CACHE],**cache;
   int last_errno= 0;
@@ -2344,6 +2932,7 @@
                  test_key_cache(keycache, "start of flush_key_blocks", 0););
 #endif
 
+  keycache_stats_flush_key_blocks_int++;
   cache= cache_buff;
   if (keycache->disk_blocks > 0 &&
       (!my_disable_flush_key_blocks || type != FLUSH_KEEP))
@@ -2353,6 +2942,7 @@
     uint count= 0;
     BLOCK_LINK **pos,**end;
     BLOCK_LINK *first_in_switch= NULL;
+    BLOCK_LINK *last_in_flush;
     BLOCK_LINK *block, *next;
 #if defined(KEYCACHE_DEBUG)
     uint cnt=0;
@@ -2361,14 +2951,16 @@
     if (type != FLUSH_IGNORE_CHANGED)
     {
       /*
-         Count how many key blocks we have to cache to be able
-         to flush all dirty pages with minimum seek moves
+        Count how many changed key blocks we have. We want to collect them
+        in a "cache" array later. We want to sort the array by file position
+        to flush all dirty pages with minimum seek moves.
       */
       for (block= keycache->changed_blocks[FILE_HASH(file)] ;
            block ;
            block= block->next_changed)
       {
-        if (block->hash_link->file == file)
+        if ((block->hash_link->file == file) &&
+            ! (block->status & BLOCK_IN_FLUSH))
         {
           count++;
           KEYCACHE_DBUG_ASSERT(count<= keycache->blocks_used);
@@ -2386,6 +2978,7 @@
 
     /* Retrieve the blocks and write them to a buffer to be flushed */
 restart:
+    last_in_flush= NULL;
     end= (pos= cache)+count;
     for (block= keycache->changed_blocks[FILE_HASH(file)] ;
          block ;
@@ -2398,61 +2991,105 @@
       next= block->next_changed;
       if (block->hash_link->file == file)
       {
-        /*
-           Mark the block with BLOCK_IN_FLUSH in order not to let
-           other threads to use it for new pages and interfere with
-           our sequence ot flushing dirty file pages
-        */
-        block->status|= BLOCK_IN_FLUSH;
-
-        if (! (block->status & BLOCK_IN_SWITCH))
+        if (! (block->status & BLOCK_IN_FLUSH))
         {
-	  /*
-	    We care only for the blocks for which flushing was not
-	    initiated by other threads as a result of page swapping
+          /*
+            Mark the block with BLOCK_IN_FLUSH in order not to let other
+            threads use it for new pages and interfere with our sequence
+            of flushing dirty file pages. This does also prevent other
+            threads from flushing the same block.
           */
-          reg_requests(keycache, block, 1);
-          if (type != FLUSH_IGNORE_CHANGED)
+          block->status|= BLOCK_IN_FLUSH;
+
+          if (! (block->status & BLOCK_IN_SWITCH))
           {
-	    /* It's not a temporary file */
-            if (pos == end)
+            /*
+              We care only for the blocks for which flushing was not
+              initiated by other threads as a result of page swapping
+            */
+            reg_requests(keycache, block, 1);
+            if (type != FLUSH_IGNORE_CHANGED)
             {
-	      /*
-		This happens only if there is not enough
-		memory for the big block
-              */
-              if ((error= flush_cached_blocks(keycache, file, cache,
-                                              end,type)))
-                last_errno=error;
-              /*
-		Restart the scan as some other thread might have changed
-		the changed blocks chain: the blocks that were in switch
-		state before the flush started have to be excluded
-              */
-              goto restart;
+              /* It's not a temporary file */
+              if (pos == end)
+              {
+                /*
+                  This happens only if there is not enough
+                  memory for the big block
+                */
+                if ((error= flush_cached_blocks(keycache, file, cache, end)))
+                  last_errno=error;
+                /*
+                  Restart the scan as some other thread might have changed
+                  the changed blocks chain: the blocks that were in switch
+                  state before the flush started have to be excluded
+                */
+                goto restart;
+              }
+              *pos++= block;
+            }
+            else
+            {
+              /* It's a temporary file */
+              block->status&= ~BLOCK_CHANGED;
+              keycache->blocks_changed--;
+              keycache->global_blocks_changed--;
+              free_block(keycache, block);
             }
-            *pos++= block;
           }
           else
           {
-            /* It's a temporary file */
-            keycache->blocks_changed--;
-	    keycache->global_blocks_changed--;
-            free_block(keycache, block);
+            /* Link the block into a list of blocks 'in switch' */
+            unlink_changed(block);
+            link_changed(block, &first_in_switch);
           }
         }
         else
         {
-	  /* Link the block into a list of blocks 'in switch' */
-          unlink_changed(block);
-          link_changed(block, &first_in_switch);
+          /* Remember the last block found to be in flush. */
+          last_in_flush= block;
         }
       }
     }
     if (pos != cache)
     {
-      if ((error= flush_cached_blocks(keycache, file, cache, pos, type)))
+      if ((error= flush_cached_blocks(keycache, file, cache, pos)))
         last_errno= error;
+      /*
+        We need another scan as some other thread might have changed the
+        changed blocks chain again while we released the lock for
+        writing each block. The flush is complete only after no more
+        changed blocks are found while the lock is held.
+      */
+      *blocks_flushed+= pos - cache;
+      goto restart;
+    }
+    else if (last_in_flush)
+    {
+      /*
+        There are no blocks to be flushed by this thread, but blocks in
+        flush by other threads. Wait until one of the blocks is flushed.
+       */
+#ifdef THREAD
+      struct st_my_thread_var *thread= my_thread_var;
+      keycache_stats_flush_key_blocks_int_wait_flushed++;
+      add_to_queue(&last_in_flush->wqueue[COND_FOR_SAVED], thread);
+      do
+      {
+        KEYCACHE_DBUG_PRINT("flush_key_blocks_int: wait",
+                            ("suspend thread %ld", thread->id));
+        keycache_pthread_cond_wait(&thread->suspend,
+                                   &keycache->cache_lock);
+        /* The signalling thread unlinks this thread from the queue. */
+        DBUG_ASSERT(! thread->next);
+      }
+      while (thread->next);
+#else
+      KEYCACHE_DBUG_ASSERT(0);
+      /* No parallel requests in single-threaded case */
+#endif
+      /* Be sure not to lose a block. They may be flushed in random order. */
+      goto restart;
     }
     /* Wait until list of blocks in switch is empty */
     while (first_in_switch)
@@ -2464,6 +3101,7 @@
       {
 #ifdef THREAD
         struct st_my_thread_var *thread= my_thread_var;
+        keycache_stats_flush_key_blocks_int_wait_switching++;
         add_to_queue(&block->wqueue[COND_FOR_SAVED], thread);
         do
         {
@@ -2471,6 +3109,8 @@
                               ("suspend thread %ld", thread->id));
           keycache_pthread_cond_wait(&thread->suspend,
                                      &keycache->cache_lock);
+          /* The signalling thread unlinks this thread from the queue. */
+          DBUG_ASSERT(! thread->next);
         }
         while (thread->next);
 #else
@@ -2489,6 +3129,7 @@
 #if defined(KEYCACHE_DEBUG)
       cnt=0;
 #endif
+      /* Release all non-changed blocks of this file. */
       for (block= keycache->file_blocks[FILE_HASH(file)] ;
            block ;
            block= next)
@@ -2498,12 +3139,34 @@
         KEYCACHE_DBUG_ASSERT(cnt <= keycache->blocks_used);
 #endif
         next= block->next_changed;
-        if (block->hash_link->file == file &&
+        /* If hash_link is NULL, block has already been freed. */
+        if (block->hash_link && (block->hash_link->file == file) &&
             (! (block->status & BLOCK_CHANGED)
              || type == FLUSH_IGNORE_CHANGED))
         {
           reg_requests(keycache, block, 1);
+          /*
+            Get the count of changed blocks correct.
+            Changed blocks do not come here without FLUSH_IGNORE_CHANGED.
+            They have been flushed above already.
+            free_block() contains an assert to assure this.
+          */
+          if ((type == FLUSH_IGNORE_CHANGED) &&
+              (block->status & BLOCK_CHANGED))
+          {
+            block->status&= ~BLOCK_CHANGED;
+            keycache->blocks_changed--;
+            keycache->global_blocks_changed--;
+          }
           free_block(keycache, block);
+#ifdef INGO_OLD
+          if (block->status)
+          else
+            DBUG_ASSERT(!block->temperature &&
+                        !block->next_used && !block->prev_used &&
+                        !block->next_changed && !block->prev_changed);
+#endif
+          keycache_stats_flush_key_blocks_int_freed_unchanged++;
         }
       }
     }
@@ -2522,14 +3185,14 @@
 
 
 /*
-  Flush all blocks for a file to disk
+  Flush all blocks for a file.
 
   SYNOPSIS
 
     flush_key_blocks()
-      keycache            pointer to a key cache data structure
-      file                handler for the file to flush to
-      flush_type          type of the flush
+      keycache            Pointer to a key cache data structure
+      file                Handle for the file to flush to
+      flush_type          Type of the flush
 
   RETURN
     0   ok
@@ -2540,14 +3203,16 @@
                      File file, enum flush_type type)
 {
   int res;
+  uint blocks_flushed= 0;
   DBUG_ENTER("flush_key_blocks");
   DBUG_PRINT("enter", ("keycache: 0x%lx", keycache));
 
   if (keycache->disk_blocks <= 0)
     DBUG_RETURN(0);
   keycache_pthread_mutex_lock(&keycache->cache_lock);
+  keycache_stats_flush_key_blocks++;
   inc_counter_for_resize_op(keycache);
-  res= flush_key_blocks_int(keycache, file, type);
+  res= flush_key_blocks_int(keycache, file, type, &blocks_flushed);
   dec_counter_for_resize_op(keycache);
   keycache_pthread_mutex_unlock(&keycache->cache_lock);
   DBUG_RETURN(res);
@@ -2555,7 +3220,7 @@
 
 
 /*
-  Flush all blocks in the key cache to disk
+  Flush all blocks in the key cache to their files.
 */
 
 static int flush_all_key_blocks(KEY_CACHE *keycache)
@@ -2563,27 +3228,46 @@
 #if defined(KEYCACHE_DEBUG)
   uint cnt=0;
 #endif
-  while (keycache->blocks_changed > 0)
+  uint blocks_flushed;
+  DBUG_ENTER("flush_all_key_blocks");
+
+  /*
+    Loop flushing all changed blocks until no blocks for flush are found.
+    Do not rely on keycache->blocks_changed. It may be inaccurate.
+  */
+  keycache_stats_flush_all_key_blocks++;
+  do
   {
-    BLOCK_LINK *block;
-    for (block= keycache->used_last->next_used ; ; block=block->next_used)
+    BLOCK_LINK  *block;
+    uint        idx;
+
+    blocks_flushed= 0;
+    keycache_stats_flush_all_key_blocks_try++;
+    /* Step through all changed block hashes to find all changed blocks. */
+    for (idx= 0; idx < CHANGED_BLOCKS_HASH; idx++)
     {
-      if (block->hash_link)
+      keycache_stats_flush_all_key_blocks_scan_changed++;
+      for (block= keycache->changed_blocks[idx] ;
+           block ;
+           block= block->next_changed)
       {
+        if (block->hash_link)
+        {
 #if defined(KEYCACHE_DEBUG)
-        cnt++;
-        KEYCACHE_DBUG_ASSERT(cnt <= keycache->blocks_used);
+          cnt++;
+          KEYCACHE_DBUG_ASSERT(cnt <= keycache->blocks_used);
 #endif
-        if (flush_key_blocks_int(keycache, block->hash_link->file,
-				 FLUSH_RELEASE))
-          return 1;
-        break;
+          if (flush_key_blocks_int(keycache, block->hash_link->file,
+                                   FLUSH_RELEASE, &blocks_flushed))
+            DBUG_RETURN(1);
+          break;
+        }
+        if (block == keycache->used_last)
+          break;
       }
-      if (block == keycache->used_last)
-        break;
     }
-  }
-  return 0;
+  } while (blocks_flushed);
+  DBUG_RETURN(0);
 }
 
 
@@ -2624,7 +3308,7 @@
 
 #ifndef DBUG_OFF
 /*
-  Test if disk-cache is ok
+  Test if key-cache is ok
 */
 static void test_key_cache(KEY_CACHE *keycache __attribute__((unused)),
                            const char *where __attribute__((unused)),
@@ -2851,3 +3535,30 @@
 #endif /* defined(KEYCACHE_DEBUG_LOG) */
 
 #endif /* defined(KEYCACHE_DEBUG) */
+
+#ifdef EXTRA_DEBUG
+/* Check the free blocks list and if block is in it. */
+static void check_free_block_list(KEY_CACHE *keycache, BLOCK_LINK *block)
+{
+  uint       count= 0;
+  my_bool    found= FALSE;
+  BLOCK_LINK **sblk= &keycache->free_block_list;
+
+  while (*sblk)
+  {
+    if (*sblk == block)
+      found= TRUE;
+
+    /* Detect a loop in the free list. */
+    DBUG_ASSERT(++count < keycache->blocks_used);
+
+    /* Detect improper initialized blocks. Maybe linked from LRU. */
+    DBUG_ASSERT(! (*sblk)->prev_used && ! (*sblk)->status);
+    DBUG_ASSERT(! (*sblk)->temperature);
+
+    sblk= &(*sblk)->next_used;
+  }
+  /* We need to find block in free list. It can be nowhere else. */
+  DBUG_ASSERT(found);
+}
+#endif /*EXTRA_DEBUG*/

--- 1.6/include/keycache.h	2005-09-14 13:18:11 +02:00
+++ 1.7/include/keycache.h	2006-04-26 12:43:36 +02:00
@@ -73,6 +73,7 @@
   BLOCK_LINK *used_ins;          /* ptr to the insertion block in LRU chain  */
   pthread_mutex_t cache_lock;    /* to lock access to the cache structure    */
   KEYCACHE_WQUEUE resize_queue;  /* threads waiting during resize operation  */
+  KEYCACHE_WQUEUE resizer;       /* thread curently resizing the key cache   */
   KEYCACHE_WQUEUE waiting_for_hash_link; /* waiting for a free hash link     */
   KEYCACHE_WQUEUE waiting_for_block;    /* requests waiting for a free block */
   BLOCK_LINK *changed_blocks[CHANGED_BLOCKS_HASH]; /* hash for dirty file bl.*/
Thread
bk commit into 5.0 tree (ingo:1.2052) BUG#17332ingo26 Apr