List:Commits« Previous MessageNext Message »
From:ingo Date:June 1 2006 1:31pm
Subject:bk commit into 5.0 tree (ingo:1.2138) BUG#17332
View as plain text  
Below is the list of changes that have just been committed into a local
5.0 repository of mydev. When mydev does a push these changes will
be propagated to the main repository and, within 24 hours after the
push, to the public repository.
For information on how to access the public repository
see http://dev.mysql.com/doc/mysql/en/installing-source-tree.html

ChangeSet
  1.2138 06/06/01 15:31:09 ingo@stripped +2 -0
  Bug#17332 - changing key_buffer_size on a running server can crash under load
  
  The key cache had a couple of race conditions for "on-line"
  resize.
  
  I rewrote parts of the locking scheme, fixed block status 
  handling, initialized block pointers more thoroughly, checked
  blocks for free before using its pointers, and changed the
  key cache flushing loop so that no blocks can be forgotten.

  mysys/mf_keycache.c
    1.57 06/06/01 15:31:03 ingo@stripped +607 -251
    Bug#17332 - changing key_buffer_size on a running server can crash under load
    Changed and added comments, removed unused defines,
    added new debugging features, fixed queue/mutex handling
    so that a common scheme is followed. See the new big comment
    at top of the file. Consequently created a new queue for
    the resizer to wait in for others to leave the cache after
    flushing.
    Made small style fixes. Initialized block pointers on unlink.
    Added checks if other threads freed a block where necessary.
    Return a changed block during resize to avoid its contents
    to be lost if writer wants to replace just a part of the block
    or that the meanwhile obsolete contents gets flushed later.
    Don't free reassigned blocks if they are tried to write
    during resize. Added a new block status flag BLOCK_IN_USE to
    distinguish between freshly initialized and free blocks.
    Always mark a block as BLOCK_IN_FLUSH before releasing the
    mutex for writing. Fixed removal from the hash_link in 
    key_cache_write().
    In free_block() be careful not to try to free already freed
    pointers of the block. (Re-)initialize more pointers to avoid
    a free block pointing into arbitrary queues.
    Fixed the use of the BLOCK_IN_FLUSH flag at some places.
    Always set it before going to write and clear it after write.
    In flush_cached_blocks() do not free every block immediately 
    after write. They may be used for further reading until all
    blocks are flushed.
    In flush_key_blocks_int() do not try to flush blocks that
    are already BLOCK_IN_FLUSH. Mark BLOCK_IN_FLUSH immediately
    when selecting blocks for flush. Restarting flushing until
    no more blocks for flush are found. While writing, more blocks
    might become dirty.
    Changed flush_all_key_blocks() so that it loops over all files
    until no more blocks for flush are found.

  include/keycache.h
    1.7 06/06/01 15:31:03 ingo@stripped +1 -0
    Bug#17332 - changing key_buffer_size on a running server can crash under load
    Added the 'resizer' queue for the resizer to wait
    for others to leave the cache after flushing.

# This is a BitKeeper patch.  What follows are the unified diffs for the
# set of deltas contained in the patch.  The rest of the patch, the part
# that BitKeeper cares about, is below these diffs.
# User:	ingo
# Host:	chilla.local
# Root:	/home/mydev/mysql-5.0-bug17332

--- 1.56/mysys/mf_keycache.c	2006-04-20 01:22:53 +02:00
+++ 1.57/mysys/mf_keycache.c	2006-06-01 15:31:03 +02:00
@@ -26,7 +26,7 @@
   When a new block is required it is first tried to pop one from the stack.
   If the stack is empty, it is tried to get a never-used block from the pool.
   If this is empty too, then a block is taken from the LRU ring, flushing it
-  to disk, if neccessary. This is handled in find_key_block().
+  to file, if neccessary. This is handled in find_key_block().
   With the new free list, the blocks can have three temperatures:
   hot, warm and cold (which is free). This is remembered in the block header
   by the enum BLOCK_TEMPERATURE temperature variable. Remembering the
@@ -37,6 +37,42 @@
   blocks_unused is the sum of never used blocks in the pool and of currently
   free blocks. blocks_used is the number of blocks fetched from the pool and
   as such gives the maximum number of in-use blocks at any time.
+
+  Key Cache Locking
+  =================
+
+  All key cache locking is done with a single mutex per key cache:
+  keycache->cache_lock. This mutex is locked almost all the time
+  when executing code in this file (mf_keycache.c).
+  However it is released for I/O and some copy operations.
+
+  The cache_lock is also released when waiting for some event. Waiting
+  and signalling is done via condition variables. In most cases the
+  thread waits on its thread->suspend condition variable. Every thread
+  has a my_thread_var structure, which contains this variable and a
+  'next' and 'prev' pointer. These pointers are used to insert the
+  thread into a wait queue.
+
+  Since there is only one pair of queue pointers per thread, a thread
+  can be part of one wait queue only.
+
+  Before starting to wait on its condition variale with
+  pthread_cond_wait(), the thread enters itself to a specific wait queue
+  with link_into_queue() (double linked with 'next' + 'prev') or
+  add_to_queue() (single linked with 'next').
+
+  Another thread, when releasing a resource, looks up the waiting thread
+  in the related wait queue. It sends a signal with
+  pthread_cond_signal() to the waiting thread and removes it from the
+  wait queue with unlink_from_queue() or release_queue() respectively.
+
+  There is one exception from this locking scheme. Each block has a
+  reference to a condition variable (condvar). It holds a reference to
+  the thread->suspend condition variable, if that thread is waiting for
+  the block. When that thread is signalled, the reference is cleared.
+  This is similar to the above, but it clearly means that only one
+  thread can wait for a particular block. There is no queue in this
+  case.
 */
 
 #include "mysys_priv.h"
@@ -73,7 +109,7 @@
       #define KEYCACHE_DEBUG_LOG  <LOG NAME>
     if the name is not defined, it's set by default;
     if the KEYCACHE_DEBUG flag is not set up and we are in a debug
-    mode, i.e. when ! defined(DBUG_OFF), the debug information from the
+    mode, i.e. when !defined(DBUG_OFF), the debug information from the
     module is sent to the regular debug log.
 
   Example of the settings:
@@ -98,7 +134,6 @@
 /* types of condition variables */
 #define  COND_FOR_REQUESTED 0
 #define  COND_FOR_SAVED     1
-#define  COND_FOR_READERS   2
 
 typedef pthread_cond_t KEYCACHE_CONDVAR;
 
@@ -120,12 +155,13 @@ struct st_hash_link
 };
 
 /* simple states of a block */
-#define BLOCK_ERROR       1   /* an error occured when performing disk i/o   */
+#define BLOCK_ERROR       1   /* an error occured when performing file i/o   */
 #define BLOCK_READ        2   /* the is page in the block buffer             */
 #define BLOCK_IN_SWITCH   4   /* block is preparing to read new page         */
 #define BLOCK_REASSIGNED  8   /* block does not accept requests for old page */
 #define BLOCK_IN_FLUSH   16   /* block is in flush operation                 */
 #define BLOCK_CHANGED    32   /* block buffer contains a dirty page          */
+#define BLOCK_IN_USE     64   /* block buffer is in use (not free)           */
 
 /* page status, returned by find_key_block */
 #define PAGE_READ               0
@@ -139,7 +175,7 @@ enum BLOCK_TEMPERATURE { BLOCK_COLD /*fr
 struct st_block_link
 {
   struct st_block_link
-    *next_used, **prev_used;   /* to connect links in the LRU chain (ring)   */
+    *next_used, **prev_used;    /* link in LRU ring or free_block_list */
   struct st_block_link
     *next_changed, **prev_changed; /* for lists of file dirty/clean blocks   */
   struct st_hash_link *hash_link; /* backward ptr to referring hash_link     */
@@ -178,7 +214,7 @@ static void test_key_cache(KEY_CACHE *ke
 
 #define DEFAULT_KEYCACHE_DEBUG_LOG  "keycache_debug.log"
 
-#if defined(KEYCACHE_DEBUG) && ! defined(KEYCACHE_DEBUG_LOG)
+#if defined(KEYCACHE_DEBUG) && !defined(KEYCACHE_DEBUG_LOG)
 #define KEYCACHE_DEBUG_LOG  DEFAULT_KEYCACHE_DEBUG_LOG
 #endif
 
@@ -209,7 +245,7 @@ static void keycache_debug_print _VARARG
               keycache_debug_print m; }
 
 #define KEYCACHE_DBUG_ASSERT(a)                                               \
-            { if (! (a) && keycache_debug_log) fclose(keycache_debug_log);    \
+            { if (!(a) && keycache_debug_log) fclose(keycache_debug_log);    \
               assert(a); }
 #else
 #define KEYCACHE_DBUG_PRINT(l, m)  DBUG_PRINT(l, m)
@@ -262,6 +298,7 @@ static int keycache_pthread_cond_signal(
 #define keycache_pthread_cond_signal pthread_cond_signal
 #endif /* defined(KEYCACHE_DEBUG) */
 
+
 static uint next_power(uint value)
 {
   uint old_value= 1;
@@ -318,7 +355,7 @@ int init_key_cache(KEY_CACHE *keycache, 
   keycache->global_cache_w_requests= keycache->global_cache_r_requests= 0;
   keycache->global_cache_read= keycache->global_cache_write= 0;
   keycache->disk_blocks= -1;
-  if (! keycache->key_cache_inited)
+  if (!keycache->key_cache_inited)
   {
     keycache->key_cache_inited= 1;
     keycache->in_init= 0;
@@ -485,7 +522,6 @@ int resize_key_cache(KEY_CACHE *keycache
 {
   int blocks;
   struct st_my_thread_var *thread;
-  KEYCACHE_WQUEUE *wqueue;
   DBUG_ENTER("resize_key_cache");
 
   if (!keycache->key_cache_inited)
@@ -501,13 +537,19 @@ int resize_key_cache(KEY_CACHE *keycache
   keycache_pthread_mutex_lock(&keycache->cache_lock);
 
 #ifdef THREAD
-  wqueue= &keycache->resize_queue;
+  /*
+    If another thread is resizing the key cache already, we have to wait
+    for it. For this purpose we queue up in the resize_queue. When the
+    current resizer finishes, it will signal the first thread in the
+    queue and remove it from the queue.
+  */
   thread= my_thread_var;
-  link_into_queue(wqueue, thread);
-
-  while (wqueue->last_thread->next != thread)
+  while (keycache->resize_in_flush)
   {
+    /* Wait for other resizes to finish. */
+    link_into_queue(&keycache->resize_queue, thread);
     keycache_pthread_cond_wait(&thread->suspend, &keycache->cache_lock);
+    /* The signalling thread unlinks this thread from the queue. */
   }
 #endif
 
@@ -522,12 +564,22 @@ int resize_key_cache(KEY_CACHE *keycache
   }
   keycache->resize_in_flush= 0;
   keycache->can_be_used= 0;
+
 #ifdef THREAD
+  /*
+    Wait until all users of the cache have left before freeing all data
+    structures. Use the resizer "queue" for waiting. There should be
+    only one entry in the "queue". dec_counter_for_resize_op() will wake
+    us when it is called from the last thread that used a page. It will
+    also remove us from the "queue".
+  */
   while (keycache->cnt_for_resize_op)
   {
     KEYCACHE_DBUG_PRINT("resize_key_cache: wait",
                         ("suspend thread %ld", thread->id));
+    link_into_queue(&keycache->resizer, thread);
     keycache_pthread_cond_wait(&thread->suspend, &keycache->cache_lock);
+    /* The signalling thread unlinks this thread from the queue. */
   }
 #else
   KEYCACHE_DBUG_ASSERT(keycache->cnt_for_resize_op == 0);
@@ -540,23 +592,50 @@ int resize_key_cache(KEY_CACHE *keycache
 
 finish:
 #ifdef THREAD
-  unlink_from_queue(wqueue, thread);
-  /* Signal for the next resize request to proceeed if any */
-  if (wqueue->last_thread)
+  /*
+    If there is any thread waiting in the resizer_queue then wake it and
+    remove it from the queue. It is the next one to resize the key cache.
+  */
+  if ((thread= keycache->resize_queue.last_thread))
   {
+    thread= thread->next;
     KEYCACHE_DBUG_PRINT("resize_key_cache: signal",
-                        ("thread %ld", wqueue->last_thread->next->id));
-    keycache_pthread_cond_signal(&wqueue->last_thread->next->suspend);
+                        ("thread %ld", thread->id));
+    keycache_pthread_cond_signal(&thread->suspend);
+    unlink_from_queue(&keycache->resize_queue, thread);
   }
 #endif
+
   keycache_pthread_mutex_unlock(&keycache->cache_lock);
-  return blocks;
+  DBUG_RETURN(blocks);
 }
 
 
 /*
-  Increment counter blocking resize key cache operation
+  Increment counter blocking resize key cache operation to finish.
+
+  SYNOPSIS
+    inc_counter_for_resize_op()
+      keycache                  The key cache.
+
+  DESCRIPTION
+    This is used whenever key_cache_read(), key_cache_insert(), or
+    key_cache_write() want to copy data to/from a cached block.
+    They release the cache_lock while copying or even reading or
+    writing the file. The resize counter tells resize_key_cache()
+    that someone is still using a cached block. The data structures
+    must not yet be freed.
+    flush_key_blocks() increments the counter too before it flushes
+    the blocks for an index file.
+    The counter is decremented by dec_counter_for_resize_op().
+
+  NOTE
+    It is important that the counter is incremented in a cache_lock.
+
+  RETURN
+    void
 */
+
 static inline void inc_counter_for_resize_op(KEY_CACHE *keycache)
 {
   keycache->cnt_for_resize_op++;
@@ -564,19 +643,38 @@ static inline void inc_counter_for_resiz
 
 
 /*
-  Decrement counter blocking resize key cache operation;
-  Signal the operation to proceed when counter becomes equal zero
+  Decrement counter blocking resize key cache operation to finish.
+
+  SYNOPSIS
+    dec_counter_for_resize_op()
+      keycache                  The key cache.
+
+  DESCRIPTION
+    See description for inc_counter_for_resize_op().
+    When the counter becomes zero, wake the resizer and unlink it from
+    the resizer queue.
+    When the resizer finished flushing, it waits on the resizer queue
+    until users of the cache have left.
+
+  NOTE
+    It is important that the counter is decremented in a cache_lock.
+
+  RETURN
+    void
 */
+
 static inline void dec_counter_for_resize_op(KEY_CACHE *keycache)
 {
 #ifdef THREAD
-  struct st_my_thread_var *last_thread;
+  struct st_my_thread_var *thread;
+
   if (!--keycache->cnt_for_resize_op &&
-      (last_thread= keycache->resize_queue.last_thread))
+      (thread= keycache->resizer.last_thread))
   {
     KEYCACHE_DBUG_PRINT("dec_counter_for_resize_op: signal",
-                        ("thread %ld", last_thread->next->id));
-    keycache_pthread_cond_signal(&last_thread->next->suspend);
+                        ("thread %ld", thread->id));
+    keycache_pthread_cond_signal(&thread->suspend);
+    unlink_from_queue(&keycache->resizer, thread);
   }
 #else
   keycache->cnt_for_resize_op--;
@@ -648,7 +746,7 @@ void end_key_cache(KEY_CACHE *keycache, 
       keycache->block_root= NULL;
     }
     keycache->disk_blocks= -1;
-    /* Reset blocks_changed to be safe if flush_all_key_blocks is called */
+    /* Reset blocks_changed to be safe. */
     keycache->blocks_changed= 0;
   }
 
@@ -692,7 +790,7 @@ static void link_into_queue(KEYCACHE_WQU
                                    struct st_my_thread_var *thread)
 {
   struct st_my_thread_var *last;
-  if (! (last= wqueue->last_thread))
+  if (!(last= wqueue->last_thread))
   {
     /* Queue is empty */
     thread->next= thread;
@@ -728,8 +826,10 @@ static void unlink_from_queue(KEYCACHE_W
 {
   KEYCACHE_DBUG_PRINT("unlink_from_queue", ("thread %ld", thread->id));
   if (thread->next == thread)
+  {
     /* The queue contains only one member */
     wqueue->last_thread= NULL;
+  }
   else
   {
     thread->next->prev= thread->prev;
@@ -763,7 +863,7 @@ static inline void add_to_queue(KEYCACHE
                                 struct st_my_thread_var *thread)
 {
   struct st_my_thread_var *last;
-  if (! (last= wqueue->last_thread))
+  if (!(last= wqueue->last_thread))
     thread->next= thread;
   else
   {
@@ -801,10 +901,10 @@ static void release_queue(KEYCACHE_WQUEU
     thread=next;
     KEYCACHE_DBUG_PRINT("release_queue: signal", ("thread %ld", thread->id));
     keycache_pthread_cond_signal(&thread->suspend);
+    /* Unlink the signalled thread from the queue. */
     next=thread->next;
     thread->next= NULL;
-  }
-  while (thread != last);
+  } while (thread != last);
   wqueue->last_thread= NULL;
 }
 #endif
@@ -818,6 +918,8 @@ static inline void unlink_changed(BLOCK_
 {
   if (block->next_changed)
     block->next_changed->prev_changed= block->prev_changed;
+  /* A block must be either in dirty or clean chain (unless free). */
+  DBUG_ASSERT(block->prev_changed);
   *block->prev_changed= block->next_changed;
 }
 
@@ -866,7 +968,7 @@ static inline void link_to_changed_list(
   unlink_changed(block);
   link_changed(block,
                &keycache->changed_blocks[FILE_HASH(block->hash_link->file)]);
-  block->status|=BLOCK_CHANGED;
+  block->status|= BLOCK_CHANGED;
   keycache->blocks_changed++;
   keycache->global_blocks_changed++;
 }
@@ -914,25 +1016,34 @@ static void link_block(KEY_CACHE *keycac
   BLOCK_LINK *ins;
   BLOCK_LINK **pins;
 
-  KEYCACHE_DBUG_ASSERT(! (block->hash_link && block->hash_link->requests));
+  KEYCACHE_DBUG_ASSERT(!(block->hash_link && block->hash_link->requests));
 #ifdef THREAD
   if (!hot && keycache->waiting_for_block.last_thread)
   {
-    /* Signal that in the LRU warm sub-chain an available block has appeared */
+    /*
+      This branch does not link the block into the LRU. There are threads
+      waiting to grab a block for their purpose. They want to take it from
+      the LRU. So we avoid the effort of linking it in and out. By assigning
+      hash_link->block= block we pretend that the block is already unlinked
+      by another thread.
+
+      The hash_link to be used is taken from my_thread_var->opt_info of
+      the first thread in the queue. Then all threads are signalled that
+      wait for the same hash_link.
+    */
     struct st_my_thread_var *last_thread=
                                keycache->waiting_for_block.last_thread;
-    struct st_my_thread_var *first_thread= last_thread->next;
-    struct st_my_thread_var *next_thread= first_thread;
-    HASH_LINK *hash_link= (HASH_LINK *) first_thread->opt_info;
+    struct st_my_thread_var *next_thread= last_thread->next;
     struct st_my_thread_var *thread;
+    HASH_LINK *hash_link= (HASH_LINK*) next_thread->opt_info;
+
+    hash_link->block= block;
     do
     {
+      /* Step through the chain that is modified by unlink_from_queue(). */
       thread= next_thread;
       next_thread= thread->next;
-      /*
-         We notify about the event all threads that ask
-         for the same page as the first thread in the queue
-      */
+      /* Notify each thread that asks for the same page. */
       if ((HASH_LINK *) thread->opt_info == hash_link)
       {
         KEYCACHE_DBUG_PRINT("link_block: signal", ("thread %ld", thread->id));
@@ -940,9 +1051,8 @@ static void link_block(KEY_CACHE *keycac
         unlink_from_queue(&keycache->waiting_for_block, thread);
         block->requests++;
       }
-    }
-    while (thread != last_thread);
-    hash_link->block= block;
+    } while (thread != last_thread);
+
     KEYCACHE_THREAD_TRACE("link_block: after signaling");
 #if defined(KEYCACHE_DEBUG)
     KEYCACHE_DBUG_PRINT("link_block",
@@ -953,7 +1063,7 @@ static void link_block(KEY_CACHE *keycac
     return;
   }
 #else /* THREAD */
-  KEYCACHE_DBUG_ASSERT(! (!hot && keycache->waiting_for_block.last_thread));
+  KEYCACHE_DBUG_ASSERT(!(!hot && keycache->waiting_for_block.last_thread));
       /* Condition not transformed using DeMorgan, to keep the text identical */
 #endif /* THREAD */
   pins= hot ? &keycache->used_ins : &keycache->used_last;
@@ -1003,9 +1113,15 @@ static void link_block(KEY_CACHE *keycac
 
 static void unlink_block(KEY_CACHE *keycache, BLOCK_LINK *block)
 {
+  DBUG_ASSERT(block->next_used && block->prev_used &&
+              (block->next_used->prev_used == &block->next_used) &&
+              (*block->prev_used == block));
+
   if (block->next_used == block)
+  {
     /* The list contains only one member */
     keycache->used_last= keycache->used_ins= NULL;
+  }
   else
   {
     block->next_used->prev_used= block->prev_used;
@@ -1015,15 +1131,17 @@ static void unlink_block(KEY_CACHE *keyc
     if (keycache->used_ins == block)
       keycache->used_ins=STRUCT_PTR(BLOCK_LINK, next_used, block->prev_used);
   }
+  block->prev_used= NULL;
   block->next_used= NULL;
 
   KEYCACHE_THREAD_TRACE("unlink_block");
 #if defined(KEYCACHE_DEBUG)
   keycache->blocks_available--;
   KEYCACHE_DBUG_PRINT("unlink_block",
-    ("unlinked block %u  status=%x   #requests=%u  #available=%u",
-     BLOCK_NUMBER(block), block->status,
-     block->requests, keycache->blocks_available));
+                      ("unlinked block %u  status=%x  "
+                       "#requests=%u  #available=%u",
+                       BLOCK_NUMBER(block), block->status,
+                       block->requests, keycache->blocks_available));
   KEYCACHE_DBUG_ASSERT(keycache->blocks_available >= 0);
 #endif
 }
@@ -1034,7 +1152,7 @@ static void unlink_block(KEY_CACHE *keyc
 */
 static void reg_requests(KEY_CACHE *keycache, BLOCK_LINK *block, int count)
 {
-  if (! block->requests)
+  if (!block->requests)
     /* First request for the block unlinks it */
     unlink_block(keycache, block);
   block->requests+=count;
@@ -1073,7 +1191,7 @@ static void reg_requests(KEY_CACHE *keyc
 static void unreg_request(KEY_CACHE *keycache,
                           BLOCK_LINK *block, int at_end)
 {
-  if (! --block->requests)
+  if (!--block->requests)
   {
     my_bool hot;
     if (block->hits_left)
@@ -1116,8 +1234,12 @@ static void unreg_request(KEY_CACHE *key
 
 static inline void remove_reader(BLOCK_LINK *block)
 {
-  if (! --block->hash_link->requests && block->condvar)
+  if (!--block->hash_link->requests && block->condvar)
+  {
     keycache_pthread_cond_signal(block->condvar);
+    /* Do not signal this thread again. */
+    block->condvar= NULL;
+  }
 }
 
 
@@ -1130,14 +1252,16 @@ static inline void wait_for_readers(KEY_
 {
 #ifdef THREAD
   struct st_my_thread_var *thread= my_thread_var;
-  while (block->hash_link->requests)
+  /* Block could have been freed while waiting (after an iteration). */
+  while (block->hash_link && block->hash_link->requests)
   {
     KEYCACHE_DBUG_PRINT("wait_for_readers: wait",
                         ("suspend thread %ld  block %u",
                          thread->id, BLOCK_NUMBER(block)));
+    DBUG_ASSERT(!block->condvar);
     block->condvar= &thread->suspend;
     keycache_pthread_cond_wait(&thread->suspend, &keycache->cache_lock);
-    block->condvar= NULL;
+    /* The condition variable is cleared by the signalling thread. */
   }
 #else
   KEYCACHE_DBUG_ASSERT(block->hash_link->requests == 0);
@@ -1177,9 +1301,8 @@ static void unlink_hash(KEY_CACHE *keyca
     /* Signal that a free hash link has appeared */
     struct st_my_thread_var *last_thread=
                                keycache->waiting_for_hash_link.last_thread;
-    struct st_my_thread_var *first_thread= last_thread->next;
-    struct st_my_thread_var *next_thread= first_thread;
-    KEYCACHE_PAGE *first_page= (KEYCACHE_PAGE *) (first_thread->opt_info);
+    struct st_my_thread_var *next_thread= last_thread->next;
+    KEYCACHE_PAGE *first_page= (KEYCACHE_PAGE*) next_thread->opt_info;
     struct st_my_thread_var *thread;
 
     hash_link->file= first_page->file;
@@ -1200,15 +1323,14 @@ static void unlink_hash(KEY_CACHE *keyca
         keycache_pthread_cond_signal(&thread->suspend);
         unlink_from_queue(&keycache->waiting_for_hash_link, thread);
       }
-    }
-    while (thread != last_thread);
+    } while (thread != last_thread);
     link_hash(&keycache->hash_root[KEYCACHE_HASH(hash_link->file,
 					         hash_link->diskpos)],
               hash_link);
     return;
   }
 #else /* THREAD */
-  KEYCACHE_DBUG_ASSERT(! (keycache->waiting_for_hash_link.last_thread));
+  KEYCACHE_DBUG_ASSERT(!(keycache->waiting_for_hash_link.last_thread));
 #endif /* THREAD */
   hash_link->next= keycache->free_hash_list;
   keycache->free_hash_list= hash_link;
@@ -1227,6 +1349,7 @@ static HASH_LINK *get_hash_link(KEY_CACH
 #if defined(KEYCACHE_DEBUG)
   int cnt;
 #endif
+  DBUG_ENTER("get_hash_link");
 
   KEYCACHE_DBUG_PRINT("get_hash_link", ("fd: %u  pos: %lu",
                       (uint) file,(ulong) filepos));
@@ -1248,7 +1371,7 @@ restart:
     hash_link= hash_link->next;
 #if defined(KEYCACHE_DEBUG)
     cnt++;
-    if (! (cnt <= keycache->hash_links_used))
+    if (!(cnt <= keycache->hash_links_used))
     {
       int i;
       for (i=0, hash_link= *start ;
@@ -1261,7 +1384,7 @@ restart:
     KEYCACHE_DBUG_ASSERT(cnt <= keycache->hash_links_used);
 #endif
   }
-  if (! hash_link)
+  if (!hash_link)
   {
     /* There is no hash link in the hash table for the pair (file, filepos) */
     if (keycache->free_hash_list)
@@ -1287,6 +1410,7 @@ restart:
                         ("suspend thread %ld", thread->id));
       keycache_pthread_cond_wait(&thread->suspend,
                                  &keycache->cache_lock);
+      /* The signalling thread unlinks this thread from the queue. */
       thread->opt_info= NULL;
 #else
       KEYCACHE_DBUG_ASSERT(0);
@@ -1300,7 +1424,7 @@ restart:
   /* Register the request for the page */
   hash_link->requests++;
 
-  return hash_link;
+  DBUG_RETURN(hash_link);
 }
 
 
@@ -1313,7 +1437,7 @@ restart:
 
     find_key_block()
       keycache            pointer to a key cache data structure
-      file                handler for the file to read page from
+      file                handle for the file to read page from
       filepos             position of the page in the file
       init_hits_left      how initialize the block counter for the page
       wrmode              <-> get for writing
@@ -1377,29 +1501,48 @@ restart:
 
     if (page_status != PAGE_READ)
     {
-      /* We don't need the page in the cache: we are going to write on disk */
-      hash_link->requests--;
-      unlink_hash(keycache, hash_link);
-      return 0;
+      /*
+        We don't have the block in the cache. And we don't want to have
+        new changed blocks at this stage. We do not need to expect that
+        another thread could try to read the same page until the caller
+        finishes writing itself. It should have an exclusive write lock
+        on the table or index. So we happily return NULL.
+      */
+      if (!--hash_link->requests && !block)
+        unlink_hash(keycache, hash_link);
+      DBUG_RETURN(NULL);
     }
+    /* The block is in the cache. */
     if (!(block->status & BLOCK_IN_FLUSH))
     {
-      hash_link->requests--;
+      /* The block is not yet or no longer part of a flush operation. */
+      if (block->status & BLOCK_CHANGED)
+      {
+        /* The block has changed contents, which must not get lost. */
+        reg_requests(keycache, block, 1);
+        *page_st= page_status;
+        DBUG_RETURN(block);
+      }
       /*
         Remove block to invalidate the page in the block buffer
-        as we are going to write directly on disk.
+        as we are going to write it directly to file.
         Although we have an exlusive lock for the updated key part
-        the control can be yieded by the current thread as we might
+        the control can be yielded by the current thread as we might
         have unfinished readers of other key parts in the block
         buffer. Still we are guaranteed not to have any readers
         of the key part we are writing into until the block is
-        removed from the cache as we set the BLOCL_REASSIGNED
+        removed from the cache as we set the BLOCK_REASSIGNED
         flag (see the code below that handles reading requests).
       */
-      free_block(keycache, block);
-      return 0;
+      hash_link->requests--;
+      /* We must not free switching blocks. They are reused already. */
+      if (!(block->status & (BLOCK_IN_SWITCH | BLOCK_REASSIGNED)))
+      {
+        free_block(keycache, block);
+      }
+      DBUG_RETURN(NULL);
     }
-    /* Wait intil the page is flushed on disk */
+    /* Wait until the page is flushed to file. */
     hash_link->requests--;
     {
 #ifdef THREAD
@@ -1411,8 +1554,9 @@ restart:
                             ("suspend thread %ld", thread->id));
         keycache_pthread_cond_wait(&thread->suspend,
                                    &keycache->cache_lock);
-      }
-      while(thread->next);
+        /* The signalling thread unlinks this thread from the queue. */
+        DBUG_ASSERT(!thread->next);
+      } while(thread->next);
 #else
       KEYCACHE_DBUG_ASSERT(0);
       /*
@@ -1424,10 +1568,14 @@ restart:
       */
 #endif
     }
-    /* Invalidate page in the block if it has not been done yet */
-    if (block->status)
+    /*
+      Invalidate page in the block if it has not been done yet, but
+      do not free switching blocks. They are reused already.
+    */
+    if (block->status &&
+        !(block->status & (BLOCK_IN_SWITCH | BLOCK_REASSIGNED)))
       free_block(keycache, block);
-    return 0;
+    DBUG_RETURN(NULL);
   }
 
   if (page_status == PAGE_READ &&
@@ -1444,9 +1592,20 @@ restart:
        all others are to be suspended, then resubmitted
     */
     if (!wrmode && !(block->status & BLOCK_REASSIGNED))
+    {
+      /* This is a read for a page that is in flush (ongoing write). */
       reg_requests(keycache, block, 1);
+    }
     else
     {
+      /*
+        This is either a write for a page in flush
+        or a read (or write) for an already flushed page that is
+        waiting for other readers to finish.
+        Anyway, we must not access it. We have to wait for it to be
+        reinitialized before we try to get a new block. Otherwise we
+        would find the same page again and again.
+      */
       hash_link->requests--;
       KEYCACHE_DBUG_PRINT("find_key_block",
                           ("request waiting for old page to be saved"));
@@ -1462,8 +1621,9 @@ restart:
                               ("suspend thread %ld", thread->id));
           keycache_pthread_cond_wait(&thread->suspend,
                                      &keycache->cache_lock);
-        }
-        while(thread->next);
+          /* The signalling thread unlinks this thread from the queue. */
+          DBUG_ASSERT(!thread->next);
+        } while(thread->next);
 #else
         KEYCACHE_DBUG_ASSERT(0);
           /* No parallel requests in single-threaded case */
@@ -1478,7 +1638,7 @@ restart:
   else
   {
     /* This is a request for a new page or for a page not to be removed */
-    if (! block)
+    if (!block)
     {
       /* No block is assigned for the page yet */
       if (keycache->blocks_unused)
@@ -1501,7 +1661,8 @@ restart:
           keycache->blocks_used++;
         }
         keycache->blocks_unused--;
-        block->status= 0;
+        /* Mark block in use to distinguish newly initialized from free. */
+        block->status= BLOCK_IN_USE;
         block->length= 0;
         block->offset= keycache->key_cache_block_size;
         block->requests= 1;
@@ -1520,17 +1681,17 @@ restart:
       {
 	/* There are no never used blocks, use a block from the LRU chain */
 
-        /*
-          Wait until a new block is added to the LRU chain;
-          several threads might wait here for the same page,
-          all of them must get the same block
-        */
-
 #ifdef THREAD
-        if (! keycache->used_last)
+        if (!keycache->used_last)
         {
+          /*
+            The LRU chain is empty. Wait until a block is added.
+            If several threads wait for the same page,
+            they are all signalled by link_block() at the same time.
+            my_thread_var->opt_info holds the requested page.
+          */
           struct st_my_thread_var *thread= my_thread_var;
-          thread->opt_info= (void *) hash_link;
+          thread->opt_info= (void*) hash_link;
           link_into_queue(&keycache->waiting_for_block, thread);
           do
           {
@@ -1538,15 +1699,22 @@ restart:
                                 ("suspend thread %ld", thread->id));
             keycache_pthread_cond_wait(&thread->suspend,
                                        &keycache->cache_lock);
-          }
-          while (thread->next);
+            /* The signalling thread unlinks this thread from the queue. */
+            DBUG_ASSERT(!thread->next);
+          } while (thread->next);
+          /* Avoid confusion. */
           thread->opt_info= NULL;
         }
 #else
         KEYCACHE_DBUG_ASSERT(keycache->used_last);
 #endif
+        /*
+          If we had to wait for a block to be put to the LRU chain, it
+          is now assigned to the hash_link. Otherwise we have to pick it
+          ourselves.
+        */
         block= hash_link->block;
-        if (! block)
+        if (!block)
         {
           /*
              Take the first block from the LRU chain
@@ -1555,12 +1723,13 @@ restart:
           block= keycache->used_last->next_used;
           block->hits_left= init_hits_left;
           block->last_hit_time= 0;
+          /* Register request and unlink from LRU. */
           reg_requests(keycache, block,1);
           hash_link->block= block;
         }
 
         if (block->hash_link != hash_link &&
-	    ! (block->status & BLOCK_IN_SWITCH) )
+	    !(block->status & BLOCK_IN_SWITCH) )
         {
 	  /* this is a primary request for a new page */
           block->status|= BLOCK_IN_SWITCH;
@@ -1574,6 +1743,8 @@ restart:
 
             KEYCACHE_DBUG_PRINT("find_key_block", ("block is dirty"));
 
+            /* Mark block in flush to avoid flushing by another thread. */
+            block->status|= BLOCK_IN_FLUSH;
             keycache_pthread_mutex_unlock(&keycache->cache_lock);
             /*
 	      The call is thread safe because only the current
@@ -1599,15 +1770,24 @@ restart:
             */
             wait_for_readers(keycache, block);
 
-            /* Remove the hash link for this page from the hash table */
+            /*
+              Remove the hash link for this page from the hash table.
+              Assert that the block has not been freed.
+              BLOCK_REASSIGNED should assure this.
+            */
+            DBUG_ASSERT(block->hash_link);
             unlink_hash(keycache, block->hash_link);
-            /* All pending requests for this page must be resubmitted */
-            if (block->wqueue[COND_FOR_SAVED].last_thread)
-              release_queue(&block->wqueue[COND_FOR_SAVED]);
+            /* Assigned later, so unnecessary here: block->hash_link= NULL; */
           }
+
+          /* All pending requests for this page must be resubmitted. */
+          if (block->wqueue[COND_FOR_SAVED].last_thread)
+            release_queue(&block->wqueue[COND_FOR_SAVED]);
+
           link_to_file_list(keycache, block, file,
                             (my_bool)(block->hash_link ? 1 : 0));
-          block->status= error? BLOCK_ERROR : 0;
+          /* Mark block in use to distinguish newly initialized from free. */
+          block->status= (error ? BLOCK_ERROR : 0) | BLOCK_IN_USE;
           block->length= 0;
           block->offset= keycache->key_cache_block_size;
           block->hash_link= hash_link;
@@ -1623,6 +1803,10 @@ restart:
                               ("block->hash_link: %p  hash_link: %p  "
                                "block->status: %u", block->hash_link,
                                hash_link, block->status ));
+          /*
+            If the block is not fully initialized,
+            someone is reading it in already.
+          */
           page_status= (((block->hash_link == hash_link) &&
                          (block->status & BLOCK_READ)) ?
                         PAGE_READ : PAGE_WAIT_TO_BE_READ);
@@ -1632,6 +1816,7 @@ restart:
     }
     else
     {
+      /* There is already a block assigned for this file and position. */
       reg_requests(keycache, block, 1);
       KEYCACHE_DBUG_PRINT("find_key_block",
                           ("block->hash_link: %p  hash_link: %p  "
@@ -1660,7 +1845,7 @@ restart:
 
 
 /*
-  Read into a key cache block buffer from disk.
+  Read into a key cache block buffer from file.
 
   SYNOPSIS
 
@@ -1688,6 +1873,7 @@ static void read_block(KEY_CACHE *keycac
                        uint min_length, my_bool primary)
 {
   uint got_length;
+  DBUG_ENTER("read_block");
 
   /* On entry cache_lock is locked */
 
@@ -1702,7 +1888,7 @@ static void read_block(KEY_CACHE *keycac
     KEYCACHE_DBUG_PRINT("read_block",
                         ("page to be read by primary request"));
 
-    /* Page is not in buffer yet, is to be read from disk */
+    /* Page is not in buffer yet, is to be read from file. */
     keycache_pthread_mutex_unlock(&keycache->cache_lock);
     /*
       Here other threads may step in and register as secondary readers.
@@ -1715,7 +1901,7 @@ static void read_block(KEY_CACHE *keycac
       block->status|= BLOCK_ERROR;
     else
     {
-      block->status= BLOCK_READ;
+      block->status|= BLOCK_READ;
       block->length= got_length;
     }
     KEYCACHE_DBUG_PRINT("read_block",
@@ -1743,8 +1929,9 @@ static void read_block(KEY_CACHE *keycac
                             ("suspend thread %ld", thread->id));
         keycache_pthread_cond_wait(&thread->suspend,
                                    &keycache->cache_lock);
-      }
-      while (thread->next);
+        /* The signalling thread unlinks this thread from the queue. */
+        DBUG_ASSERT(!thread->next);
+      } while (thread->next);
 #else
       KEYCACHE_DBUG_ASSERT(0);
       /* No parallel requests in single-threaded case */
@@ -1753,6 +1940,7 @@ static void read_block(KEY_CACHE *keycac
     KEYCACHE_DBUG_PRINT("read_block",
                         ("secondary request: new page in cache"));
   }
+  DBUG_VOID_RETURN;
 }
 
 
@@ -1763,7 +1951,7 @@ static void read_block(KEY_CACHE *keycac
 
     key_cache_read()
       keycache            pointer to a key cache data structure
-      file                handler for the file for the block of data to be read
+      file                handle for the file for the block of data to be read
       filepos             position of the block of data in the file
       level               determines the weight of the data
       buff                buffer to where the data must be placed
@@ -1791,7 +1979,6 @@ byte *key_cache_read(KEY_CACHE *keycache
 		     int return_buffer __attribute__((unused)))
 {
   int error=0;
-  uint offset= 0;
   byte *start= buff;
   DBUG_ENTER("key_cache_read");
   DBUG_PRINT("enter", ("fd: %u  pos: %lu  length: %u",
@@ -1801,12 +1988,12 @@ byte *key_cache_read(KEY_CACHE *keycache
   {
     /* Key cache is used */
     reg1 BLOCK_LINK *block;
+    uint offset;
     uint read_length;
     uint status;
     int page_st;
 
     offset= (uint) (filepos & (keycache->key_cache_block_size-1));
-    /* Read data in key_cache_block_size increments */
     do
     {
       keycache_pthread_mutex_lock(&keycache->cache_lock);
@@ -1815,10 +2002,12 @@ byte *key_cache_read(KEY_CACHE *keycache
 	keycache_pthread_mutex_unlock(&keycache->cache_lock);
 	goto no_key_cache;
       }
+      /* Read data in key_cache_block_size increments */
       filepos-= offset;
       read_length= length;
       set_if_smaller(read_length, keycache->key_cache_block_size-offset);
       KEYCACHE_DBUG_ASSERT(read_length > 0);
+      /* read_length is the part of the block to be read. */
 
 #ifndef THREAD
       if (block_length > keycache->key_cache_block_size || offset)
@@ -1835,7 +2024,7 @@ byte *key_cache_read(KEY_CACHE *keycache
                    keycache->key_cache_block_size, read_length+offset,
                    (my_bool)(page_st == PAGE_TO_BE_READ));
       }
-      else if (! (block->status & BLOCK_ERROR) &&
+      else if (!(block->status & BLOCK_ERROR) &&
                block->length < read_length + offset)
       {
         /*
@@ -1847,10 +2036,10 @@ byte *key_cache_read(KEY_CACHE *keycache
         block->status|= BLOCK_ERROR;
       }
 
-      if (! ((status= block->status) & BLOCK_ERROR))
+      if (!((status= block->status) & BLOCK_ERROR))
       {
 #ifndef THREAD
-        if (! return_buffer)
+        if (!return_buffer)
 #endif
         {
 #if !defined(SERIALIZED_READ_FROM_CACHE)
@@ -1869,13 +2058,19 @@ byte *key_cache_read(KEY_CACHE *keycache
         }
       }
 
+      /*
+        Remove myself as a reader of this hash_link and wake a
+        waiting thread, if I was the last one.
+      */
       remove_reader(block);
+
       /*
-         Link the block into the LRU chain
-         if it's the last submitted request for the block
+        Link the block into the LRU chain
+        if it's the last submitted request for the block.
       */
       unreg_request(keycache, block, 1);
 
+      /* Decrement resize counter before breaking on error. */
       dec_counter_for_resize_op(keycache);
 
       keycache_pthread_mutex_unlock(&keycache->cache_lock);
@@ -1896,12 +2091,13 @@ byte *key_cache_read(KEY_CACHE *keycache
     DBUG_RETURN(start);
   }
 
-no_key_cache:					/* Key cache is not used */
+no_key_cache:
+  /* Key cache is not used */
 
   /* We can't use mutex here as the key cache may not be initialized */
   keycache->global_cache_r_requests++;
   keycache->global_cache_read++;
-  if (my_pread(file, (byte*) buff, length, filepos+offset, MYF(MY_NABP)))
+  if (my_pread(file, (byte*) buff, length, filepos, MYF(MY_NABP)))
     error= 1;
   DBUG_RETURN(error ? (byte*) 0 : start);
 }
@@ -1913,7 +2109,7 @@ no_key_cache:					/* Key cache is not us
   SYNOPSIS
     key_cache_insert()
     keycache            pointer to a key cache data structure
-    file                handler for the file to insert data from
+    file                handle for the file to insert data from
     filepos             position of the block of data in the file to insert
     level               determines the weight of the data
     buff                buffer to read data from
@@ -1939,10 +2135,10 @@ int key_cache_insert(KEY_CACHE *keycache
   {
     /* Key cache is used */
     reg1 BLOCK_LINK *block;
+    uint offset;
     uint read_length;
+    uint status;
     int page_st;
-    int error;
-    uint offset;
 
     offset= (uint) (filepos & (keycache->key_cache_block_size-1));
     do
@@ -1958,11 +2154,12 @@ int key_cache_insert(KEY_CACHE *keycache
       read_length= length;
       set_if_smaller(read_length, keycache->key_cache_block_size-offset);
       KEYCACHE_DBUG_ASSERT(read_length > 0);
+      /* read_length is the part of the block to be replaced. */
 
       inc_counter_for_resize_op(keycache);
       keycache->global_cache_r_requests++;
       block= find_key_block(keycache, file, filepos, level, 0, &page_st);
-      if (block->status != BLOCK_ERROR && page_st != PAGE_READ)
+      if (!((status= block->status) & BLOCK_ERROR) && page_st != PAGE_READ)
       {
         /* The requested page is to be read into the block buffer */
 #if !defined(SERIALIZED_READ_FROM_CACHE)
@@ -1983,7 +2180,7 @@ int key_cache_insert(KEY_CACHE *keycache
         keycache_pthread_mutex_lock(&keycache->cache_lock);
         /* Here we are alone again. */
 #endif
-        block->status= BLOCK_READ;
+        block->status|= BLOCK_READ;
         block->length= read_length+offset;
         KEYCACHE_DBUG_PRINT("key_cache_insert",
                             ("primary request: new page in cache"));
@@ -1992,20 +2189,24 @@ int key_cache_insert(KEY_CACHE *keycache
           release_queue(&block->wqueue[COND_FOR_REQUESTED]);
       }
 
+      /*
+        Remove myself as a reader of this hash_link and wake a
+        waiting thread, if I was the last one.
+      */
       remove_reader(block);
+
       /*
-         Link the block into the LRU chain
-         if it's the last submitted request for the block
+        Link the block into the LRU chain
+        if it's the last submitted request for the block.
       */
       unreg_request(keycache, block, 1);
 
-      error= (block->status & BLOCK_ERROR);
-
+      /* Decrement resize counter before breaking on error. */
       dec_counter_for_resize_op(keycache);
 
       keycache_pthread_mutex_unlock(&keycache->cache_lock);
 
-      if (error)
+      if (status & BLOCK_ERROR)
         DBUG_RETURN(1);
 
       buff+= read_length;
@@ -2025,7 +2226,7 @@ int key_cache_insert(KEY_CACHE *keycache
 
     key_cache_write()
       keycache            pointer to a key cache data structure
-      file                handler for the file to write data to
+      file                handle for the file to write data to
       filepos             position in the file to write data to
       level               determines the weight of the data
       buff                buffer with the data
@@ -2051,7 +2252,6 @@ int key_cache_write(KEY_CACHE *keycache,
                     uint block_length  __attribute__((unused)),
                     int dont_write)
 {
-  reg1 BLOCK_LINK *block;
   int error=0;
   DBUG_ENTER("key_cache_write");
   DBUG_PRINT("enter",
@@ -2061,7 +2261,10 @@ int key_cache_write(KEY_CACHE *keycache,
 
   if (!dont_write)
   {
-    /* Force writing from buff into disk */
+    /*
+      Force writing from buff to file.
+      This branch is not taken in the server.
+    */
     keycache->global_cache_write++;
     if (my_pwrite(file, buff, length, filepos, MYF(MY_NABP | MY_WAIT_IF_FULL)))
       DBUG_RETURN(1);
@@ -2075,9 +2278,11 @@ int key_cache_write(KEY_CACHE *keycache,
   if (keycache->can_be_used)
   {
     /* Key cache is used */
+    reg1 BLOCK_LINK *block;
+    uint offset;
     uint read_length;
+    uint status;
     int page_st;
-    uint offset;
 
     offset= (uint) (filepos & (keycache->key_cache_block_size-1));
     do
@@ -2093,13 +2298,21 @@ int key_cache_write(KEY_CACHE *keycache,
       read_length= length;
       set_if_smaller(read_length, keycache->key_cache_block_size-offset);
       KEYCACHE_DBUG_ASSERT(read_length > 0);
+      /* read_length is the part of the block to be replaced. */
 
       inc_counter_for_resize_op(keycache);
       keycache->global_cache_w_requests++;
       block= find_key_block(keycache, file, filepos, level, 1, &page_st);
       if (!block)
       {
-        /* It happens only for requests submitted during resize operation */
+        /*
+          It happens only for requests submitted during resize
+          operation. Block is not in cache and shall not go in now. We
+          can release the resize counter and cache_lock here because we
+          can assume that the thread has a table or index lock. Since
+          the block is not in cache, it also cannot be written by a
+          resizer.
+        */
         dec_counter_for_resize_op(keycache);
 	keycache_pthread_mutex_unlock(&keycache->cache_lock);
 	if (dont_write)
@@ -2127,13 +2340,13 @@ int key_cache_write(KEY_CACHE *keycache,
             (!offset && read_length >= keycache->key_cache_block_size))
              link_to_file_list(keycache, block, block->hash_link->file, 1);
       }
-      else if (! (block->status & BLOCK_CHANGED))
+      else if (!(block->status & BLOCK_CHANGED))
         link_to_changed_list(keycache, block);
 
       set_if_smaller(block->offset, offset);
       set_if_bigger(block->length, read_length+offset);
 
-      if (! (block->status & BLOCK_ERROR))
+      if (!((status= block->status) & BLOCK_ERROR))
       {
         if (!(read_length & 511))
 	  bmove512(block->buffer+offset, buff, read_length);
@@ -2141,23 +2354,31 @@ int key_cache_write(KEY_CACHE *keycache,
           memcpy(block->buffer+offset, buff, (size_t) read_length);
       }
 
-      block->status|=BLOCK_READ;
+      block->status|= BLOCK_READ;
 
-      /* Unregister the request */
-      block->hash_link->requests--;
+      /*
+        Remove myself as a reader of this hash_link and wake a
+        waiting thread, if I was the last one.
+      */
+      remove_reader(block);
+
+      /*
+        Link the block into the LRU chain
+        if it's the last submitted request for the block.
+      */
       unreg_request(keycache, block, 1);
 
-      if (block->status & BLOCK_ERROR)
+      /* Decrement resize counter before breaking on error. */
+      dec_counter_for_resize_op(keycache);
+
+      keycache_pthread_mutex_unlock(&keycache->cache_lock);
+
+      if (status & BLOCK_ERROR)
       {
-        keycache_pthread_mutex_unlock(&keycache->cache_lock);
         error= 1;
         break;
       }
 
-      dec_counter_for_resize_op(keycache);
-
-      keycache_pthread_mutex_unlock(&keycache->cache_lock);
-
     next_block:
       buff+= read_length;
       filepos+= read_length+offset;
@@ -2169,8 +2390,11 @@ int key_cache_write(KEY_CACHE *keycache,
 
 no_key_cache:
   /* Key cache is not used */
+
   if (dont_write)
   {
+    /* This branch is taken in the server if key cache is not used. */
+    /* We can't use mutex here as the key cache may not be initialized */
     keycache->global_cache_w_requests++;
     keycache->global_cache_write++;
     if (my_pwrite(file, (byte*) buff, length, filepos,
@@ -2199,7 +2423,15 @@ static void free_block(KEY_CACHE *keycac
   KEYCACHE_DBUG_PRINT("free_block",
                       ("block %u to be freed, hash_link %p",
                        BLOCK_NUMBER(block), block->hash_link));
-  if (block->hash_link)
+  /*
+    Assert that the block is not free already.
+    And that it is in a clean state.
+  */
+  DBUG_ASSERT(block->status &&
+              !(block->status & (BLOCK_IN_SWITCH | BLOCK_REASSIGNED |
+                                 BLOCK_IN_FLUSH | BLOCK_CHANGED)));
+
+  if (likely(block->hash_link))
   {
     /*
       While waiting for readers to finish, new readers might request the
@@ -2209,10 +2441,20 @@ static void free_block(KEY_CACHE *keycac
     */
     block->status|= BLOCK_REASSIGNED;
     wait_for_readers(keycache, block);
+    /*
+      Assert that the block has not been freed.
+      BLOCK_REASSIGNED should assure this.
+    */
+    DBUG_ASSERT(block->hash_link && block->status);
     unlink_hash(keycache, block->hash_link);
+    block->hash_link= NULL;
   }
 
+  /* A block must be either in dirty or clean chain (unless free). */
   unlink_changed(block);
+  block->next_changed= NULL;
+  block->prev_changed= NULL;
+
   block->status= 0;
   block->length= 0;
   block->offset= keycache->key_cache_block_size;
@@ -2220,16 +2462,22 @@ static void free_block(KEY_CACHE *keycac
   KEYCACHE_DBUG_PRINT("free_block",
                       ("block is freed"));
   unreg_request(keycache, block, 0);
-  block->hash_link= NULL;
 
-  /* Remove the free block from the LRU ring. */
-  unlink_block(keycache, block);
+  /* Remove the free block from the LRU ring if it is part of it. */
+  if (block->prev_used)
+    unlink_block(keycache, block);
   if (block->temperature == BLOCK_WARM)
     keycache->warm_blocks--;
   block->temperature= BLOCK_COLD;
-  /* Insert the free block in the free list. */
+
+  /*
+    Insert the free block in the free list.
+    Assert it is neither in the LRU ring nor in free_block_list.
+  */
+  DBUG_ASSERT(!block->next_used);
   block->next_used= keycache->free_block_list;
   keycache->free_block_list= block;
+
   /* Keep track of the number of currently unused blocks. */
   keycache->blocks_unused++;
 
@@ -2247,24 +2495,41 @@ static int cmp_sec_link(BLOCK_LINK **a, 
 
 
 /*
-  Flush a portion of changed blocks to disk,
-  free used blocks if requested
+  Flush a portion of changed blocks to file.
+
+  SYNOPSIS
+    flush_cached_blocks()
+      keycache                  The key cache
+      file                      Handle for the file to flush to
+      cache                     Array of blocks to flush
+      end                       Pointer past last array element
+
+  NOTE
+    Formerly this function freed flushed blocks. Now it does only link
+    the blocks to the file list (list of non-changed blocks).
+    flush_key_blocks_int() frees all file blocks after all changed
+    blocks are flushed. This way the blocks can be used for reading as
+    long as possible.
+
+  RETURN
+    0           OK
+    != 0        Error number
 */
 
 static int flush_cached_blocks(KEY_CACHE *keycache,
                                File file, BLOCK_LINK **cache,
-                               BLOCK_LINK **end,
-                               enum flush_type type)
+                               BLOCK_LINK **end)
 {
   int error;
   int last_errno= 0;
   uint count= (uint) (end-cache);
+  DBUG_ENTER("flush_cached_blocks");
 
-  /* Don't lock the cache during the flush */
+  /* Don't lock the cache during the sort. */
   keycache_pthread_mutex_unlock(&keycache->cache_lock);
   /*
      As all blocks referred in 'cache' are marked by BLOCK_IN_FLUSH
-     we are guarunteed no thread will change them
+     we are guaranteed no thread will change them.
   */
   qsort((byte*) cache, count, sizeof(*cache), (qsort_cmp) cmp_sec_link);
 
@@ -2289,43 +2554,49 @@ static int flush_cached_blocks(KEY_CACHE
       if (!last_errno)
         last_errno= errno ? errno : -1;
     }
+
     /*
-      Let to proceed for possible waiting requests to write to the block page.
+      Wake possible waiting requests to write to the block page.
       It might happen only during an operation to resize the key cache.
     */
     if (block->wqueue[COND_FOR_SAVED].last_thread)
       release_queue(&block->wqueue[COND_FOR_SAVED]);
-    /* type will never be FLUSH_IGNORE_CHANGED here */
-    if (! (type == FLUSH_KEEP || type == FLUSH_FORCE_WRITE))
-    {
-      keycache->blocks_changed--;
-      keycache->global_blocks_changed--;
-      free_block(keycache, block);
-    }
-    else
-    {
-      block->status&= ~BLOCK_IN_FLUSH;
-      link_to_file_list(keycache, block, file, 1);
-      unreg_request(keycache, block, 1);
-    }
 
+    /* Flush of this block is complete. */
+    block->status&= ~BLOCK_IN_FLUSH;
+
+    /*
+      Do not free the block now.
+      It can be used for reading while flushing other blocks.
+      link_to_file_list() clears the BLOCK_CHANGED flag.
+    */
+    link_to_file_list(keycache, block, file, 1);
+    unreg_request(keycache, block, 1);
   }
-  return last_errno;
+  DBUG_RETURN(last_errno);
 }
 
 
 /*
-  flush all key blocks for a file to disk, but don't do any mutex locks
+  flush all key blocks for a file, but don't do any mutex locks
 
+  SYNOPSIS
     flush_key_blocks_int()
-      keycache            pointer to a key cache data structure
-      file                handler for the file to flush to
-      flush_type          type of the flush
+      keycache                  Pointer to a key cache data structure
+      file                      Handle for the file to flush to
+      type                      Type of the flush
+      blocks_flushed       OUT  Additional number of blocks flushed by this
+                                run. Added to the referenced variable.
 
   NOTES
-    This function doesn't do any mutex locks because it needs to be called both
-    from flush_key_blocks and flush_all_key_blocks (the later one does the
-    mutex lock in the resize_key_cache() function).
+    This function doesn't do any mutex locks because it needs to be
+    called both from flush_key_blocks() and flush_all_key_blocks() (the
+    latter one does the mutex lock in the resize_key_cache() function).
+
+    But it releases the mutex for every write in flush_cached_blocks().
+    Hence it is possible for multiple threads to execute this function
+    at the same time. Every block selected by this function is marked as
+    BLOCK_IN_FLUSH. This makes other threads to skip it when flushing.
 
   RETURN
     0   ok
@@ -2333,7 +2604,8 @@ static int flush_cached_blocks(KEY_CACHE
 */
 
 static int flush_key_blocks_int(KEY_CACHE *keycache,
-				File file, enum flush_type type)
+                                File file, enum flush_type type,
+                                uint *blocks_flushed)
 {
   BLOCK_LINK *cache_buff[FLUSH_CACHE],**cache;
   int last_errno= 0;
@@ -2355,6 +2627,7 @@ static int flush_key_blocks_int(KEY_CACH
     uint count= 0;
     BLOCK_LINK **pos,**end;
     BLOCK_LINK *first_in_switch= NULL;
+    BLOCK_LINK *last_in_flush;
     BLOCK_LINK *block, *next;
 #if defined(KEYCACHE_DEBUG)
     uint cnt=0;
@@ -2363,14 +2636,17 @@ static int flush_key_blocks_int(KEY_CACH
     if (type != FLUSH_IGNORE_CHANGED)
     {
       /*
-         Count how many key blocks we have to cache to be able
-         to flush all dirty pages with minimum seek moves
+        Count how many changed key blocks we have. We want to collect them
+        in a "cache" array later. We want to sort the array by file position
+        to flush all dirty pages with minimum seek moves.
       */
       for (block= keycache->changed_blocks[FILE_HASH(file)] ;
            block ;
            block= block->next_changed)
       {
-        if (block->hash_link->file == file)
+        /* Ignore blocks that are in flush already. */
+        if ((block->hash_link->file == file) &&
+            !(block->status & BLOCK_IN_FLUSH))
         {
           count++;
           KEYCACHE_DBUG_ASSERT(count<= keycache->blocks_used);
@@ -2388,6 +2664,7 @@ static int flush_key_blocks_int(KEY_CACH
 
     /* Retrieve the blocks and write them to a buffer to be flushed */
 restart:
+    last_in_flush= NULL;
     end= (pos= cache)+count;
     for (block= keycache->changed_blocks[FILE_HASH(file)] ;
          block ;
@@ -2400,63 +2677,115 @@ restart:
       next= block->next_changed;
       if (block->hash_link->file == file)
       {
-        /*
-           Mark the block with BLOCK_IN_FLUSH in order not to let
-           other threads to use it for new pages and interfere with
-           our sequence ot flushing dirty file pages
-        */
-        block->status|= BLOCK_IN_FLUSH;
-
-        if (! (block->status & BLOCK_IN_SWITCH))
+        if (!(block->status & BLOCK_IN_FLUSH))
         {
-	  /*
-	    We care only for the blocks for which flushing was not
-	    initiated by other threads as a result of page swapping
+          /*
+            Mark the block with BLOCK_IN_FLUSH in order not to let other
+            threads use it for new pages and interfere with our sequence
+            of flushing dirty file pages. This does also prevent other
+            threads from flushing the same block.
           */
-          reg_requests(keycache, block, 1);
-          if (type != FLUSH_IGNORE_CHANGED)
+          block->status|= BLOCK_IN_FLUSH;
+
+          if (!(block->status & BLOCK_IN_SWITCH))
           {
-	    /* It's not a temporary file */
-            if (pos == end)
+            /*
+              We care only for the blocks for which flushing was not
+              initiated by other threads as a result of page swapping
+            */
+            reg_requests(keycache, block, 1);
+            if (type != FLUSH_IGNORE_CHANGED)
             {
-	      /*
-		This happens only if there is not enough
-		memory for the big block
-              */
-              if ((error= flush_cached_blocks(keycache, file, cache,
-                                              end,type)))
-                last_errno=error;
-              /*
-		Restart the scan as some other thread might have changed
-		the changed blocks chain: the blocks that were in switch
-		state before the flush started have to be excluded
-              */
-              goto restart;
+              /* It's not a temporary file */
+              if (pos == end)
+              {
+                /*
+                  This happens only if there is not enough
+                  memory for the big block
+                */
+                if ((error= flush_cached_blocks(keycache, file, cache, end)))
+                  last_errno=error;
+                /*
+                  Restart the scan as some other thread might have changed
+                  the changed blocks chain: the blocks that were in switch
+                  state before the flush started have to be excluded
+                */
+                *blocks_flushed+= pos - cache;
+                goto restart;
+              }
+              *pos++= block;
+            }
+            else
+            {
+              /* It's a temporary file */
+              block->status&= ~BLOCK_CHANGED;
+              keycache->blocks_changed--;
+              keycache->global_blocks_changed--;
+              free_block(keycache, block);
             }
-            *pos++= block;
           }
           else
           {
-            /* It's a temporary file */
-            keycache->blocks_changed--;
-	    keycache->global_blocks_changed--;
-            free_block(keycache, block);
+            /*
+              Link the block into a list of blocks 'in switch'.
+              It will be relinked to a clean file chain by another thread.
+            */
+            unlink_changed(block);
+            link_changed(block, &first_in_switch);
           }
         }
         else
         {
-	  /* Link the block into a list of blocks 'in switch' */
-          unlink_changed(block);
-          link_changed(block, &first_in_switch);
+          /* Remember the last block found to be in flush. */
+          last_in_flush= block;
         }
       }
     }
+
     if (pos != cache)
     {
-      if ((error= flush_cached_blocks(keycache, file, cache, pos, type)))
+      if ((error= flush_cached_blocks(keycache, file, cache, pos)))
         last_errno= error;
+      /*
+        We need another scan as some other thread might have changed the
+        changed blocks chain again while we released the lock for
+        writing each block. The flush is complete only after no more
+        changed blocks are found while the lock is held.
+      */
+      *blocks_flushed+= pos - cache;
+      goto restart;
     }
-    /* Wait until list of blocks in switch is empty */
+    else if (last_in_flush)
+    {
+      /*
+        There are no blocks to be flushed by this thread, but blocks in
+        flush by other threads. Wait until one of the blocks is flushed.
+       */
+#ifdef THREAD
+      struct st_my_thread_var *thread= my_thread_var;
+      add_to_queue(&last_in_flush->wqueue[COND_FOR_SAVED], thread);
+      do
+      {
+        KEYCACHE_DBUG_PRINT("flush_key_blocks_int: wait",
+                            ("suspend thread %ld", thread->id));
+        keycache_pthread_cond_wait(&thread->suspend,
+                                   &keycache->cache_lock);
+        /* The signalling thread unlinks this thread from the queue. */
+        DBUG_ASSERT(!thread->next);
+      } while (thread->next);
+#else
+      KEYCACHE_DBUG_ASSERT(0);
+      /* No parallel requests in single-threaded case */
+#endif
+      /* Be sure not to lose a block. They may be flushed in random order. */
+      goto restart;
+    }
+
+    /*
+      Wait until the list of blocks in switch is empty. The threads that
+      are switching these blocks will relink them to clean file chains
+      while we wait and thus empty the 'first_in_switch' chain.
+    */
     while (first_in_switch)
     {
 #if defined(KEYCACHE_DEBUG)
@@ -2473,8 +2802,9 @@ restart:
                               ("suspend thread %ld", thread->id));
           keycache_pthread_cond_wait(&thread->suspend,
                                      &keycache->cache_lock);
-        }
-        while (thread->next);
+          /* The signalling thread unlinks this thread from the queue. */
+          DBUG_ASSERT(!thread->next);
+        } while (thread->next);
 #else
         KEYCACHE_DBUG_ASSERT(0);
         /* No parallel requests in single-threaded case */
@@ -2485,8 +2815,9 @@ restart:
       KEYCACHE_DBUG_ASSERT(cnt <= keycache->blocks_used);
 #endif
     }
-    /* The following happens very seldom */
-    if (! (type == FLUSH_KEEP || type == FLUSH_FORCE_WRITE))
+
+    /* Release all file blocks for FLUSH_RELEASE or FLUSH_IGNORE_CHANGED. */
+    if ((type != FLUSH_KEEP) && (type != FLUSH_FORCE_WRITE))
     {
 #if defined(KEYCACHE_DEBUG)
       cnt=0;
@@ -2499,10 +2830,14 @@ restart:
         cnt++;
         KEYCACHE_DBUG_ASSERT(cnt <= keycache->blocks_used);
 #endif
+        /* Survive freeing of the block. */
         next= block->next_changed;
-        if (block->hash_link->file == file &&
-            (! (block->status & BLOCK_CHANGED)
-             || type == FLUSH_IGNORE_CHANGED))
+
+        /* There cannot be any changed blocks in the file_blocks chain. */
+        DBUG_ASSERT(!(block->status & BLOCK_CHANGED));
+
+        /* If hash_link is NULL, block has already been freed. */
+        if (block->hash_link && (block->hash_link->file == file))
         {
           reg_requests(keycache, block, 1);
           free_block(keycache, block);
@@ -2524,14 +2859,14 @@ restart:
 
 
 /*
-  Flush all blocks for a file to disk
+  Flush all blocks for a file.
 
   SYNOPSIS
 
     flush_key_blocks()
-      keycache            pointer to a key cache data structure
-      file                handler for the file to flush to
-      flush_type          type of the flush
+      keycache            Pointer to a key cache data structure
+      file                Handle for the file to flush to
+      flush_type          Type of the flush
 
   RETURN
     0   ok
@@ -2542,6 +2877,7 @@ int flush_key_blocks(KEY_CACHE *keycache
                      File file, enum flush_type type)
 {
   int res;
+  uint blocks_flushed= 0;
   DBUG_ENTER("flush_key_blocks");
   DBUG_PRINT("enter", ("keycache: 0x%lx", keycache));
 
@@ -2549,7 +2885,7 @@ int flush_key_blocks(KEY_CACHE *keycache
     DBUG_RETURN(0);
   keycache_pthread_mutex_lock(&keycache->cache_lock);
   inc_counter_for_resize_op(keycache);
-  res= flush_key_blocks_int(keycache, file, type);
+  res= flush_key_blocks_int(keycache, file, type, &blocks_flushed);
   dec_counter_for_resize_op(keycache);
   keycache_pthread_mutex_unlock(&keycache->cache_lock);
   DBUG_RETURN(res);
@@ -2557,35 +2893,58 @@ int flush_key_blocks(KEY_CACHE *keycache
 
 
 /*
-  Flush all blocks in the key cache to disk
+  Flush all blocks in the key cache to their files.
 */
 
 static int flush_all_key_blocks(KEY_CACHE *keycache)
 {
+  uint blocks_flushed;
+  DBUG_ENTER("flush_all_key_blocks");
+  safe_mutex_assert_owner(&keycache->cache_lock);
+
+  /* Loop flushing all changed blocks until no blocks for flush are found. */
+  do
+  {
+    BLOCK_LINK  *block;
+    uint        idx;
 #if defined(KEYCACHE_DEBUG)
-  uint cnt=0;
+    uint        cnt=0;
 #endif
-  while (keycache->blocks_changed > 0)
-  {
-    BLOCK_LINK *block;
-    for (block= keycache->used_last->next_used ; ; block=block->next_used)
+
+    blocks_flushed= 0;
+    /* Step through all changed_blocks chains. */
+    for (idx= 0; idx < CHANGED_BLOCKS_HASH; idx++)
     {
-      if (block->hash_link)
+      /* Step through all blocks in a changed_blocks chain. */
+      for (block= keycache->changed_blocks[idx] ;
+           block ;
+           block= block->next_changed)
       {
+        uint old_blocks_flushed= blocks_flushed;
+
+        /* The changed_blocks chains cannot contain free blocks. */
+        DBUG_ASSERT(block->hash_link);
 #if defined(KEYCACHE_DEBUG)
         cnt++;
         KEYCACHE_DBUG_ASSERT(cnt <= keycache->blocks_used);
 #endif
         if (flush_key_blocks_int(keycache, block->hash_link->file,
-				 FLUSH_RELEASE))
-          return 1;
-        break;
+                                 FLUSH_RELEASE, &blocks_flushed))
+          DBUG_RETURN(1);
+
+        /*
+          If any blocks have been flushed, the chain is changed.
+          Don't use it anymore. It will be re-evaluated at the next
+          iteration of the outer loop.
+        */
+        if (blocks_flushed != old_blocks_flushed)
+          break;
       }
-      if (block == keycache->used_last)
-        break;
     }
-  }
-  return 0;
+
+  } while (blocks_flushed);
+
+  DBUG_RETURN(0);
 }
 
 
@@ -2626,7 +2985,7 @@ int reset_key_cache_counters(const char 
 
 #ifndef DBUG_OFF
 /*
-  Test if disk-cache is ok
+  Test if key-cache is ok
 */
 static void test_key_cache(KEY_CACHE *keycache __attribute__((unused)),
                            const char *where __attribute__((unused)),
@@ -2667,8 +3026,7 @@ static void keycache_dump(KEY_CACHE *key
               thread->id,(uint) page->file,(ulong) page->filepos);
       if (++i == MAX_QUEUE_LEN)
         break;
-    }
-    while (thread != last);
+    } while (thread != last);
 
   i=0;
   thread=last=waiting_for_block.last_thread;
@@ -2684,8 +3042,7 @@ static void keycache_dump(KEY_CACHE *key
         (uint) hash_link->file,(ulong) hash_link->diskpos);
       if (++i == MAX_QUEUE_LEN)
         break;
-    }
-    while (thread != last);
+    } while (thread != last);
 
   for (i=0 ; i< keycache->blocks_used ; i++)
   {
@@ -2710,8 +3067,7 @@ static void keycache_dump(KEY_CACHE *key
                   "thread:%u\n", thread->id);
           if (++i == MAX_QUEUE_LEN)
             break;
-        }
-        while (thread != last);
+        } while (thread != last);
       }
     }
   }
@@ -2724,8 +3080,7 @@ static void keycache_dump(KEY_CACHE *key
       block= block->next_used;
       fprintf(keycache_dump_file,
               "block:%u, ", BLOCK_NUMBER(block));
-    }
-    while (block != keycache->used_last);
+    } while (block != keycache->used_last);
   }
   fprintf(keycache_dump_file, "\n");
 
@@ -2853,3 +3208,4 @@ void keycache_debug_log_close(void)
 #endif /* defined(KEYCACHE_DEBUG_LOG) */
 
 #endif /* defined(KEYCACHE_DEBUG) */
+

--- 1.6/include/keycache.h	2005-09-14 13:18:11 +02:00
+++ 1.7/include/keycache.h	2006-06-01 15:31:03 +02:00
@@ -73,6 +73,7 @@ typedef struct st_key_cache
   BLOCK_LINK *used_ins;          /* ptr to the insertion block in LRU chain  */
   pthread_mutex_t cache_lock;    /* to lock access to the cache structure    */
   KEYCACHE_WQUEUE resize_queue;  /* threads waiting during resize operation  */
+  KEYCACHE_WQUEUE resizer;       /* thread currently resizing the key cache  */
   KEYCACHE_WQUEUE waiting_for_hash_link; /* waiting for a free hash link     */
   KEYCACHE_WQUEUE waiting_for_block;    /* requests waiting for a free block */
   BLOCK_LINK *changed_blocks[CHANGED_BLOCKS_HASH]; /* hash for dirty file bl.*/
Thread
bk commit into 5.0 tree (ingo:1.2138) BUG#17332ingo1 Jun