Change azio to either posix_memalign allocate in and out buffers OR
allow user to allocate these. Flag in azio_stream should be where
padding was so no additional memory used.
This enables O_DIRECT usage with azio if aligned buffers are allocated
(by default, the azio allocation - posix_memalign - does this).
Also fiddles with ZLIB internals to get the correct buffer sizes to allocate
for deflate and inflate.
---
storage/ndb/include/util/azlib.h | 24 ++---
storage/ndb/src/common/util/azio.c | 89 +++++++++++++++++++---
storage/ndb/src/kernel/blocks/ndbfs/AsyncFile.cpp | 38 ++++++++-
storage/ndb/src/kernel/blocks/ndbfs/AsyncFile.hpp | 2
zlib/inflate.h | 5 +
5 files changed, 134 insertions(+), 24 deletions(-)
Index: telco-6.2/storage/ndb/include/util/azlib.h
===================================================================
--- telco-6.2.orig/storage/ndb/include/util/azlib.h 2007-10-19 12:13:17.635229612 +1000
+++ telco-6.2/storage/ndb/include/util/azlib.h 2007-10-19 14:47:37.589398879 +1000
@@ -43,16 +43,9 @@ extern "C" {
#endif
/* Start of MySQL Specific Information */
-/*
- ulonglong + ulonglong + ulonglong + ulonglong + uchar
-*/
-#define AZMETA_BUFFER_SIZE sizeof(unsigned long long) \
- + sizeof(unsigned long long) + sizeof(unsigned long long) + sizeof(unsigned long long)
\
- + sizeof(unsigned int) + sizeof(unsigned int) \
- + sizeof(unsigned int) + sizeof(unsigned int) \
- + sizeof(unsigned char)
#define AZHEADER_SIZE 29
+#define AZMETA_BUFFER_SIZE 512-AZHEADER_SIZE
#define AZ_MAGIC_POS 0
#define AZ_VERSION_POS 1
@@ -83,6 +76,14 @@ extern "C" {
#define AZ_STATE_SAVED 2
#define AZ_STATE_CRASHED 3
+size_t az_inflate_mem_size();
+size_t az_deflate_mem_size();
+struct az_alloc_rec {
+ size_t size;
+ size_t mfree;
+ char *mem;
+};
+
/*
The 'zlib' compression library provides in-memory compression and
decompression functions, including integrity checks of the uncompressed
@@ -199,18 +200,18 @@ extern "C" {
#define AZ_BUFSIZE_READ 32768
#define AZ_BUFSIZE_WRITE 16384
-
typedef struct azio_stream {
z_stream stream;
int z_err; /* error code for last stream operation */
int z_eof; /* set if end of input file */
File file; /* .gz file */
- Byte inbuf[AZ_BUFSIZE_READ]; /* input buffer */
- Byte outbuf[AZ_BUFSIZE_WRITE]; /* output buffer */
+ Byte *inbuf; /* input buffer */
+ Byte *outbuf; /* output buffer */
uLong crc; /* crc32 of uncompressed data */
char *msg; /* error message */
int transparent; /* 1 if input file is not a .gz file */
char mode; /* 'w' or 'r' */
+ char bufalloced; /* true if azio allocated buffers */
my_off_t start; /* start of compressed data in file (header skipped) */
my_off_t in; /* bytes into deflate or inflate */
my_off_t out; /* bytes out of deflate or inflate */
@@ -233,7 +234,6 @@ typedef struct azio_stream {
} azio_stream;
/* basic functions */
-
extern int azopen(azio_stream *s, const char *path, int Flags);
/*
Opens a gzip (.gz) file for reading or writing. The mode parameter
Index: telco-6.2/storage/ndb/src/kernel/blocks/ndbfs/AsyncFile.cpp
===================================================================
--- telco-6.2.orig/storage/ndb/src/kernel/blocks/ndbfs/AsyncFile.cpp 2007-10-19
12:13:17.635229612 +1000
+++ telco-6.2/storage/ndb/src/kernel/blocks/ndbfs/AsyncFile.cpp 2007-10-19
16:21:52.866771006 +1000
@@ -172,6 +172,22 @@ AsyncFile::run()
(((UintPtr)theWriteBufferUnaligned + NDB_O_DIRECT_WRITE_ALIGNMENT - 1) &
~(UintPtr)(NDB_O_DIRECT_WRITE_ALIGNMENT - 1));
+ azfBufferUnaligned= (Byte*)ndbd_malloc((AZ_BUFSIZE_READ+AZ_BUFSIZE_WRITE)
+ +NDB_O_DIRECT_WRITE_ALIGNMENT-1);
+
+ azf.inbuf= (Byte*)(((UintPtr)azfBufferUnaligned
+ + NDB_O_DIRECT_WRITE_ALIGNMENT - 1) &
+ ~(UintPtr)(NDB_O_DIRECT_WRITE_ALIGNMENT - 1));
+
+ azf.outbuf= azf.inbuf + AZ_BUFSIZE_READ;
+
+ az_mempool.size = az_mempool.mfree = az_inflate_mem_size()+az_deflate_mem_size();
+
+ ndbout_c("NDBFS/AsyncFile: Allocating %d for In/Deflate buffer",az_mempool.size);
+ az_mempool.mem = (char*) ndbd_malloc(az_mempool.size);
+
+ azf.stream.opaque= &az_mempool;
+
NdbMutex_Unlock(theStartMutexPtr);
NdbCondition_Signal(theStartConditionPtr);
@@ -580,6 +596,7 @@ no_odirect:
}
if(n == -1 || n == 0)
{
+ ndbout_c("azwrite|write returned %d: errno: %d my_errno: %d",n,errno,my_errno);
break;
}
size -= n;
@@ -656,7 +673,7 @@ no_odirect:
}
#endif
if(use_gz)
- if(!azdopen(&azf, theFd, new_flags))
+ if(azdopen(&azf, theFd, new_flags) < 1)
{
ndbout_c("Stewart's brain broke");
abort();
@@ -838,6 +855,7 @@ AsyncFile::extendfile(Request* request)
pbuf,
maxSize);
if ((return_value == -1) || (return_value != maxSize)) {
+ ndbout_c("NDBFS ERROR during extendFile");
ndbd_free(pbuf,maxSize);
return -1;
}
@@ -972,6 +990,8 @@ AsyncFile::writeBuffer(const char * buf,
bytes_written = 0;
DEBUG(ndbout_c("EINTR in write"));
} else if (return_value == -1){
+ if(use_gz)
+ return my_errno;
return errno;
} else {
bytes_written = return_value;
@@ -1024,7 +1044,14 @@ AsyncFile::closeReq(Request * request)
else
::close(theFd);
use_gz= 0;
+ Byte *a,*b;
+ a= azf.inbuf;
+ b= azf.outbuf;
memset(&azf,0,sizeof(azf));
+ azf.inbuf= a;
+ azf.outbuf= b;
+ azf.stream.opaque = (void*)&az_mempool;
+
if (-1 == r) {
#ifndef DBUG_OFF
if (theFd == -1) {
@@ -1097,7 +1124,10 @@ AsyncFile::appendReq(Request * request){
continue;
}
if(n == -1){
- request->error = errno;
+ if(use_gz)
+ request->error = my_errno;
+ else
+ request->error = errno;
return;
}
if(n == 0){
@@ -1225,6 +1255,10 @@ void AsyncFile::endReq()
// Thread is ended with return
if (theWriteBufferUnaligned)
ndbd_free(theWriteBufferUnaligned, theWriteBufferSize);
+
+ if (azfBufferUnaligned)
+ ndbd_free(azfBufferUnaligned, (AZ_BUFSIZE_READ*AZ_BUFSIZE_WRITE)
+ +NDB_O_DIRECT_WRITE_ALIGNMENT-1);
}
Index: telco-6.2/storage/ndb/src/kernel/blocks/ndbfs/AsyncFile.hpp
===================================================================
--- telco-6.2.orig/storage/ndb/src/kernel/blocks/ndbfs/AsyncFile.hpp 2007-10-19
12:13:17.635229612 +1000
+++ telco-6.2/storage/ndb/src/kernel/blocks/ndbfs/AsyncFile.hpp 2007-10-19
14:28:29.033066097 +1000
@@ -231,6 +231,7 @@ private:
int use_gz;
azio_stream azf;
+ struct az_alloc_rec az_mempool;
MemoryChannel<Request> *theReportTo;
MemoryChannel<Request>* theMemoryChannelPtr;
@@ -242,6 +243,7 @@ private:
int theWriteBufferSize;
char* theWriteBuffer;
void* theWriteBufferUnaligned;
+ void* azfBufferUnaligned;
size_t m_write_wo_sync; // Writes wo/ sync
size_t m_auto_sync_freq; // Auto sync freq in bytes
Index: telco-6.2/storage/ndb/src/common/util/azio.c
===================================================================
--- telco-6.2.orig/storage/ndb/src/common/util/azio.c 2007-10-19 12:13:17.635229612 +1000
+++ telco-6.2/storage/ndb/src/common/util/azio.c 2007-10-19 17:20:03.037952036 +1000
@@ -3,18 +3,32 @@
-Brian Aker
*/
+/*
+ * This version has been hacked around by Stewart Smith to get the following:
+ * - Direct IO
+ */
+
/* gzio.c -- IO on .gz files
* Copyright (C) 1995-2005 Jean-loup Gailly.
* For conditions of distribution and use, see copyright notice in zlib.h
*
*/
-/* @(#) $Id$ */
+/**
+ * This is a casual hack to do static memory allocation
+ * (needed by NDB)
+ */
+#include "../../../../zlib/zutil.h"
+#include "../../../../zlib/zconf.h"
+#include "../../../../zlib/inftrees.h"
+#include "../../../../zlib/inflate.h"
+#include "../../../../zlib/deflate.h"
#include "azlib.h"
#include <stdio.h>
#include <string.h>
+#include <stdlib.h>
static int const gz_magic[2] = {0x1f, 0x8b}; /* gzip magic header */
static int const az_magic[3] = {0xfe, 0x03, 0x01}; /* az magic header */
@@ -27,6 +41,8 @@ static int const az_magic[3] = {0xfe, 0x
#define COMMENT 0x10 /* bit 4 set: file comment present */
#define RESERVED 0xE0 /* bits 5..7: reserved */
+#define AZ_MEMLEVEL 8
+
int az_open(azio_stream *s, const char *path, int Flags, File fd);
int do_flush(azio_stream *file, int flush);
int get_byte(azio_stream *s);
@@ -37,6 +53,40 @@ void putLong(File file, uLong x);
uLong getLong(azio_stream *s);
void read_header(azio_stream *s, unsigned char *buffer);
+size_t az_inflate_mem_size()
+{
+ return sizeof(struct inflate_state)
+ + ((1U << MAX_WBITS)*sizeof(unsigned char));
+}
+
+size_t az_deflate_mem_size()
+{
+ return sizeof(deflate_state)
+ + ((1U << MAX_WBITS)*(2*sizeof(Byte)))
+ + ((1U << MAX_WBITS)*sizeof(Pos))
+ + ((1U << (AZ_MEMLEVEL+7))*sizeof(Pos))
+ + ((1U << (AZ_MEMLEVEL+6))*(sizeof(ush)+2));
+}
+
+voidpf az_alloc(voidpf opaque, uInt items, uInt size)
+{
+ struct az_alloc_rec *r = (struct az_alloc_rec*)opaque;
+
+ if((items * size) > r->mfree)
+ abort();
+
+ r->mfree -= items*size;
+
+ return (r->mem + r->size) - r->mfree;
+}
+void az_free(voidpf opaque, voidpf address)
+{
+ // Oh how we hack.
+ struct az_alloc_rec *r = (struct az_alloc_rec*)opaque;
+ r->mfree = r->size;
+}
+
+
/* ===========================================================================
Opens a gzip (.gz) file for reading or writing. The mode parameter
is as in fopen ("rb" or "wb"). The file is given either by file descriptor
@@ -52,9 +102,20 @@ int az_open (azio_stream *s, const char
int level = Z_DEFAULT_COMPRESSION; /* compression level */
int strategy = Z_DEFAULT_STRATEGY; /* compression strategy */
- s->stream.zalloc = (alloc_func)0;
- s->stream.zfree = (free_func)0;
- s->stream.opaque = (voidpf)0;
+ s->stream.zalloc = (alloc_func)az_alloc;
+ s->stream.zfree = (free_func)az_free;
+// s->stream.opaque = (voidpf)r;
+ s->bufalloced = 0;
+ if(!s->inbuf)
+ {
+ err= posix_memalign(&(s->inbuf),512,AZ_BUFSIZE_READ);
+ if(err)
+ return err;
+ err= posix_memalign(&(s->outbuf),512,AZ_BUFSIZE_WRITE);
+ if(err)
+ return err;
+ s->bufalloced = 1;
+ }
memset(s->inbuf, 0, AZ_BUFSIZE_READ);
memset(s->outbuf, 0, AZ_BUFSIZE_WRITE);
s->stream.next_in = s->inbuf;
@@ -84,7 +145,7 @@ int az_open (azio_stream *s, const char
if (s->mode == 'w')
{
err = deflateInit2(&(s->stream), level,
- Z_DEFLATED, -MAX_WBITS, 8, strategy);
+ Z_DEFLATED, -MAX_WBITS, AZ_MEMLEVEL, strategy);
/* windowBits is passed < 0 to suppress zlib header */
s->stream.next_out = s->outbuf;
@@ -139,10 +200,9 @@ int az_open (azio_stream *s, const char
}
else if (s->mode == 'w')
{
- uchar buffer[AZHEADER_SIZE + AZMETA_BUFFER_SIZE];
- my_pread(s->file, buffer, AZHEADER_SIZE + AZMETA_BUFFER_SIZE, 0,
+ my_pread(s->file, s->inbuf, AZHEADER_SIZE + AZMETA_BUFFER_SIZE, 0,
MYF(0));
- read_header(s, buffer); /* skip the .az header */
+ read_header(s, s->inbuf); /* skip the .az header */
my_seek(s->file, 0, MY_SEEK_END, MYF(0));
}
else
@@ -156,7 +216,7 @@ int az_open (azio_stream *s, const char
void write_header(azio_stream *s)
{
- char buffer[AZHEADER_SIZE + AZMETA_BUFFER_SIZE];
+ char *buffer= (char*)s->outbuf;
char *ptr= buffer;
s->block_size= AZ_BUFSIZE_WRITE;
@@ -378,6 +438,12 @@ int destroy (s)
if (s->z_err < 0) err = s->z_err;
+ if(s->bufalloced)
+ {
+ free(s->inbuf);
+ free(s->outbuf);
+ }
+
return err;
}
@@ -569,14 +635,17 @@ int do_flush (azio_stream *s, int flush)
{
len = AZ_BUFSIZE_WRITE - s->stream.avail_out;
+ memset(s->outbuf+len,0,s->stream.avail_out);
+
if (len != 0)
{
s->check_point= my_tell(s->file, MYF(0));
- if ((uInt)my_write(s->file, (uchar *)s->outbuf, len, MYF(0)) != len)
+ if ((uInt)my_write(s->file, (uchar *)s->outbuf, AZ_BUFSIZE_WRITE, MYF(0)) !=
len)
{
s->z_err = Z_ERRNO;
return Z_ERRNO;
}
+ my_seek(s->file, s->stream.avail_out, SEEK_CUR, MYF(0));
s->stream.next_out = s->outbuf;
s->stream.avail_out = AZ_BUFSIZE_WRITE;
}
Index: telco-6.2/zlib/inflate.h
===================================================================
--- telco-6.2.orig/zlib/inflate.h 2007-10-19 14:28:49.534071601 +1000
+++ telco-6.2/zlib/inflate.h 2007-10-19 14:29:25.535837366 +1000
@@ -16,6 +16,9 @@
# define GUNZIP
#endif
+#ifndef __ZLIB_INFLATE_H__
+#define __ZLIB_INFLATE_H__
+
/* Possible inflate modes between inflate() calls */
typedef enum {
HEAD, /* i: waiting for magic header */
@@ -113,3 +116,5 @@ struct inflate_state {
unsigned short work[288]; /* work area for code table building */
code codes[ENOUGH]; /* space for code tables */
};
+
+#endif
--
Stewart Smith