Move all platform specific AsyncFile functionality out into sep classes.
Generic functionality in AsyncFile, POSIX specific in PosixAsyncFile.
In future, will have azioAsyncFile (maybe a mysysAsyncFile) and can have
specific numbers of each instantiated in kernel. Then, NDBFS can decide
which AsyncFile should be used for that file - e.g. we can keep the number
of azioAsyncFiles to a minimum.
---
storage/ndb/include/kernel/signaldata/FsOpenReq.hpp | 1
storage/ndb/include/kernel/signaldata/FsReadWriteReq.hpp | 1
storage/ndb/src/kernel/blocks/Makefile.am | 4
storage/ndb/src/kernel/blocks/ndbfs/AsyncFile.cpp | 1130 ---------------
storage/ndb/src/kernel/blocks/ndbfs/AsyncFile.hpp | 192 +-
storage/ndb/src/kernel/blocks/ndbfs/Ndbfs.cpp | 6
storage/ndb/src/kernel/blocks/ndbfs/PosixAsyncFile.cpp | 767 ++++++++++
storage/ndb/src/kernel/blocks/ndbfs/PosixAsyncFile.hpp | 72
storage/ndb/src/kernel/blocks/ndbfs/Win32AsyncFile.cpp | 334 ++++
storage/ndb/src/kernel/blocks/ndbfs/Win32AsyncFile.hpp | 83 +
storage/ndb/src/kernel/vm/SimulatedBlock.hpp | 1
11 files changed, 1413 insertions(+), 1178 deletions(-)
Index: telco-6.2/storage/ndb/src/kernel/blocks/ndbfs/AsyncFile.cpp
===================================================================
--- telco-6.2.orig/storage/ndb/src/kernel/blocks/ndbfs/AsyncFile.cpp 2007-10-24
16:40:00.920491920 +1000
+++ telco-6.2/storage/ndb/src/kernel/blocks/ndbfs/AsyncFile.cpp 2007-10-26
16:24:50.498403593 +1000
@@ -17,11 +17,8 @@
#include <my_sys.h>
#include <my_pthread.h>
-#ifdef HAVE_XFS_XFS_H
-#include <xfs/xfs.h>
-#endif
-
#include "AsyncFile.hpp"
+#include "PosixAsyncFile.hpp"
#include <ErrorHandlingMacros.hpp>
#include <kernel_types.h>
@@ -31,40 +28,6 @@
#include <signaldata/FsOpenReq.hpp>
#include <signaldata/FsReadWriteReq.hpp>
-// use this to test broken pread code
-//#define HAVE_BROKEN_PREAD
-
-#ifdef HAVE_BROKEN_PREAD
-#undef HAVE_PWRITE
-#undef HAVE_PREAD
-#endif
-
-#if defined NDB_WIN32
-#else
-// For readv and writev
-#include <sys/uio.h>
-#endif
-
-#ifndef NDB_WIN32
-#include <dirent.h>
-#endif
-
-// Use this define if you want printouts from AsyncFile class
-#define DEBUG_ASYNCFILE
-
-#ifdef DEBUG_ASYNCFILE
-#include <NdbOut.hpp>
-#define DEBUG(x) x
-#define PRINT_ERRORANDFLAGS(f) printErrorAndFlags(f)
-void printErrorAndFlags(Uint32 used_flags);
-#else
-#define DEBUG(x)
-#define PRINT_ERRORANDFLAGS(f)
-#endif
-
-// Define the size of the write buffer (for each thread)
-#define WRITEBUFFERSIZE 262144
-
const char *actionName[] = {
"open",
"close",
@@ -82,30 +45,24 @@ static int numAsyncFiles = 0;
extern "C" void * runAsyncFile(void* arg)
{
- ((AsyncFile*)arg)->run();
+ ((PosixAsyncFile*)arg)->run();
return (NULL);
}
+
AsyncFile::AsyncFile(SimulatedBlock& fs) :
theFileName(),
-#ifdef NDB_WIN32
- hFile(INVALID_HANDLE_VALUE),
-#else
- theFd(-1),
-#endif
theReportTo(0),
- use_gz(0),
theMemoryChannelPtr(NULL),
m_fs(fs)
{
- memset(&azf,0,sizeof(azf));
m_page_ptr.setNull();
m_current_request= m_last_request= 0;
- m_open_flags = 0;
+ m_auto_sync_freq = 0;
}
void
-AsyncFile::doStart()
+AsyncFile::doStart()
{
// Stacksize for filesystem threads
// An 8k stack should be enough
@@ -128,18 +85,18 @@ AsyncFile::doStart()
ERROR_SET(fatal, NDBD_EXIT_MEMALLOC, "","Could not allocate file system thread");
NdbCondition_Wait(theStartConditionPtr,
- theStartMutexPtr);
+ theStartMutexPtr);
NdbMutex_Unlock(theStartMutexPtr);
NdbMutex_Destroy(theStartMutexPtr);
NdbCondition_Destroy(theStartConditionPtr);
}
-AsyncFile::~AsyncFile()
+void AsyncFile::shutdown()
{
void *status;
Request request;
request.action = Request::end;
- theMemoryChannelPtr->writeChannel( &request );
+ this->theMemoryChannelPtr->writeChannel( &request );
NdbThread_WaitFor(theThreadPtr, &status);
NdbThread_Destroy(&theThreadPtr);
delete theMemoryChannelPtr;
@@ -151,50 +108,40 @@ AsyncFile::reportTo( MemoryChannel<Reque
theReportTo = reportTo;
}
-void AsyncFile::execute(Request* request)
+void AsyncFile::execute(Request* request)
{
theMemoryChannelPtr->writeChannel( request );
}
+int AsyncFile::init()
+{
+ // Create write buffer for bigger writes
+ theWriteBufferSize = WRITEBUFFERSIZE;
+ theWriteBuffer = (char *) ndbd_malloc(theWriteBufferSize);
+
+ return 0;
+}
+
void
AsyncFile::run()
{
Request *request;
+
// Create theMemoryChannel in the thread that will wait for it
NdbMutex_Lock(theStartMutexPtr);
theMemoryChannelPtr = new MemoryChannel<Request>();
theStartFlag = true;
- // Create write buffer for bigger writes
- theWriteBufferSize = WRITEBUFFERSIZE;
- theWriteBufferUnaligned = (char *) ndbd_malloc(theWriteBufferSize +
- NDB_O_DIRECT_WRITE_ALIGNMENT-1);
- theWriteBuffer = (char *)
- (((UintPtr)theWriteBufferUnaligned + NDB_O_DIRECT_WRITE_ALIGNMENT - 1) &
- ~(UintPtr)(NDB_O_DIRECT_WRITE_ALIGNMENT - 1));
-
- azfBufferUnaligned= (Byte*)ndbd_malloc((AZ_BUFSIZE_READ+AZ_BUFSIZE_WRITE)
- +NDB_O_DIRECT_WRITE_ALIGNMENT-1);
-
- azf.inbuf= (Byte*)(((UintPtr)azfBufferUnaligned
- + NDB_O_DIRECT_WRITE_ALIGNMENT - 1) &
- ~(UintPtr)(NDB_O_DIRECT_WRITE_ALIGNMENT - 1));
-
- azf.outbuf= azf.inbuf + AZ_BUFSIZE_READ;
- az_mempool.size = az_mempool.mfree = az_inflate_mem_size()+az_deflate_mem_size();
-
- ndbout_c("NDBFS/AsyncFile: Allocating %d for In/Deflate buffer",az_mempool.size);
- az_mempool.mem = (char*) ndbd_malloc(az_mempool.size);
-
- azf.stream.opaque= &az_mempool;
+ int r= this->init();
NdbMutex_Unlock(theStartMutexPtr);
NdbCondition_Signal(theStartConditionPtr);
-
- if (!theWriteBuffer) {
- DEBUG(ndbout_c("AsyncFile::writeReq, Failed allocating write buffer"));
+
+ if(r!=0)
+ {
+ DEBUG(ndbout_c("AsyncFile::init() failed"));
return;
- }//if
+ }
while (1) {
request = theMemoryChannelPtr->readChannel();
@@ -250,7 +197,7 @@ AsyncFile::run()
rmrfReq(request, (char*)theFileName.c_str(), request->par.rmrf.own_directory);
break;
case Request:: end:
- if (theFd > 0)
+ if (isOpen())
closeReq(request);
endReq();
return;
@@ -261,517 +208,12 @@ AsyncFile::run()
}//switch
m_last_request= request;
m_current_request= 0;
-
+
// No need to signal as ndbfs only uses tryRead
theReportTo->writeChannelNoSignal(request);
}//while
}//AsyncFile::run()
-#ifdef O_DIRECT
-static char g_odirect_readbuf[2*GLOBAL_PAGE_SIZE -1];
-#endif
-
-int
-AsyncFile::check_odirect_write(Uint32 flags, int& new_flags, int mode)
-{
- assert(new_flags & (O_CREAT | O_TRUNC));
-#ifdef O_DIRECT
- int ret;
- char * bufptr = (char*)((UintPtr(g_odirect_readbuf)+(GLOBAL_PAGE_SIZE - 1)) &
~(GLOBAL_PAGE_SIZE - 1));
- while (((ret = ::write(theFd, bufptr, GLOBAL_PAGE_SIZE)) == -1) &&
- (errno == EINTR));
- if (ret == -1)
- {
- new_flags &= ~O_DIRECT;
- ndbout_c("%s Failed to write using O_DIRECT, disabling",
- theFileName.c_str());
- }
-
- close(theFd);
- theFd = ::open(theFileName.c_str(), new_flags, mode);
- if (theFd == -1)
- return errno;
-#endif
-
- return 0;
-}
-
-int
-AsyncFile::check_odirect_read(Uint32 flags, int &new_flags, int mode)
-{
-#ifdef O_DIRECT
- int ret;
- char * bufptr = (char*)((UintPtr(g_odirect_readbuf)+(GLOBAL_PAGE_SIZE - 1)) &
~(GLOBAL_PAGE_SIZE - 1));
- while (((ret = ::read(theFd, bufptr, GLOBAL_PAGE_SIZE)) == -1) &&
- (errno == EINTR));
- if (ret == -1)
- {
- ndbout_c("%s Failed to read using O_DIRECT, disabling",
- theFileName.c_str());
- goto reopen;
- }
-
- if(lseek(theFd, 0, SEEK_SET) != 0)
- {
- return errno;
- }
-
- if ((flags & FsOpenReq::OM_CHECK_SIZE) == 0)
- {
- struct stat buf;
- if ((fstat(theFd, &buf) == -1))
- {
- return errno;
- }
- else if ((buf.st_size % GLOBAL_PAGE_SIZE) != 0)
- {
- ndbout_c("%s filesize not a multiple of %d, disabling O_DIRECT",
- theFileName.c_str(), GLOBAL_PAGE_SIZE);
- goto reopen;
- }
- }
-
- return 0;
-
-reopen:
- close(theFd);
- new_flags &= ~O_DIRECT;
- theFd = ::open(theFileName.c_str(), new_flags, mode);
- if (theFd == -1)
- return errno;
-#endif
- return 0;
-}
-
-void AsyncFile::openReq(Request* request)
-{
- m_auto_sync_freq = 0;
- m_write_wo_sync = 0;
- m_open_flags = request->par.open.flags;
-
- // for open.flags, see signal FSOPENREQ
-#ifdef NDB_WIN32
- DWORD dwCreationDisposition;
- DWORD dwDesiredAccess = 0;
- DWORD dwShareMode = FILE_SHARE_READ | FILE_SHARE_WRITE;
- DWORD dwFlagsAndAttributes = FILE_ATTRIBUTE_NORMAL | FILE_FLAG_RANDOM_ACCESS |
FILE_FLAG_NO_BUFFERING;
- Uint32 flags = request->par.open.flags;
-
- // Convert file open flags from Solaris to Windows
- if ((flags & FsOpenReq::OM_CREATE) && (flags &
FsOpenReq::OM_TRUNCATE)){
- dwCreationDisposition = CREATE_ALWAYS;
- } else if (flags & FsOpenReq::OM_TRUNCATE){
- dwCreationDisposition = TRUNCATE_EXISTING;
- } else if (flags & FsOpenReq::OM_CREATE){
- dwCreationDisposition = CREATE_NEW;
- } else {
- dwCreationDisposition = OPEN_EXISTING;
- }
-
- switch(flags & 3){
- case FsOpenReq::OM_READONLY:
- dwDesiredAccess = GENERIC_READ;
- break;
- case FsOpenReq::OM_WRITEONLY:
- dwDesiredAccess = GENERIC_WRITE;
- break;
- case FsOpenReq::OM_READWRITE:
- dwDesiredAccess = GENERIC_READ | GENERIC_WRITE;
- break;
- default:
- request->error = 1000;
- break;
- return;
- }
-
- hFile = CreateFile(theFileName.c_str(), dwDesiredAccess, dwShareMode,
- 0, dwCreationDisposition, dwFlagsAndAttributes, 0);
-
- if(INVALID_HANDLE_VALUE == hFile) {
- request->error = GetLastError();
- if(((ERROR_PATH_NOT_FOUND == request->error) || (ERROR_INVALID_NAME ==
request->error))
- && (flags & FsOpenReq::OM_CREATE)) {
- createDirectories();
- hFile = CreateFile(theFileName.c_str(), dwDesiredAccess, dwShareMode,
- 0, dwCreationDisposition, dwFlagsAndAttributes, 0);
-
- if(INVALID_HANDLE_VALUE == hFile)
- request->error = GetLastError();
- else
- request->error = 0;
-
- return;
- }
- }
- else {
- request->error = 0;
- return;
- }
-#else
- Uint32 flags = request->par.open.flags;
- int new_flags = 0;
-
- // Convert file open flags from Solaris to Liux
- if (flags & FsOpenReq::OM_CREATE)
- {
- new_flags |= O_CREAT;
- }
-
- if (flags & FsOpenReq::OM_TRUNCATE){
-#if 0
- if(Global_unlinkO_CREAT){
- unlink(theFileName.c_str());
- } else
-#endif
- new_flags |= O_TRUNC;
- }
-
- if (flags & FsOpenReq::OM_AUTOSYNC)
- {
- m_auto_sync_freq = request->par.open.auto_sync_size;
- }
-
- if (flags & FsOpenReq::OM_APPEND){
- new_flags |= O_APPEND;
- }
-
- if (flags & FsOpenReq::OM_DIRECT)
-#ifdef O_DIRECT
- {
- new_flags |= O_DIRECT;
- }
-#endif
-
- if ((flags & FsOpenReq::OM_SYNC) && ! (flags & FsOpenReq::OM_INIT))
- {
-#ifdef O_SYNC
- new_flags |= O_SYNC;
-#endif
- }
-
- const char * rw = "";
- switch(flags & 0x3){
- case FsOpenReq::OM_READONLY:
- rw = "r";
- new_flags |= O_RDONLY;
- break;
- case FsOpenReq::OM_WRITEONLY:
- rw = "w";
- new_flags |= O_WRONLY;
- break;
- case FsOpenReq::OM_READWRITE:
- rw = "rw";
- new_flags |= O_RDWR;
- break;
- default:
- request->error = 1000;
- break;
- return;
- }
- if(flags & FsOpenReq::OM_GZ)
- use_gz= 1;
-
- // allow for user to choose any permissionsa with umask
- const int mode = S_IRUSR | S_IWUSR |
- S_IRGRP | S_IWGRP |
- S_IROTH | S_IWOTH;
- if (flags & FsOpenReq::OM_CREATE_IF_NONE)
- {
- Uint32 tmp_flags = new_flags;
-#ifdef O_DIRECT
- tmp_flags &= ~O_DIRECT;
-#endif
- if ((theFd = ::open(theFileName.c_str(), tmp_flags, mode)) != -1)
- {
- close(theFd);
- request->error = FsRef::fsErrFileExists;
- return;
- }
- new_flags |= O_CREAT;
- }
-
-no_odirect:
- theFd = ::open(theFileName.c_str(), new_flags, mode);
- if (-1 == theFd)
- {
- ndbout_c("1 ERROR opening %s",theFileName.c_str());
- PRINT_ERRORANDFLAGS(new_flags);
- if ((errno == ENOENT) && (new_flags & O_CREAT))
- {
- createDirectories();
- theFd = ::open(theFileName.c_str(), new_flags, mode);
- if (-1 == theFd)
- {
-#ifdef O_DIRECT
- if (new_flags & O_DIRECT)
- {
- new_flags &= ~O_DIRECT;
- goto no_odirect;
- }
-#endif
- ndbout_c("2 ERROR opening %s",theFileName.c_str());
- PRINT_ERRORANDFLAGS(new_flags);
- request->error = errno;
- return;
- }
- }
-#ifdef O_DIRECT
- else if (new_flags & O_DIRECT)
- {
- new_flags &= ~O_DIRECT;
- goto no_odirect;
- }
-#endif
- else
- {
- request->error = errno;
- return;
- }
- }
-
- if (flags & FsOpenReq::OM_CHECK_SIZE)
- {
- struct stat buf;
- if ((fstat(theFd, &buf) == -1))
- {
- request->error = errno;
- }
- else if((Uint64)buf.st_size != request->par.open.file_size)
- {
- request->error = FsRef::fsErrInvalidFileSize;
- }
- if (request->error)
- return;
- }
-
- if (flags & FsOpenReq::OM_INIT)
- {
- off_t off = 0;
- const off_t sz = request->par.open.file_size;
- Uint32 tmp[sizeof(SignalHeader)+25];
- Signal * signal = (Signal*)(&tmp[0]);
- FsReadWriteReq* req = (FsReadWriteReq*)signal->getDataPtrSend();
-
- Uint32 index = 0;
- Uint32 block = refToBlock(request->theUserReference);
-
-#ifdef HAVE_XFS_XFS_H
- if(platform_test_xfs_fd(theFd))
- {
- ndbout_c("Using xfsctl(XFS_IOC_RESVSP64) to allocate disk space");
- xfs_flock64_t fl;
- fl.l_whence= 0;
- fl.l_start= 0;
- fl.l_len= (off64_t)sz;
- if(xfsctl(NULL, theFd, XFS_IOC_RESVSP64, &fl) < 0)
- ndbout_c("failed to optimally allocate disk space");
- }
-#endif
-#ifdef HAVE_POSIX_FALLOCATE
- posix_fallocate(theFd, 0, sz);
-#endif
-
- while(off < sz)
- {
- req->filePointer = 0; // DATA 0
- req->userPointer = request->theUserPointer; // DATA 2
- req->numberOfPages = 1; // DATA 5
- req->varIndex = index++;
- req->data.pageData[0] = m_page_ptr.i;
-
- m_fs.EXECUTE_DIRECT(block, GSN_FSWRITEREQ, signal,
- FsReadWriteReq::FixedLength + 1);
- retry:
- Uint32 size = request->par.open.page_size;
- char* buf = (char*)m_page_ptr.p;
- while(size > 0){
- int n;
- if(use_gz)
- n= azwrite(&azf,buf,size);
- else
- n= write(theFd, buf, size);
- if(n == -1 && errno == EINTR)
- {
- continue;
- }
- if(n == -1 || n == 0)
- {
- ndbout_c("azwrite|write returned %d: errno: %d my_errno: %d",n,errno,my_errno);
- break;
- }
- size -= n;
- buf += n;
- }
- if(size != 0)
- {
- int err = errno;
-#ifdef O_DIRECT
- if ((new_flags & O_DIRECT) && off == 0)
- {
- ndbout_c("error on first write(%d), disable O_DIRECT", err);
- new_flags &= ~O_DIRECT;
- close(theFd);
- theFd = ::open(theFileName.c_str(), new_flags, mode);
- if (theFd != -1)
- goto retry;
- }
-#endif
- close(theFd);
- unlink(theFileName.c_str());
- request->error = err;
- return;
- }
- off += request->par.open.page_size;
- }
- if(lseek(theFd, 0, SEEK_SET) != 0)
- request->error = errno;
- }
- else if (flags & FsOpenReq::OM_DIRECT)
- {
-#ifdef O_DIRECT
- if (flags & (FsOpenReq::OM_TRUNCATE | FsOpenReq::OM_CREATE))
- {
- request->error = check_odirect_write(flags, new_flags, mode);
- }
- else
- {
- request->error = check_odirect_read(flags, new_flags, mode);
- }
-
- if (request->error)
- return;
-#endif
- }
-#ifdef VM_TRACE
- if (flags & FsOpenReq::OM_DIRECT)
- {
-#ifdef O_DIRECT
- ndbout_c("%s %s O_DIRECT: %d",
- theFileName.c_str(), rw,
- !!(new_flags & O_DIRECT));
-#else
- ndbout_c("%s %s O_DIRECT: 0",
- theFileName.c_str(), rw);
-#endif
- }
-#endif
- if ((flags & FsOpenReq::OM_SYNC) && (flags & FsOpenReq::OM_INIT))
- {
-#ifdef O_SYNC
- /**
- * reopen file with O_SYNC
- */
- close(theFd);
- new_flags &= ~(O_CREAT | O_TRUNC);
- new_flags |= O_SYNC;
- theFd = ::open(theFileName.c_str(), new_flags, mode);
- if (theFd == -1)
- {
- request->error = errno;
- }
-#endif
- }
-#endif
- if(use_gz)
- if(azdopen(&azf, theFd, new_flags) < 1)
- {
- ndbout_c("Stewart's brain broke");
- abort();
- }
-}
-
-int
-AsyncFile::readBuffer(Request* req, char * buf, size_t size, off_t offset){
- int return_value;
- req->par.readWrite.pages[0].size = 0;
-#ifdef NDB_WIN32
- DWORD dwSFP = SetFilePointer(hFile, offset, 0, FILE_BEGIN);
- if(dwSFP != offset) {
- return GetLastError();
- }
-#elif ! defined(HAVE_PREAD)
- off_t seek_val;
- if(!use_gz)
- {
- while((seek_val= lseek(theFd, offset, SEEK_SET)) == (off_t)-1
- && errno == EINTR);
- if(seek_val == (off_t)-1)
- {
- return errno;
- }
- }
-#endif
- off_t seek_val;
- if(use_gz)
- {
- while((seek_val= azseek(&azf, offset, SEEK_SET)) == (off_t)-1
- && errno == EINTR);
- if(seek_val == (off_t)-1)
- {
- return errno;
- }
- }
-
- int error;
-
- while (size > 0) {
- size_t bytes_read = 0;
-
-#ifdef NDB_WIN32
- DWORD dwBytesRead;
- BOOL bRead = ReadFile(hFile,
- buf,
- size,
- &dwBytesRead,
- 0);
- if(!bRead){
- return GetLastError();
- }
- bytes_read = dwBytesRead;
-#elif ! defined(HAVE_PREAD)
- if(use_gz)
- return_value = azread(&azf, buf, size, &error);
- else
- return_value = ::read(theFd, buf, size);
-#else // UNIX
- if(!use_gz)
- return_value = ::pread(theFd, buf, size, offset);
- else
- return_value = azread(&azf, buf, size, &error);
-#endif
-#ifndef NDB_WIN32
- if (return_value == -1 && errno == EINTR) {
- DEBUG(ndbout_c("EINTR in read"));
- continue;
- } else if (return_value == -1){
- return errno;
- } else {
- bytes_read = return_value;
- }
-#endif
-
- req->par.readWrite.pages[0].size += bytes_read;
- if(bytes_read == 0){
- if(req->action == Request::readPartial)
- {
- return 0;
- }
- DEBUG(ndbout_c("Read underflow %d %d\n %x\n%d %d",
- size, offset, buf, bytes_read, return_value));
- return ERR_ReadUnderflow;
- }
-
- if(bytes_read != size){
- DEBUG(ndbout_c("Warning partial read %d != %d",
- bytes_read, size));
- }
-
- buf += bytes_read;
- size -= bytes_read;
- offset += bytes_read;
- }
- return 0;
-}
void
AsyncFile::readReq( Request * request)
@@ -780,7 +222,7 @@ AsyncFile::readReq( Request * request)
off_t offset = request->par.readWrite.pages[i].offset;
size_t size = request->par.readWrite.pages[i].size;
char * buf = request->par.readWrite.pages[i].buf;
-
+
int err = readBuffer(request, buf, size, offset);
if(err != 0){
request->error = err;
@@ -792,84 +234,8 @@ AsyncFile::readReq( Request * request)
void
AsyncFile::readvReq( Request * request)
{
-#if ! defined(HAVE_PREAD)
readReq(request);
return;
-#elif defined NDB_WIN32
- // ReadFileScatter?
- readReq(request);
- return;
-#else
- int return_value;
- int length = 0;
- struct iovec iov[20]; // the parameter in the signal restricts this to 20 deep
- for(int i=0; i < request->par.readWrite.numberOfPages ; i++) {
- iov[i].iov_base= request->par.readWrite.pages[i].buf;
- iov[i].iov_len= request->par.readWrite.pages[i].size;
- length = length + iov[i].iov_len;
- }
- lseek( theFd, request->par.readWrite.pages[0].offset, SEEK_SET );
- return_value = ::readv(theFd, iov, request->par.readWrite.numberOfPages);
- if (return_value == -1) {
- request->error = errno;
- return;
- } else if (return_value != length) {
- request->error = 1011;
- return;
- }
-#endif
-}
-
-int
-AsyncFile::extendfile(Request* request) {
-#if ! defined(HAVE_PWRITE)
- // Find max size of this file in this request
- int maxOffset = 0;
- int maxSize = 0;
- for(int i=0; i < request->par.readWrite.numberOfPages ; i++) {
- if (request->par.readWrite.pages[i].offset > maxOffset) {
- maxOffset = request->par.readWrite.pages[i].offset;
- maxSize = request->par.readWrite.pages[i].size;
- }
- }
- DEBUG(ndbout_c("extendfile: maxOffset=%d, size=%d", maxOffset, maxSize));
-
- // Allocate a buffer and fill it with zeros
- void* pbuf = ndbd_malloc(maxSize);
- memset(pbuf, 0, maxSize);
- for (int p = 0; p <= maxOffset; p = p + maxSize) {
- int return_value;
- return_value = lseek(theFd,
- p,
- SEEK_SET);
- if((return_value == -1 ) || (return_value != p)) {
- ndbd_free(pbuf,maxSize);
- return -1;
- }
- if(use_gz)
- return_value = azwrite(&azf,
- pbuf,
- maxSize);
- else
- return_value = ::write(theFd,
- pbuf,
- maxSize);
- if ((return_value == -1) || (return_value != maxSize)) {
- ndbout_c("NDBFS ERROR during extendFile");
- ndbd_free(pbuf,maxSize);
- return -1;
- }
- }
- ndbd_free(pbuf,maxSize);
-
- DEBUG(ndbout_c("extendfile: \"%s\" OK!", theFileName.c_str()));
- return 0;
-#else
- request = request;
- DEBUG(ndbout_c("no pwrite"));
- abort();
- return -1;
-#endif
}
void
@@ -882,15 +248,15 @@ AsyncFile::writeReq( Request * request)
int totsize = 0;
off_t offset = request->par.readWrite.pages[page_num].offset;
char* bufptr = theWriteBuffer;
-
+
write_not_complete = false;
if (request->par.readWrite.numberOfPages > 1) {
off_t page_offset = offset;
// Multiple page write, copy to buffer for one write
for(int i=page_num; i < request->par.readWrite.numberOfPages; i++) {
- memcpy(bufptr,
- request->par.readWrite.pages[i].buf,
+ memcpy(bufptr,
+ request->par.readWrite.pages[i].buf,
request->par.readWrite.pages[i].size);
bufptr += request->par.readWrite.pages[i].size;
totsize += request->par.readWrite.pages[i].size;
@@ -915,7 +281,7 @@ AsyncFile::writeReq( Request * request)
page_offset += request->par.readWrite.pages[i].size;
}
bufptr = theWriteBuffer;
- } else {
+ } else {
// One page write, write page directly
bufptr = request->par.readWrite.pages[0].buf;
totsize = request->par.readWrite.pages[0].size;
@@ -932,89 +298,6 @@ AsyncFile::writeReq( Request * request)
}
}
-int
-AsyncFile::writeBuffer(const char * buf, size_t size, off_t offset,
- size_t chunk_size)
-{
- size_t bytes_to_write = chunk_size;
- int return_value;
-
- m_write_wo_sync += size;
-
-#ifdef NDB_WIN32
- DWORD dwSFP = SetFilePointer(hFile, offset, 0, FILE_BEGIN);
- if(dwSFP != offset) {
- return GetLastError();
- }
-#elif ! defined(HAVE_PWRITE)
- off_t seek_val;
- while((seek_val= lseek(theFd, offset, SEEK_SET)) == (off_t)-1
- && errno == EINTR);
- if(seek_val == (off_t)-1)
- {
- return errno;
- }
-#endif
-
- while (size > 0) {
- if (size < bytes_to_write){
- // We are at the last chunk
- bytes_to_write = size;
- }
- size_t bytes_written = 0;
-
-#ifdef NDB_WIN32
- DWORD dwWritten;
- BOOL bWrite = WriteFile(hFile, buf, bytes_to_write, &dwWritten, 0);
- if(!bWrite) {
- return GetLastError();
- }
- bytes_written = dwWritten;
- if (bytes_written != bytes_to_write) {
- DEBUG(ndbout_c("Warning partial write %d != %d", bytes_written, bytes_to_write));
- }
-
-#elif ! defined(HAVE_PWRITE)
- if(use_gz)
- return_value= azwrite(&azf, buf, bytes_to_write);
- else
- return_value = ::write(theFd, buf, bytes_to_write);
-#else // UNIX
- if(use_gz)
- return_value= azwrite(&azf, buf, bytes_to_write);
- else
- return_value = ::pwrite(theFd, buf, bytes_to_write, offset);
-#endif
-#ifndef NDB_WIN32
- if (return_value == -1 && errno == EINTR) {
- bytes_written = 0;
- DEBUG(ndbout_c("EINTR in write"));
- } else if (return_value == -1){
- if(use_gz)
- return my_errno;
- return errno;
- } else {
- bytes_written = return_value;
-
- if(bytes_written == 0){
- DEBUG(ndbout_c("no bytes written"));
- abort();
- }
-
- if(bytes_written != bytes_to_write){
- DEBUG(ndbout_c("Warning partial write %d != %d",
- bytes_written, bytes_to_write));
- }
- }
-#endif
-
- buf += bytes_written;
- size -= bytes_written;
- offset += bytes_written;
- }
- return 0;
-}
-
void
AsyncFile::writevReq( Request * request)
{
@@ -1022,263 +305,35 @@ AsyncFile::writevReq( Request * request)
writeReq(request);
}
-
+/*
void
AsyncFile::closeReq(Request * request)
{
- if (m_open_flags & (
+ // FIXME: Stewart should check all callers why this is here.
+/* if (m_open_flags & (
FsOpenReq::OM_WRITEONLY |
FsOpenReq::OM_READWRITE |
FsOpenReq::OM_APPEND )) {
syncReq(request);
}
-#ifdef NDB_WIN32
- if(!CloseHandle(hFile)) {
- request->error = GetLastError();
- }
- hFile = INVALID_HANDLE_VALUE;
-#else
- int r;
- if(use_gz)
- r= azclose(&azf);
- else
- ::close(theFd);
- use_gz= 0;
- Byte *a,*b;
- a= azf.inbuf;
- b= azf.outbuf;
- memset(&azf,0,sizeof(azf));
- azf.inbuf= a;
- azf.outbuf= b;
- azf.stream.opaque = (void*)&az_mempool;
-
- if (-1 == r) {
-#ifndef DBUG_OFF
- if (theFd == -1) {
- DEBUG(ndbout_c("close on fd = -1"));
- abort();
- }
-#endif
- request->error = errno;
- }
- theFd = -1;
-#endif
-}
-
-bool AsyncFile::isOpen(){
-#ifdef NDB_WIN32
- return (hFile != INVALID_HANDLE_VALUE);
-#else
- return (theFd != -1);
-#endif
-}
-
+*/
+//}
-void
+/*void
AsyncFile::syncReq(Request * request)
{
+ // FIXME: Stewart to find out why these are here
if(m_auto_sync_freq && m_write_wo_sync == 0){
return;
}
-#ifdef NDB_WIN32
- if(!FlushFileBuffers(hFile)) {
- request->error = GetLastError();
- return;
- }
-#else
- if (-1 == ::fsync(theFd)){
- request->error = errno;
- return;
- }
-#endif
+ // do sync
m_write_wo_sync = 0;
-}
-
-void
-AsyncFile::appendReq(Request * request){
-
- const char * buf = request->par.append.buf;
- Uint32 size = request->par.append.size;
-
- m_write_wo_sync += size;
-
-#ifdef NDB_WIN32
- DWORD dwWritten = 0;
- while(size > 0){
- if(!WriteFile(hFile, buf, size, &dwWritten, 0)){
- request->error = GetLastError();
- return ;
- }
-
- buf += dwWritten;
- size -= dwWritten;
- }
-#else
- while(size > 0){
- int n;
- if(use_gz)
- n= azwrite(&azf,buf,size);
- else
- n= write(theFd, buf, size);
- if(n == -1 && errno == EINTR){
- continue;
- }
- if(n == -1){
- if(use_gz)
- request->error = my_errno;
- else
- request->error = errno;
- return;
- }
- if(n == 0){
- DEBUG(ndbout_c("append with n=0"));
- abort();
- }
- size -= n;
- buf += n;
- }
-#endif
-
- if(m_auto_sync_freq && m_write_wo_sync > m_auto_sync_freq){
- syncReq(request);
- }
-}
-
-void
-AsyncFile::removeReq(Request * request)
-{
-#ifdef NDB_WIN32
- if(!DeleteFile(theFileName.c_str())) {
- request->error = GetLastError();
- }
-#else
- if (-1 == ::remove(theFileName.c_str())) {
- request->error = errno;
-
- }
-#endif
-}
-
-void
-AsyncFile::rmrfReq(Request * request, char * path, bool removePath){
- Uint32 path_len = strlen(path);
- Uint32 path_max_copy = PATH_MAX - path_len;
- char* path_add = &path[path_len];
-#ifndef NDB_WIN32
- if(!request->par.rmrf.directory){
- // Remove file
- if(unlink((const char *)path) != 0 && errno != ENOENT)
- request->error = errno;
- return;
- }
- // Remove directory
- DIR* dirp = opendir((const char *)path);
- if(dirp == 0){
- if(errno != ENOENT)
- request->error = errno;
- return;
- }
- struct dirent * dp;
- while ((dp = readdir(dirp)) != NULL){
- if ((strcmp(".", dp->d_name) != 0) && (strcmp("..", dp->d_name) != 0))
{
- BaseString::snprintf(path_add, (size_t)path_max_copy, "%s%s",
- DIR_SEPARATOR, dp->d_name);
- if(remove((const char*)path) == 0){
- path[path_len] = 0;
- continue;
- }
-
- rmrfReq(request, path, true);
- path[path_len] = 0;
- if(request->error != 0){
- closedir(dirp);
- return;
- }
- }
- }
- closedir(dirp);
- if(removePath && rmdir((const char *)path) != 0){
- request->error = errno;
- }
- return;
-#else
-
- if(!request->par.rmrf.directory){
- // Remove file
- if(!DeleteFile(path)){
- DWORD dwError = GetLastError();
- if(dwError!=ERROR_FILE_NOT_FOUND)
- request->error = dwError;
- }
- return;
- }
-
- strcat(path, "\\*");
- WIN32_FIND_DATA ffd;
- HANDLE hFindFile = FindFirstFile(path, &ffd);
- path[path_len] = 0;
- if(INVALID_HANDLE_VALUE==hFindFile){
- DWORD dwError = GetLastError();
- if(dwError!=ERROR_PATH_NOT_FOUND)
- request->error = dwError;
- return;
- }
-
- do {
- if(0!=strcmp(".", ffd.cFileName) && 0!=strcmp("..", ffd.cFileName)){
- strcat(path, "\\");
- strcat(path, ffd.cFileName);
- if(DeleteFile(path)) {
- path[path_len] = 0;
- continue;
- }//if
-
- rmrfReq(request, path, true);
- path[path_len] = 0;
- if(request->error != 0){
- FindClose(hFindFile);
- return;
- }
- }
- } while(FindNextFile(hFindFile, &ffd));
-
- FindClose(hFindFile);
-
- if(removePath && !RemoveDirectory(path))
- request->error = GetLastError();
-
-#endif
-}
+ }*/
void AsyncFile::endReq()
{
- // Thread is ended with return
- if (theWriteBufferUnaligned)
- ndbd_free(theWriteBufferUnaligned, theWriteBufferSize);
-
- if (azfBufferUnaligned)
- ndbd_free(azfBufferUnaligned, (AZ_BUFSIZE_READ*AZ_BUFSIZE_WRITE)
- +NDB_O_DIRECT_WRITE_ALIGNMENT-1);
-}
-
-
-void AsyncFile::createDirectories()
-{
- char* tmp;
- const char * name = theFileName.c_str();
- const char * base = theFileName.get_base_name();
- while((tmp = (char *)strstr(base, DIR_SEPARATOR)))
- {
- char t = tmp[0];
- tmp[0] = 0;
-#ifdef NDB_WIN32
- CreateDirectory(name, 0);
-#else
- mkdir(name, S_IRUSR | S_IWUSR | S_IXUSR | S_IXGRP | S_IRGRP);
-#endif
- tmp[0] = t;
- base = tmp + sizeof(DIR_SEPARATOR);
- }
+ if (theWriteBuffer)
+ ndbd_free(theWriteBuffer, theWriteBufferSize);
}
#ifdef DEBUG_ASYNCFILE
@@ -1286,95 +341,8 @@ void printErrorAndFlags(Uint32 used_flag
char buf[255];
sprintf(buf, "PEAF: errno=%d \"", errno);
- switch(errno) {
- case EACCES:
- strcat(buf, "EACCES");
- break;
- case EDQUOT:
- strcat(buf, "EDQUOT");
- break;
- case EEXIST :
- strcat(buf, "EEXIST");
- break;
- case EINTR :
- strcat(buf, "EINTR");
- break;
- case EFAULT :
- strcat(buf, "EFAULT");
- break;
- case EIO :
- strcat(buf, "EIO");
- break;
- case EISDIR :
- strcat(buf, "EISDIR");
- break;
- case ELOOP :
- strcat(buf, "ELOOP");
- break;
- case EMFILE :
- strcat(buf, "EMFILE");
- break;
- case ENFILE :
- strcat(buf, "ENFILE");
- break;
- case ENOENT :
- strcat(buf, "ENOENT ");
- break;
- case ENOSPC :
- strcat(buf, "ENOSPC");
- break;
- case ENOTDIR :
- strcat(buf, "ENOTDIR");
- break;
- case ENXIO :
- strcat(buf, "ENXIO");
- break;
- case EOPNOTSUPP:
- strcat(buf, "EOPNOTSUPP");
- break;
- case EMULTIHOP :
- strcat(buf, "EMULTIHOP");
- break;
- case ENOLINK :
- strcat(buf, "ENOLINK");
- break;
- case ENOSR :
- strcat(buf, "ENOSR");
- break;
- case EOVERFLOW :
- strcat(buf, "EOVERFLOW");
- break;
- case EROFS :
- strcat(buf, "EROFS");
- break;
- case EAGAIN :
- strcat(buf, "EAGAIN");
- break;
- case EINVAL :
- strcat(buf, "EINVAL");
- break;
- case ENOMEM :
- strcat(buf, "ENOMEM");
- break;
- case ETXTBSY :
- strcat(buf, "ETXTBSY");
- break;
- case ENAMETOOLONG:
- strcat(buf, "ENAMETOOLONG");
- break;
- case EBADF:
- strcat(buf, "EBADF");
- break;
- case ESPIPE:
- strcat(buf, "ESPIPE");
- break;
- case ESTALE:
- strcat(buf, "ESTALE");
- break;
- default:
- strcat(buf, "EOTHER");
- break;
- }
+ strcat(buf, strerror(errno));
+
strcat(buf, "\" ");
strcat(buf, " flags: ");
switch(used_flags & 3){
@@ -1421,7 +389,7 @@ void printErrorAndFlags(Uint32 used_flag
NdbOut&
operator<<(NdbOut& out, const Request& req)
{
- out << "[ Request: file: " << hex << req.file
+ out << "[ Request: file: " << hex << req.file
<< " userRef: " << hex << req.theUserReference
<< " userData: " << dec << req.theUserPointer
<< " theFilePointer: " << req.theFilePointer
@@ -1436,19 +404,19 @@ operator<<(NdbOut& out, const Request& r
case Request::closeRemove:
out << "closeRemove";
break;
- case Request::read: // Allways leave readv directly after
+ case Request::read: // Allways leave readv directly after
out << "read";
break;
case Request::readv:
out << "readv";
break;
- case Request::write:// Allways leave writev directly after
+ case Request::write:// Allways leave writev directly after
out << "write";
break;
case Request::writev:
out << "writev";
break;
- case Request::writeSync:// Allways leave writevSync directly after
+ case Request::writeSync:// Allways leave writevSync directly after
out << "writeSync";
break;
// writeSync because SimblockAsyncFileSystem depends on it
Index: telco-6.2/storage/ndb/src/kernel/blocks/ndbfs/AsyncFile.hpp
===================================================================
--- telco-6.2.orig/storage/ndb/src/kernel/blocks/ndbfs/AsyncFile.hpp 2007-10-24
16:03:20.312559583 +1000
+++ telco-6.2/storage/ndb/src/kernel/blocks/ndbfs/AsyncFile.hpp 2007-10-24
16:40:02.920590018 +1000
@@ -16,51 +16,13 @@
#ifndef AsyncFile_H
#define AsyncFile_H
-//===========================================================================
-//
-// .DESCRIPTION
-// Asynchronous file, All actions are executed concurrently with other
-// activity of the process.
-// Because all action are performed in a seperated thread the result of
-// of a action is send back tru a memory channel.
-// For the asyncronise notivication of a finished request all the calls
-// have a request as paramater, the user can use the userData pointer
-// to add information it needs when the request is send back.
-//
-//
-// .TYPICAL USE:
-// Writing or reading data to/from disk concurrently to other activities.
-//
-//===========================================================================
-//=============================================================================
-//
-// .PUBLIC
-//
-//=============================================================================
-///////////////////////////////////////////////////////////////////////////////
-//
-// AsyncFile( );
-// Description:
-// Initialisation of the class.
-// Parameters:
-// -
-///////////////////////////////////////////////////////////////////////////////
-//
-// ~AsyncFile( );
-// Description:
-// Tell the thread to stop and wait for it to return
-// Parameters:
-// -
-///////////////////////////////////////////////////////////////////////////////
-//
-// doStart( );
-// Description:
-// Spawns the new thread.
-// Parameters:
-// Base path of filesystem
-//
-///////////////////////////////////////////////////////////////////////////////
-//
+/**
+ AsyncFile
+
+ All file operations executed in thread-per-file, away from the DB threads.
+ */
+
+
// void execute(Request *request);
// Description:
// performens the requered action.
@@ -79,9 +41,9 @@
// return values are stored in the request error field:
// error= return state of the action, UNIX error see man open/errno
// userData= is untouched can be used be user.
-//
-///////////////////////////////////////////////////////////////////////////////
-//
+
+
+
// void reportTo( MemoryChannel<Request> *reportTo );
// Description:
// set the channel where the file must report the result of the
@@ -90,14 +52,26 @@
// reportTo: the memory channel to use use MemoryChannelMultipleWriter
// if more
// than one file uses this channel to report back.
-//
-///////////////////////////////////////////////////////////////////////////////
#include <kernel_types.h>
#include "MemoryChannel.hpp"
#include "Filename.hpp"
-#include <azlib.h>
+// Use this define if you want printouts from AsyncFile class
+#define DEBUG_ASYNCFILE
+
+#ifdef DEBUG_ASYNCFILE
+#include <NdbOut.hpp>
+#define DEBUG(x) x
+#define PRINT_ERRORANDFLAGS(f) printErrorAndFlags(f)
+void printErrorAndFlags(Uint32 used_flags);
+#else
+#define DEBUG(x)
+#define PRINT_ERRORANDFLAGS(f)
+#endif
+
+// Define the size of the write buffer (for each thread)
+#define WRITEBUFFERSIZE 262144
const int ERR_ReadUnderflow = 1000;
@@ -185,71 +159,101 @@ class AsyncFile
friend class Ndbfs;
public:
AsyncFile(SimulatedBlock& fs);
- ~AsyncFile();
-
+
void reportTo( MemoryChannel<Request> *reportTo );
-
+
void execute( Request* request );
-
- void doStart();
+
+ virtual void doStart();
+
+ virtual void shutdown();
+
// its a thread so its always running
- void run();
+ virtual void run();
- bool isOpen();
+ virtual bool isOpen() = 0;
Filename theFileName;
Request *m_current_request, *m_last_request;
private:
-
- void openReq(Request *request);
- void readReq(Request *request);
- void readvReq(Request *request);
- void writeReq(Request *request);
- void writevReq(Request *request);
-
- void closeReq(Request *request);
- void syncReq(Request *request);
- void removeReq(Request *request);
- void appendReq(Request *request);
- void rmrfReq(Request *request, char * path, bool removePath);
- void endReq();
-
- int readBuffer(Request*, char * buf, size_t size, off_t offset);
- int writeBuffer(const char * buf, size_t size, off_t offset,
- size_t chunk_size = WRITECHUNK);
-
- int extendfile(Request* request);
- void createDirectories();
-
-#ifdef NDB_WIN32
- HANDLE hFile;
-#else
- int theFd;
-#endif
- Uint32 m_open_flags; // OM_ flags from request to open file
+ /**
+ * Implementers of AsyncFile interface
+ * should implement the following
+ */
+
+ /**
+ * init()
+ *
+ * Initialise buffers etc. After init(), ready to execute()
+ * Called with theStartMutexPtr held.
+ */
+ virtual int init();
+
+ /**
+ * openReq() - open a file.
+ */
+ virtual void openReq(Request *request) = 0;
+
+ /**
+ * readBuffer - read into buffer
+ */
+ virtual int readBuffer(Request*, char * buf, size_t size, off_t offset)=0;
+
+ /**
+ * writeBuffer() - write into file
+ */
+ virtual int writeBuffer(const char * buf, size_t size, off_t offset,
+ size_t chunk_size = WRITECHUNK)=0;
+
+
+ virtual void closeReq(Request *request)=0;
+ virtual void syncReq(Request *request)=0;
+ virtual void removeReq(Request *request)=0;
+ virtual void appendReq(Request *request)=0;
+ virtual void rmrfReq(Request *request, char * path, bool removePath)=0;
+ virtual void createDirectories()=0;
+
+ /**
+ * Unlikely to need to implement these. readvReq for iovec
+ */
+ virtual void readReq(Request *request);
+ virtual void readvReq(Request *request);
+
+ /**
+ * Unlikely to need to implement these, writeBuffer likely sufficient.
+ * writevReq for iovec (not yet used)
+ */
+ virtual void writeReq(Request *request);
+ virtual void writevReq(Request *request);
+
+ /**
+ * endReq()
+ *
+ * Inverse to ::init(). Cleans up thread before it exits.
+ */
+ virtual void endReq();
- int use_gz;
- azio_stream azf;
- struct az_alloc_rec az_mempool;
+private:
+ /**
+ * (end of what implementors need)
+ */
MemoryChannel<Request> *theReportTo;
MemoryChannel<Request>* theMemoryChannelPtr;
-
+
struct NdbThread* theThreadPtr;
NdbMutex* theStartMutexPtr;
NdbCondition* theStartConditionPtr;
bool theStartFlag;
+
+protected:
int theWriteBufferSize;
char* theWriteBuffer;
- void* theWriteBufferUnaligned;
- void* azfBufferUnaligned;
-
+
size_t m_write_wo_sync; // Writes wo/ sync
size_t m_auto_sync_freq; // Auto sync freq in bytes
- int check_odirect_read(Uint32 flags, int&new_flags, int mode);
- int check_odirect_write(Uint32 flags, int&new_flags, int mode);
public:
SimulatedBlock& m_fs;
Ptr<GlobalPage> m_page_ptr;
Index: telco-6.2/storage/ndb/src/kernel/blocks/ndbfs/PosixAsyncFile.cpp
===================================================================
--- /dev/null 1970-01-01 00:00:00.000000000 +0000
+++ telco-6.2/storage/ndb/src/kernel/blocks/ndbfs/PosixAsyncFile.cpp 2007-11-09
11:33:59.859245082 +1100
@@ -0,0 +1,767 @@
+/* Copyright (C) 2003 MySQL AB
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; version 2 of the License.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
+
+#include <ndb_global.h>
+#include <my_sys.h>
+#include <my_pthread.h>
+
+#ifdef HAVE_XFS_XFS_H
+#include <xfs/xfs.h>
+#endif
+
+#include "AsyncFile.hpp"
+#include "PosixAsyncFile.hpp"
+
+#include <ErrorHandlingMacros.hpp>
+#include <kernel_types.h>
+#include <ndbd_malloc.hpp>
+#include <NdbThread.h>
+#include <signaldata/FsRef.hpp>
+#include <signaldata/FsOpenReq.hpp>
+#include <signaldata/FsReadWriteReq.hpp>
+
+// use this to test broken pread code
+//#define HAVE_BROKEN_PREAD
+
+#ifdef HAVE_BROKEN_PREAD
+#undef HAVE_PWRITE
+#undef HAVE_PREAD
+#endif
+
+// For readv and writev
+#include <sys/uio.h>
+
+#include <dirent.h>
+
+PosixAsyncFile::PosixAsyncFile(SimulatedBlock& fs) :
+ AsyncFile(fs),
+ theFd(-1),
+ use_gz(0)
+{
+ memset(&azf,0,sizeof(azf));
+}
+
+int PosixAsyncFile::init()
+{
+ // Create write buffer for bigger writes
+ theWriteBufferSize = WRITEBUFFERSIZE;
+ theWriteBufferUnaligned = (char *) ndbd_malloc(theWriteBufferSize +
+ NDB_O_DIRECT_WRITE_ALIGNMENT-1);
+ theWriteBuffer = (char *)
+ (((UintPtr)theWriteBufferUnaligned + NDB_O_DIRECT_WRITE_ALIGNMENT - 1) &
+ ~(UintPtr)(NDB_O_DIRECT_WRITE_ALIGNMENT - 1));
+
+ azfBufferUnaligned= (Byte*)ndbd_malloc((AZ_BUFSIZE_READ+AZ_BUFSIZE_WRITE)
+ +NDB_O_DIRECT_WRITE_ALIGNMENT-1);
+
+ azf.inbuf= (Byte*)(((UintPtr)azfBufferUnaligned
+ + NDB_O_DIRECT_WRITE_ALIGNMENT - 1) &
+ ~(UintPtr)(NDB_O_DIRECT_WRITE_ALIGNMENT - 1));
+
+ azf.outbuf= azf.inbuf + AZ_BUFSIZE_READ;
+
+ az_mempool.size = az_mempool.mfree = az_inflate_mem_size()+az_deflate_mem_size();
+
+ ndbout_c("NDBFS/AsyncFile: Allocating %d for In/Deflate buffer",az_mempool.size);
+ az_mempool.mem = (char*) ndbd_malloc(az_mempool.size);
+
+ azf.stream.opaque= &az_mempool;
+
+ if (!theWriteBuffer) {
+ DEBUG(ndbout_c("AsyncFile::writeReq, Failed allocating write buffer"));
+ return -1;
+ }//if
+
+ return 0;
+}//AsyncFile::init()
+
+#ifdef O_DIRECT
+static char g_odirect_readbuf[2*GLOBAL_PAGE_SIZE -1];
+#endif
+
+int PosixAsyncFile::check_odirect_write(Uint32 flags, int& new_flags, int mode)
+{
+ assert(new_flags & (O_CREAT | O_TRUNC));
+#ifdef O_DIRECT
+ int ret;
+ char * bufptr = (char*)((UintPtr(g_odirect_readbuf)+(GLOBAL_PAGE_SIZE - 1)) &
~(GLOBAL_PAGE_SIZE - 1));
+ while (((ret = ::write(theFd, bufptr, GLOBAL_PAGE_SIZE)) == -1) &&
+ (errno == EINTR));
+ if (ret == -1)
+ {
+ new_flags &= ~O_DIRECT;
+ ndbout_c("%s Failed to write using O_DIRECT, disabling",
+ theFileName.c_str());
+ }
+
+ close(theFd);
+ theFd = ::open(theFileName.c_str(), new_flags, mode);
+ if (theFd == -1)
+ return errno;
+#endif
+
+ return 0;
+}
+
+int PosixAsyncFile::check_odirect_read(Uint32 flags, int &new_flags, int mode)
+{
+#ifdef O_DIRECT
+ int ret;
+ char * bufptr = (char*)((UintPtr(g_odirect_readbuf)+(GLOBAL_PAGE_SIZE - 1)) &
~(GLOBAL_PAGE_SIZE - 1));
+ while (((ret = ::read(theFd, bufptr, GLOBAL_PAGE_SIZE)) == -1) &&
+ (errno == EINTR));
+ if (ret == -1)
+ {
+ ndbout_c("%s Failed to read using O_DIRECT, disabling",
+ theFileName.c_str());
+ goto reopen;
+ }
+
+ if(lseek(theFd, 0, SEEK_SET) != 0)
+ {
+ return errno;
+ }
+
+ if ((flags & FsOpenReq::OM_CHECK_SIZE) == 0)
+ {
+ struct stat buf;
+ if ((fstat(theFd, &buf) == -1))
+ {
+ return errno;
+ }
+ else if ((buf.st_size % GLOBAL_PAGE_SIZE) != 0)
+ {
+ ndbout_c("%s filesize not a multiple of %d, disabling O_DIRECT",
+ theFileName.c_str(), GLOBAL_PAGE_SIZE);
+ goto reopen;
+ }
+ }
+
+ return 0;
+
+reopen:
+ close(theFd);
+ new_flags &= ~O_DIRECT;
+ theFd = ::open(theFileName.c_str(), new_flags, mode);
+ if (theFd == -1)
+ return errno;
+#endif
+ return 0;
+}
+
+void PosixAsyncFile::openReq(Request *request)
+{
+ m_auto_sync_freq = 0;
+ m_write_wo_sync = 0;
+ m_open_flags = request->par.open.flags;
+
+ // for open.flags, see signal FSOPENREQ
+ Uint32 flags = request->par.open.flags;
+ int new_flags = 0;
+
+ // Convert file open flags from Solaris to Liux
+ if (flags & FsOpenReq::OM_CREATE)
+ {
+ new_flags |= O_CREAT;
+ }
+
+ if (flags & FsOpenReq::OM_TRUNCATE){
+ new_flags |= O_TRUNC;
+ }
+
+ if (flags & FsOpenReq::OM_AUTOSYNC)
+ {
+ m_auto_sync_freq = request->par.open.auto_sync_size;
+ }
+
+ if (flags & FsOpenReq::OM_APPEND){
+ new_flags |= O_APPEND;
+ }
+
+ if (flags & FsOpenReq::OM_DIRECT)
+#ifdef O_DIRECT
+ {
+ new_flags |= O_DIRECT;
+ }
+#endif
+
+ if ((flags & FsOpenReq::OM_SYNC) && ! (flags & FsOpenReq::OM_INIT))
+ {
+#ifdef O_SYNC
+ new_flags |= O_SYNC;
+#endif
+ }
+
+ const char * rw = "";
+ switch(flags & 0x3){
+ case FsOpenReq::OM_READONLY:
+ rw = "r";
+ new_flags |= O_RDONLY;
+ break;
+ case FsOpenReq::OM_WRITEONLY:
+ rw = "w";
+ new_flags |= O_WRONLY;
+ break;
+ case FsOpenReq::OM_READWRITE:
+ rw = "rw";
+ new_flags |= O_RDWR;
+ break;
+ default:
+ request->error = 1000;
+ break;
+ return;
+ }
+ if(flags & FsOpenReq::OM_GZ)
+ use_gz= 1;
+
+ // allow for user to choose any permissionsa with umask
+ const int mode = S_IRUSR | S_IWUSR |
+ S_IRGRP | S_IWGRP |
+ S_IROTH | S_IWOTH;
+ if (flags & FsOpenReq::OM_CREATE_IF_NONE)
+ {
+ Uint32 tmp_flags = new_flags;
+#ifdef O_DIRECT
+ tmp_flags &= ~O_DIRECT;
+#endif
+ if ((theFd = ::open(theFileName.c_str(), tmp_flags, mode)) != -1)
+ {
+ close(theFd);
+ request->error = FsRef::fsErrFileExists;
+ return;
+ }
+ new_flags |= O_CREAT;
+ }
+
+no_odirect:
+ theFd = ::open(theFileName.c_str(), new_flags, mode);
+ if (-1 == theFd)
+ {
+ ndbout_c("1 ERROR opening %s",theFileName.c_str());
+ PRINT_ERRORANDFLAGS(new_flags);
+ if ((errno == ENOENT) && (new_flags & O_CREAT))
+ {
+ createDirectories();
+ theFd = ::open(theFileName.c_str(), new_flags, mode);
+ if (-1 == theFd)
+ {
+#ifdef O_DIRECT
+ if (new_flags & O_DIRECT)
+ {
+ new_flags &= ~O_DIRECT;
+ goto no_odirect;
+ }
+#endif
+ ndbout_c("2 ERROR opening %s",theFileName.c_str());
+ PRINT_ERRORANDFLAGS(new_flags);
+ request->error = errno;
+ return;
+ }
+ }
+#ifdef O_DIRECT
+ else if (new_flags & O_DIRECT)
+ {
+ new_flags &= ~O_DIRECT;
+ goto no_odirect;
+ }
+#endif
+ else
+ {
+ request->error = errno;
+ return;
+ }
+ }
+
+ if (flags & FsOpenReq::OM_CHECK_SIZE)
+ {
+ struct stat buf;
+ if ((fstat(theFd, &buf) == -1))
+ {
+ request->error = errno;
+ }
+ else if((Uint64)buf.st_size != request->par.open.file_size)
+ {
+ request->error = FsRef::fsErrInvalidFileSize;
+ }
+ if (request->error)
+ return;
+ }
+
+ if (flags & FsOpenReq::OM_INIT)
+ {
+ off_t off = 0;
+ const off_t sz = request->par.open.file_size;
+ Uint32 tmp[sizeof(SignalHeader)+25];
+ Signal * signal = (Signal*)(&tmp[0]);
+ FsReadWriteReq* req = (FsReadWriteReq*)signal->getDataPtrSend();
+
+ Uint32 index = 0;
+ Uint32 block = refToBlock(request->theUserReference);
+
+#ifdef HAVE_XFS_XFS_H
+ if(platform_test_xfs_fd(theFd))
+ {
+ ndbout_c("Using xfsctl(XFS_IOC_RESVSP64) to allocate disk space");
+ xfs_flock64_t fl;
+ fl.l_whence= 0;
+ fl.l_start= 0;
+ fl.l_len= (off64_t)sz;
+ if(xfsctl(NULL, theFd, XFS_IOC_RESVSP64, &fl) < 0)
+ ndbout_c("failed to optimally allocate disk space");
+ }
+#endif
+#ifdef HAVE_POSIX_FALLOCATE
+ posix_fallocate(theFd, 0, sz);
+#endif
+
+ while(off < sz)
+ {
+ req->filePointer = 0; // DATA 0
+ req->userPointer = request->theUserPointer; // DATA 2
+ req->numberOfPages = 1; // DATA 5
+ req->varIndex = index++;
+ req->data.pageData[0] = m_page_ptr.i;
+
+ m_fs.EXECUTE_DIRECT(block, GSN_FSWRITEREQ, signal,
+ FsReadWriteReq::FixedLength + 1);
+ retry:
+ Uint32 size = request->par.open.page_size;
+ char* buf = (char*)m_page_ptr.p;
+ while(size > 0){
+ int n;
+ if(use_gz)
+ n= azwrite(&azf,buf,size);
+ else
+ n= write(theFd, buf, size);
+ if(n == -1 && errno == EINTR)
+ {
+ continue;
+ }
+ if(n == -1 || n == 0)
+ {
+ ndbout_c("azwrite|write returned %d: errno: %d my_errno: %d",n,errno,my_errno);
+ break;
+ }
+ size -= n;
+ buf += n;
+ }
+ if(size != 0)
+ {
+ int err = errno;
+#ifdef O_DIRECT
+ if ((new_flags & O_DIRECT) && off == 0)
+ {
+ ndbout_c("error on first write(%d), disable O_DIRECT", err);
+ new_flags &= ~O_DIRECT;
+ close(theFd);
+ theFd = ::open(theFileName.c_str(), new_flags, mode);
+ if (theFd != -1)
+ goto retry;
+ }
+#endif
+ close(theFd);
+ unlink(theFileName.c_str());
+ request->error = err;
+ return;
+ }
+ off += request->par.open.page_size;
+ }
+ if(lseek(theFd, 0, SEEK_SET) != 0)
+ request->error = errno;
+ }
+ else if (flags & FsOpenReq::OM_DIRECT)
+ {
+#ifdef O_DIRECT
+ if (flags & (FsOpenReq::OM_TRUNCATE | FsOpenReq::OM_CREATE))
+ {
+ request->error = check_odirect_write(flags, new_flags, mode);
+ }
+ else
+ {
+ request->error = check_odirect_read(flags, new_flags, mode);
+ }
+
+ if (request->error)
+ return;
+#endif
+ }
+#ifdef VM_TRACE
+ if (flags & FsOpenReq::OM_DIRECT)
+ {
+#ifdef O_DIRECT
+ ndbout_c("%s %s O_DIRECT: %d",
+ theFileName.c_str(), rw,
+ !!(new_flags & O_DIRECT));
+#else
+ ndbout_c("%s %s O_DIRECT: 0",
+ theFileName.c_str(), rw);
+#endif
+ }
+#endif
+ if ((flags & FsOpenReq::OM_SYNC) && (flags & FsOpenReq::OM_INIT))
+ {
+#ifdef O_SYNC
+ /**
+ * reopen file with O_SYNC
+ */
+ close(theFd);
+ new_flags &= ~(O_CREAT | O_TRUNC);
+ new_flags |= O_SYNC;
+ theFd = ::open(theFileName.c_str(), new_flags, mode);
+ if (theFd == -1)
+ {
+ request->error = errno;
+ }
+#endif
+ }
+ if(use_gz)
+ if(azdopen(&azf, theFd, new_flags) < 1)
+ {
+ ndbout_c("Stewart's brain broke");
+ abort();
+ }
+}
+
+int PosixAsyncFile::readBuffer(Request *req, char *buf,
+ size_t size, off_t offset)
+{
+ int return_value;
+ req->par.readWrite.pages[0].size = 0;
+#if ! defined(HAVE_PREAD)
+ off_t seek_val;
+ if(!use_gz)
+ {
+ while((seek_val= lseek(theFd, offset, SEEK_SET)) == (off_t)-1
+ && errno == EINTR);
+ if(seek_val == (off_t)-1)
+ {
+ return errno;
+ }
+ }
+#endif
+ off_t seek_val;
+ if(use_gz)
+ {
+ while((seek_val= azseek(&azf, offset, SEEK_SET)) == (off_t)-1
+ && errno == EINTR);
+ if(seek_val == (off_t)-1)
+ {
+ return errno;
+ }
+ }
+
+ int error;
+
+ while (size > 0) {
+ size_t bytes_read = 0;
+
+#if ! defined(HAVE_PREAD)
+ if(use_gz)
+ return_value = azread(&azf, buf, size, &error);
+ else
+ return_value = ::read(theFd, buf, size);
+#else // UNIX
+ if(!use_gz)
+ return_value = ::pread(theFd, buf, size, offset);
+ else
+ return_value = azread(&azf, buf, size, &error);
+#endif
+ if (return_value == -1 && errno == EINTR) {
+ DEBUG(ndbout_c("EINTR in read"));
+ continue;
+ } else if (return_value == -1){
+ return errno;
+ } else {
+ bytes_read = return_value;
+ }
+
+ req->par.readWrite.pages[0].size += bytes_read;
+ if(bytes_read == 0){
+ if(req->action == Request::readPartial)
+ {
+ return 0;
+ }
+ DEBUG(ndbout_c("Read underflow %d %d\n %x\n%d %d",
+ size, offset, buf, bytes_read, return_value));
+ return ERR_ReadUnderflow;
+ }
+
+ if(bytes_read != size){
+ DEBUG(ndbout_c("Warning partial read %d != %d on %s",
+ bytes_read, size, theFileName.c_str()));
+ }
+
+ buf += bytes_read;
+ size -= bytes_read;
+ offset += bytes_read;
+ }
+ return 0;
+}
+
+void PosixAsyncFile::readvReq(Request *request)
+{
+#if ! defined(HAVE_PREAD)
+ readReq(request);
+ return;
+#else
+ int return_value;
+ int length = 0;
+ struct iovec iov[20]; // the parameter in the signal restricts this to 20 deep
+ for(int i=0; i < request->par.readWrite.numberOfPages ; i++) {
+ iov[i].iov_base= request->par.readWrite.pages[i].buf;
+ iov[i].iov_len= request->par.readWrite.pages[i].size;
+ length = length + iov[i].iov_len;
+ }
+ lseek( theFd, request->par.readWrite.pages[0].offset, SEEK_SET );
+ return_value = ::readv(theFd, iov, request->par.readWrite.numberOfPages);
+ if (return_value == -1) {
+ request->error = errno;
+ return;
+ } else if (return_value != length) {
+ request->error = 1011;
+ return;
+ }
+#endif
+}
+
+int PosixAsyncFile::writeBuffer(const char *buf, size_t size, off_t offset,
+ size_t chunk_size)
+{
+ size_t bytes_to_write = chunk_size;
+ int return_value;
+
+ m_write_wo_sync += size;
+
+#if ! defined(HAVE_PWRITE)
+ off_t seek_val;
+ while((seek_val= lseek(theFd, offset, SEEK_SET)) == (off_t)-1
+ && errno == EINTR);
+ if(seek_val == (off_t)-1)
+ {
+ return errno;
+ }
+#endif
+
+ while (size > 0) {
+ if (size < bytes_to_write){
+ // We are at the last chunk
+ bytes_to_write = size;
+ }
+ size_t bytes_written = 0;
+
+#if ! defined(HAVE_PWRITE)
+ if(use_gz)
+ return_value= azwrite(&azf, buf, bytes_to_write);
+ else
+ return_value = ::write(theFd, buf, bytes_to_write);
+#else // UNIX
+ if(use_gz)
+ return_value= azwrite(&azf, buf, bytes_to_write);
+ else
+ return_value = ::pwrite(theFd, buf, bytes_to_write, offset);
+#endif
+ if (return_value == -1 && errno == EINTR) {
+ bytes_written = 0;
+ DEBUG(ndbout_c("EINTR in write"));
+ } else if (return_value == -1){
+ if(use_gz)
+ return my_errno;
+ return errno;
+ } else {
+ bytes_written = return_value;
+
+ if(bytes_written == 0){
+ DEBUG(ndbout_c("no bytes written"));
+ abort();
+ }
+
+ if(bytes_written != bytes_to_write){
+ DEBUG(ndbout_c("Warning partial write %d != %d",
+ bytes_written, bytes_to_write));
+ }
+ }
+
+ buf += bytes_written;
+ size -= bytes_written;
+ offset += bytes_written;
+ }
+ return 0;
+}
+
+void PosixAsyncFile::closeReq(Request *request)
+{
+ if (m_open_flags & (
+ FsOpenReq::OM_WRITEONLY |
+ FsOpenReq::OM_READWRITE |
+ FsOpenReq::OM_APPEND )) {
+ syncReq(request);
+ }
+ int r;
+ if(use_gz)
+ r= azclose(&azf);
+ else
+ ::close(theFd);
+ use_gz= 0;
+ Byte *a,*b;
+ a= azf.inbuf;
+ b= azf.outbuf;
+ memset(&azf,0,sizeof(azf));
+ azf.inbuf= a;
+ azf.outbuf= b;
+ azf.stream.opaque = (void*)&az_mempool;
+
+ if (-1 == r) {
+#ifndef DBUG_OFF
+ if (theFd == -1) {
+ DEBUG(ndbout_c("close on fd = -1"));
+ abort();
+ }
+#endif
+ request->error = errno;
+ }
+ theFd = -1;
+}
+
+bool PosixAsyncFile::isOpen(){
+ return (theFd != -1);
+}
+
+
+void PosixAsyncFile::syncReq(Request *request)
+{
+ if(m_auto_sync_freq && m_write_wo_sync == 0){
+ return;
+ }
+ if (-1 == ::fsync(theFd)){
+ request->error = errno;
+ return;
+ }
+ m_write_wo_sync = 0;
+}
+
+void PosixAsyncFile::appendReq(Request *request)
+{
+ const char * buf = request->par.append.buf;
+ Uint32 size = request->par.append.size;
+
+ m_write_wo_sync += size;
+
+ while(size > 0){
+ int n;
+ if(use_gz)
+ n= azwrite(&azf,buf,size);
+ else
+ n= write(theFd, buf, size);
+ if(n == -1 && errno == EINTR){
+ continue;
+ }
+ if(n == -1){
+ if(use_gz)
+ request->error = my_errno;
+ else
+ request->error = errno;
+ return;
+ }
+ if(n == 0){
+ DEBUG(ndbout_c("append with n=0"));
+ abort();
+ }
+ size -= n;
+ buf += n;
+ }
+
+ if(m_auto_sync_freq && m_write_wo_sync > m_auto_sync_freq){
+ syncReq(request);
+ }
+}
+
+void PosixAsyncFile::removeReq(Request *request)
+{
+ if (-1 == ::remove(theFileName.c_str())) {
+ request->error = errno;
+
+ }
+}
+
+void PosixAsyncFile::rmrfReq(Request *request, char *path, bool removePath)
+{
+ Uint32 path_len = strlen(path);
+ Uint32 path_max_copy = PATH_MAX - path_len;
+ char* path_add = &path[path_len];
+
+ if(!request->par.rmrf.directory){
+ // Remove file
+ if(unlink((const char *)path) != 0 && errno != ENOENT)
+ request->error = errno;
+ return;
+ }
+ // Remove directory
+ DIR* dirp = opendir((const char *)path);
+ if(dirp == 0){
+ if(errno != ENOENT)
+ request->error = errno;
+ return;
+ }
+ struct dirent * dp;
+ while ((dp = readdir(dirp)) != NULL){
+ if ((strcmp(".", dp->d_name) != 0) && (strcmp("..", dp->d_name) != 0))
{
+ BaseString::snprintf(path_add, (size_t)path_max_copy, "%s%s",
+ DIR_SEPARATOR, dp->d_name);
+ if(remove((const char*)path) == 0){
+ path[path_len] = 0;
+ continue;
+ }
+
+ rmrfReq(request, path, true);
+ path[path_len] = 0;
+ if(request->error != 0){
+ closedir(dirp);
+ return;
+ }
+ }
+ }
+ closedir(dirp);
+ if(removePath && rmdir((const char *)path) != 0){
+ request->error = errno;
+ }
+ return;
+}
+
+void PosixAsyncFile::endReq()
+{
+ // Thread is ended with return
+ if (theWriteBufferUnaligned)
+ ndbd_free(theWriteBufferUnaligned, theWriteBufferSize);
+
+ if (azfBufferUnaligned)
+ ndbd_free(azfBufferUnaligned, (AZ_BUFSIZE_READ*AZ_BUFSIZE_WRITE)
+ +NDB_O_DIRECT_WRITE_ALIGNMENT-1);
+}
+
+
+void PosixAsyncFile::createDirectories()
+{
+ char* tmp;
+ const char * name = theFileName.c_str();
+ const char * base = theFileName.get_base_name();
+ while((tmp = (char *)strstr(base, DIR_SEPARATOR)))
+ {
+ char t = tmp[0];
+ tmp[0] = 0;
+ mkdir(name, S_IRUSR | S_IWUSR | S_IXUSR | S_IXGRP | S_IRGRP);
+ tmp[0] = t;
+ base = tmp + sizeof(DIR_SEPARATOR);
+ }
+}
Index: telco-6.2/storage/ndb/src/kernel/blocks/ndbfs/PosixAsyncFile.hpp
===================================================================
--- /dev/null 1970-01-01 00:00:00.000000000 +0000
+++ telco-6.2/storage/ndb/src/kernel/blocks/ndbfs/PosixAsyncFile.hpp 2007-10-24
16:40:02.920590018 +1000
@@ -0,0 +1,72 @@
+/* Copyright (C) 2003 MySQL AB
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; version 2 of the License.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
+
+#ifndef PosixAsyncFile_H
+#define PosixAsyncFile_H
+
+/**
+ * POSIX implementation of AsyncFile
+ *
+ * Also does direct IO, preallocation.
+ */
+
+#include <azlib.h>
+
+class PosixAsyncFile : public AsyncFile
+{
+ friend class Ndbfs;
+public:
+ PosixAsyncFile(SimulatedBlock& fs);
+
+ int init();
+
+ bool isOpen();
+
+ virtual void openReq(Request *request);
+ virtual void readvReq(Request *request);
+
+ virtual void closeReq(Request *request);
+ virtual void syncReq(Request *request);
+ virtual void removeReq(Request *request);
+ virtual void appendReq(Request *request);
+ virtual void rmrfReq(Request *request, char * path, bool removePath);
+ void endReq();
+
+ virtual int readBuffer(Request*, char * buf, size_t size, off_t offset);
+ virtual int writeBuffer(const char * buf, size_t size, off_t offset,
+ size_t chunk_size = WRITECHUNK);
+
+ virtual void createDirectories();
+
+private:
+ int theFd;
+
+ Uint32 m_open_flags; // OM_ flags from request to open file
+
+ int use_gz;
+ azio_stream azf;
+ struct az_alloc_rec az_mempool;
+
+ void* theWriteBufferUnaligned;
+ void* azfBufferUnaligned;
+
+ size_t m_write_wo_sync; // Writes wo/ sync
+ size_t m_auto_sync_freq; // Auto sync freq in bytes
+
+ int check_odirect_read(Uint32 flags, int&new_flags, int mode);
+ int check_odirect_write(Uint32 flags, int&new_flags, int mode);
+};
+
+#endif
Index: telco-6.2/storage/ndb/src/kernel/blocks/ndbfs/Win32AsyncFile.cpp
===================================================================
--- /dev/null 1970-01-01 00:00:00.000000000 +0000
+++ telco-6.2/storage/ndb/src/kernel/blocks/ndbfs/Win32AsyncFile.cpp 2007-10-24
16:40:02.920590018 +1000
@@ -0,0 +1,334 @@
+/* Copyright (C) 2003 MySQL AB
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; version 2 of the License.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
+
+#include <ndb_global.h>
+#include <my_sys.h>
+#include <my_pthread.h>
+
+#include "Win32AsyncFile.hpp"
+
+#include <ErrorHandlingMacros.hpp>
+#include <kernel_types.h>
+#include <ndbd_malloc.hpp>
+#include <NdbThread.h>
+#include <signaldata/FsRef.hpp>
+#include <signaldata/FsOpenReq.hpp>
+#include <signaldata/FsReadWriteReq.hpp>
+
+Win32AsyncFile::Win32AsyncFile(SimulatedBlock& fs) :
+ hFile(INVALID_HANDLE_VALUE),
+{
+}
+
+Win32AsyncFile::~Win32AsyncFile()
+{
+
+}
+
+void Win32AsyncFile::openReq(Request* request)
+{
+ m_auto_sync_freq = 0;
+ m_write_wo_sync = 0;
+ m_open_flags = request->par.open.flags;
+
+ // for open.flags, see signal FSOPENREQ
+ DWORD dwCreationDisposition;
+ DWORD dwDesiredAccess = 0;
+ DWORD dwShareMode = FILE_SHARE_READ | FILE_SHARE_WRITE;
+ DWORD dwFlagsAndAttributes = FILE_ATTRIBUTE_NORMAL | FILE_FLAG_RANDOM_ACCESS |
FILE_FLAG_NO_BUFFERING;
+ Uint32 flags = request->par.open.flags;
+
+ // Convert file open flags from Solaris to Windows
+ if ((flags & FsOpenReq::OM_CREATE) && (flags &
FsOpenReq::OM_TRUNCATE)){
+ dwCreationDisposition = CREATE_ALWAYS;
+ } else if (flags & FsOpenReq::OM_TRUNCATE){
+ dwCreationDisposition = TRUNCATE_EXISTING;
+ } else if (flags & FsOpenReq::OM_CREATE){
+ dwCreationDisposition = CREATE_NEW;
+ } else {
+ dwCreationDisposition = OPEN_EXISTING;
+ }
+
+ switch(flags & 3){
+ case FsOpenReq::OM_READONLY:
+ dwDesiredAccess = GENERIC_READ;
+ break;
+ case FsOpenReq::OM_WRITEONLY:
+ dwDesiredAccess = GENERIC_WRITE;
+ break;
+ case FsOpenReq::OM_READWRITE:
+ dwDesiredAccess = GENERIC_READ | GENERIC_WRITE;
+ break;
+ default:
+ request->error = 1000;
+ break;
+ return;
+ }
+
+ hFile = CreateFile(theFileName.c_str(), dwDesiredAccess, dwShareMode,
+ 0, dwCreationDisposition, dwFlagsAndAttributes, 0);
+
+ if(INVALID_HANDLE_VALUE == hFile) {
+ request->error = GetLastError();
+ if(((ERROR_PATH_NOT_FOUND == request->error) || (ERROR_INVALID_NAME ==
request->error))
+ && (flags & FsOpenReq::OM_CREATE)) {
+ createDirectories();
+ hFile = CreateFile(theFileName.c_str(), dwDesiredAccess, dwShareMode,
+ 0, dwCreationDisposition, dwFlagsAndAttributes, 0);
+
+ if(INVALID_HANDLE_VALUE == hFile)
+ request->error = GetLastError();
+ else
+ request->error = 0;
+
+ return;
+ }
+ }
+ else {
+ request->error = 0;
+ return;
+ }
+}
+
+int
+Win32AsyncFile::readBuffer(Request* req, char * buf, size_t size, off_t offset){
+ int return_value;
+ req->par.readWrite.pages[0].size = 0;
+
+ DWORD dwSFP = SetFilePointer(hFile, offset, 0, FILE_BEGIN);
+ if(dwSFP != offset) {
+ return GetLastError();
+ }
+
+ off_t seek_val;
+ if(use_gz)
+ {
+ while((seek_val= azseek(&azf, offset, SEEK_SET)) == (off_t)-1
+ && errno == EINTR);
+ if(seek_val == (off_t)-1)
+ {
+ return errno;
+ }
+ }
+
+ int error;
+
+ while (size > 0) {
+ size_t bytes_read = 0;
+
+ DWORD dwBytesRead;
+ BOOL bRead = ReadFile(hFile,
+ buf,
+ size,
+ &dwBytesRead,
+ 0);
+ if(!bRead){
+ return GetLastError();
+ }
+ bytes_read = dwBytesRead;
+
+ req->par.readWrite.pages[0].size += bytes_read;
+ if(bytes_read == 0){
+ if(req->action == Request::readPartial)
+ {
+ return 0;
+ }
+ DEBUG(ndbout_c("Read underflow %d %d\n %x\n%d %d",
+ size, offset, buf, bytes_read, return_value));
+ return ERR_ReadUnderflow;
+ }
+
+ if(bytes_read != size){
+ DEBUG(ndbout_c("Warning partial read %d != %d",
+ bytes_read, size));
+ }
+
+ buf += bytes_read;
+ size -= bytes_read;
+ offset += bytes_read;
+ }
+ return 0;
+}
+
+int
+Win32AsyncFile::writeBuffer(const char * buf, size_t size, off_t offset,
+ size_t chunk_size)
+{
+ size_t bytes_to_write = chunk_size;
+ int return_value;
+
+ m_write_wo_sync += size;
+
+ DWORD dwSFP = SetFilePointer(hFile, offset, 0, FILE_BEGIN);
+ if(dwSFP != offset) {
+ return GetLastError();
+ }
+
+ while (size > 0) {
+ if (size < bytes_to_write){
+ // We are at the last chunk
+ bytes_to_write = size;
+ }
+ size_t bytes_written = 0;
+
+ DWORD dwWritten;
+ BOOL bWrite = WriteFile(hFile, buf, bytes_to_write, &dwWritten, 0);
+ if(!bWrite) {
+ return GetLastError();
+ }
+ bytes_written = dwWritten;
+ if (bytes_written != bytes_to_write) {
+ DEBUG(ndbout_c("Warning partial write %d != %d", bytes_written, bytes_to_write));
+ }
+
+ buf += bytes_written;
+ size -= bytes_written;
+ offset += bytes_written;
+ }
+ return 0;
+}
+
+void
+Win32AsyncFile::closeReq(Request * request)
+{
+ if (m_open_flags & (
+ FsOpenReq::OM_WRITEONLY |
+ FsOpenReq::OM_READWRITE |
+ FsOpenReq::OM_APPEND )) {
+ syncReq(request);
+ }
+
+ if(!CloseHandle(hFile)) {
+ request->error = GetLastError();
+ }
+ hFile = INVALID_HANDLE_VALUE;
+}
+
+bool Win32AsyncFile::isOpen(){
+ return (hFile != INVALID_HANDLE_VALUE);
+}
+
+
+void
+Win32AsyncFile::syncReq(Request * request)
+{
+ if(m_auto_sync_freq && m_write_wo_sync == 0){
+ return;
+ }
+ if(!FlushFileBuffers(hFile)) {
+ request->error = GetLastError();
+ return;
+ }
+ m_write_wo_sync = 0;
+}
+
+void
+Win32AsyncFile::appendReq(Request * request){
+
+ const char * buf = request->par.append.buf;
+ Uint32 size = request->par.append.size;
+
+ m_write_wo_sync += size;
+
+ DWORD dwWritten = 0;
+ while(size > 0){
+ if(!WriteFile(hFile, buf, size, &dwWritten, 0)){
+ request->error = GetLastError();
+ return ;
+ }
+
+ buf += dwWritten;
+ size -= dwWritten;
+ }
+
+ if(m_auto_sync_freq && m_write_wo_sync > m_auto_sync_freq){
+ syncReq(request);
+ }
+}
+
+void
+Win32AsyncFile::removeReq(Request * request)
+{
+ if(!DeleteFile(theFileName.c_str())) {
+ request->error = GetLastError();
+ }
+}
+
+void
+Win32AsyncFile::rmrfReq(Request * request, char * path, bool removePath){
+ Uint32 path_len = strlen(path);
+ Uint32 path_max_copy = PATH_MAX - path_len;
+ char* path_add = &path[path_len];
+
+ if(!request->par.rmrf.directory){
+ // Remove file
+ if(!DeleteFile(path)){
+ DWORD dwError = GetLastError();
+ if(dwError!=ERROR_FILE_NOT_FOUND)
+ request->error = dwError;
+ }
+ return;
+ }
+
+ strcat(path, "\\*");
+ WIN32_FIND_DATA ffd;
+ HANDLE hFindFile = FindFirstFile(path, &ffd);
+ path[path_len] = 0;
+ if(INVALID_HANDLE_VALUE==hFindFile){
+ DWORD dwError = GetLastError();
+ if(dwError!=ERROR_PATH_NOT_FOUND)
+ request->error = dwError;
+ return;
+ }
+
+ do {
+ if(0!=strcmp(".", ffd.cFileName) && 0!=strcmp("..", ffd.cFileName)){
+ strcat(path, "\\");
+ strcat(path, ffd.cFileName);
+ if(DeleteFile(path)) {
+ path[path_len] = 0;
+ continue;
+ }//if
+
+ rmrfReq(request, path, true);
+ path[path_len] = 0;
+ if(request->error != 0){
+ FindClose(hFindFile);
+ return;
+ }
+ }
+ } while(FindNextFile(hFindFile, &ffd));
+
+ FindClose(hFindFile);
+
+ if(removePath && !RemoveDirectory(path))
+ request->error = GetLastError();
+
+}
+
+void Win32AsyncFile::createDirectories()
+{
+ char* tmp;
+ const char * name = theFileName.c_str();
+ const char * base = theFileName.get_base_name();
+ while((tmp = (char *)strstr(base, DIR_SEPARATOR)))
+ {
+ char t = tmp[0];
+ tmp[0] = 0;
+ CreateDirectory(name, 0);
+ tmp[0] = t;
+ base = tmp + sizeof(DIR_SEPARATOR);
+ }
+}
Index: telco-6.2/storage/ndb/src/kernel/blocks/ndbfs/Win32AsyncFile.hpp
===================================================================
--- /dev/null 1970-01-01 00:00:00.000000000 +0000
+++ telco-6.2/storage/ndb/src/kernel/blocks/ndbfs/Win32AsyncFile.hpp 2007-10-24
16:40:02.920590018 +1000
@@ -0,0 +1,83 @@
+/* Copyright (C) 2003 MySQL AB
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; version 2 of the License.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
+
+#ifndef Win32AsyncFile_H
+#define Win32AsyncFile_H
+
+/**
+ * Win32 Implementation of AsyncFile interface
+ */
+
+#include <kernel_types.h>
+#include "MemoryChannel.hpp"
+#include "Filename.hpp"
+
+#include <azlib.h>
+
+const int ERR_ReadUnderflow = 1000;
+
+const int WRITECHUNK = 262144;
+
+class AsyncFile;
+
+class Win32AsyncFile : public AsyncFile
+{
+ friend class Ndbfs;
+public:
+ Win32AsyncFile(SimulatedBlock& fs);
+ ~Win32AsyncFile();
+
+ void reportTo( MemoryChannel<Request> *reportTo );
+
+ void execute( Request* request );
+
+ void doStart();
+ // its a thread so its always running
+ void run();
+
+ bool isOpen();
+
+ Filename theFileName;
+ Request *m_current_request, *m_last_request;
+private:
+
+ void openReq(Request *request);
+ void readReq(Request *request);
+ void readvReq(Request *request);
+ void writeReq(Request *request);
+ void writevReq(Request *request);
+
+ void closeReq(Request *request);
+ void syncReq(Request *request);
+ void removeReq(Request *request);
+ void appendReq(Request *request);
+ void rmrfReq(Request *request, char * path, bool removePath);
+
+ int readBuffer(Request*, char * buf, size_t size, off_t offset);
+ int writeBuffer(const char * buf, size_t size, off_t offset,
+ size_t chunk_size = WRITECHUNK);
+
+ int extendfile(Request* request);
+ void createDirectories();
+
+ HANDLE hFile;
+
+ Uint32 m_open_flags; // OM_ flags from request to open file
+
+ size_t m_write_wo_sync; // Writes wo/ sync
+ size_t m_auto_sync_freq; // Auto sync freq in bytes
+};
+
+#endif
Index: telco-6.2/storage/ndb/src/kernel/blocks/Makefile.am
===================================================================
--- telco-6.2.orig/storage/ndb/src/kernel/blocks/Makefile.am 2007-10-24 16:03:20.312559583
+1000
+++ telco-6.2/storage/ndb/src/kernel/blocks/Makefile.am 2007-10-24 16:40:02.920590018
+1000
@@ -41,7 +41,9 @@ libblocks_a_SOURCES = tsman.cpp lgman.cp
dbtup/DbtupDebug.cpp dbtup/DbtupScan.cpp \
dbtup/DbtupDiskAlloc.cpp dbtup/DbtupVarAlloc.cpp \
dbtup/tuppage.cpp dbtup/Undo_buffer.cpp \
- ndbfs/AsyncFile.cpp ndbfs/Ndbfs.cpp ndbfs/VoidFs.cpp \
+ ndbfs/PosixAsyncFile.cpp ndbfs/AsyncFile.cpp \
+ ndbfs/Ndbfs.cpp \
+ ndbfs/VoidFs.cpp \
ndbfs/Filename.cpp ndbfs/CircularIndex.cpp \
ndbcntr/NdbcntrInit.cpp ndbcntr/NdbcntrSysTable.cpp ndbcntr/NdbcntrMain.cpp \
qmgr/QmgrInit.cpp qmgr/QmgrMain.cpp \
Index: telco-6.2/storage/ndb/src/kernel/blocks/ndbfs/Ndbfs.cpp
===================================================================
--- telco-6.2.orig/storage/ndb/src/kernel/blocks/ndbfs/Ndbfs.cpp 2007-10-24
16:03:20.312559583 +1000
+++ telco-6.2/storage/ndb/src/kernel/blocks/ndbfs/Ndbfs.cpp 2007-10-24 16:40:02.920590018
+1000
@@ -17,6 +17,7 @@
#include "Ndbfs.hpp"
#include "AsyncFile.hpp"
+#include "PosixAsyncFile.hpp"
#include "Filename.hpp"
#include <signaldata/FsOpenReq.hpp>
@@ -80,7 +81,8 @@ Ndbfs::~Ndbfs()
// the thread it has created
for (unsigned i = 0; i < theFiles.size(); i++){
AsyncFile* file = theFiles[i];
- delete file;
+ file->shutdown();
+ delete file;
theFiles[i] = NULL;
}//for
theFiles.clear();
@@ -661,7 +663,7 @@ Ndbfs::createAsyncFile(){
ERROR_SET(fatal, NDBD_EXIT_AFS_MAXOPEN,""," Ndbfs::createAsyncFile");
}
- AsyncFile* file = new AsyncFile(* this);
+ AsyncFile* file = new PosixAsyncFile(* this);
file->doStart();
// Put the file in list of all files
Index: telco-6.2/storage/ndb/include/kernel/signaldata/FsOpenReq.hpp
===================================================================
--- telco-6.2.orig/storage/ndb/include/kernel/signaldata/FsOpenReq.hpp 2007-10-24
16:03:20.812584107 +1000
+++ telco-6.2/storage/ndb/include/kernel/signaldata/FsOpenReq.hpp 2007-10-24
16:40:02.920590018 +1000
@@ -29,6 +29,7 @@ class FsOpenReq {
*/
friend class Ndbfs; // Reciver
friend class AsyncFile; // Uses FsOpenReq to decode file open flags
+ friend class PosixAsyncFile; // FIXME
friend class Filename;
friend class VoidFs;
Index: telco-6.2/storage/ndb/include/kernel/signaldata/FsReadWriteReq.hpp
===================================================================
--- telco-6.2.orig/storage/ndb/include/kernel/signaldata/FsReadWriteReq.hpp 2007-10-24
16:03:20.812584107 +1000
+++ telco-6.2/storage/ndb/include/kernel/signaldata/FsReadWriteReq.hpp 2007-10-24
16:40:02.920590018 +1000
@@ -35,6 +35,7 @@ class FsReadWriteReq {
friend class Ndbfs;
friend class VoidFs;
friend class AsyncFile;
+ friend class PosixAsyncFile; // FIXME
/**
* Sender(s)
Index: telco-6.2/storage/ndb/src/kernel/vm/SimulatedBlock.hpp
===================================================================
--- telco-6.2.orig/storage/ndb/src/kernel/vm/SimulatedBlock.hpp 2007-10-24
16:03:20.312559583 +1000
+++ telco-6.2/storage/ndb/src/kernel/vm/SimulatedBlock.hpp 2007-10-24 16:40:02.920590018
+1000
@@ -91,6 +91,7 @@ class SimulatedBlock {
friend class SafeCounterManager;
friend struct UpgradeStartup;
friend class AsyncFile;
+ friend class PosixAsyncFile; // FIXME
friend class Pgman;
friend class Page_cache_client;
friend class Lgman;
--
Stewart Smith