List:Commits« Previous MessageNext Message »
From:stewart Date:November 9 2007 3:29am
Subject:[patch 11/11] Massive azio bugfix (mainly for O_DIRECT) and error reporting
View as plain text  
Also update restore compressed test case.

---
 mysql-test/suite/ndb/r/ndb_restore_compressed.result   |    8 
 storage/ndb/src/common/util/azio.c                     |  350 ++++++++++-------
 storage/ndb/src/kernel/blocks/ndbfs/PosixAsyncFile.cpp |   32 +
 storage/ndb/tools/restore/Restore.cpp                  |    2 
 4 files changed, 248 insertions(+), 144 deletions(-)

Index: telco-6.2/storage/ndb/src/common/util/azio.c
===================================================================
--- telco-6.2.orig/storage/ndb/src/common/util/azio.c	2007-11-09 11:34:25.360495832 +1100
+++ telco-6.2/storage/ndb/src/common/util/azio.c	2007-11-09 11:34:25.860520356 +1100
@@ -5,7 +5,9 @@
 
 /*
  * This version has been hacked around by Stewart Smith to get the following:
- * - Direct IO
+ * - Direct IO (512 byte aligned IO, IO in multibles of 512 bytes)
+ * - No memory dynamic memory allocation (all done on startup)
+ * - A kinda broken flush (see point 1: O_DIRECT) :)
  */
 
 /* gzio.c -- IO on .gz files
@@ -30,6 +32,13 @@
 #include <string.h>
 #include <stdlib.h>
 
+#ifdef HAVE_VALGRIND
+#include <valgrind/memcheck.h>
+#else
+#define VALGRIND_MAKE_MEM_DEFINED(a,b) do {} while(0);
+#define VALGRIND_MAKE_MEM_NOACCESS(a,b) do {} while(0);
+#endif
+
 static int const gz_magic[2] = {0x1f, 0x8b}; /* gzip magic header */
 static int const az_magic[3] = {0xfe, 0x03, 0x01}; /* az magic header */
 
@@ -46,44 +55,52 @@ static int const az_magic[3] = {0xfe, 0x
 int az_open(azio_stream *s, const char *path, int Flags, File  fd);
 int do_flush(azio_stream *file, int flush);
 int    get_byte(azio_stream *s);
-void   check_header(azio_stream *s);
-void write_header(azio_stream *s);
+char* get_block(azio_stream *s, int blksz);
+int   check_header(azio_stream *s);
+int write_header(azio_stream *s);
 int    destroy(azio_stream *s);
-void putLong(File file, uLong x);
+void putLong(azio_stream *s, uLong x);
 uLong  getLong(azio_stream *s);
 void read_header(azio_stream *s, unsigned char *buffer);
 
 size_t az_inflate_mem_size()
 {
-  return sizeof(struct inflate_state)
-    + ((1U << MAX_WBITS)*sizeof(unsigned char));
+  return sizeof(struct inflate_state)            // state
+    + ((1U << MAX_WBITS)*sizeof(unsigned char)); // window
 }
 
 size_t az_deflate_mem_size()
 {
   return sizeof(deflate_state)
-    + ((1U << MAX_WBITS)*(2*sizeof(Byte)))
-    + ((1U << MAX_WBITS)*sizeof(Pos))
-    + ((1U << (AZ_MEMLEVEL+7))*sizeof(Pos))
-    + ((1U << (AZ_MEMLEVEL+6))*(sizeof(ush)+2));
+    + ((1U << MAX_WBITS)*(2*sizeof(Byte)))       // window = wsize,2*|Byte|
+    + ((1U << MAX_WBITS)*sizeof(Pos))            // prev   = wsize,|Pos|
+    + ((1U << (AZ_MEMLEVEL+7))*sizeof(Pos))      // head   = hashsize,|Pos|
+    + ((1U << (AZ_MEMLEVEL+6))*(sizeof(ush)+2)); // overlay= lit_bufsize,|ush|+2
 }
 
 voidpf az_alloc(voidpf opaque, uInt items, uInt size)
 {
   struct az_alloc_rec *r = (struct az_alloc_rec*)opaque;
 
-  if((items * size) > r->mfree)
+  if((items * size) > r->mfree || r->mfree==0)
     abort();
 
+  assert(r->mfree <= r->size);
+
+  void * retval= (r->mem + r->size) - r->mfree;
+  memset(retval, 0, items*size);
+  VALGRIND_MAKE_MEM_DEFINED(retval,items*size);
   r->mfree -= items*size;
 
-  return (r->mem + r->size) - r->mfree;
+  return retval;
 }
 void az_free(voidpf opaque, voidpf address)
 {
   // Oh how we hack.
   struct az_alloc_rec *r = (struct az_alloc_rec*)opaque;
-  r->mfree = r->size;
+  r->mfree= r->size;
+  if(r->mfree==r->size)
+    VALGRIND_MAKE_MEM_NOACCESS(r->mem,r->size);
 }
 
 
@@ -95,6 +112,10 @@ void az_free(voidpf opaque, voidpf addre
   insufficient memory to allocate the (de)compression state; errno
   can be checked to distinguish the two cases (if errno is zero, the
   zlib error is Z_MEM_ERROR).
+
+  IO errors in my_errno.
+
+  NOTE: If called without a fd, my_open *WILL* malloc()
 */
 int az_open (azio_stream *s, const char *path, int Flags, File fd)
 {
@@ -102,18 +123,21 @@ int az_open (azio_stream *s, const char 
   int level = Z_DEFAULT_COMPRESSION; /* compression level */
   int strategy = Z_DEFAULT_STRATEGY; /* compression strategy */
 
-  s->stream.zalloc = (alloc_func)az_alloc;
-  s->stream.zfree = (free_func)az_free;
+  if(s->stream.opaque)
+  {
+    s->stream.zalloc = (alloc_func)az_alloc;
+    s->stream.zfree = (free_func)az_free;
+  }
 //  s->stream.opaque = (voidpf)r;
   s->bufalloced = 0;
   if(!s->inbuf)
   {
     err= posix_memalign(&(s->inbuf),512,AZ_BUFSIZE_READ);
     if(err)
-      return err;
+      return -err;
     err= posix_memalign(&(s->outbuf),512,AZ_BUFSIZE_WRITE);
     if(err)
-      return err;
+      return -err;
     s->bufalloced = 1;
   }
   memset(s->inbuf, 0, AZ_BUFSIZE_READ);
@@ -195,26 +219,31 @@ int az_open (azio_stream *s, const char 
     s->frm_length= 0;
     s->dirty= 1; /* We create the file dirty */
     s->start = AZHEADER_SIZE + AZMETA_BUFFER_SIZE;
-    write_header(s);
-    my_seek(s->file, 0, MY_SEEK_END, MYF(0));
+    if(write_header(s))
+      return Z_NULL;
+    if(my_seek(s->file, 0, MY_SEEK_END, MYF(0)) == MY_FILEPOS_ERROR)
+      return Z_NULL;
   }
-  else if (s->mode == 'w') 
+  else if (s->mode == 'w')
   {
-    my_pread(s->file, s->inbuf, AZHEADER_SIZE + AZMETA_BUFFER_SIZE, 0,
-             MYF(0));
+    if(my_pread(s->file, s->inbuf, AZHEADER_SIZE + AZMETA_BUFFER_SIZE, 0,
+                MYF(0)) < AZHEADER_SIZE + AZMETA_BUFFER_SIZE)
+      return Z_NULL;
     read_header(s, s->inbuf); /* skip the .az header */
-    my_seek(s->file, 0, MY_SEEK_END, MYF(0));
+    if(my_seek(s->file, 0, MY_SEEK_END, MYF(0)) == MY_FILEPOS_ERROR)
+      return Z_NULL;
   }
   else
   {
-    check_header(s); /* skip the .az header */
+    if(check_header(s)!=0) /* skip the .az header */
+      return Z_NULL;
   }
 
   return 1;
 }
 
 
-void write_header(azio_stream *s)
+int write_header(azio_stream *s)
 {
   char *buffer= (char*)s->outbuf;
   char *ptr= buffer;
@@ -250,8 +279,11 @@ void write_header(azio_stream *s)
   *(ptr + AZ_DIRTY_POS)= (unsigned char)s->dirty; /* Start of Data Block Index Block
*/
 
   /* Always begin at the begining, and end there as well */
-  my_pwrite(s->file, (uchar*) buffer, AZHEADER_SIZE + AZMETA_BUFFER_SIZE, 0,
-            MYF(0));
+  if(my_pwrite(s->file, (uchar*) buffer, AZHEADER_SIZE + AZMETA_BUFFER_SIZE, 0,
+               MYF(0)) == -1)
+    return -1;
+
+  return 0;
 }
 
 /* ===========================================================================
@@ -273,31 +305,64 @@ int azdopen(azio_stream *s, File fd, int
   return az_open (s, NULL, Flags, fd);
 }
 
-/* ===========================================================================
-  Read a byte from a azio_stream; update next_in and avail_in. Return EOF
-  for end of file.
-  IN assertion: the stream s has been sucessfully opened for reading.
-*/
-int get_byte(s)
-  azio_stream *s;
+/*
+  Read from azio_stream into buffer.
+  Reads up to AZ_BUFSIZE_READ bytes.
+
+  Number of Bytes read is in: s->stream.avail_in
+
+  return 0 on success
+ */
+int read_buffer(azio_stream *s)
 {
   if (s->z_eof) return EOF;
-  if (s->stream.avail_in == 0) 
+  my_errno= 0;
+  if (s->stream.avail_in == 0)
   {
-    errno = 0;
     s->stream.avail_in = my_read(s->file, (uchar *)s->inbuf, AZ_BUFSIZE_READ,
MYF(0));
-    if (s->stream.avail_in == 0) 
+    if(s->stream.avail_in > 0)
+      my_errno= 0;
+    if (s->stream.avail_in == 0)
     {
       s->z_eof = 1;
-      /* if (ferror(s->file)) s->z_err = Z_ERRNO; */
-      return EOF;
     }
     s->stream.next_in = s->inbuf;
   }
+  return my_errno;
+}
+
+/*
+  Read a byte from a azio_stream; update next_in and avail_in.
+
+  returns EOF on error;
+*/
+int get_byte(s)
+  azio_stream *s;
+{
+  if (s->stream.avail_in == 0)
+    if(read_buffer(s))
+      return EOF;
   s->stream.avail_in--;
   return *(s->stream.next_in)++;
 }
 
+/*
+ * Gets block of size blksz
+ * *MUST* be < buffer size
+ * *MUST* be aligned to IO size (i.e. not be only partially in buffer)
+ */
+char* get_block(azio_stream *s,int blksz)
+{
+  char *r= s->stream.next_in;
+  if (s->stream.avail_in == 0)
+    if(read_buffer(s))
+      return NULL;
+  s->stream.avail_in-=blksz;
+  s->stream.next_in+=blksz;
+
+  return r;
+}
+
 /* ===========================================================================
   Check the gzip header of a azio_stream opened for reading. Set the stream
   mode to transparent if the gzip magic header is not present; set s->err
@@ -307,33 +372,21 @@ int get_byte(s)
   s->stream.avail_in is zero for the first time, but may be non-zero
   for concatenated .gz files.
 */
-void check_header(azio_stream *s)
+int check_header(azio_stream *s)
 {
   int method; /* method uchar */
   int flags;  /* flags uchar */
   uInt len;
   int c;
 
-  /* Assure two bytes in the buffer so we can peek ahead -- handle case
-    where first byte of header is at the end of the buffer after the last
-    gzip segment */
-  len = s->stream.avail_in;
-  if (len < 2) {
-    if (len) s->inbuf[0] = s->stream.next_in[0];
-    errno = 0;
-    len = (uInt)my_read(s->file, (uchar *)s->inbuf + len, AZ_BUFSIZE_READ >>
len, MYF(0));
-    if (len == 0) s->z_err = Z_ERRNO;
-    s->stream.avail_in += len;
-    s->stream.next_in = s->inbuf;
-    if (s->stream.avail_in < 2) {
-      s->transparent = s->stream.avail_in;
-      return;
-    }
-  }
+  if(s->stream.avail_in==0)
+    if(c= read_buffer(s))
+      return 0;
 
   /* Peek ahead to check the gzip magic header */
   if ( s->stream.next_in[0] == gz_magic[0]  && s->stream.next_in[1] ==
gz_magic[1])
   {
+    abort(); // FIXME: stewart broke it, massively
     s->stream.avail_in -= 2;
     s->stream.next_in += 2;
     s->version= (unsigned char)2;
@@ -343,7 +396,7 @@ void check_header(azio_stream *s)
     flags = get_byte(s);
     if (method != Z_DEFLATED || (flags & RESERVED) != 0) {
       s->z_err = Z_DATA_ERROR;
-      return;
+      return 0;
     }
 
     /* Discard time, xflags and OS code: */
@@ -365,26 +418,33 @@ void check_header(azio_stream *s)
       for (len = 0; len < 2; len++) (void)get_byte(s);
     }
     s->z_err = s->z_eof ? Z_DATA_ERROR : Z_OK;
-    s->start = my_tell(s->file, MYF(0)) - s->stream.avail_in;
+    s->start = my_tell(s->file, MYF(0));
+    if(s->start == -1)
+      return my_errno;
+    s->start-= s->stream.avail_in;
   }
-  else if ( s->stream.next_in[0] == az_magic[0]  && s->stream.next_in[1] ==
az_magic[1])
+  else if (    s->stream.next_in[0] == az_magic[0]
+            && s->stream.next_in[1] == az_magic[1])
   {
-    unsigned char buffer[AZHEADER_SIZE + AZMETA_BUFFER_SIZE];
-
-    for (len = 0; len < (AZHEADER_SIZE + AZMETA_BUFFER_SIZE); len++) 
-      buffer[len]= get_byte(s);
-    s->z_err = s->z_eof ? Z_DATA_ERROR : Z_OK;
-    read_header(s, buffer);
-    for (; len < s->start; len++) 
-      get_byte(s);
+    if(s->stream.avail_in < AZHEADER_SIZE + AZMETA_BUFFER_SIZE)
+    {
+      s->z_err = Z_DATA_ERROR;
+      return s->z_err;
+    }
+    char *header_block = get_block(s,512);
+    if(!header_block)
+      return my_errno;
+    read_header(s, header_block);
   }
   else
   {
     s->transparent = 1;
     s->z_err = Z_OK;
 
-    return;
+    return 0;
   }
+
+  return 0;
 }
 
 void read_header(azio_stream *s, unsigned char *buffer)
@@ -458,19 +518,19 @@ unsigned int ZEXPORT azread ( azio_strea
   *error= 0;
 
   if (s->mode != 'r')
-  { 
+  {
     *error= Z_STREAM_ERROR;
     return 0;
   }
 
   if (s->z_err == Z_DATA_ERROR || s->z_err == Z_ERRNO)
-  { 
+  {
     *error= s->z_err;
     return 0;
   }
 
   if (s->z_err == Z_STREAM_END)  /* EOF */
-  { 
+  {
     return 0;
   }
 
@@ -487,12 +547,11 @@ unsigned int ZEXPORT azread ( azio_strea
     start++;
     if (s->last) {
       s->z_err = Z_STREAM_END;
-      { 
+      {
         return 1;
       }
     }
   }
-
   while (s->stream.avail_out != 0) {
 
     if (s->transparent) {
@@ -515,20 +574,18 @@ unsigned int ZEXPORT azread ( azio_strea
       len -= s->stream.avail_out;
       s->in  += len;
       s->out += len;
-      if (len == 0) s->z_eof = 1;
-      { 
-        return len;
-      }
+      if (len == 0)
+        s->z_eof = 1;
+      return len;
     }
-    if (s->stream.avail_in == 0 && !s->z_eof) {
 
-      errno = 0;
-      s->stream.avail_in = (uInt)my_read(s->file, (uchar *)s->inbuf,
AZ_BUFSIZE_READ, MYF(0));
-      if (s->stream.avail_in == 0) 
+
+    if (s->stream.avail_in == 0 && !s->z_eof) {
+      read_buffer(s);
+      if (s->stream.avail_in == 0)
       {
         s->z_eof = 1;
       }
-      s->stream.next_in = (Bytef *)s->inbuf;
     }
     s->in += s->stream.avail_in;
     s->out += s->stream.avail_out;
@@ -541,20 +598,21 @@ unsigned int ZEXPORT azread ( azio_strea
       s->crc = crc32(s->crc, start, (uInt)(s->stream.next_out - start));
       start = s->stream.next_out;
 
-      if (getLong(s) != s->crc) {
+      uInt gotcrc = getLong(s);
+      if (gotcrc != s->crc) {
         s->z_err = Z_DATA_ERROR;
       } else {
         (void)getLong(s);
         /* The uncompressed length returned by above getlong() may be
          * different from s->out in case of concatenated .gz files.
          * Check for such files:
-       */
+         *//*
         check_header(s);
         if (s->z_err == Z_OK) 
         {
           inflateReset(&(s->stream));
           s->crc = crc32(0L, Z_NULL, 0);
-        }
+          }*/
       }
     }
     if (s->z_err != Z_OK || s->z_eof) break;
@@ -565,13 +623,46 @@ unsigned int ZEXPORT azread ( azio_strea
       (s->z_err == Z_DATA_ERROR || s->z_err == Z_ERRNO))
   {
     *error= s->z_err;
-
     return 0;
   }
 
   return (len - s->stream.avail_out);
 }
 
+/*
+  Write last remaining 512 byte block of write buffer, 0 padded.
+ */
+int flush_write_buffer(azio_stream *s)
+{
+  uInt real_len = AZ_BUFSIZE_WRITE - s->stream.avail_out;
+  uInt len = ((real_len+0x1FF)>>9)<<9;
+
+  memset(s->outbuf+real_len, 0, s->stream.avail_out);
+
+  s->check_point= my_tell(s->file, MYF(0));
+
+  my_write(s->file,(uchar*)s->outbuf,len,MYF(0));
+
+  s->dirty= AZ_STATE_CLEAN;
+
+  return 0;
+}
+
+int write_buffer(azio_stream *s)
+{
+  if (s->stream.avail_out == 0)
+  {
+    s->stream.next_out = s->outbuf;
+    if (my_write(s->file, (uchar *)s->outbuf, AZ_BUFSIZE_WRITE,
+                 MYF(0)) != AZ_BUFSIZE_WRITE)
+    {
+      s->z_err = Z_ERRNO;
+      return my_errno;
+    }
+    s->stream.avail_out = AZ_BUFSIZE_WRITE;
+  }
+  return 0;
+}
 
 /* ===========================================================================
   Writes the given number of uncompressed bytes into the compressed file.
@@ -579,25 +670,19 @@ unsigned int ZEXPORT azread ( azio_strea
 */
 unsigned int azwrite (azio_stream *s, const void*  buf, unsigned int len)
 {
+  int i;
   s->stream.next_in = (Bytef*)buf;
   s->stream.avail_in = len;
 
   s->rows++;
 
-  while (s->stream.avail_in != 0) 
-  {
-    if (s->stream.avail_out == 0) 
-    {
+  for(i=0;i<len;i++)
+    memcmp(buf,s,1);
 
-      s->stream.next_out = s->outbuf;
-      if (my_write(s->file, (uchar *)s->outbuf, AZ_BUFSIZE_WRITE, 
-                   MYF(0)) != AZ_BUFSIZE_WRITE) 
-      {
-        s->z_err = Z_ERRNO;
-        break;
-      }
-      s->stream.avail_out = AZ_BUFSIZE_WRITE;
-    }
+  while (s->stream.avail_in != 0)
+  {
+    if(write_buffer(s))
+      return 0;
     s->in += s->stream.avail_in;
     s->out += s->stream.avail_out;
     s->z_err = deflate(&(s->stream), Z_NO_FLUSH);
@@ -631,24 +716,11 @@ int do_flush (azio_stream *s, int flush)
 
   s->stream.avail_in = 0; /* should be zero already anyway */
 
-  for (;;) 
+  for (;;)
   {
     len = AZ_BUFSIZE_WRITE - s->stream.avail_out;
+    write_buffer(s);
 
-    memset(s->outbuf+len,0,s->stream.avail_out);
-
-    if (len != 0) 
-    {
-      s->check_point= my_tell(s->file, MYF(0));
-      if ((uInt)my_write(s->file, (uchar *)s->outbuf, AZ_BUFSIZE_WRITE, MYF(0)) !=
len) 
-      {
-        s->z_err = Z_ERRNO;
-        return Z_ERRNO;
-      }
-      my_seek(s->file, s->stream.avail_out, SEEK_CUR, MYF(0));
-      s->stream.next_out = s->outbuf;
-      s->stream.avail_out = AZ_BUFSIZE_WRITE;
-    }
     if (done) break;
     s->out += s->stream.avail_out;
     s->z_err = deflate(&(s->stream), flush);
@@ -670,10 +742,14 @@ int do_flush (azio_stream *s, int flush)
   else
     s->dirty= AZ_STATE_SAVED; /* Mark it clean, we should be good now */
 
-  afterwrite_pos= my_tell(s->file, MYF(0));
-  write_header(s);
-  my_seek(s->file, afterwrite_pos, SEEK_SET, MYF(0));
-
+/*  afterwrite_pos= my_tell(s->file, MYF(0));
+  if(afterwrite_pos == -1)
+    return Z_ERRNO;
+  if(write_header(s) == -1)
+    return Z_ERRNO;
+  if(my_seek(s->file, afterwrite_pos, SEEK_SET, MYF(0)) == MY_FILEPOS_ERROR)
+    return Z_ERRNO;
+*/
   return  s->z_err == Z_STREAM_END ? Z_OK : s->z_err;
 }
 
@@ -686,8 +762,9 @@ int ZEXPORT azflush (s, flush)
   if (s->mode == 'r') 
   {
     unsigned char buffer[AZHEADER_SIZE + AZMETA_BUFFER_SIZE];
-    my_pread(s->file, (uchar*) buffer, AZHEADER_SIZE + AZMETA_BUFFER_SIZE, 0,
-             MYF(0));
+    if(my_pread(s->file, (uchar*) buffer, AZHEADER_SIZE + AZMETA_BUFFER_SIZE, 0,
+                MYF(0)) == -1)
+      return Z_ERRNO;
     read_header(s, buffer); /* skip the .az header */
 
     return Z_OK;
@@ -698,7 +775,8 @@ int ZEXPORT azflush (s, flush)
     err= do_flush(s, flush);
 
     if (err) return err;
-    my_sync(s->file, MYF(0));
+    if(my_sync(s->file, MYF(0)) == -1)
+      return Z_ERRNO;
     return  s->z_err == Z_STREAM_END ? Z_OK : s->z_err;
   }
 }
@@ -818,17 +896,18 @@ my_off_t ZEXPORT aztell (file)
 
 
 /* ===========================================================================
-  Outputs a long in LSB order to the given file
+  Outputs a long in LSB order to the given azio_stream
 */
-void putLong (File file, uLong x)
+void putLong (azio_stream *s, uLong x)
 {
   int n;
-  uchar buffer[1];
 
-  for (n = 0; n < 4; n++) 
+  for (n = 0; n < 4; n++)
   {
-    buffer[0]= (int)(x & 0xff);
-    my_write(file, buffer, 1, MYF(0));
+    s->stream.avail_out--;
+    *(s->stream.next_out) = x & 0xff;
+    s->stream.next_out++;
+    write_buffer(s);
     x >>= 8;
   }
 }
@@ -863,14 +942,19 @@ int azclose (azio_stream *s)
 
   if (s->mode == 'w') 
   {
-    if (do_flush(s, Z_FINISH) != Z_OK)
+    int r= do_flush(s, Z_FINISH);
+    if(r!= Z_OK)
+    {
       return destroy(s);
+    }
+
+    putLong(s, s->crc);
+    putLong(s, (uLong)(s->in & 0xffffffff));
+    putLong(s, 0x4E444244);
+
+    flush_write_buffer(s);
 
-    putLong(s->file, s->crc);
-    putLong(s->file, (uLong)(s->in & 0xffffffff));
-    s->dirty= AZ_STATE_CLEAN;
-    s->check_point= my_tell(s->file, MYF(0));
-    write_header(s);
+//    write_header(s);
   }
 
   return destroy(s);
Index: telco-6.2/storage/ndb/src/kernel/blocks/ndbfs/PosixAsyncFile.cpp
===================================================================
--- telco-6.2.orig/storage/ndb/src/kernel/blocks/ndbfs/PosixAsyncFile.cpp	2007-11-09
11:34:25.360495832 +1100
+++ telco-6.2/storage/ndb/src/kernel/blocks/ndbfs/PosixAsyncFile.cpp	2007-11-09
11:34:25.860520356 +1100
@@ -249,7 +249,6 @@ no_odirect:
   theFd = ::open(theFileName.c_str(), new_flags, mode);
   if (-1 == theFd)
   {
-    ndbout_c("1 ERROR opening %s",theFileName.c_str());
     PRINT_ERRORANDFLAGS(new_flags);
     if ((errno == ENOENT) && (new_flags & O_CREAT))
     {
@@ -264,7 +263,6 @@ no_odirect:
 	  goto no_odirect;
 	}
 #endif
-        ndbout_c("2 ERROR opening %s",theFileName.c_str());
         PRINT_ERRORANDFLAGS(new_flags);
         request->error = errno;
 	return;
@@ -427,11 +425,15 @@ no_odirect:
 #endif
   }
   if(use_gz)
-    if(azdopen(&azf, theFd, new_flags) < 1)
+  {
+    int err;
+    if((err= azdopen(&azf, theFd, new_flags)) < 1)
     {
-      ndbout_c("Stewart's brain broke");
+      ndbout_c("Stewart's brain broke: %d %d %s",
+               err, my_errno, theFileName.c_str());
       abort();
     }
+  }
 }
 
 int PosixAsyncFile::readBuffer(Request *req, char *buf,
@@ -481,7 +483,14 @@ int PosixAsyncFile::readBuffer(Request *
     if (return_value == -1 && errno == EINTR) {
       DEBUG(ndbout_c("EINTR in read"));
       continue;
-    } else if (return_value == -1){
+    } else if (return_value < 1){
+      if(my_errno==0 && errno==0 && error==0 &&
azf.z_err==Z_STREAM_END)
+        break;
+      DEBUG(ndbout_c("ERROR DURING %sRead: %d off: %d from
%s",(use_gz)?"gz":"",size,offset,theFileName.c_str()));
+      ndbout_c("ERROR IN PosixAsyncFile::readBuffer %d %d %d %d",
+               my_errno, errno, azf.z_err, error);
+      if(use_gz)
+        return my_errno;
       return errno;
     } else {
       bytes_read = return_value;
@@ -575,7 +584,9 @@ int PosixAsyncFile::writeBuffer(const ch
     if (return_value == -1 && errno == EINTR) {
       bytes_written = 0;
       DEBUG(ndbout_c("EINTR in write"));
-    } else if (return_value == -1){
+    } else if (return_value == -1 || return_value < 1){
+      ndbout_c("ERROR IN PosixAsyncFile::writeBuffer %d %d %d",
+               my_errno, errno, azf.z_err);
       if(use_gz)
         return my_errno;
       return errno;
@@ -612,7 +623,7 @@ void PosixAsyncFile::closeReq(Request *r
   if(use_gz)
     r= azclose(&azf);
   else
-    ::close(theFd);
+    r= ::close(theFd);
   use_gz= 0;
   Byte *a,*b;
   a= azf.inbuf;
@@ -748,6 +759,13 @@ void PosixAsyncFile::endReq()
   if (azfBufferUnaligned)
     ndbd_free(azfBufferUnaligned, (AZ_BUFSIZE_READ*AZ_BUFSIZE_WRITE)
               +NDB_O_DIRECT_WRITE_ALIGNMENT-1);
+
+  if(az_mempool.mem)
+    ndbd_free(az_mempool.mem,az_mempool.size);
+
+  az_mempool.mem = NULL;
+  theWriteBufferUnaligned = NULL;
+  azfBufferUnaligned = NULL;
 }
 
 
Index: telco-6.2/mysql-test/suite/ndb/r/ndb_restore_compressed.result
===================================================================
--- telco-6.2.orig/mysql-test/suite/ndb/r/ndb_restore_compressed.result	2007-11-09
11:33:59.859245082 +1100
+++ telco-6.2/mysql-test/suite/ndb/r/ndb_restore_compressed.result	2007-11-09
11:34:25.860520356 +1100
@@ -142,10 +142,10 @@ drop table t1_c,t2_c,t3_c,t4_c,t5_c,t6_c
 ForceVarPart: 0
 ForceVarPart: 1
 select * from information_schema.columns where table_name = "t1_c";
-TABLE_CATALOG	TABLE_SCHEMA	TABLE_NAME	COLUMN_NAME	ORDINAL_POSITION	COLUMN_DEFAULT	IS_NULLABLE	DATA_TYPE	CHARACTER_MAXIMUM_LENGTH	CHARACTER_OCTET_LENGTH	NUMERIC_PRECISION	NUMERIC_SCALE	CHARACTER_SET_NAME	COLLATION_NAME	COLUMN_TYPE	COLUMN_KEY	EXTRA	PRIVILEGES	COLUMN_COMMENT
-NULL	test	t1_c	capgoaledatta	1	NULL	NO	mediumint	NULL	NULL	7	0	NULL	NULL	mediumint(5)
unsigned	PRI	auto_increment	select,insert,update,references	
-NULL	test	t1_c	goaledatta	2		NO	char	2	2	NULL	NULL	latin1	latin1_swedish_ci	char(2)	PRI		select,insert,update,references	
-NULL	test	t1_c	maturegarbagefa	3		NO	varchar	32	32	NULL	NULL	latin1	latin1_swedish_ci	varchar(32)	PRI		select,insert,update,references	
+TABLE_CATALOG	TABLE_SCHEMA	TABLE_NAME	COLUMN_NAME	ORDINAL_POSITION	COLUMN_DEFAULT	IS_NULLABLE	DATA_TYPE	CHARACTER_MAXIMUM_LENGTH	CHARACTER_OCTET_LENGTH	NUMERIC_PRECISION	NUMERIC_SCALE	CHARACTER_SET_NAME	COLLATION_NAME	COLUMN_TYPE	COLUMN_KEY	EXTRA	PRIVILEGES	COLUMN_COMMENT	STORAGE	FORMAT
+NULL	test	t1_c	capgoaledatta	1	NULL	NO	mediumint	NULL	NULL	7	0	NULL	NULL	mediumint(5)
unsigned	PRI	auto_increment	select,insert,update,references		Default	Default
+NULL	test	t1_c	goaledatta	2		NO	char	2	2	NULL	NULL	latin1	latin1_swedish_ci	char(2)	PRI		select,insert,update,references		Default	Default
+NULL	test	t1_c	maturegarbagefa	3		NO	varchar	32	32	NULL	NULL	latin1	latin1_swedish_ci	varchar(32)	PRI		select,insert,update,references		Default	Default
 select count(*) from t1;
 count(*)
 5
Index: telco-6.2/storage/ndb/tools/restore/Restore.cpp
===================================================================
--- telco-6.2.orig/storage/ndb/tools/restore/Restore.cpp	2007-11-09 11:33:59.859245082
+1100
+++ telco-6.2/storage/ndb/tools/restore/Restore.cpp	2007-11-09 13:31:46.205825356 +1100
@@ -761,6 +761,7 @@ RestoreDataIterator::getNextTuple(int  &
 BackupFile::BackupFile(void (* _free_data_callback)()) 
   : free_data_callback(_free_data_callback)
 {
+  memset(&m_file,0,sizeof(m_file));
   m_file.file = -1;
   m_path[0] = 0;
   m_fileName[0] = 0;
@@ -792,6 +793,7 @@ BackupFile::openFile(){
 
   info.setLevel(254);
   info << "Opening file '" << m_fileName << "'\n";
+  memset(&m_file,0,sizeof(m_file));
   int r= azopen(&m_file,m_fileName, O_RDONLY);
 
   if (r==0)

-- 
Stewart Smith
Thread
[patch 00/11] WL4081 NDB Compressed LCP and Backupstewart9 Nov
  • [patch 04/11] WL4081: read compressed backup filesstewart9 Nov
  • [patch 03/11] WL4081: Add compressed file support to AsyncFile (azio) and support compressed backups.stewart9 Nov
  • [patch 01/11] WL4081: Copy azio for NDBstewart9 Nov
  • [patch 05/11] WL4081: add support for *storing* compressed LCPstewart9 Nov
  • [patch 07/11] WL4081 Futz with mtr ndb config to enable compressed lcp, backup and O_DIRECTstewart9 Nov
  • [patch 08/11] WL4081 Allow use of direct IO (O_DIRECT) with aziostewart9 Nov
  • [patch 10/11] Fix mem leak in mgmapi report eventstewart9 Nov
  • [patch 11/11] Massive azio bugfix (mainly for O_DIRECT) and error reportingstewart9 Nov
  • [patch 06/11] WL4081: Add support to AsyncFile for reading zlib compressed files.stewart9 Nov
  • [patch 09/11] Cleanup AsyncFile, make modular and nice to readstewart9 Nov
Re: [patch 10/11] Fix mem leak in mgmapi report eventStewart Smith12 Nov
Re: [patch 08/11] WL4081 Allow use of direct IO (O_DIRECT) withazioStewart Smith12 Nov
Re: [patch 07/11] WL4081 Futz with mtr ndb config to enablecompressed lcp, backup and O_DIRECTStewart Smith12 Nov
Re: [patch 06/11] WL4081: Add support to AsyncFile for readingzlib compressed files.Stewart Smith12 Nov
Re: [patch 09/11] Cleanup AsyncFile, make modular and nice to readStewart Smith12 Nov
Re: [patch 08/11] WL4081 Allow use of direct IO (O_DIRECT)with azioStewart Smith12 Nov