List:Internals« Previous MessageNext Message »
From:sasha Date:November 3 2001 11:54pm
Subject:bk commit into 4.0 tree
View as plain text  
Below is the list of changes that have just been committed into a
4.0 repository of sasha. When sasha does a push, they will be propogated to 
the main repository and within 24 hours after the push to the public repository. 
For information on how to access the public repository
see http://www.mysql.com/doc/I/n/Installing_source_tree.html

ChangeSet@stripped, 2001-11-03 16:54:00-07:00, sasha@stripped
  more work on IO_CACHE
  portability fixes for systems with broken syscalls that do not interrupt on 
  a signal
  temporary commit - will not be pushed, need to sync up

  BitKeeper/etc/ignore
    1.104 01/11/03 16:53:59 sasha@stripped +2 -0
    Added mysys/#mf_iocache.c# mysys/test_io_cache to the ignore list

  sql/mysqld.cc
    1.238 01/11/03 16:53:59 sasha@stripped +60 -11
    make shutdown work on broken systems

  sql/sql_repl.cc
    1.62 01/11/03 16:53:59 sasha@stripped +11 -7
    make sure slave can be stopped on broken systems in all cases, clean-up

  include/my_sys.h
    1.57 01/11/03 16:53:58 sasha@stripped +18 -2
    work on READ_APPEND cache

  mysys/Makefile.am
    1.26 01/11/03 16:53:58 sasha@stripped +4 -0
    change to test IO_CACHE

  mysys/mf_iocache.c
    1.11 01/11/03 16:53:58 sasha@stripped +195 -22
    work on READ_APPEND cache

# This is a BitKeeper patch.  What follows are the unified diffs for the
# set of deltas contained in the patch.  The rest of the patch, the part
# that BitKeeper cares about, is below these diffs.
# User:	sasha
# Host:	mysql.sashanet.com
# Root:	/home/sasha/src/bk/mysql-4.0

--- 1.56/include/my_sys.h	Tue Oct 23 13:27:19 2001
+++ 1.57/include/my_sys.h	Sat Nov  3 16:53:58 2001
@@ -293,6 +293,16 @@
 struct st_io_cache;
 typedef int (*IO_CACHE_CALLBACK)(struct st_io_cache*);
 
+#ifdef THREAD
+#define lock_append_buffer(info) \
+ pthread_mutex_lock(&(info)->append_buffer_lock)
+#define unlock_append_buffer(info) \
+ pthread_mutex_unlock(&(info)->append_buffer_lock)
+#else
+#define lock_append_buffer(info)
+#define unlock_append_buffer(info)
+#endif
+
 typedef struct st_io_cache		/* Used when cacheing files */
 {
   my_off_t pos_in_file,end_of_file;
@@ -301,7 +311,7 @@
 			     that will use a buffer allocated somewhere
 			     else
 			   */
-  byte *append_buffer, *append_pos, *append_end;
+  byte *append_buffer, *append_read_pos, *append_write_pos, *append_end;
 /* for append buffer used in READ_APPEND cache */
 #ifdef THREAD
   pthread_mutex_t append_buffer_lock;
@@ -348,10 +358,15 @@
    _my_b_get(info))
 
 #define my_b_write(info,Buffer,Count) \
+  ((info)->type != SEQ_READ_APPEND) ? (\
   ((info)->rc_pos + (Count) <= (info)->rc_end ?\
    (memcpy((info)->rc_pos,Buffer,(size_t) (Count)), \
     ((info)->rc_pos+=(Count)),0) :\
-   _my_b_write(info,Buffer,Count))
+   _my_b_write(info,Buffer,Count))) : \
+   ((info)->append_write_pos + (Count) <= (info)->append_end ?\
+   (memcpy((info)->append_write_pos,Buffer,(size_t)Count), \
+   ((info)->append_write_pos+=(Count),0)) : \
+   _my_b_append(info,Buffer,Count))
 
 	/* my_b_write_byte dosn't have any err-check */
 #define my_b_write_byte(info,chr) \
@@ -564,6 +579,7 @@
 extern int _my_b_get(IO_CACHE *info);
 extern int _my_b_async_read(IO_CACHE *info,byte *Buffer,uint Count);
 extern int _my_b_write(IO_CACHE *info,const byte *Buffer,uint Count);
+extern int _my_b_append(IO_CACHE *info,const byte *Buffer,uint Count);
 extern int my_block_write(IO_CACHE *info, const byte *Buffer,
 			  uint Count, my_off_t pos);
 extern int flush_io_cache(IO_CACHE *info);

--- 1.25/mysys/Makefile.am	Sun Oct  7 20:00:05 2001
+++ 1.26/mysys/Makefile.am	Sat Nov  3 16:53:58 2001
@@ -95,6 +95,10 @@
 	$(CP) $(srcdir)/my_vsnprintf.c test_vsnprintf.c
 	$(LINK) $(FLAGS) -DMAIN ./test_vsnprintf.c $(LDADD) $(LIBS)
 	$(RM) -f test_vsnprintf.*
+test_io_cache: mf_iocache.c $(LIBRARIES)
+	$(CP) $(srcdir)/mf_iocache.c test_io_cache.c
+	$(LINK) $(FLAGS) -DMAIN ./test_io_cache.c $(LDADD) $(LIBS)
+	$(RM) -f test_io_cache.*
 
 test_dir: test_dir.c $(LIBRARIES)
 	$(LINK) $(FLAGS) -DMAIN $(srcdir)/test_dir.c $(LDADD) $(LIBS)

--- 1.10/mysys/mf_iocache.c	Tue Oct 23 13:27:20 2001
+++ 1.11/mysys/mf_iocache.c	Sat Nov  3 16:53:58 2001
@@ -41,6 +41,10 @@
 #include <assert.h>
 #include <errno.h>
 
+#ifdef MAIN
+#include <my_dir.h>
+#endif
+
 static void init_read_function(IO_CACHE* info, enum cache_type type);
 
 static void init_read_function(IO_CACHE* info, enum cache_type type)
@@ -152,7 +156,7 @@
   info->rc_request_pos=info->rc_pos=info->buffer;
   if (type == SEQ_READ_APPEND)
   {
-    info->append_pos = info->append_buffer;
+    info->append_read_pos = info->append_write_pos = info->append_buffer;
     info->append_end = info->append_buffer + info->buffer_length;
 #ifdef THREAD    
     pthread_mutex_init(&info->append_buffer_lock,MY_MUTEX_INIT_FAST);
@@ -277,6 +281,10 @@
 			  ~(my_off_t) 0);
     }
   }
+  if (info->type == SEQ_READ_APPEND)
+  {
+    info->append_read_pos = info->append_write_pos = info->append_buffer;
+  }
   info->type=type;
   info->error=0;
   init_read_function(info,type);
@@ -294,7 +302,7 @@
   info->inited=0;
 #endif
   DBUG_RETURN(0);
-} /* init_io_cache */
+} /* reinit_io_cache */
 
 
 
@@ -377,11 +385,19 @@
   return 0;
 }
 
+/* Do sequential read from the SEQ_READ_APPEND cache
+   we do this in three stages:
+    - first read from info->buffer
+    - then if there are still data to read, try the file descriptor
+    - afterwards, if there are still data to read, try append buffer
+*/
+
 int _my_b_seq_read(register IO_CACHE *info, byte *Buffer, uint Count)
 {
-  uint length,diff_length,left_length;
+  uint length,diff_length,left_length,save_count;
   my_off_t max_length, pos_in_file;
-  
+  save_count=Count;
+  /* first, read the regular buffer */
   if ((left_length=(uint) (info->rc_end-info->rc_pos)))
   {
     dbug_assert(Count >= left_length);	/* User is not using my_b_read() */
@@ -390,30 +406,33 @@
     Count-=left_length;
   }
   /* pos_in_file always point on where info->buffer was read */
-  pos_in_file=info->pos_in_file+(uint) (info->rc_end - info->buffer);
+  if ((pos_in_file=info->pos_in_file+(uint) (info->rc_end - info->buffer)) >=
+      info->end_of_file)
+  {
+    info->pos_in_file=pos_in_file;
+    goto read_append_buffer;
+  }
   /* no need to seek since the read is guaranteed to be sequential */
   diff_length=(uint) (pos_in_file & (IO_SIZE-1));
-#ifdef THREAD
-  pthread_mutex_lock(&info->append_buffer_lock);
-#endif  
-#ifdef THREAD
-  pthread_mutex_unlock(&info->append_buffer_lock);
-#endif  
+  
+  /* now the second stage begins - read from file descriptor */
   if (Count >= (uint) (IO_SIZE+(IO_SIZE-diff_length)))
   {					/* Fill first intern buffer */
     uint read_length;
     if (info->end_of_file == pos_in_file)
     {					/* End of file */
-      info->error=(int) left_length;
-      return 1;
+      goto read_append_buffer;
     }
     length=(Count & (uint) ~(IO_SIZE-1))-diff_length;
     if ((read_length=my_read(info->file,Buffer,(uint) length,info->myflags))
 	!= (uint) length)
     {
-      info->error= read_length == (uint) -1 ? -1 :
-	(int) (read_length+left_length);
-      return 1;
+      if (read_length != (uint)-1)
+      {
+	Count -= read_length;
+	Buffer += read_length;
+      }
+      goto read_append_buffer;
     }
     Count-=length;
     Buffer+=length;
@@ -422,15 +441,13 @@
     diff_length=0;
   }
   max_length=info->read_length-diff_length;
-  if (info->type != READ_FIFO &&
-      (info->end_of_file - pos_in_file) < max_length)
+  if ((info->end_of_file - pos_in_file) < max_length)
     max_length = info->end_of_file - pos_in_file;
   if (!max_length)
   {
     if (Count)
     {
-      info->error= left_length;		/* We only got this many char */
-      return 1;
+      goto read_append_buffer;
     }
     length=0;				/* Didn't read any chars */
   }
@@ -439,15 +456,36 @@
 	   length == (uint) -1)
   {
     if (length != (uint) -1)
+    {
       memcpy(Buffer,info->buffer,(size_t) length);
-    info->error= length == (uint) -1 ? -1 : (int) (length+left_length);
-    return 1;
+      Count -= length;
+      Buffer += length;
+    }
+    goto read_append_buffer;
   }
   info->rc_pos=info->buffer+Count;
   info->rc_end=info->buffer+length;
   info->pos_in_file=pos_in_file;
   memcpy(Buffer,info->buffer,(size_t) Count);
   return 0;
+read_append_buffer:
+  lock_append_buffer(info);
+  if (!Count) return 0;
+  {
+    uint copy_len = (uint)(info->append_read_pos -
+			   info->append_write_pos);
+    dbug_assert(info->append_read_pos <= info->append_write_pos);
+    if (copy_len > Count)
+      copy_len = Count;
+    memcpy(Buffer, info->append_read_pos,
+	 copy_len);
+    info->append_read_pos += copy_len;
+    Count -= copy_len;
+    if (Count)
+      info->error = save_count - Count; 
+  }
+  unlock_append_buffer(info);
+  return Count ? 1 : 0;
 }
 
 #ifdef HAVE_AIOWAIT
@@ -672,6 +710,31 @@
   return 0;
 }
 
+int _my_b_append(register IO_CACHE *info, const byte *Buffer, uint Count)
+{
+  uint rest_length,length;
+
+  rest_length=(uint) (info->append_end -
+		      info->append_write_pos);
+  memcpy(info->append_write_pos,Buffer,(size_t) rest_length);
+  Buffer+=rest_length;
+  Count-=rest_length;
+  info->append_write_pos+=rest_length;
+  if (flush_io_cache(info))
+    return 1;
+  if (Count >= IO_SIZE)
+  {					/* Fill first intern buffer */
+    length=Count & (uint) ~(IO_SIZE-1);
+    if (my_write(info->file,Buffer,(uint) length,info->myflags | MY_NABP))
+      return info->error= -1;
+    Count-=length;
+    Buffer+=length;
+  }
+  memcpy(info->append_write_pos,Buffer,(size_t) Count);
+  info->append_write_pos+=Count;
+  return 0;
+}
+
 
 /*
   Write a block to disk where part of the data may be inside the record
@@ -756,6 +819,30 @@
       DBUG_RETURN(0);
     }
   }
+  else if (info->type == SEQ_READ_APPEND)
+  {
+    if (info->file == -1)
+    {
+      if (real_open_cached_file(info))
+	DBUG_RETURN((info->error= -1));
+    }
+    lock_append_buffer(info);
+    if (info->append_write_pos != info->append_buffer)
+    {
+      length=(uint) (info->append_write_pos - info->append_buffer);
+      info->append_read_pos=info->append_write_pos=info->append_buffer;
+      info->append_end=(info->append_buffer+info->buffer_length-
+		    (info->pos_in_file & (IO_SIZE-1)));
+      if (my_write(info->file,info->buffer,length,info->myflags | MY_NABP))
+      {
+	unlock_append_buffer(info);
+	DBUG_RETURN((info->error= -1));
+      }
+      unlock_append_buffer(info);
+      DBUG_RETURN(0);
+    }
+    unlock_append_buffer(info);
+  }
 #ifdef HAVE_AIOWAIT
   else if (info->type != READ_NET)
   {
@@ -783,4 +870,90 @@
   }
   DBUG_RETURN(error);
 } /* end_io_cache */
+
+#ifdef MAIN
+void die(const char* fmt, ...)
+{
+  va_list va_args;
+  va_start(va_args,fmt);
+  fprintf(stderr,"Error:");
+  vfprintf(stderr, fmt,va_args);
+  fprintf(stderr,", errno=%d\n", errno);
+  exit(1);
+}
+
+int open_file(const char* fname, IO_CACHE* info, int cache_size)
+{
+  int fd;
+  if ((fd=my_open(fname,O_CREAT|O_APPEND|O_RDWR,MYF(MY_WME))) < 0)
+    die("Could not open %s", fname);
+  if (init_io_cache(info, fd, cache_size, SEQ_READ_APPEND, 0,0,MYF(MY_WME)))
+    die("failed in init_io_cache()");
+  return fd;
+}
+
+void close_file(IO_CACHE* info)
+{
+  end_io_cache(info);
+  my_close(info->file, MYF(MY_WME));
+}
+
+int main(int argc, char** argv)
+{
+  IO_CACHE sra_cache; /* SEQ_READ_APPEND */
+  MY_STAT status;
+  const char* fname="/tmp/iocache.test";
+  int cache_size=16384;
+  char llstr_buf[22];
+  int max_block,total_bytes=0;
+  int i,num_loops=100,error=0;
+  char *p;
+  char* block, *block_end;
+  MY_INIT(argv[0]);
+  max_block = cache_size*3;
+  if (!(block=(char*)my_malloc(max_block,MYF(MY_WME))))
+    die("Not enough memory to allocate test block");
+  block_end = block + max_block;
+  for (p = block,i=0; p < block_end;i++)
+  {
+    *p++ = (char)i;
+  }
+  if (my_stat(fname,&status, MYF(0)) &&
+      my_delete(fname,MYF(MY_WME)))
+    {
+      die("Delete of %s failed, aborting", fname);
+    }
+  open_file(fname,&sra_cache, cache_size);
+  for (i = 0; i < num_loops; i++)
+  {
+    char buf[4];
+    int block_size = abs(rand() % max_block);
+    int4store(buf, block_size);
+    if (my_b_write(&sra_cache,buf,4) ||
+	my_b_write(&sra_cache, block, block_size))
+      die("write failed");
+    total_bytes += 4+block_size;
+  }
+  close_file(&sra_cache);
+  my_free(block,MYF(MY_WME));
+  if (!my_stat(fname,&status,MYF(MY_WME)))
+    die("%s failed to stat, but I had just closed it,\
+ wonder how that happened");
+  printf("Final size of %s is %s, wrote %d bytes\n",fname,
+	 llstr(status.st_size,llstr_buf),
+	 total_bytes);
+  my_delete(fname, MYF(MY_WME));
+  /* check correctness of tests */
+  if (total_bytes != status.st_size)
+  {
+    fprintf(stderr,"Not the same number of bytes acutally  in file as bytes \
+supposedly written\n");
+    error=1;
+  }
+  exit(error);
+  return 0;
+}
+#endif
+
+
 

--- 1.237/sql/mysqld.cc	Sat Oct 20 02:22:23 2001
+++ 1.238/sql/mysqld.cc	Sat Nov  3 16:53:59 2001
@@ -45,6 +45,11 @@
 char pstack_file_name[80];
 #endif /* __linux__ */
 
+#if defined(HAVE_DEC_3_2_THREADS) || defined(SIGNALS_DONT_BREAK_READ)
+#define HAVE_CLOSE_SERVER_SOCK 1
+void close_server_sock();
+#endif  
+
 extern "C" {					// Because of SCO 3.2V4.2
 #include <errno.h>
 #include <sys/stat.h>
@@ -453,16 +458,7 @@
       sql_print_error("Got error %d from pthread_cond_timedwait",error);
 #endif
 #if defined(HAVE_DEC_3_2_THREADS) || defined(SIGNALS_DONT_BREAK_READ)
-    if (ip_sock != INVALID_SOCKET)
-    {
-      DBUG_PRINT("error",("closing TCP/IP and socket files"));
-      VOID(shutdown(ip_sock,2));
-      VOID(closesocket(ip_sock));
-      VOID(shutdown(unix_sock,2));
-      VOID(closesocket(unix_sock));
-      VOID(unlink(mysql_unix_port));
-      ip_sock=unix_sock= INVALID_SOCKET;
-    }
+    close_server_sock();
 #endif
   }
   (void) pthread_mutex_unlock(&LOCK_thread_count);
@@ -577,10 +573,37 @@
   DBUG_VOID_RETURN;
 }
 
+#ifdef HAVE_CLOSE_SERVER_SOCK
+void close_server_sock()
+{
+  DBUG_ENTER("close_server_sock");
+  if (ip_sock != INVALID_SOCKET)
+  {
+    DBUG_PRINT("info",("closing TCP/IP socket"));
+    VOID(shutdown(ip_sock,2));
+    VOID(closesocket(ip_sock));
+    ip_sock=INVALID_SOCKET;
+  }
+  if (unix_sock != INVALID_SOCKET)
+  {
+    DBUG_PRINT("info",("closing Unix socket"));
+    VOID(shutdown(unix_sock,2));
+    VOID(closesocket(unix_sock));
+    VOID(unlink(mysql_unix_port));
+    unix_sock=INVALID_SOCKET;
+  }
+  DBUG_VOID_RETURN;
+}
+#endif
+
 void kill_mysql(void)
 {
   DBUG_ENTER("kill_mysql");
 
+#ifdef SIGNALS_DONT_BREAK_READ
+  close_server_sock(); /* force accept to wake up */
+#endif  
+  
 #if defined(__WIN__)
   {
     if (!SetEvent(hEventShutdown))
@@ -604,6 +627,7 @@
 #endif
     DBUG_PRINT("quit",("After pthread_kill"));
     shutdown_in_progress=1;			// Safety if kill didn't work
+    abort_loop=1;
     DBUG_VOID_RETURN;
 }
 
@@ -2023,6 +2047,7 @@
   sql_print_error("Before Lock_thread_count");
 #endif
   (void) pthread_mutex_lock(&LOCK_thread_count);
+  DBUG_PRINT("quit", ("Got thread_count mutex"));
   select_thread_in_use=0;			// For close_connections
   (void) pthread_cond_broadcast(&COND_thread_count);
   (void) pthread_mutex_unlock(&LOCK_thread_count);
@@ -2054,10 +2079,14 @@
 #endif /* HAVE_OPENSSL */
   /* Wait until cleanup is done */
   (void) pthread_mutex_lock(&LOCK_thread_count);
+  DBUG_PRINT("quit", ("Got thread_count mutex for clean up wait"));
+  
   while (!ready_to_exit)
   {
+    DBUG_PRINT("quit", ("not yet ready to exit"));
     pthread_cond_wait(&COND_thread_count,&LOCK_thread_count);
   }
+  DBUG_PRINT("quit", ("ready to exit"));
   (void) pthread_mutex_unlock(&LOCK_thread_count);
   my_end(opt_endinfo ? MY_CHECK_ERROR | MY_GIVE_INFO : 0);
   exit(0);
@@ -2253,6 +2282,20 @@
   DBUG_VOID_RETURN;
 }
 
+#ifdef SIGNALS_DONT_BREAK_READ
+inline void kill_broken_server()
+{
+  /* hack to get around signals ignored in syscalls for problem OS's */
+  if (unix_sock == INVALID_SOCKET || ip_sock ==INVALID_SOCKET)
+  {
+    select_thread_in_use = 0;
+    kill_server((void*)MYSQL_KILL_SIGNAL); /* never returns */
+  }
+}
+#define MAYBE_BROKEN_SYSCALL kill_broken_server();
+#else
+#define MAYBE_BROKEN_SYSCALL
+#endif
 
 	/* Handle new connections and spawn new process to handle them */
 
@@ -2288,6 +2331,7 @@
 #endif
 
   DBUG_PRINT("general",("Waiting for connections."));
+  MAYBE_BROKEN_SYSCALL;
   while (!abort_loop)
   {
     readFDs=clientFDs;
@@ -2302,12 +2346,15 @@
 	if (!select_errors++ && !abort_loop)	/* purecov: inspected */
 	  sql_print_error("mysqld: Got error %d from select",socket_errno); /* purecov: inspected */
       }
+      MAYBE_BROKEN_SYSCALL
       continue;
     }
 #endif	/* HPUX */
     if (abort_loop)
+    {
+      MAYBE_BROKEN_SYSCALL;
       break;
-
+    }
     /*
     ** Is this a new connection request
     */
@@ -2343,6 +2390,7 @@
       if (new_sock != INVALID_SOCKET ||
 	  (socket_errno != SOCKET_EINTR && socket_errno != SOCKET_EAGAIN))
 	break;
+      MAYBE_BROKEN_SYSCALL;
 #if !defined(NO_FCNTL_NONBLOCK)
       if (!(test_flags & TEST_BLOCKING))
       {
@@ -2359,6 +2407,7 @@
     {
       if ((error_count++ & 255) == 0)		// This can happen often
 	sql_perror("Error in accept");
+      MAYBE_BROKEN_SYSCALL;
       if (socket_errno == SOCKET_ENFILE || socket_errno == SOCKET_EMFILE)
 	sleep(1);				// Give other threads some time
       continue;

--- 1.103/BitKeeper/etc/ignore	Fri Oct 19 08:46:44 2001
+++ 1.104/BitKeeper/etc/ignore	Sat Nov  3 16:53:59 2001
@@ -424,3 +424,5 @@
 vio/test-sslclient
 vio/test-sslserver
 vio/viotest-ssl
+mysys/#mf_iocache.c#
+mysys/test_io_cache

--- 1.61/sql/sql_repl.cc	Tue Oct 23 13:27:20 2001
+++ 1.62/sql/sql_repl.cc	Sat Nov  3 16:53:59 2001
@@ -38,6 +38,13 @@
 static int binlog_dump_count = 0;
 #endif
 
+#ifdef SIGNAL_WITH_VIO_CLOSE
+#define KICK_SLAVE { slave_thd->close_active_vio(); \
+  thr_alarm_kill(slave_real_id); }
+#else
+#define KICK_SLAVE thr_alarm_kill(slave_real_id);
+#endif
+
 static Slave_log_event* find_slave_event(IO_CACHE* log,
 					 const char* log_file_name,
 					 char* errmsg);
@@ -700,10 +707,7 @@
   if (slave_running)
   {
     abort_slave = 1;
-    thr_alarm_kill(slave_real_id);
-#ifdef SIGNAL_WITH_VIO_CLOSE
-    slave_thd->close_active_vio();
-#endif
+    KICK_SLAVE;
     // do not abort the slave in the middle of a query, so we do not set
     // thd->killed for the slave thread
     thd->proc_info = "waiting for slave to die";
@@ -728,7 +732,7 @@
 #endif
       pthread_cond_timedwait(&COND_slave_stopped, &LOCK_slave, &abstime);
       if (slave_running)
-        thr_alarm_kill(slave_real_id);
+        KICK_SLAVE;
     }
   }
   else
@@ -818,7 +822,7 @@
   if ((slave_was_running = slave_running))
   {
     abort_slave = 1;
-    thr_alarm_kill(slave_real_id);
+    KICK_SLAVE;
     thd->proc_info = "waiting for slave to die";
     while (slave_running)
       pthread_cond_wait(&COND_slave_stopped, &LOCK_slave); // wait until done
@@ -1470,7 +1474,7 @@
   if ((slave_was_running = slave_running))
   {
     abort_slave = 1;
-    thr_alarm_kill(slave_real_id);
+    KICK_SLAVE;
     thd->proc_info = "waiting for slave to die";
     while (slave_running)
       pthread_cond_wait(&COND_slave_stopped, &LOCK_slave); // wait until done
Thread
bk commit into 4.0 treesasha4 Nov