List:Commits« Previous MessageNext Message »
From:stewart Date:November 9 2007 3:29am
Subject:[patch 08/11] WL4081 Allow use of direct IO (O_DIRECT) with azio
View as plain text  
Change azio to either posix_memalign allocate in and out buffers OR
allow user to allocate these. Flag in azio_stream should be where
padding was so no additional memory used.

This enables O_DIRECT usage with azio if aligned buffers are allocated
(by default, the azio allocation - posix_memalign - does this).

Also fiddles with ZLIB internals to get the correct buffer sizes to allocate
for deflate and inflate.

---
 storage/ndb/include/util/azlib.h                  |   24 ++---
 storage/ndb/src/common/util/azio.c                |   89 +++++++++++++++++++---
 storage/ndb/src/kernel/blocks/ndbfs/AsyncFile.cpp |   38 ++++++++-
 storage/ndb/src/kernel/blocks/ndbfs/AsyncFile.hpp |    2 
 zlib/inflate.h                                    |    5 +
 5 files changed, 134 insertions(+), 24 deletions(-)

Index: telco-6.2/storage/ndb/include/util/azlib.h
===================================================================
--- telco-6.2.orig/storage/ndb/include/util/azlib.h	2007-10-19 12:13:17.635229612 +1000
+++ telco-6.2/storage/ndb/include/util/azlib.h	2007-10-19 14:47:37.589398879 +1000
@@ -43,16 +43,9 @@ extern "C" {
 #endif
 /* Start of MySQL Specific Information */
 
-/*
-  ulonglong + ulonglong + ulonglong + ulonglong + uchar
-*/
-#define AZMETA_BUFFER_SIZE sizeof(unsigned long long) \
-  + sizeof(unsigned long long) + sizeof(unsigned long long) + sizeof(unsigned long long)
\
-  + sizeof(unsigned int) + sizeof(unsigned int) \
-  + sizeof(unsigned int) + sizeof(unsigned int) \
-  + sizeof(unsigned char)
 
 #define AZHEADER_SIZE 29
+#define AZMETA_BUFFER_SIZE 512-AZHEADER_SIZE
 
 #define AZ_MAGIC_POS 0
 #define AZ_VERSION_POS 1
@@ -83,6 +76,14 @@ extern "C" {
 #define AZ_STATE_SAVED 2
 #define AZ_STATE_CRASHED 3
 
+size_t az_inflate_mem_size();
+size_t az_deflate_mem_size();
+struct az_alloc_rec {
+  size_t size;
+  size_t mfree;
+  char *mem;
+};
+
 /*
      The 'zlib' compression library provides in-memory compression and
   decompression functions, including integrity checks of the uncompressed
@@ -199,18 +200,18 @@ extern "C" {
 #define AZ_BUFSIZE_READ 32768
 #define AZ_BUFSIZE_WRITE 16384
 
-
 typedef struct azio_stream {
   z_stream stream;
   int      z_err;   /* error code for last stream operation */
   int      z_eof;   /* set if end of input file */
   File     file;   /* .gz file */
-  Byte     inbuf[AZ_BUFSIZE_READ];  /* input buffer */
-  Byte     outbuf[AZ_BUFSIZE_WRITE]; /* output buffer */
+  Byte     *inbuf;  /* input buffer */
+  Byte     *outbuf; /* output buffer */
   uLong    crc;     /* crc32 of uncompressed data */
   char     *msg;    /* error message */
   int      transparent; /* 1 if input file is not a .gz file */
   char     mode;    /* 'w' or 'r' */
+  char     bufalloced; /* true if azio allocated buffers */
   my_off_t  start;   /* start of compressed data in file (header skipped) */
   my_off_t  in;      /* bytes into deflate or inflate */
   my_off_t  out;     /* bytes out of deflate or inflate */
@@ -233,7 +234,6 @@ typedef struct azio_stream {
 } azio_stream;
 
                         /* basic functions */
-
 extern int azopen(azio_stream *s, const char *path, int Flags);
 /*
      Opens a gzip (.gz) file for reading or writing. The mode parameter
Index: telco-6.2/storage/ndb/src/kernel/blocks/ndbfs/AsyncFile.cpp
===================================================================
--- telco-6.2.orig/storage/ndb/src/kernel/blocks/ndbfs/AsyncFile.cpp	2007-10-19
12:13:17.635229612 +1000
+++ telco-6.2/storage/ndb/src/kernel/blocks/ndbfs/AsyncFile.cpp	2007-10-19
16:21:52.866771006 +1000
@@ -172,6 +172,22 @@ AsyncFile::run()
     (((UintPtr)theWriteBufferUnaligned + NDB_O_DIRECT_WRITE_ALIGNMENT - 1) &
      ~(UintPtr)(NDB_O_DIRECT_WRITE_ALIGNMENT - 1));
 
+  azfBufferUnaligned= (Byte*)ndbd_malloc((AZ_BUFSIZE_READ+AZ_BUFSIZE_WRITE)
+                                         +NDB_O_DIRECT_WRITE_ALIGNMENT-1);
+
+  azf.inbuf= (Byte*)(((UintPtr)azfBufferUnaligned
+                      + NDB_O_DIRECT_WRITE_ALIGNMENT - 1) &
+                     ~(UintPtr)(NDB_O_DIRECT_WRITE_ALIGNMENT - 1));
+
+  azf.outbuf= azf.inbuf + AZ_BUFSIZE_READ;
+
+  az_mempool.size = az_mempool.mfree = az_inflate_mem_size()+az_deflate_mem_size();
+
+  ndbout_c("NDBFS/AsyncFile: Allocating %d for In/Deflate buffer",az_mempool.size);
+  az_mempool.mem = (char*) ndbd_malloc(az_mempool.size);
+
+  azf.stream.opaque= &az_mempool;
+
   NdbMutex_Unlock(theStartMutexPtr);
   NdbCondition_Signal(theStartConditionPtr);
   
@@ -580,6 +596,7 @@ no_odirect:
 	}
 	if(n == -1 || n == 0)
 	{
+          ndbout_c("azwrite|write returned %d: errno: %d my_errno: %d",n,errno,my_errno);
 	  break;
 	}
 	size -= n;
@@ -656,7 +673,7 @@ no_odirect:
   }
 #endif
   if(use_gz)
-    if(!azdopen(&azf, theFd, new_flags))
+    if(azdopen(&azf, theFd, new_flags) < 1)
     {
       ndbout_c("Stewart's brain broke");
       abort();
@@ -838,6 +855,7 @@ AsyncFile::extendfile(Request* request) 
                              pbuf,
                              maxSize);
     if ((return_value == -1) || (return_value != maxSize)) {
+      ndbout_c("NDBFS ERROR during extendFile");
       ndbd_free(pbuf,maxSize);
       return -1;
     }
@@ -972,6 +990,8 @@ AsyncFile::writeBuffer(const char * buf,
       bytes_written = 0;
       DEBUG(ndbout_c("EINTR in write"));
     } else if (return_value == -1){
+      if(use_gz)
+        return my_errno;
       return errno;
     } else {
       bytes_written = return_value;
@@ -1024,7 +1044,14 @@ AsyncFile::closeReq(Request * request)
   else
     ::close(theFd);
   use_gz= 0;
+  Byte *a,*b;
+  a= azf.inbuf;
+  b= azf.outbuf;
   memset(&azf,0,sizeof(azf));
+  azf.inbuf= a;
+  azf.outbuf= b;
+  azf.stream.opaque = (void*)&az_mempool;
+
   if (-1 == r) {
 #ifndef DBUG_OFF
     if (theFd == -1) {
@@ -1097,7 +1124,10 @@ AsyncFile::appendReq(Request * request){
       continue;
     }
     if(n == -1){
-      request->error = errno;
+      if(use_gz)
+        request->error = my_errno;
+      else
+        request->error = errno;
       return;
     }
     if(n == 0){
@@ -1225,6 +1255,10 @@ void AsyncFile::endReq()
   // Thread is ended with return
   if (theWriteBufferUnaligned)
     ndbd_free(theWriteBufferUnaligned, theWriteBufferSize);
+
+  if (azfBufferUnaligned)
+    ndbd_free(azfBufferUnaligned, (AZ_BUFSIZE_READ*AZ_BUFSIZE_WRITE)
+              +NDB_O_DIRECT_WRITE_ALIGNMENT-1);
 }
 
 
Index: telco-6.2/storage/ndb/src/kernel/blocks/ndbfs/AsyncFile.hpp
===================================================================
--- telco-6.2.orig/storage/ndb/src/kernel/blocks/ndbfs/AsyncFile.hpp	2007-10-19
12:13:17.635229612 +1000
+++ telco-6.2/storage/ndb/src/kernel/blocks/ndbfs/AsyncFile.hpp	2007-10-19
14:28:29.033066097 +1000
@@ -231,6 +231,7 @@ private:
 
   int use_gz;
   azio_stream azf;
+  struct az_alloc_rec az_mempool;
 
   MemoryChannel<Request> *theReportTo;
   MemoryChannel<Request>* theMemoryChannelPtr;
@@ -242,6 +243,7 @@ private:
   int theWriteBufferSize;
   char* theWriteBuffer;
   void* theWriteBufferUnaligned;
+  void* azfBufferUnaligned;
   
   size_t m_write_wo_sync;  // Writes wo/ sync
   size_t m_auto_sync_freq; // Auto sync freq in bytes
Index: telco-6.2/storage/ndb/src/common/util/azio.c
===================================================================
--- telco-6.2.orig/storage/ndb/src/common/util/azio.c	2007-10-19 12:13:17.635229612 +1000
+++ telco-6.2/storage/ndb/src/common/util/azio.c	2007-10-19 17:20:03.037952036 +1000
@@ -3,18 +3,32 @@
     -Brian Aker
 */
 
+/*
+ * This version has been hacked around by Stewart Smith to get the following:
+ * - Direct IO
+ */
+
 /* gzio.c -- IO on .gz files
  * Copyright (C) 1995-2005 Jean-loup Gailly.
  * For conditions of distribution and use, see copyright notice in zlib.h
  *
  */
 
-/* @(#) $Id$ */
+/**
+ * This is a casual hack to do static memory allocation
+ * (needed by NDB)
+ */
+#include "../../../../zlib/zutil.h"
+#include "../../../../zlib/zconf.h"
+#include "../../../../zlib/inftrees.h"
+#include "../../../../zlib/inflate.h"
+#include "../../../../zlib/deflate.h"
 
 #include "azlib.h"
 
 #include <stdio.h>
 #include <string.h>
+#include <stdlib.h>
 
 static int const gz_magic[2] = {0x1f, 0x8b}; /* gzip magic header */
 static int const az_magic[3] = {0xfe, 0x03, 0x01}; /* az magic header */
@@ -27,6 +41,8 @@ static int const az_magic[3] = {0xfe, 0x
 #define COMMENT      0x10 /* bit 4 set: file comment present */
 #define RESERVED     0xE0 /* bits 5..7: reserved */
 
+#define AZ_MEMLEVEL 8
+
 int az_open(azio_stream *s, const char *path, int Flags, File  fd);
 int do_flush(azio_stream *file, int flush);
 int    get_byte(azio_stream *s);
@@ -37,6 +53,40 @@ void putLong(File file, uLong x);
 uLong  getLong(azio_stream *s);
 void read_header(azio_stream *s, unsigned char *buffer);
 
+size_t az_inflate_mem_size()
+{
+  return sizeof(struct inflate_state)
+    + ((1U << MAX_WBITS)*sizeof(unsigned char));
+}
+
+size_t az_deflate_mem_size()
+{
+  return sizeof(deflate_state)
+    + ((1U << MAX_WBITS)*(2*sizeof(Byte)))
+    + ((1U << MAX_WBITS)*sizeof(Pos))
+    + ((1U << (AZ_MEMLEVEL+7))*sizeof(Pos))
+    + ((1U << (AZ_MEMLEVEL+6))*(sizeof(ush)+2));
+}
+
+voidpf az_alloc(voidpf opaque, uInt items, uInt size)
+{
+  struct az_alloc_rec *r = (struct az_alloc_rec*)opaque;
+
+  if((items * size) > r->mfree)
+    abort();
+
+  r->mfree -= items*size;
+
+  return (r->mem + r->size) - r->mfree;
+}
+void az_free(voidpf opaque, voidpf address)
+{
+  // Oh how we hack.
+  struct az_alloc_rec *r = (struct az_alloc_rec*)opaque;
+  r->mfree = r->size;
+}
+
+
 /* ===========================================================================
   Opens a gzip (.gz) file for reading or writing. The mode parameter
   is as in fopen ("rb" or "wb"). The file is given either by file descriptor
@@ -52,9 +102,20 @@ int az_open (azio_stream *s, const char 
   int level = Z_DEFAULT_COMPRESSION; /* compression level */
   int strategy = Z_DEFAULT_STRATEGY; /* compression strategy */
 
-  s->stream.zalloc = (alloc_func)0;
-  s->stream.zfree = (free_func)0;
-  s->stream.opaque = (voidpf)0;
+  s->stream.zalloc = (alloc_func)az_alloc;
+  s->stream.zfree = (free_func)az_free;
+//  s->stream.opaque = (voidpf)r;
+  s->bufalloced = 0;
+  if(!s->inbuf)
+  {
+    err= posix_memalign(&(s->inbuf),512,AZ_BUFSIZE_READ);
+    if(err)
+      return err;
+    err= posix_memalign(&(s->outbuf),512,AZ_BUFSIZE_WRITE);
+    if(err)
+      return err;
+    s->bufalloced = 1;
+  }
   memset(s->inbuf, 0, AZ_BUFSIZE_READ);
   memset(s->outbuf, 0, AZ_BUFSIZE_WRITE);
   s->stream.next_in = s->inbuf;
@@ -84,7 +145,7 @@ int az_open (azio_stream *s, const char 
   if (s->mode == 'w') 
   {
     err = deflateInit2(&(s->stream), level,
-                       Z_DEFLATED, -MAX_WBITS, 8, strategy);
+                       Z_DEFLATED, -MAX_WBITS, AZ_MEMLEVEL, strategy);
     /* windowBits is passed < 0 to suppress zlib header */
 
     s->stream.next_out = s->outbuf;
@@ -139,10 +200,9 @@ int az_open (azio_stream *s, const char 
   }
   else if (s->mode == 'w') 
   {
-    uchar buffer[AZHEADER_SIZE + AZMETA_BUFFER_SIZE];
-    my_pread(s->file, buffer, AZHEADER_SIZE + AZMETA_BUFFER_SIZE, 0,
+    my_pread(s->file, s->inbuf, AZHEADER_SIZE + AZMETA_BUFFER_SIZE, 0,
              MYF(0));
-    read_header(s, buffer); /* skip the .az header */
+    read_header(s, s->inbuf); /* skip the .az header */
     my_seek(s->file, 0, MY_SEEK_END, MYF(0));
   }
   else
@@ -156,7 +216,7 @@ int az_open (azio_stream *s, const char 
 
 void write_header(azio_stream *s)
 {
-  char buffer[AZHEADER_SIZE + AZMETA_BUFFER_SIZE];
+  char *buffer= (char*)s->outbuf;
   char *ptr= buffer;
 
   s->block_size= AZ_BUFSIZE_WRITE;
@@ -378,6 +438,12 @@ int destroy (s)
 
   if (s->z_err < 0) err = s->z_err;
 
+  if(s->bufalloced)
+  {
+    free(s->inbuf);
+    free(s->outbuf);
+  }
+
   return err;
 }
 
@@ -569,14 +635,17 @@ int do_flush (azio_stream *s, int flush)
   {
     len = AZ_BUFSIZE_WRITE - s->stream.avail_out;
 
+    memset(s->outbuf+len,0,s->stream.avail_out);
+
     if (len != 0) 
     {
       s->check_point= my_tell(s->file, MYF(0));
-      if ((uInt)my_write(s->file, (uchar *)s->outbuf, len, MYF(0)) != len) 
+      if ((uInt)my_write(s->file, (uchar *)s->outbuf, AZ_BUFSIZE_WRITE, MYF(0)) !=
len) 
       {
         s->z_err = Z_ERRNO;
         return Z_ERRNO;
       }
+      my_seek(s->file, s->stream.avail_out, SEEK_CUR, MYF(0));
       s->stream.next_out = s->outbuf;
       s->stream.avail_out = AZ_BUFSIZE_WRITE;
     }
Index: telco-6.2/zlib/inflate.h
===================================================================
--- telco-6.2.orig/zlib/inflate.h	2007-10-19 14:28:49.534071601 +1000
+++ telco-6.2/zlib/inflate.h	2007-10-19 14:29:25.535837366 +1000
@@ -16,6 +16,9 @@
 #  define GUNZIP
 #endif
 
+#ifndef __ZLIB_INFLATE_H__
+#define __ZLIB_INFLATE_H__
+
 /* Possible inflate modes between inflate() calls */
 typedef enum {
     HEAD,       /* i: waiting for magic header */
@@ -113,3 +116,5 @@ struct inflate_state {
     unsigned short work[288];   /* work area for code table building */
     code codes[ENOUGH];         /* space for code tables */
 };
+
+#endif

-- 
Stewart Smith
Thread
[patch 00/11] WL4081 NDB Compressed LCP and Backupstewart9 Nov
  • [patch 04/11] WL4081: read compressed backup filesstewart9 Nov
  • [patch 03/11] WL4081: Add compressed file support to AsyncFile (azio) and support compressed backups.stewart9 Nov
  • [patch 01/11] WL4081: Copy azio for NDBstewart9 Nov
  • [patch 05/11] WL4081: add support for *storing* compressed LCPstewart9 Nov
  • [patch 07/11] WL4081 Futz with mtr ndb config to enable compressed lcp, backup and O_DIRECTstewart9 Nov
  • [patch 08/11] WL4081 Allow use of direct IO (O_DIRECT) with aziostewart9 Nov
  • [patch 10/11] Fix mem leak in mgmapi report eventstewart9 Nov
  • [patch 11/11] Massive azio bugfix (mainly for O_DIRECT) and error reportingstewart9 Nov
  • [patch 06/11] WL4081: Add support to AsyncFile for reading zlib compressed files.stewart9 Nov
  • [patch 09/11] Cleanup AsyncFile, make modular and nice to readstewart9 Nov
Re: [patch 10/11] Fix mem leak in mgmapi report eventStewart Smith12 Nov
Re: [patch 08/11] WL4081 Allow use of direct IO (O_DIRECT) withazioStewart Smith12 Nov
Re: [patch 07/11] WL4081 Futz with mtr ndb config to enablecompressed lcp, backup and O_DIRECTStewart Smith12 Nov
Re: [patch 06/11] WL4081: Add support to AsyncFile for readingzlib compressed files.Stewart Smith12 Nov
Re: [patch 09/11] Cleanup AsyncFile, make modular and nice to readStewart Smith12 Nov
Re: [patch 08/11] WL4081 Allow use of direct IO (O_DIRECT)with azioStewart Smith12 Nov