List:Commits« Previous MessageNext Message »
From:ingo Date:August 5 2006 10:41am
Subject:bk commit into 4.1 tree (istruewing:1.2521) BUG#8283
View as plain text  
Below is the list of changes that have just been committed into a local
4.1 repository of istruewing. When istruewing does a push these changes will
be propagated to the main repository and, within 24 hours after the
push, to the public repository.
For information on how to access the public repository
see http://dev.mysql.com/doc/mysql/en/installing-source-tree.html

ChangeSet@stripped, 2006-08-05 12:41:11+02:00, istruewing@stripped +9 -0
  Bug#8283 - OPTIMIZE TABLE causes data loss
  
  [Not to be pushed. There is an issue with myisamchk -p left.
   I will fix it later. But please review the new design.]
  
  OPTIMIZE TABLE with myisam_repair_threads > 1 performs a non-quick 
  parallel repair. This means that it does not only rebuild all 
  indexes, but also the data file.
  
  Non-quick parallel repair works so that there is one thread per 
  index. The first of the threads rebuilds also the new data file.
  
  The problem was that all threads shared the read io cache on the
  old data file. If there were holes (deleted records) in the table,
  the first thread skipped them, writing only contiguous, non-deleted
  records to the new data file. Then it built the new index so that
  its entries pointed to the correct record positions. But the other
  threads didn't know the new record positions, but put the positions
  from the old data file into the index.
  
  The new design is so that there is a shared io cache which is filled
  by the first thread and read by the other threads. Now they know the
  new record positions.

  include/my_sys.h@stripped, 2006-08-05 12:41:08+02:00, istruewing@stripped +13 -7
    Bug#8283 - OPTIMIZE TABLE causes data loss
    Redesign of io_cache_share.

  myisam/mi_cache.c@stripped, 2006-08-05 12:41:08+02:00, istruewing@stripped +15 -1
    Bug#8283 - OPTIMIZE TABLE causes data loss
    Fixed implications of the new parallel repair design.

  myisam/mi_check.c@stripped, 2006-08-05 12:41:08+02:00, istruewing@stripped +148 -18
    Bug#8283 - OPTIMIZE TABLE causes data loss
    Implemented a new parallel repair design.

  myisam/mi_open.c@stripped, 2006-08-05 12:41:08+02:00, istruewing@stripped +4 -1
    Bug#8283 - OPTIMIZE TABLE causes data loss
    Added DBUG output.

  myisam/myisamdef.h@stripped, 2006-08-05 12:41:09+02:00, istruewing@stripped +9 -2
    Bug#8283 - OPTIMIZE TABLE causes data loss
    Added DBUG output.

  myisam/sort.c@stripped, 2006-08-05 12:41:09+02:00, istruewing@stripped +78 -53
    Bug#8283 - OPTIMIZE TABLE causes data loss
    Added implications of the new parallel repair design.
    Renamed 'info' -> 'sort_param'.
    Added DBUG output.

  mysql-test/r/myisam.result@stripped, 2006-08-05 12:41:09+02:00, istruewing@stripped +126 -0
    Bug#8283 - OPTIMIZE TABLE causes data loss
    Added test results.

  mysql-test/t/myisam.test@stripped, 2006-08-05 12:41:09+02:00, istruewing@stripped +93 -0
    Bug#8283 - OPTIMIZE TABLE causes data loss
    Added test cases.

  mysys/mf_iocache.c@stripped, 2006-08-05 12:41:09+02:00, istruewing@stripped +475 -79
    Bug#8283 - OPTIMIZE TABLE causes data loss
    Redesign of io_cache_share.

# This is a BitKeeper patch.  What follows are the unified diffs for the
# set of deltas contained in the patch.  The rest of the patch, the part
# that BitKeeper cares about, is below these diffs.
# User:	istruewing
# Host:	chilla.local
# Root:	/home/mydev/mysql-4.1-bug8283

--- 1.144/include/my_sys.h	2006-08-05 12:41:16 +02:00
+++ 1.145/include/my_sys.h	2006-08-05 12:41:16 +02:00
@@ -326,11 +326,17 @@ typedef int (*IO_CACHE_CALLBACK)(struct 
 typedef struct st_io_cache_share
 {
   /* to sync on reads into buffer */
-  pthread_mutex_t mutex;
-  pthread_cond_t  cond;
-  int             count, total;
-  /* actual IO_CACHE that filled the buffer */
-  struct st_io_cache *active;
+  pthread_mutex_t       mutex;
+  pthread_cond_t        cond;
+  int                   running_threads; /* threads not in lock. */
+  int                   total_threads;   /* threads sharing the cache. */
+  int                   error;           /* Last error. */
+  byte                  *buffer;         /* The read buffer. */
+  byte                  *read_end;       /* Behind last valid byte of buffer. */
+  /* Offset in file corresponding to the first byte of buffer. */
+  my_off_t              pos_in_file;
+  /* If a synchronized write cache is the source of the data. */
+  struct st_io_cache    *source_cache;
 #ifdef NOT_YET_IMPLEMENTED
   /* whether the structure should be free'd */
   my_bool alloced;
@@ -672,8 +678,8 @@ extern void setup_io_cache(IO_CACHE* inf
 extern int _my_b_read(IO_CACHE *info,byte *Buffer,uint Count);
 #ifdef THREAD
 extern int _my_b_read_r(IO_CACHE *info,byte *Buffer,uint Count);
-extern void init_io_cache_share(IO_CACHE *info,
-				IO_CACHE_SHARE *s, uint num_threads);
+extern void init_io_cache_share(IO_CACHE *read_cache, IO_CACHE_SHARE *cshare,
+                                IO_CACHE *write_cache, uint num_threads);
 extern void remove_io_thread(IO_CACHE *info);
 #endif
 extern int _my_b_seq_read(IO_CACHE *info,byte *Buffer,uint Count);

--- 1.14/myisam/mi_cache.c	2006-08-05 12:41:16 +02:00
+++ 1.15/myisam/mi_cache.c	2006-08-05 12:41:16 +02:00
@@ -50,6 +50,13 @@ int _mi_read_cache(IO_CACHE *info, byte 
     if ((my_off_t) read_length > (my_off_t) (info->pos_in_file-pos))
       read_length=(uint) (info->pos_in_file-pos);
     info->seek_not_done=1;
+    /*
+      For non-quick parallel repair, info->file can be -1. In this case
+      we must not come here. We come here only if we want to read below
+      the cache buffer start. This must not happen with non-quick
+      prarllel repair.
+    */
+    DBUG_ASSERT(info->file >= 0);
     if (my_pread(info->file,buff,read_length,pos,MYF(MY_NABP)))
       DBUG_RETURN(1);
     if (!(length-=read_length))
@@ -71,7 +78,14 @@ int _mi_read_cache(IO_CACHE *info, byte 
   }
   else
     in_buff_length=0;
-  if (flag & READING_NEXT)
+  /*
+    For non-quick parallel repair, info->file can be -1. In this case we
+    must read through the cache. We are reading from a buffer that was
+    filled by the write thread. It writes the new data file so that
+    record parts are contiguous. There is no need to read besides the
+    cache buffer.
+  */
+  if ((flag & READING_NEXT) || (info->file < 0))
   {
     if (pos != (info->pos_in_file +
 		(uint) (info->read_end - info->request_pos)))

--- 1.157/myisam/mi_check.c	2006-08-05 12:41:16 +02:00
+++ 1.158/myisam/mi_check.c	2006-08-05 12:41:16 +02:00
@@ -2352,6 +2352,28 @@ err:
     Each key is handled by a separate thread.
     TODO: make a number of threads a parameter
 
+    In parallel repair we use one thread per index. There are two modes:
+
+    Quick
+
+      Only the indexes are rebuilt. All threads share a read buffer.
+      Every thread that needs fresh data in the buffer enters the shared
+      cache lock. The last thread joining the lock reads the buffer from
+      the data file and wakes all other threads.
+
+    Non-quick
+
+      The data file is rebuilt and all indexes are rebuilt to point to
+      the new record positions. One thread is the master thread. It
+      reads from the old data file and writes to the new data file. It
+      also creates one of the indexes. The other threads read from a
+      buffer which is filled by the master. If they need fresh data,
+      they enter the shared cache lock. If the masters write buffer is
+      full, it flushes it to the new data file and enters the shared
+      cache lock too. When all threads joined in the lock, the master
+      copies its write buffer to the read buffer for the other threads
+      and wakes them.
+
   RESULT
     0	ok
     <>0	Error
@@ -2374,6 +2396,7 @@ int mi_repair_parallel(MI_CHECK *param, 
   ulong   *rec_per_key_part;
   HA_KEYSEG *keyseg;
   char llbuff[22];
+  IO_CACHE new_data_cache; /* For non-quick repair. */
   IO_CACHE_SHARE io_share;
   SORT_INFO sort_info;
   ulonglong key_map=share->state.key_map;
@@ -2395,19 +2418,51 @@ int mi_repair_parallel(MI_CHECK *param, 
   if (info->s->options & (HA_OPTION_CHECKSUM | HA_OPTION_COMPRESS_RECORD))
     param->testflag|=T_CALC_CHECKSUM;
 
+  /*
+    Quick repair (not touching data file, rebuilding indexes):
+    {
+      Read  cache is (MI_CHECK *param)->read_cache using info->dfile.
+    }
+
+    Non-quick repair (rebuilding data file and indexes):
+    {
+      Master thread:
+
+        Read  cache is (MI_CHECK *param)->read_cache using info->dfile.
+        Write cache is (MI_INFO   *info)->rec_cache  using new_file.
+
+      Slave threads:
+
+        Read  cache is new_data_cache synced to master rec_cache.
+
+      The final assignment of the filedescriptors is done after the
+      cache creation.
+
+      Don't check file size on new_data_cache, as the resulting file size
+      is not known yet.
+
+      As rec_cache and new_data_cache are synced, write_buffer_length is
+      used for the read cache 'new_data_cache'. Both start at the same
+      position 'new_header_length'.
+    }
+  */
+  DBUG_PRINT("info", ("is quick repair: %d", rep_quick));
   bzero((char*)&sort_info,sizeof(sort_info));
   if (!(sort_info.key_block=
-	alloc_key_blocks(param,
-			 (uint) param->sort_key_blocks,
-			 share->base.max_key_block_length))
-      || init_io_cache(&param->read_cache,info->dfile,
-		       (uint) param->read_buffer_length,
-		       READ_CACHE,share->pack.header_length,1,MYF(MY_WME)) ||
-      (! rep_quick &&
-       init_io_cache(&info->rec_cache,info->dfile,
-		     (uint) param->write_buffer_length,
-		     WRITE_CACHE,new_header_length,1,
-		     MYF(MY_WME | MY_WAIT_IF_FULL) & param->myf_rw)))
+	alloc_key_blocks(param, (uint) param->sort_key_blocks,
+			 share->base.max_key_block_length)) ||
+      init_io_cache(&param->read_cache, info->dfile,
+                    (uint) param->read_buffer_length,
+                    READ_CACHE, share->pack.header_length, 1, MYF(MY_WME)) ||
+      (!rep_quick &&
+       (init_io_cache(&info->rec_cache, info->dfile,
+                      (uint) param->write_buffer_length,
+                      WRITE_CACHE, new_header_length, 1,
+                      MYF(MY_WME | MY_WAIT_IF_FULL) & param->myf_rw) ||
+        init_io_cache(&new_data_cache, info->dfile,
+                      (uint) param->write_buffer_length,
+                      READ_CACHE, new_header_length, 1,
+                      MYF(MY_WME | MY_DONT_CHECK_FILESIZE)))))
     goto err;
   sort_info.key_block_end=sort_info.key_block+param->sort_key_blocks;
   info->opt_flag|=WRITE_CACHE_USED;
@@ -2440,6 +2495,8 @@ int mi_repair_parallel(MI_CHECK *param, 
     }
     share->state.dellink= HA_OFFSET_ERROR;
     info->rec_cache.file=new_file;
+    /* This read cache is filled from the write cache, not from a file. */
+    new_data_cache.file= -1;
   }
 
   info->update= (short) (HA_STATE_CHANGED | HA_STATE_ROW_CHANGED);
@@ -2590,13 +2647,42 @@ int mi_repair_parallel(MI_CHECK *param, 
   pthread_cond_init(&sort_info.cond, 0);
   pthread_mutex_lock(&sort_info.mutex);
 
-  init_io_cache_share(&param->read_cache, &io_share, i);
+  /*
+    Initialize the I/O cache share for use with the read caches and, in
+    case of non-quick repair, the write cache. When all threads join on
+    the cache lock, the writer copies the write cache contents to the
+    read caches.
+  */
+  if (i > 1)
+  {
+    if (rep_quick)
+      init_io_cache_share(&param->read_cache, &io_share, NULL, i);
+    else
+      init_io_cache_share(&new_data_cache, &io_share, &info->rec_cache, i);
+  }
+  else
+    io_share.total_threads= 0; /* share not used */
+
   (void) pthread_attr_init(&thr_attr);
   (void) pthread_attr_setdetachstate(&thr_attr,PTHREAD_CREATE_DETACHED);
 
   for (i=0 ; i < sort_info.total_keys ; i++)
   {
-    sort_param[i].read_cache=param->read_cache;
+    /*
+      Copy the properly initialized IO_CACHE structure so that every
+      thread has its own copy. In quick mode param->read_cache is shared
+      for use by all threads. In non-quick mode all threads but the
+      first copy the shared new_data_cache, which is synchronized to the
+      write cache of the first thread. The first thread copies
+      param->read_cache, which is not shared.
+    */
+    if (rep_quick || !i)
+      sort_param[i].read_cache= param->read_cache;
+    else
+      sort_param[i].read_cache= new_data_cache;
+    DBUG_PRINT("io_cache_share", ("thread: %u  read_cache: 0x%lx",
+                                  i, (long) &sort_param[i].read_cache));
+
     /*
       two approaches: the same amount of memory for each thread
       or the memory for the same number of keys for each thread...
@@ -2614,7 +2700,9 @@ int mi_repair_parallel(MI_CHECK *param, 
 		       (void *) (sort_param+i)))
     {
       mi_check_print_error(param,"Cannot start a repair thread");
-      remove_io_thread(&param->read_cache);
+      /* Cleanup: Detach from the share. Avoid others to be blocked. */
+      if (io_share.total_threads)
+        remove_io_thread(&sort_param[i].read_cache);
       sort_info.got_error=1;
     }
     else
@@ -2636,7 +2724,11 @@ int mi_repair_parallel(MI_CHECK *param, 
 
   if (sort_param[0].fix_datafile)
   {
-    if (write_data_suffix(&sort_info,1) || end_io_cache(&info->rec_cache))
+    /*
+      Destroy the write cache. The master thread did already detach from
+      the share by remove_io_thread().
+    */
+    if (end_io_cache(&info->rec_cache))
       goto err;
     if (param->testflag & T_SAFE_REPAIR)
     {
@@ -2651,8 +2743,14 @@ int mi_repair_parallel(MI_CHECK *param, 
       sort_param->filepos;
     /* Only whole records */
     share->state.version=(ulong) time((time_t*) 0);
+
+    /*
+      Exchange the data file descriptor of the table, so that we use the
+      new file from now on.
+     */
     my_close(info->dfile,MYF(0));
     info->dfile=new_file;
+
     share->data_file_type=sort_info.new_data_file_type;
     share->pack.header_length=(ulong) new_header_length;
   }
@@ -2707,7 +2805,18 @@ int mi_repair_parallel(MI_CHECK *param, 
 
 err:
   got_error|= flush_blocks(param, share->key_cache, share->kfile);
+  /*
+    Destroy the write cache. The master thread did already detach from
+    the share by remove_io_thread() or it was not yet started.
+  */
   VOID(end_io_cache(&info->rec_cache));
+  /*
+    Destroy the new data cache in case of non-quick repair. All slave
+    threads did either detach from the share by remove_io_thread()
+    already or they were not yet started.
+  */
+  if (!rep_quick)
+    VOID(end_io_cache(&new_data_cache));
   if (!got_error)
   {
     /* Replace the actual file with the temporary file */
@@ -3103,9 +3212,30 @@ static int sort_get_next_record(MI_SORT_
 			      llstr(sort_param->start_recpos,llbuff));
 	  goto try_next;
 	}
-	if (_mi_read_cache(&sort_param->read_cache,to,block_info.filepos,
-			   block_info.data_len,
-			   (found_record == 1 ? READING_NEXT : 0)))
+        /*
+          Copy information that is already read. Avoid accessing data
+          below the cache start. This could happen if the header
+          streched over the end of the previous buffer contents.
+        */
+        {
+          uint header_len= (uint) (block_info.filepos - pos);
+          uint prefetch_len= (MI_BLOCK_INFO_HEADER_LENGTH - header_len);
+
+          if (prefetch_len > block_info.data_len)
+            prefetch_len= block_info.data_len;
+          if (prefetch_len)
+          {
+            memcpy(to, block_info.header + header_len, prefetch_len);
+            block_info.filepos+= prefetch_len;
+            block_info.data_len-= prefetch_len;
+            left_length-= prefetch_len;
+            to+= prefetch_len;
+          }
+        }
+        if (block_info.data_len &&
+            _mi_read_cache(&sort_param->read_cache,to,block_info.filepos,
+                           block_info.data_len,
+                           (found_record == 1 ? READING_NEXT : 0)))
 	{
 	  mi_check_print_info(param,
 			      "Read error for block at: %s (error: %d); Skipped",

--- 1.85/myisam/mi_open.c	2006-08-05 12:41:16 +02:00
+++ 1.86/myisam/mi_open.c	2006-08-05 12:41:16 +02:00
@@ -201,7 +201,10 @@ MI_INFO *mi_open(const char *name, int m
 	 ((open_flags & HA_OPEN_ABORT_IF_CRASHED) &&
 	  (my_disable_locking && share->state.open_count))))
     {
-      DBUG_PRINT("error",("Table is marked as crashed"));
+      DBUG_PRINT("error",("Table is marked as crashed. open_flags: %u  "
+                          "changed: %u  open_count: %u  !locking: %d",
+                          open_flags, share->state.changed,
+                          share->state.open_count, my_disable_locking));
       my_errno=((share->state.changed & STATE_CRASHED_ON_REPAIR) ?
 		HA_ERR_CRASHED_ON_REPAIR : HA_ERR_CRASHED_ON_USAGE);
       goto err;

--- 1.81/myisam/myisamdef.h	2006-08-05 12:41:16 +02:00
+++ 1.82/myisam/myisamdef.h	2006-08-05 12:41:16 +02:00
@@ -361,8 +361,15 @@ typedef struct st_mi_sort_param
 #define mi_putint(x,y,nod) { uint16 boh=(nod ? (uint16) 32768 : 0) + (uint16) (y);\
 			  mi_int2store(x,boh); }
 #define mi_test_if_nod(x) (x[0] & 128 ? info->s->base.key_reflength : 0)
-#define mi_mark_crashed(x) (x)->s->state.changed|=STATE_CRASHED
-#define mi_mark_crashed_on_repair(x) { (x)->s->state.changed|=STATE_CRASHED|STATE_CRASHED_ON_REPAIR ; (x)->update|= HA_STATE_CHANGED; }
+#define mi_mark_crashed(x) do{(x)->s->state.changed|= STATE_CRASHED; \
+                              DBUG_PRINT("error", ("Marked table crashed"));} \
+                           while(0)
+#define mi_mark_crashed_on_repair(x) do{(x)->s->state.changed|= \
+                                        STATE_CRASHED|STATE_CRASHED_ON_REPAIR; \
+                                        (x)->update|= HA_STATE_CHANGED; \
+                                        DBUG_PRINT("error", \
+                                                   ("Marked table crashed"));} \
+                                     while(0)
 #define mi_is_crashed(x) ((x)->s->state.changed & STATE_CRASHED)
 #define mi_is_crashed_on_repair(x) ((x)->s->state.changed & STATE_CRASHED_ON_REPAIR)
 

--- 1.45/myisam/sort.c	2006-08-05 12:41:16 +02:00
+++ 1.46/myisam/sort.c	2006-08-05 12:41:16 +02:00
@@ -309,7 +309,7 @@ static ha_rows NEAR_F find_all_keys(MI_S
 
 pthread_handler_decl(thr_find_all_keys,arg)
 {
-  MI_SORT_PARAM *info= (MI_SORT_PARAM*) arg;
+  MI_SORT_PARAM *sort_param= (MI_SORT_PARAM*) arg;
   int error;
   uint memavl,old_memavl,keys,sort_length;
   uint idx, maxbuffer;
@@ -321,32 +321,33 @@ pthread_handler_decl(thr_find_all_keys,a
 
   if (my_thread_init())
     goto err;
-  if (info->sort_info->got_error)
+  DBUG_ENTER("thr_find_all_keys");
+  if (sort_param->sort_info->got_error)
     goto err;
 
-  if (info->keyinfo->flag && HA_VAR_LENGTH_KEY)
+  if (sort_param->keyinfo->flag && HA_VAR_LENGTH_KEY)
   {
-    info->write_keys=write_keys_varlen;
-    info->read_to_buffer=read_to_buffer_varlen;
-    info->write_key=write_merge_key_varlen;
+    sort_param->write_keys=     write_keys_varlen;
+    sort_param->read_to_buffer= read_to_buffer_varlen;
+    sort_param->write_key=      write_merge_key_varlen;
   }
   else
   {
-    info->write_keys=write_keys;
-    info->read_to_buffer=read_to_buffer;
-    info->write_key=write_merge_key;
+    sort_param->write_keys=     write_keys;
+    sort_param->read_to_buffer= read_to_buffer;
+    sort_param->write_key=      write_merge_key;
   }
 
-  my_b_clear(&info->tempfile);
-  my_b_clear(&info->tempfile_for_exceptions);
-  bzero((char*) &info->buffpek,sizeof(info->buffpek));
-  bzero((char*) &info->unique, sizeof(info->unique));
+  my_b_clear(&sort_param->tempfile);
+  my_b_clear(&sort_param->tempfile_for_exceptions);
+  bzero((char*) &sort_param->buffpek, sizeof(sort_param->buffpek));
+  bzero((char*) &sort_param->unique,  sizeof(sort_param->unique));
   sort_keys= (uchar **) NULL;
 
-  memavl=max(info->sortbuff_size, MIN_SORT_MEMORY);
-  idx=      info->sort_info->max_records;
-  sort_length=  info->key_length;
-  maxbuffer= 1;
+  memavl=       max(sort_param->sortbuff_size, MIN_SORT_MEMORY);
+  idx=          sort_param->sort_info->max_records;
+  sort_length=  sort_param->key_length;
+  maxbuffer=    1;
 
   while (memavl >= MIN_SORT_MEMORY)
   {
@@ -363,18 +364,19 @@ pthread_handler_decl(thr_find_all_keys,a
             (keys=(memavl-sizeof(BUFFPEK)*maxbuffer)/
              (sort_length+sizeof(char*))) <= 1)
         {
-          mi_check_print_error(info->sort_info->param,
+          mi_check_print_error(sort_param->sort_info->param,
                                "sort_buffer_size is to small");
           goto err;
         }
       }
       while ((maxbuffer= (int) (idx/(keys-1)+1)) != skr);
     }
-    if ((sort_keys=(uchar **)my_malloc(keys*(sort_length+sizeof(char*))+
-				       ((info->keyinfo->flag & HA_FULLTEXT) ?
-					HA_FT_MAXBYTELEN : 0), MYF(0))))
+    if ((sort_keys= (uchar**)
+         my_malloc(keys*(sort_length+sizeof(char*))+
+                   ((sort_param->keyinfo->flag & HA_FULLTEXT) ?
+                    HA_FT_MAXBYTELEN : 0), MYF(0))))
     {
-      if (my_init_dynamic_array(&info->buffpek, sizeof(BUFFPEK),
+      if (my_init_dynamic_array(&sort_param->buffpek, sizeof(BUFFPEK),
 				maxbuffer, maxbuffer/2))
         my_free((gptr) sort_keys,MYF(0));
       else
@@ -386,71 +388,93 @@ pthread_handler_decl(thr_find_all_keys,a
   }
   if (memavl < MIN_SORT_MEMORY)
   {
-    mi_check_print_error(info->sort_info->param,"Sort buffer to small"); /* purecov: tested */
+    mi_check_print_error(sort_param->sort_info->param, "Sort buffer too small");
     goto err; /* purecov: tested */
   }
 
-  if (info->sort_info->param->testflag & T_VERBOSE)
-    printf("Key %d - Allocating buffer for %d keys\n",info->key+1,keys);
-  info->sort_keys=sort_keys;
+  if (sort_param->sort_info->param->testflag & T_VERBOSE)
+    printf("Key %d - Allocating buffer for %d keys\n",
+           sort_param->key + 1, keys);
+  sort_param->sort_keys= sort_keys;
 
   idx=error=0;
   sort_keys[0]=(uchar*) (sort_keys+keys);
 
-  while (!(error=info->sort_info->got_error) &&
-         !(error=(*info->key_read)(info,sort_keys[idx])))
+  DBUG_PRINT("info", ("reading keys"));
+  while (!(error= sort_param->sort_info->got_error) &&
+         !(error= (*sort_param->key_read)(sort_param, sort_keys[idx])))
   {
-    if (info->real_key_length > info->key_length)
+    if (sort_param->real_key_length > sort_param->key_length)
     {
-      if (write_key(info,sort_keys[idx], &info->tempfile_for_exceptions))
+      if (write_key(sort_param, sort_keys[idx],
+                    &sort_param->tempfile_for_exceptions))
         goto err;
       continue;
     }
 
     if (++idx == keys)
     {
-      if (info->write_keys(info,sort_keys,idx-1,
-		     (BUFFPEK *)alloc_dynamic(&info->buffpek),
-		     &info->tempfile))
+      if (sort_param->write_keys(sort_param, sort_keys, idx - 1,
+                                 (BUFFPEK*) alloc_dynamic(&sort_param->buffpek),
+                                 &sort_param->tempfile))
         goto err;
       sort_keys[0]=(uchar*) (sort_keys+keys);
-      memcpy(sort_keys[0],sort_keys[idx-1],(size_t) info->key_length);
+      memcpy(sort_keys[0], sort_keys[idx - 1], (size_t) sort_param->key_length);
       idx=1;
     }
-    sort_keys[idx]=sort_keys[idx-1]+info->key_length;
+    sort_keys[idx]= sort_keys[idx - 1] + sort_param->key_length;
   }
   if (error > 0)
     goto err;
-  if (info->buffpek.elements)
+  if (sort_param->buffpek.elements)
   {
-    if (info->write_keys(info,sort_keys, idx,
-		   (BUFFPEK *) alloc_dynamic(&info->buffpek), &info->tempfile))
+    if (sort_param->write_keys(sort_param, sort_keys, idx,
+                               (BUFFPEK*) alloc_dynamic(&sort_param->buffpek),
+                               &sort_param->tempfile))
       goto err;
-    info->keys=(info->buffpek.elements-1)*(keys-1)+idx;
+    sort_param->keys= (sort_param->buffpek.elements - 1) * (keys - 1) + idx;
   }
   else
-    info->keys=idx;
+    sort_param->keys= idx;
 
-  info->sort_keys_length=keys;
+  sort_param->sort_keys_length= keys;
   goto ok;
 
 err:
-  info->sort_info->got_error=1; /* no need to protect this with a mutex */
+  DBUG_PRINT("error", ("got some error"));
+  sort_param->sort_info->got_error= 1; /* no need to protect with a mutex */
   if (sort_keys)
     my_free((gptr) sort_keys,MYF(0));
-  info->sort_keys=0;
-  delete_dynamic(& info->buffpek);
-  close_cached_file(&info->tempfile);
-  close_cached_file(&info->tempfile_for_exceptions);
+  sort_param->sort_keys= 0;
+  delete_dynamic(& sort_param->buffpek);
+  close_cached_file(&sort_param->tempfile);
+  close_cached_file(&sort_param->tempfile_for_exceptions);
 
 ok:
-  remove_io_thread(&info->read_cache);
-  pthread_mutex_lock(&info->sort_info->mutex);
-  info->sort_info->threads_running--;
-  pthread_cond_signal(&info->sort_info->cond);
-  pthread_mutex_unlock(&info->sort_info->mutex);
+  if (sort_param->fix_datafile && sort_param->master)
+  {
+    MI_INFO *info= sort_param->sort_info->info;
+    write_data_suffix(sort_param->sort_info, 1);
+    flush_io_cache(&info->rec_cache);
+    /*
+      Detach from the share if the writer is involved. Avoid others to
+      be blocked. This will also indicate EOF to the readers.
+    */
+    if (info->rec_cache.share)
+      remove_io_thread(&info->rec_cache);
+  }
+  /* Detach from the share if any. Avoid others to be blocked. */
+  if (sort_param->read_cache.share)
+    remove_io_thread(&sort_param->read_cache);
+
+  pthread_mutex_lock(&sort_param->sort_info->mutex);
+  sort_param->sort_info->threads_running--;
+  pthread_cond_signal(&sort_param->sort_info->cond);
+  pthread_mutex_unlock(&sort_param->sort_info->mutex);
+
+  DBUG_PRINT("exit", ("======== ending thread ========"));
   my_thread_end();
-  return NULL;
+  DBUG_RETURN(NULL);
 }
 
 
@@ -466,6 +490,7 @@ int thr_write_keys(MI_SORT_PARAM *sort_p
   MYISAM_SHARE *share=info->s;
   MI_SORT_PARAM *sinfo;
   byte *mergebuf=0;
+  DBUG_ENTER("thr_write_keys");
   LINT_INIT(length);
 
   for (i= 0, sinfo= sort_param ;
@@ -604,7 +629,7 @@ int thr_write_keys(MI_SORT_PARAM *sort_p
     }
   }
   my_free((gptr) mergebuf,MYF(MY_ALLOW_ZERO_PTR));
-  return got_error;
+  DBUG_RETURN(got_error);
 }
 #endif /* THREAD */
 

--- 1.48/mysys/mf_iocache.c	2006-08-05 12:41:16 +02:00
+++ 1.49/mysys/mf_iocache.c	2006-08-05 12:41:16 +02:00
@@ -70,6 +70,10 @@ static void my_aiowait(my_aio_result *re
 #define IO_ROUND_UP(X) (((X)+IO_SIZE-1) & ~(IO_SIZE-1))
 #define IO_ROUND_DN(X) ( (X)            & ~(IO_SIZE-1))
 
+#ifdef THREAD
+static void copy_to_read_buffer(IO_CACHE *write_cache,
+                                const byte *write_buffer, uint write_length);
+#endif
 
 /*
   Setup internal pointers inside IO_CACHE
@@ -500,65 +504,336 @@ int _my_b_read(register IO_CACHE *info, 
   DBUG_RETURN(0);
 }
 
+
 #ifdef THREAD
-/* Prepare IO_CACHE for shared use */
-void init_io_cache_share(IO_CACHE *info, IO_CACHE_SHARE *s, uint num_threads)
+/*
+  Prepare IO_CACHE for shared use.
+
+  SYNOPSIS
+    init_io_cache_share()
+      read_cache                A read cache. This will be copied for
+                                every thread after setup.
+      cshare                    The share.
+      write_cache               If non-NULL a write cache that is to be
+                                synchronized with the read caches.
+      num_threads               Number of threads sharing the cache
+                                including the write thread if any.
+
+  DESCRIPTION
+
+    The shared cache is used so: One IO_CACHE is initialized with
+    init_io_cache(). This includes the allocation of a buffer. Then a
+    share is allocated and init_io_cache_share() is called with the io
+    cache and the share. Then the io cache is copied for each thread. So
+    every thread has its own copy of IO_CACHE. But the allocated buffer
+    is shared because cache->buffer is the same for all caches.
+
+    One thread reads data from the file into the buffer. All threads
+    read from the buffer, but every thread maintains its own set of
+    pointers into the buffer. When all threads have used up the buffer
+    contents, one of the threads reads the next block of data into the
+    buffer. To accomplish this, each thread enters the cache lock before
+    accessing the buffer. They wait in lock_io_cache() until all threads
+    joined the lock. The last thread entering the lock is in charge of
+    reading from file to buffer. It wakes all threads when done.
+
+    Synchronizing a write cache to the read caches works so: Whenever
+    the write buffer needs a flush, the write thread enters the lock and
+    waits for all other threads to enter the lock too. They do this when
+    they have used up the read buffer. When all threads are in the lock,
+    the write thread copies the write buffer to the read buffer and
+    wakes all threads.
+
+    share->running_threads is the number of threads not being in the
+    cache lock. When entering lock_io_cache() the number is decreased.
+    When the thread that fills the buffer enters unlock_io_cache() the
+    number is reset to the number of threads. The condition
+    running_threads == 0 means that all threads are in the lock. Bumping
+    up the number to the full count is non-intuitive. But increasing the
+    number by one for each thread that leaves the lock could lead to a
+    solo run of one thread. The last thread to join a lock reads from
+    file to buffer, wakes the other threads, processes the data in the
+    cache and enters the lock again. If no other thread left the lock
+    meanwhile, it would think it's the last one again and read the next
+    block...
+
+    The share has copies of 'error', 'buffer', 'read_end', and
+    'pos_in_file' from the thread that filled the buffer. We may not be
+    able to access this information directly from its cache because the
+    thread may be removed from the share before the variables could be
+    copied by all other threads. Or, if a write buffer is synchronized,
+    it would change its 'pos_in_file' after waking the other threads,
+    possibly before they could copy its value.
+
+    However, the 'buffer' variable in the share is for a synchronized
+    write cache. It needs to know where to put the data. Otherwise it
+    would need access to the read cache of one of the threads that is
+    not yet removed from the share.
+
+  RETURN
+    void
+*/
+
+void init_io_cache_share(IO_CACHE *read_cache, IO_CACHE_SHARE *cshare,
+                         IO_CACHE *write_cache, uint num_threads)
 {
-  DBUG_ASSERT(info->type == READ_CACHE);
-  pthread_mutex_init(&s->mutex, MY_MUTEX_INIT_FAST);
-  pthread_cond_init (&s->cond, 0);
-  s->total=s->count=num_threads-1;
-  s->active=0;
-  info->share=s;
-  info->read_function=_my_b_read_r;
-  info->current_pos= info->current_end= 0;
+  DBUG_ENTER("init_io_cache_share");
+  DBUG_PRINT("io_cache_share", ("read_cache: 0x%lx  share: 0x%lx  "
+                                "write_cache: 0x%lx  threads: %u",
+                                read_cache, cshare, write_cache, num_threads));
+
+  DBUG_ASSERT((num_threads > 1) && (read_cache->type == READ_CACHE) &&
+              (!write_cache || (write_cache->type == WRITE_CACHE)));
+
+  pthread_mutex_init(&cshare->mutex, MY_MUTEX_INIT_FAST);
+  pthread_cond_init(&cshare->cond, 0);
+
+  cshare->running_threads= num_threads;
+  cshare->total_threads=   num_threads;
+  cshare->error=           read_cache->error;
+  cshare->buffer=          read_cache->buffer;
+  cshare->read_end=        NULL; /* Buffer contents is not yet valid. */
+  cshare->pos_in_file=     0;
+  cshare->source_cache=    write_cache; /* Can be NULL. */
+
+  read_cache->share=         cshare;
+  read_cache->read_function= _my_b_read_r;
+  read_cache->current_pos=   NULL;
+  read_cache->current_end=   NULL;
+
+  if (write_cache)
+    write_cache->share= cshare;
+
+  DBUG_VOID_RETURN;
 }
 
+
 /*
-  Remove a thread from shared access to IO_CACHE
-  Every thread should do that on exit for not
-  to deadlock other threads
+  Remove a thread from shared access to IO_CACHE.
+
+  SYNOPSIS
+    remove_io_thread()
+      cache                     The IO_CACHE to be removed from the share.
+
+  NOTE
+
+    Every thread must do that on exit for not to deadlock other threads.
+
+  RETURN
+    void
 */
-void remove_io_thread(IO_CACHE *info)
+
+void remove_io_thread(IO_CACHE *cache)
 {
-  IO_CACHE_SHARE *s=info->share;
+  IO_CACHE_SHARE *cshare= cache->share;
+  uint total;
+  DBUG_ENTER("remove_io_thread");
+
+  pthread_mutex_lock(&cshare->mutex);
+  DBUG_PRINT("io_cache_share", ("%s: 0x%lx",
+                                (cache == cshare->source_cache) ?
+                                "writer" : "reader", cache));
+
+  /* Remove from share. */
+  total= --cshare->total_threads;
+  DBUG_PRINT("io_cache_share", ("remaining threads: %u", total));
+
+  /* Detach from share. */
+  cache->share= NULL;
+
+  /* If the writer goes, let the readers know. */
+  if (cache == cshare->source_cache)
+  {
+    DBUG_PRINT("io_cache_share", ("writer leaves"));
+    cshare->source_cache= NULL;
+  }
+
+  /* If all threads are waiting for me to join the lock, wake them. */
+  if (!--cshare->running_threads)
+  {
+    DBUG_PRINT("io_cache_share", ("the last running thread leaves, wake all"));
+    pthread_cond_broadcast(&cshare->cond);
+    /* The last running thread can clear the buffer. Support debugging. */
+    cshare->read_end= cshare->buffer;
+  }
+
+  pthread_mutex_unlock(&cshare->mutex);
 
-  pthread_mutex_lock(&s->mutex);
-  s->total--;
-  if (! s->count--)
-    pthread_cond_signal(&s->cond);
-  pthread_mutex_unlock(&s->mutex);
+  if (!total)
+  {
+    DBUG_PRINT("io_cache_share", ("last thread removed, destroy share"));
+    pthread_cond_destroy (&cshare->cond);
+    pthread_mutex_destroy(&cshare->mutex);
+  }
 }
 
-static int lock_io_cache(IO_CACHE *info, my_off_t pos)
+
+/*
+  Lock IO cache and wait for all other threads to join.
+
+  SYNOPSIS
+    lock_io_cache()
+      cache                     The cache of the thread entering the lock.
+      pos                       File position of the block to read.
+                                Unused for the write thread.
+
+  DESCRIPTION
+
+    Wait for all threads to finish with the current buffer. We want
+    all threads to proceed in concert. The last thread to join
+    lock_io_cache() will read the block from file and all threads start
+    to use it. Then they will join again for reading the next block.
+
+    The waiting threads detect a fresh buffer by comparing
+    cshare->pos_in_file with the position they want to process next.
+    Since the first block may start at position 0, we take
+    cshare->read_end as an additional condition. This variable is
+    initialized to NULL and will be set after a block of data is written
+    to the buffer.
+
+  RETURN
+    1           OK, lock in place, go ahead and read.
+    0           OK, unlocked, another thread did the read.
+*/
+
+static int lock_io_cache(IO_CACHE *cache, my_off_t pos)
 {
-  int total;
-  IO_CACHE_SHARE *s=info->share;
+  IO_CACHE_SHARE *cshare= cache->share;
+  DBUG_ENTER("lock_io_cache");
+
+  /* Enter the lock. */
+  pthread_mutex_lock(&cshare->mutex);
+  cshare->running_threads--;
+  DBUG_PRINT("io_cache_share", ("%s: 0x%lx  pos: %lu  running: %u",
+                                (cache == cshare->source_cache) ?
+                                "writer" : "reader", cache, (ulong) pos,
+                                cshare->running_threads));
 
-  pthread_mutex_lock(&s->mutex);
-  if (!s->count)
+  if (cshare->source_cache)
   {
-    s->count=s->total;
-    return 1;
+    /* A write cache is synchronized to the read caches. */
+
+    if (cache == cshare->source_cache)
+    {
+      /* The writer waits until all readers are here. */
+      while (cshare->running_threads)
+      {
+        DBUG_PRINT("io_cache_share", ("writer waits in lock"));
+        pthread_cond_wait(&cshare->cond, &cshare->mutex);
+      }
+      DBUG_PRINT("io_cache_share", ("writer awoke, going to copy"));
+
+      /* Stay locked. Leave the lock later by unlock_io_cache(). */
+      DBUG_RETURN(1);
+    }
+
+    /* The last thread wakes the writer. */
+    if (!cshare->running_threads)
+    {
+      DBUG_PRINT("io_cache_share", ("waking writer"));
+      pthread_cond_broadcast(&cshare->cond);
+    }
+
+    /*
+      Readers wait until the data is copied from the writer. Another
+      reason to stop waiting is the removal of the write thread. If this
+      happens, we leave the lock with old data in the buffer.
+    */
+    while ((!cshare->read_end || (cshare->pos_in_file < pos)) &&
+           cshare->source_cache)
+    {
+      DBUG_PRINT("io_cache_share", ("reader waits in lock"));
+      pthread_cond_wait(&cshare->cond, &cshare->mutex);
+    }
   }
+  else
+  {
+    /*
+      There are read caches only. The last thread arriving in
+      lock_io_cache() continues with a locked cache and reads the block.
+    */
+    if (!cshare->running_threads)
+    {
+      DBUG_PRINT("io_cache_share", ("last thread joined, going to read"));
+      /* Stay locked. Leave the lock later by unlock_io_cache(). */
+      DBUG_RETURN(1);
+    }
 
-  total=s->total;
-  s->count--;
-  while (!s->active || s->active->pos_in_file < pos)
-    pthread_cond_wait(&s->cond, &s->mutex);
+    /*
+      All other threads wait until the requested block is read by the
+      last thread arriving. Another reason to stop waiting is the
+      removal of a thread. If this leads to all threads being in the
+      lock, we have to continue also. The first of the awaken threads
+      will then do the read.
+    */
+    while ((!cshare->read_end || (cshare->pos_in_file < pos)) &&
+           cshare->running_threads)
+    {
+      DBUG_PRINT("io_cache_share", ("reader waits in lock"));
+      pthread_cond_wait(&cshare->cond, &cshare->mutex);
+    }
 
-  if (s->total < total &&
-      (!s->active || s->active->pos_in_file < pos))
-    return 1;
+    /* If the block is not yet read, continue with a locked cache and read. */
+    if (!cshare->read_end || (cshare->pos_in_file < pos))
+    {
+      DBUG_PRINT("io_cache_share", ("reader awoke, going to read"));
+      /* Stay locked. Leave the lock later by unlock_io_cache(). */
+      DBUG_RETURN(1);
+    }
 
-  pthread_mutex_unlock(&s->mutex);
-  return 0;
+    /* Another thread did read the block already. */
+  }
+  DBUG_PRINT("io_cache_share", ("reader awoke, going to process %u bytes",
+                                cshare->read_end ? (uint)
+                                (cshare->read_end - cshare->buffer) : 0));
+
+  /*
+    Leave the lock. Do not call unlock_io_cache() later. The thread that
+    filled the buffer did this and marked all threads as running.
+  */
+  pthread_mutex_unlock(&cshare->mutex);
+  DBUG_RETURN(0);
 }
 
-static void unlock_io_cache(IO_CACHE *info)
+
+/*
+  Unlock IO cache.
+
+  SYNOPSIS
+    unlock_io_cache()
+      cache                     The cache of the thread leaving the lock.
+
+  NOTE
+    This is called by the thread that filled the buffer. It marks all
+    threads as running and awakes them. This must not be done by any
+    other thread.
+
+    The reason for resetting running_threads to total_threads before
+    waking all other threads is that it could be possible that this
+    thread is so fast with processing the buffer that it enters the lock
+    before even one other thread has left it. If every awoken thread
+    would increase running_threads by one, this thread could think that
+    he is again the last to join and would not wait for the other
+    threads to process the data.
+
+  RETURN
+    void
+*/
+
+static void unlock_io_cache(IO_CACHE *cache)
 {
-  pthread_cond_broadcast(&info->share->cond);
-  pthread_mutex_unlock(&info->share->mutex);
+  IO_CACHE_SHARE *cshare= cache->share;
+  DBUG_ENTER("unlock_io_cache");
+  DBUG_PRINT("io_cache_share", ("%s: 0x%lx  pos: %lu  running: %u",
+                                (cache == cshare->source_cache) ?
+                                "writer" : "reader",
+                                cache, (ulong) cshare->pos_in_file,
+                                cshare->total_threads));
+
+  cshare->running_threads= cshare->total_threads;
+  pthread_cond_broadcast(&cshare->cond);
+  pthread_mutex_unlock(&cshare->mutex);
+  DBUG_VOID_RETURN;
 }
 
 
@@ -567,7 +842,7 @@ static void unlock_io_cache(IO_CACHE *in
 
   SYNOPSIS
     _my_b_read_r()
-      info                      IO_CACHE pointer
+      cache                     IO_CACHE pointer
       Buffer                    Buffer to retrieve count bytes from file
       Count                     Number of bytes to read into Buffer
 
@@ -579,7 +854,7 @@ static void unlock_io_cache(IO_CACHE *in
 
     It works as follows: when a thread tries to read from a file (that
     is, after using all the data from the (shared) buffer), it just
-    hangs on lock_io_cache(), wating for other threads. When the very
+    hangs on lock_io_cache(), waiting for other threads. When the very
     last thread attempts a read, lock_io_cache() returns 1, the thread
     does actual IO and unlock_io_cache(), which signals all the waiting
     threads that data is in the buffer.
@@ -599,16 +874,17 @@ static void unlock_io_cache(IO_CACHE *in
     1      Error: can't read requested characters
 */
 
-int _my_b_read_r(register IO_CACHE *info, byte *Buffer, uint Count)
+int _my_b_read_r(register IO_CACHE *cache, byte *Buffer, uint Count)
 {
   my_off_t pos_in_file;
   uint length, diff_length, left_length;
+  IO_CACHE_SHARE *cshare= cache->share;
   DBUG_ENTER("_my_b_read_r");
 
-  if ((left_length= (uint) (info->read_end - info->read_pos)))
+  if ((left_length= (uint) (cache->read_end - cache->read_pos)))
   {
     DBUG_ASSERT(Count >= left_length);	/* User is not using my_b_read() */
-    memcpy(Buffer, info->read_pos, (size_t) (left_length));
+    memcpy(Buffer, cache->read_pos, (size_t) (left_length));
     Buffer+= left_length;
     Count-= left_length;
   }
@@ -616,55 +892,141 @@ int _my_b_read_r(register IO_CACHE *info
   {
     int cnt, len;
 
-    pos_in_file= info->pos_in_file + (info->read_end - info->buffer);
+    pos_in_file= cache->pos_in_file + (cache->read_end - cache->buffer);
     diff_length= (uint) (pos_in_file & (IO_SIZE-1));
     length=IO_ROUND_UP(Count+diff_length)-diff_length;
-    length=(length <= info->read_length) ?
-                   length + IO_ROUND_DN(info->read_length - length) :
-                   length - IO_ROUND_UP(length - info->read_length) ;
-    if (info->type != READ_FIFO &&
-	(length > (info->end_of_file - pos_in_file)))
-      length= (uint) (info->end_of_file - pos_in_file);
+    length= ((length <= cache->read_length) ?
+             length + IO_ROUND_DN(cache->read_length - length) :
+             length - IO_ROUND_UP(length - cache->read_length));
+    if (cache->type != READ_FIFO &&
+	(length > (cache->end_of_file - pos_in_file)))
+      length= (uint) (cache->end_of_file - pos_in_file);
     if (length == 0)
     {
-      info->error= (int) left_length;
+      cache->error= (int) left_length;
       DBUG_RETURN(1);
     }
-    if (lock_io_cache(info, pos_in_file))
+    if (lock_io_cache(cache, pos_in_file))
     {
-      info->share->active=info;
-      if (info->seek_not_done)             /* File touched, do seek */
-        VOID(my_seek(info->file,pos_in_file,MY_SEEK_SET,MYF(0)));
-      len=(int)my_read(info->file,info->buffer, length, info->myflags);
-      info->read_end=info->buffer + (len == -1 ? 0 : len);
-      info->error=(len == (int)length ? 0 : len);
-      info->pos_in_file=pos_in_file;
-      unlock_io_cache(info);
+      /* With a synchronized write/read cache we won't come here. */
+      DBUG_ASSERT(!cshare->source_cache);
+      /*
+        Unless the writer has gone before this thread entered the lock.
+        Simulate EOF in this case.
+      */
+      if (cache->file < 0)
+        len= 0;
+      else
+      {
+        if (cache->seek_not_done)             /* File touched, do seek */
+          VOID(my_seek(cache->file, pos_in_file, MY_SEEK_SET, MYF(0)));
+        len= (int) my_read(cache->file, cache->buffer, length, cache->myflags);
+      }
+      DBUG_PRINT("io_cache_share", ("read %d bytes" len));
+
+      cache->read_end=    cache->buffer + (len == -1 ? 0 : len);
+      cache->error=       (len == (int)length ? 0 : len);
+      cache->pos_in_file= pos_in_file;
+
+      /* Copy important values to the share. */
+      cshare->error=       cache->error;
+      cshare->read_end=    cache->read_end;
+      cshare->pos_in_file= pos_in_file;
+
+      /* Mark all threads as running and wake them. */
+      unlock_io_cache(cache);
     }
     else
     {
-      info->error=          info->share->active->error;
-      info->read_end=       info->share->active->read_end;
-      info->pos_in_file=    info->share->active->pos_in_file;
-      len= (info->error == -1 ? -1 : info->read_end-info->buffer);
+      /*
+        With a synchronized write/read cache we always come here.
+        If the writer was removed from the share while this thread was
+        asleep, we need to simulate an EOF condition.
+      */
+      if (!cshare->read_end || (cshare->pos_in_file < pos_in_file))
+      {
+        DBUG_PRINT("io_cache_share", ("reader found writer removed. EOF"));
+        cshare->read_end= cshare->buffer; /* Empty buffer. */
+        cshare->error= 0; /* EOF is not an error. */
+      }
+
+      /* Copy important values from the share. */
+      cache->error=       cshare->error;
+      cache->read_end=    cshare->read_end;
+      cache->pos_in_file= cshare->pos_in_file;
+
+      len= ((cache->error == -1) ? -1 : cache->read_end - cache->buffer);
     }
-    info->read_pos=info->buffer;
-    info->seek_not_done=0;
+    cache->read_pos=      cache->buffer;
+    cache->seek_not_done= 0;
     if (len <= 0)
     {
-      info->error= (int) left_length;
+      DBUG_PRINT("io_cache_share", ("reader error. len %d  left %u",
+                                    len, left_length));
+      cache->error= (int) left_length;
       DBUG_RETURN(1);
     }
     cnt= ((uint) len > Count) ? (int) Count : len;
-    memcpy(Buffer, info->read_pos, (size_t) cnt);
+    memcpy(Buffer, cache->read_pos, (size_t) cnt);
     Count -= cnt;
     Buffer+= cnt;
     left_length+= cnt;
-    info->read_pos+= cnt;
+    cache->read_pos+= cnt;
   }
   DBUG_RETURN(0);
 }
+
+
+/*
+  Copy data from write cache to read cache.
+
+  SYNOPSIS
+    copy_to_read_buffer()
+      write_cache               The write cache.
+      write_buffer              The source of data, mostly the cache buffer.
+      write_length              The number of bytes to copy.
+
+  NOTE
+    The write thread will wait for all read threads to join the cache
+    lock. Then it copies the data over and wakes the read threads.
+
+  RETURN
+    void
+*/
+
+static void copy_to_read_buffer(IO_CACHE *write_cache,
+                                const byte *write_buffer, uint write_length)
+{
+  IO_CACHE_SHARE *cshare= write_cache->share;
+
+  /*
+    write_length is usually less or equal to buffer_length.
+    It can be bigger if _my_b_write() is called with a big length.
+  */
+  while (write_length)
+  {
+    uint copy_length= min(write_length, write_cache->buffer_length);
+#if !defined(DBUG_OFF)
+    int  rc=
 #endif
+    lock_io_cache(write_cache, write_cache->pos_in_file);
+    /* The writing thread does always have the lock when it awakes. */
+    DBUG_ASSERT(rc);
+
+    memcpy(cshare->buffer, write_buffer, copy_length);
+
+    cshare->error=       0;
+    cshare->read_end=    cshare->buffer + copy_length;
+    cshare->pos_in_file= write_cache->pos_in_file;
+
+    /* Mark all threads as running and wake them. */
+    unlock_io_cache(write_cache);
+
+    write_buffer+= copy_length;
+    write_length-= copy_length;
+  }
+}
+#endif /*THREAD*/
 
 
 /*
@@ -1016,6 +1378,7 @@ int _my_b_write(register IO_CACHE *info,
   Buffer+=rest_length;
   Count-=rest_length;
   info->write_pos+=rest_length;
+
   if (flush_io_cache(info))
     return 1;
   if (Count >= IO_SIZE)
@@ -1028,6 +1391,23 @@ int _my_b_write(register IO_CACHE *info,
     }
     if (my_write(info->file,Buffer,(uint) length,info->myflags | MY_NABP))
       return info->error= -1;
+
+#ifdef THREAD
+    /*
+      In case of non-quick parallel repair the index creators are fed
+      from the data file writer by direct write cache to read cache
+      copy. Simulate this here by direct caller buffer to read cache
+      copy. Do it after the write so that the cache readers actions on
+      the flushed part can go in parallel with the write of the extra
+      stuff. copy_to_read_buffer() synchronizes writer and readers so
+      that after this call the readers can act on the extra stuff while
+      the writer can go ahead and prepare the next output.
+      copy_to_read_buffer() relies on info->pos_in_file.
+    */
+    if (info->share)
+      copy_to_read_buffer(info, Buffer, length);
+#endif
+
     Count-=length;
     Buffer+=length;
     info->pos_in_file+=length;
@@ -1048,6 +1428,11 @@ int my_b_append(register IO_CACHE *info,
 {
   uint rest_length,length;
 
+#ifdef THREAD
+  /* Assert that we cannot come here with a shared read+write cache. */
+  DBUG_ASSERT(!info->share || !info->share->source_cache);
+#endif
+
   lock_append_buffer(info);
   rest_length=(uint) (info->write_end - info->write_pos);
   if (Count <= rest_length)
@@ -1108,6 +1493,11 @@ int my_block_write(register IO_CACHE *in
   uint length;
   int error=0;
 
+#ifdef THREAD
+  /* Assert that we cannot come here with a shared read+write cache. */
+  DBUG_ASSERT(!info->share || !info->share->source_cache);
+#endif
+
   if (pos < info->pos_in_file)
   {
     /* Of no overlap, write everything without buffering */
@@ -1184,6 +1574,18 @@ int my_b_flush_io_cache(IO_CACHE *info, 
 
     if ((length=(uint) (info->write_pos - info->write_buffer)))
     {
+#ifdef THREAD
+      /*
+        In case of non-quick parallel repair the index creators are fed
+        from the data file writer by direct write cache to read cache
+        copy. Do it before the write here so that the readers can work
+        in parallel with the write. copy_to_read_buffer() relies on
+        info->pos_in_file.
+      */
+      if (info->share)
+        copy_to_read_buffer(info, info->write_buffer, length);
+#endif
+
       pos_in_file=info->pos_in_file;
       /*
 	If we have append cache, we always open the file with
@@ -1262,16 +1664,10 @@ int end_io_cache(IO_CACHE *info)
 
 #ifdef THREAD
   /*
-    if IO_CACHE is shared between several threads, only one
-    thread needs to call end_io_cache() - just as init_io_cache()
-    should be called only once and then memcopy'ed
+    Every thread must call remove_io_thread(). The last one destroys
+    the share.
   */
-  if (info->share)
-  {
-    pthread_cond_destroy (&info->share->cond);
-    pthread_mutex_destroy(&info->share->mutex);
-    info->share=0;
-  }
+  DBUG_ASSERT(!info->share || !info->share->total_threads);
 #endif
 
   if ((pre_close=info->pre_close))

--- 1.61/mysql-test/r/myisam.result	2006-08-05 12:41:16 +02:00
+++ 1.62/mysql-test/r/myisam.result	2006-08-05 12:41:16 +02:00
@@ -769,3 +769,129 @@ a	b
 xxxxxxxxx	bbbbbb
 xxxxxxxxx	bbbbbb
 DROP TABLE t1;
+SET @@myisam_repair_threads=2;
+SHOW VARIABLES LIKE 'myisam_repair%';
+Variable_name	Value
+myisam_repair_threads	2
+CREATE TABLE t1 (
+`_id` int(11) NOT NULL default '0',
+`url` text,
+`email` text,
+`description` text,
+`loverlap` int(11) default NULL,
+`roverlap` int(11) default NULL,
+`lneighbor_id` int(11) default NULL,
+`rneighbor_id` int(11) default NULL,
+`length_` int(11) default NULL,
+`sequence` mediumtext,
+`name` text,
+`_obj_class` text NOT NULL,
+PRIMARY KEY  (`_id`),
+UNIQUE KEY `sequence_name_index` (`name`(50)),
+KEY (`length_`)
+) ENGINE=MyISAM DEFAULT CHARSET=latin1;
+INSERT INTO t1 VALUES
+(1,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,'sample1',''),
+(2,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,'sample2',''),
+(3,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,'sample3',''),
+(4,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,'sample4',''),
+(5,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,'sample5',''),
+(6,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,'sample6',''),
+(7,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,'sample7',''),
+(8,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,'sample8',''),
+(9,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,'sample9','');
+SELECT _id FROM t1;
+_id
+1
+2
+3
+4
+5
+6
+7
+8
+9
+DELETE FROM t1 WHERE _id < 8;
+SHOW TABLE STATUS LIKE 't1';
+Name	Engine	Version	Row_format	Rows	Avg_row_length	Data_length	Max_data_length	Index_length	Data_free	Auto_increment	Create_time	Update_time	Check_time	Collation	Checksum	Create_options	Comment
+t1	MyISAM	9	Dynamic	2	#	#	#	#	140	#	#	#	#	#	#		
+CHECK TABLE t1 EXTENDED;
+Table	Op	Msg_type	Msg_text
+test.t1	check	status	OK
+OPTIMIZE TABLE t1;
+Table	Op	Msg_type	Msg_text
+test.t1	optimize	status	OK
+CHECK TABLE t1 EXTENDED;
+Table	Op	Msg_type	Msg_text
+test.t1	check	status	OK
+SHOW TABLE STATUS LIKE 't1';
+Name	Engine	Version	Row_format	Rows	Avg_row_length	Data_length	Max_data_length	Index_length	Data_free	Auto_increment	Create_time	Update_time	Check_time	Collation	Checksum	Create_options	Comment
+t1	MyISAM	9	Dynamic	2	#	#	#	#	0	#	#	#	#	#	#		
+SELECT _id FROM t1;
+_id
+8
+9
+DROP TABLE t1;
+CREATE TABLE t1 (
+`_id` int(11) NOT NULL default '0',
+`url` text,
+`email` text,
+`description` text,
+`loverlap` int(11) default NULL,
+`roverlap` int(11) default NULL,
+`lneighbor_id` int(11) default NULL,
+`rneighbor_id` int(11) default NULL,
+`length_` int(11) default NULL,
+`sequence` mediumtext,
+`name` text,
+`_obj_class` text NOT NULL,
+PRIMARY KEY  (`_id`),
+UNIQUE KEY `sequence_name_index` (`name`(50)),
+KEY (`length_`)
+) ENGINE=MyISAM DEFAULT CHARSET=latin1;
+INSERT INTO t1 VALUES
+(1,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,'sample1',''),
+(2,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,'sample2',''),
+(3,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,'sample3',''),
+(4,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,'sample4',''),
+(5,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,'sample5',''),
+(6,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,'sample6',''),
+(7,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,'sample7',''),
+(8,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,'sample8',''),
+(9,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,'sample9','');
+SELECT _id FROM t1;
+_id
+1
+2
+3
+4
+5
+6
+7
+8
+9
+DELETE FROM t1 WHERE _id < 8;
+SHOW TABLE STATUS LIKE 't1';
+Name	Engine	Version	Row_format	Rows	Avg_row_length	Data_length	Max_data_length	Index_length	Data_free	Auto_increment	Create_time	Update_time	Check_time	Collation	Checksum	Create_options	Comment
+t1	MyISAM	9	Dynamic	2	#	#	#	#	140	#	#	#	#	#	#		
+CHECK TABLE t1 EXTENDED;
+Table	Op	Msg_type	Msg_text
+test.t1	check	status	OK
+REPAIR TABLE t1 QUICK;
+Table	Op	Msg_type	Msg_text
+test.t1	repair	status	OK
+CHECK TABLE t1 EXTENDED;
+Table	Op	Msg_type	Msg_text
+test.t1	check	status	OK
+SHOW TABLE STATUS LIKE 't1';
+Name	Engine	Version	Row_format	Rows	Avg_row_length	Data_length	Max_data_length	Index_length	Data_free	Auto_increment	Create_time	Update_time	Check_time	Collation	Checksum	Create_options	Comment
+t1	MyISAM	9	Dynamic	2	#	#	#	#	140	#	#	#	#	#	#		
+SELECT _id FROM t1;
+_id
+8
+9
+DROP TABLE t1;
+SET @@myisam_repair_threads=1;
+SHOW VARIABLES LIKE 'myisam_repair%';
+Variable_name	Value
+myisam_repair_threads	1

--- 1.47/mysql-test/t/myisam.test	2006-08-05 12:41:16 +02:00
+++ 1.48/mysql-test/t/myisam.test	2006-08-05 12:41:16 +02:00
@@ -726,4 +726,97 @@ UPDATE t1 AS ta1,t1 AS ta2 SET ta1.b='aa
 SELECT * FROM t1;
 DROP TABLE t1;
 
+#
+# Bug#8283 - OPTIMIZE TABLE causes data loss
+#
+SET @@myisam_repair_threads=2;
+SHOW VARIABLES LIKE 'myisam_repair%';
+#
+# Test OPTIMIZE. This creates a new data file.
+CREATE TABLE t1 (
+  `_id` int(11) NOT NULL default '0',
+  `url` text,
+  `email` text,
+  `description` text,
+  `loverlap` int(11) default NULL,
+  `roverlap` int(11) default NULL,
+  `lneighbor_id` int(11) default NULL,
+  `rneighbor_id` int(11) default NULL,
+  `length_` int(11) default NULL,
+  `sequence` mediumtext,
+  `name` text,
+  `_obj_class` text NOT NULL,
+  PRIMARY KEY  (`_id`),
+  UNIQUE KEY `sequence_name_index` (`name`(50)),
+  KEY (`length_`)
+) ENGINE=MyISAM DEFAULT CHARSET=latin1;
+#
+INSERT INTO t1 VALUES
+  (1,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,'sample1',''),
+  (2,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,'sample2',''),
+  (3,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,'sample3',''),
+  (4,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,'sample4',''),
+  (5,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,'sample5',''),
+  (6,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,'sample6',''),
+  (7,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,'sample7',''),
+  (8,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,'sample8',''),
+  (9,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,'sample9','');
+#
+SELECT _id FROM t1;
+DELETE FROM t1 WHERE _id < 8;
+--replace_column 6 # 7 # 8 # 9 # 11 # 12 # 13 # 14 # 15 # 16 #
+SHOW TABLE STATUS LIKE 't1';
+CHECK TABLE t1 EXTENDED;
+OPTIMIZE TABLE t1;
+CHECK TABLE t1 EXTENDED;
+--replace_column 6 # 7 # 8 # 9 # 11 # 12 # 13 # 14 # 15 # 16 #
+SHOW TABLE STATUS LIKE 't1';
+SELECT _id FROM t1;
+DROP TABLE t1;
+#
+# Test REPAIR QUICK. This retains the old data file.
+CREATE TABLE t1 (
+  `_id` int(11) NOT NULL default '0',
+  `url` text,
+  `email` text,
+  `description` text,
+  `loverlap` int(11) default NULL,
+  `roverlap` int(11) default NULL,
+  `lneighbor_id` int(11) default NULL,
+  `rneighbor_id` int(11) default NULL,
+  `length_` int(11) default NULL,
+  `sequence` mediumtext,
+  `name` text,
+  `_obj_class` text NOT NULL,
+  PRIMARY KEY  (`_id`),
+  UNIQUE KEY `sequence_name_index` (`name`(50)),
+  KEY (`length_`)
+) ENGINE=MyISAM DEFAULT CHARSET=latin1;
+#
+INSERT INTO t1 VALUES
+  (1,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,'sample1',''),
+  (2,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,'sample2',''),
+  (3,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,'sample3',''),
+  (4,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,'sample4',''),
+  (5,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,'sample5',''),
+  (6,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,'sample6',''),
+  (7,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,'sample7',''),
+  (8,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,'sample8',''),
+  (9,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,'sample9','');
+#
+SELECT _id FROM t1;
+DELETE FROM t1 WHERE _id < 8;
+--replace_column 6 # 7 # 8 # 9 # 11 # 12 # 13 # 14 # 15 # 16 #
+SHOW TABLE STATUS LIKE 't1';
+CHECK TABLE t1 EXTENDED;
+REPAIR TABLE t1 QUICK;
+CHECK TABLE t1 EXTENDED;
+--replace_column 6 # 7 # 8 # 9 # 11 # 12 # 13 # 14 # 15 # 16 #
+SHOW TABLE STATUS LIKE 't1';
+SELECT _id FROM t1;
+DROP TABLE t1;
+#
+SET @@myisam_repair_threads=1;
+SHOW VARIABLES LIKE 'myisam_repair%';
+
 # End of 4.1 tests
Thread
bk commit into 4.1 tree (istruewing:1.2521) BUG#8283ingo5 Aug