List:Commits« Previous MessageNext Message »
From:Brian Aker Date:October 22 2007 11:49pm
Subject:bk commit into 5.2 tree (brian:1.2646)
View as plain text  
Below is the list of changes that have just been committed into a local
5.2 repository of brian. When brian does a push these changes will
be propagated to the main repository and, within 24 hours after the
push, to the public repository.
For information on how to access the public repository
see http://dev.mysql.com/doc/mysql/en/installing-source-tree.html

ChangeSet@stripped, 2007-10-22 16:49:44-07:00, brian@stripped +4 -0
  Cleanup. This gives us one buffer type used between read/write which is now the same size. 

  storage/archive/archive_reader.c@stripped, 2007-10-22 16:49:42-07:00, brian@stripped +1 -7
    Rewrote error trap to provide more information. 

  storage/archive/archive_test.c@stripped, 2007-10-22 16:49:42-07:00, brian@stripped +1 -2
    Space cleanup. 

  storage/archive/azio.c@stripped, 2007-10-22 16:49:42-07:00, brian@stripped +84 -60
    Restructured IO to use azio_bucket_st structures. 

  storage/archive/azio.h@stripped, 2007-10-22 16:49:42-07:00, brian@stripped +20 -9
    Introduced bucket structures. Reads and writes are now the same size. 

diff -Nrup a/storage/archive/archive_reader.c b/storage/archive/archive_reader.c
--- a/storage/archive/archive_reader.c	2007-10-21 14:19:49 -07:00
+++ b/storage/archive/archive_reader.c	2007-10-22 16:49:42 -07:00
@@ -118,15 +118,9 @@ int main(int argc, char *argv[])
 
     while ((read= azread_row(&reader_handle, &error)))
     {
-      if (error == Z_STREAM_ERROR || &error)
-      {
-        printf("Table is damaged\n");
-        goto end;
-      }
-
       row_count++;
 
-      if (read > reader_handle.longest_row)
+      if (error == Z_STREAM_ERROR || read > reader_handle.longest_row)
       {
         printf("Table is damaged, row %llu is invalid\n", row_count);
         goto end;
diff -Nrup a/storage/archive/archive_test.c b/storage/archive/archive_test.c
--- a/storage/archive/archive_test.c	2007-10-21 14:57:03 -07:00
+++ b/storage/archive/archive_test.c	2007-10-22 16:49:42 -07:00
@@ -194,8 +194,6 @@ int small_test(az_method method)
   assert(writer_handle.rows == TEST_LOOP_NUM);
 
 
-  /* Test here for falling off the planet */
-
   /* Final Write before closing */
   ret= azwrite_row(&writer_handle, test_string, BUFFER_LEN);
   assert(ret == BUFFER_LEN);
@@ -267,6 +265,7 @@ int small_test(az_method method)
 
   azclose(&writer_handle);
   azclose(&reader_handle);
+
   unlink(TEST_FILENAME);
 
   return 0;
diff -Nrup a/storage/archive/azio.c b/storage/archive/azio.c
--- a/storage/archive/azio.c	2007-10-21 23:25:46 -07:00
+++ b/storage/archive/azio.c	2007-10-22 16:49:42 -07:00
@@ -45,34 +45,49 @@ static void do_aio_cleanup(azio_stream *
 
 static pthread_handler_t run_task(void *p)
 {
-  int fd;
-  char *buffer;
-  size_t offset;
+  unsigned int x;
+  azio_bucket_st *bucket;
   azio_stream *s= (azio_stream *)p;  
 
   my_thread_init();
 
   while (1)
   {
+    bucket= NULL;
     pthread_mutex_lock(&s->container.thresh_mutex);
     while (s->container.ready == AZ_THREAD_FINISHED)
     {
       pthread_cond_wait(&s->container.threshhold, &s->container.thresh_mutex);
     }
-    offset= s->container.offset;
-    fd= s->container.fd;
-    buffer= s->container.buffer;
+    for (x= 0; x < AZ_BUCKET_NUM; x++)
+    {
+      if (s->bucket[x].state == AZ_STATE_HOT)
+      {
+        bucket= &s->bucket[x];
+        break;
+      }
+    }
     pthread_mutex_unlock(&s->container.thresh_mutex);
 
     if (s->container.ready == AZ_THREAD_DEAD)
       break;
 
-    s->container.read_size= my_pread(fd, (uchar *)buffer, AZ_BUFSIZE_READ, 
-                                     offset, MYF(0));
+    if (bucket)
+    {
+      bucket->read_size= my_pread(bucket->fd, (uchar *)bucket->buffer, 
+                                  AZ_BUFSIZE, 
+                                  bucket->offset, MYF(0));
+      if ((ssize_t)bucket->read_size == -1)
+      {
+        WATCHPOINT_ERRNO(errno);
+        assert(0);
+      }
 
-    pthread_mutex_lock(&s->container.thresh_mutex);
-    s->container.ready= AZ_THREAD_FINISHED; 
-    pthread_mutex_unlock(&s->container.thresh_mutex);
+      pthread_mutex_lock(&s->container.thresh_mutex);
+      bucket->state= AZ_STATE_COLD;
+      s->container.ready= AZ_THREAD_FINISHED; 
+      pthread_mutex_unlock(&s->container.thresh_mutex);
+    }
   }
 
   my_thread_end();
@@ -90,10 +105,12 @@ static void azio_kill(azio_stream *s)
   pthread_join(s->container.mainthread, (void *)NULL);
 }
 
+/*
 static size_t azio_return(azio_stream *s)
 {
-  return s->container.read_size;
+  return s->inbuf->read_size;
 }
+*/
 
 /*
   Worried about spin?
@@ -137,10 +154,11 @@ static int azio_start(azio_stream *s)
   return rc;
 }
 
-static int azio_read(azio_stream *s)
+static int azio_read(azio_stream *s, azio_bucket_st *bucket)
 {
   pthread_mutex_lock(&s->container.thresh_mutex);
   s->container.ready= AZ_THREAD_ACTIVE; 
+  bucket->state= AZ_STATE_HOT;
   pthread_mutex_unlock(&s->container.thresh_mutex);
   pthread_cond_broadcast(&s->container.threshhold);
 
@@ -158,6 +176,7 @@ static int azio_read(azio_stream *s)
 */
 int azopen(azio_stream *s, const char *path, int Flags, az_method method)
 {
+  unsigned int x;
   int err;
   int level = Z_DEFAULT_COMPRESSION ; /* compression level */
   int strategy = Z_DEFAULT_STRATEGY; /* compression strategy */
@@ -169,14 +188,12 @@ int azopen(azio_stream *s, const char *p
   s->stream.zfree = (free_func)0;
   s->stream.opaque = (voidpf)0;
 
-
-  s->container.offset= 0;
-  s->container.buffer= (void *)s->buffer1;
   s->container.ready= AZ_THREAD_FINISHED;
 
-  s->inbuf= s->buffer1;
-  s->stream.next_in = s->inbuf;
-  s->stream.next_out = s->outbuf;
+  s->inbuf= &s->bucket[0];
+  s->outbuf= &s->bucket2[0];
+  s->stream.next_in = s->inbuf->buffer;
+  s->stream.next_out = s->outbuf->buffer;
   s->z_err = Z_OK;
   s->back = EOF;
   s->crc = crc32(0L, Z_NULL, 0);
@@ -201,7 +218,7 @@ int azopen(azio_stream *s, const char *p
                        Z_DEFLATED, -MAX_WBITS, 8, strategy);
     /* windowBits is passed < 0 to suppress zlib header */
 
-    s->stream.next_out = s->outbuf;
+    s->stream.next_out = s->outbuf->buffer;
     if (err != Z_OK)
     {
       destroy(s);
@@ -209,7 +226,7 @@ int azopen(azio_stream *s, const char *p
     }
   } else {
     /* Threads are only used when we are running with azio */
-    s->stream.next_in  = s->inbuf;
+    s->stream.next_in  = s->inbuf->buffer;
 
     err = inflateInit2(&(s->stream), -MAX_WBITS);
     /* windowBits is passed < 0 to tell that there is no zlib header.
@@ -224,13 +241,10 @@ int azopen(azio_stream *s, const char *p
       return Z_NULL;
     }
   }
-  s->stream.avail_out = AZ_BUFSIZE_WRITE;
+  s->stream.avail_out = AZ_BUFSIZE;
 
   errno = 0;
   s->file = fd < 0 ? my_open(path, Flags, MYF(0)) : fd;
-#ifdef AZIO_AIO
-  s->container.fd= s->file;
-#endif
 
   if (s->file < 0 ) 
   {
@@ -278,6 +292,13 @@ int azopen(azio_stream *s, const char *p
     break;
   }
 
+  for (x= 0; x < AZ_BUCKET_NUM; x++)
+  {
+    s->bucket[x].state= AZ_STATE_COLD;
+    s->bucket[x].fd= s->file;
+    s->bucket[x].thresh_mutex= &s->container.thresh_mutex;
+  }
+
   return 1;
 }
 
@@ -287,7 +308,7 @@ void write_header(azio_stream *s)
   char buffer[AZHEADER_SIZE + AZMETA_BUFFER_SIZE];
   char *ptr= buffer;
 
-  s->block_size= AZ_BUFSIZE_WRITE;
+  s->block_size= AZ_BUFSIZE;
   s->version = (unsigned char)az_magic[1];
   s->minor_version = (unsigned char)az_magic[2];
 
@@ -341,7 +362,7 @@ int get_byte(s)
       /* if (ferror(s->file)) s->z_err = Z_ERRNO; */
       return EOF;
     }
-    s->stream.next_in = s->inbuf;
+    s->stream.next_in = s->inbuf->buffer;
   }
   s->stream.avail_in--;
   return *(s->stream.next_in)++;
@@ -362,13 +383,13 @@ void check_header(azio_stream *s)
     gzip segment */
   len = s->stream.avail_in;
   if (len < 2) {
-    if (len) s->inbuf[0] = s->stream.next_in[0];
+    if (len) s->inbuf->buffer[0] = s->stream.next_in[0];
     errno = 0;
-    len = (uInt)my_pread(s->file, (uchar *)s->inbuf + len, AZ_BUFSIZE_READ >> len, s->pos, MYF(0));
+    len = (uInt)my_pread(s->file, (uchar *)s->inbuf->buffer + len, AZ_BUFSIZE >> len, s->pos, MYF(0));
     s->pos+= len;
     if (len == 0) s->z_err = Z_ERRNO;
     s->stream.avail_in += len;
-    s->stream.next_in = s->inbuf;
+    s->stream.next_in = s->inbuf->buffer;
   }
 
   /* Now we check the actual header */
@@ -506,7 +527,7 @@ unsigned int azread_internal( azio_strea
       {
         s->z_eof = 1;
       }
-      s->stream.next_in = (Bytef *)s->inbuf;
+      s->stream.next_in = (Bytef *)s->inbuf->buffer;
     }
     s->in += s->stream.avail_in;
     s->out += s->stream.avail_out;
@@ -619,15 +640,16 @@ static unsigned int azwrite(azio_stream 
     if (s->stream.avail_out == 0) 
     {
 
-      s->stream.next_out = s->outbuf;
-      if (my_pwrite(s->file, (uchar *)s->outbuf, AZ_BUFSIZE_WRITE, s->pos, 
-                   MYF(0)) != AZ_BUFSIZE_WRITE) 
+      s->stream.next_out = s->outbuf->buffer;
+      if (my_pwrite(s->file, (uchar *)s->outbuf->buffer, 
+                    AZ_BUFSIZE, s->pos, 
+                    MYF(0)) != AZ_BUFSIZE) 
       {
         s->z_err = Z_ERRNO;
         break;
       }
-      s->pos+= AZ_BUFSIZE_WRITE;
-      s->stream.avail_out = AZ_BUFSIZE_WRITE;
+      s->pos+= AZ_BUFSIZE;
+      s->stream.avail_out = AZ_BUFSIZE;
     }
     s->in += s->stream.avail_in;
     s->out += s->stream.avail_out;
@@ -658,11 +680,11 @@ int do_flush (azio_stream *s, int flush)
 
   for (;;) 
   {
-    len = AZ_BUFSIZE_WRITE - s->stream.avail_out;
+    len = AZ_BUFSIZE - s->stream.avail_out;
 
     if (len != 0) 
     {
-      if ((uInt)my_pwrite(s->file, (uchar *)s->outbuf, len, s->pos, MYF(0)) != len) 
+      if ((uInt)my_pwrite(s->file, (uchar *)s->outbuf->buffer, len, s->pos, MYF(0)) != len) 
       {
         s->z_err = Z_ERRNO;
         assert(0);
@@ -670,8 +692,8 @@ int do_flush (azio_stream *s, int flush)
       }
       s->pos+= len;
       s->check_point= s->pos;
-      s->stream.next_out = s->outbuf;
-      s->stream.avail_out = AZ_BUFSIZE_WRITE;
+      s->stream.next_out = s->outbuf->buffer;
+      s->stream.avail_out = AZ_BUFSIZE;
     }
     if (done) break;
     s->out += s->stream.avail_out;
@@ -762,10 +784,13 @@ int azread_init(azio_stream *s)
   /* Put one in the chamber */
   if (s->method != AZ_METHOD_BLOCK)
   {
+    azio_bucket_st *bucket;
     do_aio_cleanup(s);
-    s->container.offset= s->pos;
-    s->container.buffer= (unsigned char *)s->buffer1;
-    azio_read(s);
+
+    bucket= &s->bucket[0];
+    s->inbuf= &s->bucket[1];
+    bucket->offset= s->pos;
+    azio_read(s, bucket);
     s->aio_inited= 1;
   }
 
@@ -788,7 +813,7 @@ int azrewind (s)
   s->z_eof = 0;
   s->back = EOF;
   s->stream.avail_in = 0;
-  s->stream.next_in = (Bytef *)s->inbuf;
+  s->stream.next_in = (Bytef *)s->inbuf->buffer;
   s->crc = crc32(0L, Z_NULL, 0);
   (void)inflateReset(&s->stream);
   s->in = 0;
@@ -826,10 +851,10 @@ size_t azseek (s, offset, whence)
     /* There was a zmemzero here if inbuf was null -Brian */
     while (offset > 0)  
     {
-      uInt size = AZ_BUFSIZE_WRITE;
-      if (offset < AZ_BUFSIZE_WRITE) size = (uInt)offset;
+      uInt size = AZ_BUFSIZE;
+      if (offset < AZ_BUFSIZE) size = (uInt)offset;
 
-      size = azwrite(s, s->inbuf, size);
+      size = azwrite(s, s->inbuf->buffer, size);
       if (size == 0) return -1L;
 
       offset -= size;
@@ -859,10 +884,10 @@ size_t azseek (s, offset, whence)
   }
   while (offset > 0)  {
     int error;
-    unsigned int size = AZ_BUFSIZE_READ;
-    if (offset < AZ_BUFSIZE_READ) size = (int)offset;
+    unsigned int size = AZ_BUFSIZE;
+    if (offset < AZ_BUFSIZE) size = (int)offset;
 
-    size = azread_internal(s, s->outbuf, size, &error);
+    size = azread_internal(s, s->outbuf->buffer, size, &error);
     if (error <= 0) return -1L;
     offset -= size;
   }
@@ -1038,13 +1063,15 @@ static void do_aio_cleanup(azio_stream *
 */
 static void get_block(azio_stream *s)
 {
-#ifdef AZIO_AIO
   if (s->method == AZ_METHOD_AIO && s->mode == 'r' 
       && s->pos < s->check_point
       && s->aio_inited)
   {
+    azio_bucket_st *bucket;
+
     azio_ready(s);
-    s->stream.avail_in= (unsigned int)azio_return(s);
+    bucket= (s->inbuf == &s->bucket[0]) ? &s->bucket[1] : &s->bucket[0];
+    s->stream.avail_in= (unsigned int)bucket->read_size;
     if ((int)(s->stream.avail_in) == -1)
       goto use_pread;
     else if ((int)(s->stream.avail_in) == 0)
@@ -1053,24 +1080,21 @@ static void get_block(azio_stream *s)
       return;
     }
     s->pos+= s->stream.avail_in;
-    s->inbuf= (Byte *)s->container.buffer;
+    s->inbuf= bucket;
     /* We only aio_read when we know there is more data to be read */
     if (s->pos >= s->check_point)
     {
       s->aio_inited= 0;
       return;
     }
-    s->container.buffer= (s->container.buffer == s->buffer2) ? s->buffer1 : s->buffer2;
-    s->container.offset= s->pos;
-    azio_read(s);
+    bucket= (s->inbuf == &s->bucket[0]) ? &s->bucket[1] : &s->bucket[0];
+    bucket->offset= s->pos;
+    azio_read(s, bucket);
   }
   else
-#endif
   {
-#ifdef AZIO_AIO
 use_pread:
-#endif
-    s->stream.avail_in = (uInt)my_pread(s->file, (uchar *)s->inbuf, AZ_BUFSIZE_READ, s->pos, MYF(0));
+    s->stream.avail_in = (uInt)my_pread(s->file, (uchar *)s->inbuf->buffer, AZ_BUFSIZE, s->pos, MYF(0));
     s->pos+= s->stream.avail_in;
   }
 }
diff -Nrup a/storage/archive/azio.h b/storage/archive/azio.h
--- a/storage/archive/azio.h	2007-10-21 23:26:55 -07:00
+++ b/storage/archive/azio.h	2007-10-22 16:49:42 -07:00
@@ -213,8 +213,8 @@ extern "C" {
 /* The deflate compression method (the only one supported in this version) */
 
 #define Z_NULL  0  /* for initializing zalloc, zfree, opaque */
-#define AZ_BUFSIZE_READ 32768
-#define AZ_BUFSIZE_WRITE 16384
+#define AZ_BUFSIZE 32768
+#define AZ_BUCKET_NUM 3
 
 typedef enum {
   AZ_THREAD_FINISHED,
@@ -223,6 +223,11 @@ typedef enum {
 } az_thread_type;
 
 typedef enum {
+  AZ_STATE_COLD,
+  AZ_STATE_HOT,
+} az_state;
+
+typedef enum {
   AZ_METHOD_BLOCK,
   AZ_METHOD_AIO,
 //  AZ_METHOD_THREAD, 
@@ -230,13 +235,19 @@ typedef enum {
 } az_method;
 
 typedef struct azio_container_st azio_container_st;
+typedef struct azio_bucket_st azio_bucket_st;
 
-struct azio_container_st {
+struct azio_bucket_st {
   int fd;
-  az_thread_type ready;
+  Byte buffer[AZ_BUFSIZE];  /* buffer */
   size_t offset;
   size_t read_size;
-  void *buffer;
+  az_state state;
+  pthread_mutex_t *thresh_mutex;
+};
+
+struct azio_container_st {
+  az_thread_type ready;
   pthread_mutex_t thresh_mutex;
   pthread_cond_t threshhold;
   pthread_t mainthread;            /* Thread descriptor */
@@ -248,10 +259,10 @@ typedef struct azio_stream {
   int      z_err;   /* error code for last stream operation */
   int      z_eof;   /* set if end of input file */
   File     file;   /* .gz file */
-  Byte     *inbuf;  /* input buffer */
-  Byte     buffer1[AZ_BUFSIZE_READ];  /* input buffer */
-  Byte     buffer2[AZ_BUFSIZE_READ];  /* input buffer */
-  Byte     outbuf[AZ_BUFSIZE_WRITE]; /* output buffer */
+  azio_bucket_st     *inbuf;  /* input buffer */
+  azio_bucket_st     *outbuf; /* output buffer */
+  azio_bucket_st bucket[AZ_BUCKET_NUM];
+  azio_bucket_st bucket2[1];
   int      aio_inited; /* Are we good to go */
   uLong    crc;     /* crc32 of uncompressed data */
   char     *msg;    /* error message */
Thread
bk commit into 5.2 tree (brian:1.2646)Brian Aker23 Oct