List:Commits« Previous MessageNext Message »
From:Jonas Oreland Date:December 2 2008 1:11pm
Subject:bzr commit into mysql-5.1 branch (jonas:3148)
View as plain text  
#At file:///home/jonas/src/telco-6.4/

 3148 Jonas Oreland	2008-12-02
      ndb - file != thread (aka ThreadPool) for DD
        Make full split of file/thread
added:
  storage/ndb/src/kernel/blocks/ndbfs/AsyncIoThread.cpp
  storage/ndb/src/kernel/blocks/ndbfs/AsyncIoThread.hpp
modified:
  storage/ndb/include/kernel/ndb_limits.h
  storage/ndb/include/kernel/signaldata/FsOpenReq.hpp
  storage/ndb/include/mgmapi/mgmapi_config_parameters.h
  storage/ndb/src/kernel/blocks/Makefile.am
  storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp
  storage/ndb/src/kernel/blocks/lgman.cpp
  storage/ndb/src/kernel/blocks/ndbcntr/NdbcntrMain.cpp
  storage/ndb/src/kernel/blocks/ndbfs/AsyncFile.cpp
  storage/ndb/src/kernel/blocks/ndbfs/AsyncFile.hpp
  storage/ndb/src/kernel/blocks/ndbfs/MemoryChannel.hpp
  storage/ndb/src/kernel/blocks/ndbfs/Ndbfs.cpp
  storage/ndb/src/kernel/blocks/ndbfs/Ndbfs.hpp
  storage/ndb/src/kernel/blocks/ndbfs/PosixAsyncFile.cpp
  storage/ndb/src/kernel/blocks/ndbfs/PosixAsyncFile.hpp
  storage/ndb/src/kernel/blocks/ndbfs/Win32AsyncFile.cpp
  storage/ndb/src/kernel/blocks/ndbfs/Win32AsyncFile.hpp
  storage/ndb/src/kernel/blocks/record_types.hpp
  storage/ndb/src/kernel/blocks/tsman.cpp
  storage/ndb/src/kernel/main.cpp
  storage/ndb/src/kernel/vm/ndbd_malloc_impl.hpp
  storage/ndb/src/mgmsrv/ConfigInfo.cpp

=== modified file 'storage/ndb/include/kernel/ndb_limits.h'
--- a/storage/ndb/include/kernel/ndb_limits.h	2008-11-13 15:22:59 +0000
+++ b/storage/ndb/include/kernel/ndb_limits.h	2008-12-02 13:10:49 +0000
@@ -187,4 +187,6 @@
 #define MAX_NDBMT_LQH_WORKERS 4
 #define MAX_NDBMT_LQH_THREADS 4
 
+#define NDB_FILE_BUFFER_SIZE (256*1024)
+
 #endif

=== modified file 'storage/ndb/include/kernel/signaldata/FsOpenReq.hpp'
--- a/storage/ndb/include/kernel/signaldata/FsOpenReq.hpp	2008-08-21 06:38:48 +0000
+++ b/storage/ndb/include/kernel/signaldata/FsOpenReq.hpp	2008-12-02 13:10:49 +0000
@@ -33,6 +33,7 @@ class FsOpenReq {
   friend class Win32AsyncFile; // FIXME
   friend class Filename;
   friend class VoidFs;
+  friend class AsyncIoThread;
 
   /**
    * Sender(s)
@@ -90,6 +91,8 @@ private:
   STATIC_CONST( OM_CHECK_SIZE     = 0x2000 );
   STATIC_CONST( OM_DIRECT         = 0x4000 );
   STATIC_CONST( OM_GZ             = 0x8000 );
+  STATIC_CONST( OM_THREAD_POOL    = 0x10000 );
+  STATIC_CONST( OM_WRITE_BUFFER   = 0x20000 );
   
   enum Suffixes {
     S_DATA = 0,

=== modified file 'storage/ndb/include/mgmapi/mgmapi_config_parameters.h'
--- a/storage/ndb/include/mgmapi/mgmapi_config_parameters.h	2008-11-19 11:01:17 +0000
+++ b/storage/ndb/include/mgmapi/mgmapi_config_parameters.h	2008-12-02 13:10:49 +0000
@@ -147,6 +147,7 @@
 #define CFG_NDBMT_LQH_WORKERS         188
 
 #define CFG_DB_INIT_REDO              189
+#define CFG_DB_THREAD_POOL            190
 
 #define CFG_DB_SGA                    198 /* super pool mem */
 #define CFG_DB_DATA_MEM_2             199 /* used in special build in 5.1 */

=== modified file 'storage/ndb/src/kernel/blocks/Makefile.am'
--- a/storage/ndb/src/kernel/blocks/Makefile.am	2008-11-16 15:29:16 +0000
+++ b/storage/ndb/src/kernel/blocks/Makefile.am	2008-12-02 13:10:49 +0000
@@ -41,6 +41,7 @@ libblocks_a_SOURCES = tsman.cpp lgman.cp
   dbtup/DbtupDebug.cpp dbtup/DbtupScan.cpp \
   dbtup/DbtupDiskAlloc.cpp dbtup/DbtupVarAlloc.cpp \
   dbtup/tuppage.cpp dbtup/Undo_buffer.cpp \
+  ndbfs/AsyncIoThread.cpp \
   ndbfs/PosixAsyncFile.cpp ndbfs/AsyncFile.cpp \
   ndbfs/Ndbfs.cpp \
   ndbfs/VoidFs.cpp \

=== modified file 'storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp'
--- a/storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp	2008-12-01 18:50:58 +0000
+++ b/storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp	2008-12-02 13:10:49 +0000
@@ -14228,7 +14228,7 @@ void Dblqh::openFileRw(Signal* signal, L
   signal->theData[3] = olfLogFilePtr.p->fileName[1];
   signal->theData[4] = olfLogFilePtr.p->fileName[2];
   signal->theData[5] = olfLogFilePtr.p->fileName[3];
-  signal->theData[6] = FsOpenReq::OM_READWRITE | FsOpenReq::OM_AUTOSYNC | FsOpenReq::OM_CHECK_SIZE;
+  signal->theData[6] = FsOpenReq::OM_READWRITE | FsOpenReq::OM_AUTOSYNC | FsOpenReq::OM_CHECK_SIZE | FsOpenReq::OM_WRITE_BUFFER;
   if (c_o_direct)
     signal->theData[6] |= FsOpenReq::OM_DIRECT;
   req->auto_sync_size = MAX_REDO_PAGES_WITHOUT_SYNCH * sizeof(LogPageRecord);
@@ -14254,7 +14254,7 @@ void Dblqh::openLogfileInit(Signal* sign
   signal->theData[3] = logFilePtr.p->fileName[1];
   signal->theData[4] = logFilePtr.p->fileName[2];
   signal->theData[5] = logFilePtr.p->fileName[3];
-  signal->theData[6] = FsOpenReq::OM_READWRITE | FsOpenReq::OM_TRUNCATE | FsOpenReq::OM_CREATE | FsOpenReq::OM_AUTOSYNC;
+  signal->theData[6] = FsOpenReq::OM_READWRITE | FsOpenReq::OM_TRUNCATE | FsOpenReq::OM_CREATE | FsOpenReq::OM_AUTOSYNC | FsOpenReq::OM_WRITE_BUFFER;
   if (c_o_direct)
     signal->theData[6] |= FsOpenReq::OM_DIRECT;
 
@@ -14358,7 +14358,7 @@ void Dblqh::openNextLogfile(Signal* sign
     signal->theData[3] = onlLogFilePtr.p->fileName[1];
     signal->theData[4] = onlLogFilePtr.p->fileName[2];
     signal->theData[5] = onlLogFilePtr.p->fileName[3];
-    signal->theData[6] = FsOpenReq::OM_READWRITE | FsOpenReq::OM_AUTOSYNC | FsOpenReq::OM_CHECK_SIZE;
+    signal->theData[6] = FsOpenReq::OM_READWRITE | FsOpenReq::OM_AUTOSYNC | FsOpenReq::OM_CHECK_SIZE | FsOpenReq::OM_WRITE_BUFFER;
     if (c_o_direct)
       signal->theData[6] |= FsOpenReq::OM_DIRECT;
     req->auto_sync_size = MAX_REDO_PAGES_WITHOUT_SYNCH * sizeof(LogPageRecord);

=== modified file 'storage/ndb/src/kernel/blocks/lgman.cpp'
--- a/storage/ndb/src/kernel/blocks/lgman.cpp	2008-12-01 18:03:48 +0000
+++ b/storage/ndb/src/kernel/blocks/lgman.cpp	2008-12-02 13:10:49 +0000
@@ -26,7 +26,6 @@
 #include <signaldata/SumaImpl.hpp>
 #include <signaldata/LgmanContinueB.hpp>
 #include <signaldata/GetTabInfo.hpp>
-#include "ndbfs/Ndbfs.hpp"
 #include "dbtup/Dbtup.hpp"
 
 #include <EventLogger.hpp>

=== modified file 'storage/ndb/src/kernel/blocks/ndbcntr/NdbcntrMain.cpp'
--- a/storage/ndb/src/kernel/blocks/ndbcntr/NdbcntrMain.cpp	2008-10-05 07:12:56 +0000
+++ b/storage/ndb/src/kernel/blocks/ndbcntr/NdbcntrMain.cpp	2008-12-02 13:10:49 +0000
@@ -98,8 +98,8 @@ static BlockInfo ALL_BLOCKS[] = { 
 static const Uint32 ALL_BLOCKS_SZ = sizeof(ALL_BLOCKS)/sizeof(BlockInfo);
 
 static BlockReference readConfigOrder[ALL_BLOCKS_SZ] = {
-  NDBFS_REF, // let it run first to make sure it can start the threads
   CMVMI_REF,
+  NDBFS_REF,
   DBINFO_REF,
   DBTUP_REF,
   DBACC_REF,

=== modified file 'storage/ndb/src/kernel/blocks/ndbfs/AsyncFile.cpp'
--- a/storage/ndb/src/kernel/blocks/ndbfs/AsyncFile.cpp	2008-11-06 10:17:49 +0000
+++ b/storage/ndb/src/kernel/blocks/ndbfs/AsyncFile.cpp	2008-12-02 13:10:49 +0000
@@ -27,203 +27,44 @@
 #include <signaldata/FsReadWriteReq.hpp>
 #include <Configuration.hpp>
 
-const char *actionName[] = {
-  "open",
-  "close",
-  "closeRemove",
-  "read",
-  "readv",
-  "write",
-  "writev",
-  "writeSync",
-  "writevSync",
-  "sync",
-  "end" };
-
-static int numAsyncFiles = 0;
-
-extern "C" void * runAsyncFile(void* arg)
-{
-  ((AsyncFile*)arg)->run();
-  return (NULL);
-}
-
-
 AsyncFile::AsyncFile(SimulatedBlock& fs) :
   theFileName(),
-  theReportTo(0),
-  theMemoryChannelPtr(NULL),
   m_fs(fs)
 {
-  m_page_ptr.setNull();
-  m_current_request= m_last_request= 0;
-  m_auto_sync_freq = 0;
-}
-
-struct NdbThread*
-AsyncFile::doStart()
-{
-  // Stacksize for filesystem threads
-  // An 8k stack should be enough
-  const NDB_THREAD_STACKSIZE stackSize = 8192;
-
-  char buf[16];
-  numAsyncFiles++;
-  BaseString::snprintf(buf, sizeof(buf), "AsyncFile%d", numAsyncFiles);
-
-  theStartMutexPtr = NdbMutex_Create();
-  theStartConditionPtr = NdbCondition_Create();
-  NdbMutex_Lock(theStartMutexPtr);
-  theStartFlag = false;
-
-  theThreadPtr = NdbThread_Create(runAsyncFile,
-                                  (void**)this,
-                                  stackSize,
-                                  (char*)&buf,
-                                  NDB_THREAD_PRIO_MEAN);
-
-  if (theThreadPtr == 0)
-  {
-    ERROR_SET(fatal, NDBD_EXIT_MEMALLOC,
-              "","Could not allocate file system thread");
-  }
-  NdbCondition_Wait(theStartConditionPtr,
-                    theStartMutexPtr);
-  NdbMutex_Unlock(theStartMutexPtr);
-  NdbMutex_Destroy(theStartMutexPtr);
-  NdbCondition_Destroy(theStartConditionPtr);
-
-  return theThreadPtr;
-}
+  m_thread = 0;
 
-void AsyncFile::shutdown()
-{
-  void *status;
-  Request request;
-  request.action = Request::end;
-  this->theMemoryChannelPtr->writeChannel( &request );
-  NdbThread_WaitFor(theThreadPtr, &status);
-  NdbThread_Destroy(&theThreadPtr);
-  delete theMemoryChannelPtr;
+  m_page_cnt = 0;
+  m_page_ptr.setNull();
+  theWriteBuffer = 0;
+  theWriteBufferSize = 0;
 }
 
 void
-AsyncFile::reportTo( MemoryChannel<Request> *reportTo )
-{
-  theReportTo = reportTo;
-}
-
-void AsyncFile::execute(Request* request)
-{
-  theMemoryChannelPtr->writeChannel( request );
-}
-
-int AsyncFile::init()
+AsyncFile::attach(AsyncIoThread* thr)
 {
-  // Create write buffer for bigger writes
-  theWriteBufferSize = WRITEBUFFERSIZE;
-  theWriteBuffer = (char *) ndbd_malloc(theWriteBufferSize);
-
-  return 0;
+#if 0
+  ndbout_c("%p:%s attach to %p (m_thread: %p)", this, theFileName.c_str(), thr,
+             m_thread);
+#endif
+  assert(m_thread == 0);
+  m_thread = thr;
 }
 
 void
-AsyncFile::run()
+AsyncFile::detach(AsyncIoThread* thr)
 {
-  Request *request;
-
-  // Create theMemoryChannel in the thread that will wait for it
-  NdbMutex_Lock(theStartMutexPtr);
-  theMemoryChannelPtr = new MemoryChannel<Request>();
-  theStartFlag = true;
-
-  int r= this->init();
-
-  NdbMutex_Unlock(theStartMutexPtr);
-  NdbCondition_Signal(theStartConditionPtr);
-
-  if(r!=0)
-  {
-    DEBUG(ndbout_c("AsyncFile::init() failed"));
-    return;
-  }
-
-  while (1) {
-    request = theMemoryChannelPtr->readChannel();
-    if (!request) {
-      DEBUG(ndbout_c("Nothing read from Memory Channel in AsyncFile"));
-      endReq();
-      return;
-    }//if
-    m_current_request= request;
-    switch (request->action) {
-    case Request:: open:
-      openReq(request);
-      break;
-    case Request:: close:
-      closeReq(request);
-      break;
-    case Request:: closeRemove:
-      closeReq(request);
-      removeReq(request);
-      break;
-    case Request:: readPartial:
-    case Request:: read:
-      readReq(request);
-      break;
-    case Request:: readv:
-      readvReq(request);
-      break;
-    case Request:: write:
-      writeReq(request);
-      break;
-    case Request:: writev:
-      writevReq(request);
-      break;
-    case Request:: writeSync:
-      writeReq(request);
-      syncReq(request);
-      break;
-    case Request:: writevSync:
-      writevReq(request);
-      syncReq(request);
-      break;
-    case Request:: sync:
-      syncReq(request);
-      break;
-    case Request:: append:
-      appendReq(request);
-      break;
-    case Request:: append_synch:
-      appendReq(request);
-      syncReq(request);
-      break;
-    case Request::rmrf:
-      rmrfReq(request, (char*)theFileName.c_str(), request->par.rmrf.own_directory);
-      break;
-    case Request:: end:
-      if (isOpen())
-        closeReq(request);
-      endReq();
-      return;
-    default:
-      DEBUG(ndbout_c("Invalid Request"));
-      abort();
-      break;
-    }//switch
-    m_last_request= request;
-    m_current_request= 0;
-
-    // No need to signal as ndbfs only uses tryRead
-    theReportTo->writeChannelNoSignal(request);
-  }//while
-}//AsyncFile::run()
-
+#if 0
+  ndbout_c("%p:%s detach from %p", this, theFileName.c_str(), thr);
+#endif
+  assert(m_thread == thr);
+  m_thread = 0;
+}
 
 void
 AsyncFile::readReq( Request * request)
 {
-  for(int i = 0; i < request->par.readWrite.numberOfPages ; i++) {
+  for(int i = 0; i < request->par.readWrite.numberOfPages ; i++)
+  {
     off_t offset = request->par.readWrite.pages[i].offset;
     size_t size  = request->par.readWrite.pages[i].size;
     char * buf   = request->par.readWrite.pages[i].buf;
@@ -244,78 +85,92 @@ AsyncFile::readvReq( Request * request)
 }
 
 void
-AsyncFile::writeReq( Request * request)
+AsyncFile::writeReq(Request * request)
 {
-  int page_num = 0;
-  bool write_not_complete = true;
+  Uint32 cnt = request->par.readWrite.numberOfPages;
+  if (theWriteBuffer == 0 || cnt == 1)
+  {
+    for (Uint32 i = 0; i<cnt; i++)
+    {
+      int err = writeBuffer(request->par.readWrite.pages[i].buf,
+                            request->par.readWrite.pages[i].size,
+                            request->par.readWrite.pages[i].offset);
+      if (err)
+      {
+        request->error = err;
+        return;
+      }
+      goto done;
+    }
+  }
 
-  while(write_not_complete) {
-    int totsize = 0;
-    off_t offset = request->par.readWrite.pages[page_num].offset;
-    char* bufptr = theWriteBuffer;
-
-    write_not_complete = false;
-    if (request->par.readWrite.numberOfPages > 1) {
-      off_t page_offset = offset;
-
-      // Multiple page write, copy to buffer for one write
-      for(int i=page_num; i < request->par.readWrite.numberOfPages; i++) {
-        memcpy(bufptr,
-               request->par.readWrite.pages[i].buf,
-               request->par.readWrite.pages[i].size);
-        bufptr += request->par.readWrite.pages[i].size;
-        totsize += request->par.readWrite.pages[i].size;
-        if (((i + 1) < request->par.readWrite.numberOfPages)) {
-          // There are more pages to write
-          // Check that offsets are consequtive
-	  off_t tmp = page_offset + request->par.readWrite.pages[i].size;
-          if (tmp != request->par.readWrite.pages[i+1].offset) {
-            // Next page is not aligned with previous, not allowed
-            DEBUG(ndbout_c("Page offsets are not aligned"));
-            request->error = EINVAL;
-            return;
-          }
-          if ((unsigned)(totsize + request->par.readWrite.pages[i+1].size) > (unsigned)theWriteBufferSize) {
-            // We are not finished and the buffer is full
-            write_not_complete = true;
-            // Start again with next page
-            page_num = i + 1;
-            break;
+  {
+    int page_num = 0;
+    bool write_not_complete = true;
+
+    while(write_not_complete) {
+      int totsize = 0;
+      off_t offset = request->par.readWrite.pages[page_num].offset;
+      char* bufptr = theWriteBuffer;
+
+      write_not_complete = false;
+      if (request->par.readWrite.numberOfPages > 1) {
+        off_t page_offset = offset;
+
+        // Multiple page write, copy to buffer for one write
+        for(int i=page_num; i < request->par.readWrite.numberOfPages; i++) {
+          memcpy(bufptr,
+                 request->par.readWrite.pages[i].buf,
+                 request->par.readWrite.pages[i].size);
+          bufptr += request->par.readWrite.pages[i].size;
+          totsize += request->par.readWrite.pages[i].size;
+          if (((i + 1) < request->par.readWrite.numberOfPages)) {
+            // There are more pages to write
+            // Check that offsets are consequtive
+            off_t tmp = page_offset + request->par.readWrite.pages[i].size;
+            if (tmp != request->par.readWrite.pages[i+1].offset) {
+              // Next page is not aligned with previous, not allowed
+              DEBUG(ndbout_c("Page offsets are not aligned"));
+              request->error = EINVAL;
+              return;
+            }
+            if ((unsigned)(totsize + request->par.readWrite.pages[i+1].size) > (unsigned)theWriteBufferSize) {
+              // We are not finished and the buffer is full
+              write_not_complete = true;
+              // Start again with next page
+              page_num = i + 1;
+              break;
+            }
           }
+          page_offset += request->par.readWrite.pages[i].size;
         }
-        page_offset += request->par.readWrite.pages[i].size;
+        bufptr = theWriteBuffer;
+      } else {
+        // One page write, write page directly
+        bufptr = request->par.readWrite.pages[0].buf;
+        totsize = request->par.readWrite.pages[0].size;
       }
-      bufptr = theWriteBuffer;
-    } else {
-      // One page write, write page directly
-      bufptr = request->par.readWrite.pages[0].buf;
-      totsize = request->par.readWrite.pages[0].size;
-    }
-    int err = writeBuffer(bufptr, totsize, offset);
-    if(err != 0){
-      request->error = err;
-      return;
-    }
-  } // while(write_not_complete)
-
-  if(m_auto_sync_freq && m_write_wo_sync > m_auto_sync_freq){
+      int err = writeBuffer(bufptr, totsize, offset);
+      if(err != 0){
+        request->error = err;
+        return;
+      }
+    } // while(write_not_complete)
+  }
+done:
+  if(m_auto_sync_freq && m_write_wo_sync > m_auto_sync_freq)
+  {
     syncReq(request);
   }
 }
 
 void
-AsyncFile::writevReq( Request * request)
+AsyncFile::writevReq(Request * request)
 {
   // WriteFileGather on WIN32?
   writeReq(request);
 }
 
-void AsyncFile::endReq()
-{
-  if (theWriteBuffer)
-    ndbd_free(theWriteBuffer, theWriteBufferSize);
-}
-
 #ifdef DEBUG_ASYNCFILE
 void printErrorAndFlags(Uint32 used_flags) {
   char buf[255];

=== modified file 'storage/ndb/src/kernel/blocks/ndbfs/AsyncFile.hpp'
--- a/storage/ndb/src/kernel/blocks/ndbfs/AsyncFile.hpp	2008-11-18 10:28:03 +0000
+++ b/storage/ndb/src/kernel/blocks/ndbfs/AsyncFile.hpp	2008-12-02 13:10:49 +0000
@@ -16,166 +16,33 @@
 #ifndef AsyncFile_H
 #define AsyncFile_H
 
-/**
-   AsyncFile
-
-   All file operations executed in thread-per-file, away from the DB threads.
- */
-
-
-// void execute(Request *request);
-// Description:
-//   performens the requered action.
-// Parameters:
-//    request: request to be called when open is finished.
-//       action= open|close|read|write|sync
-//          if action is open then:
-//             par.open.flags= UNIX open flags, see man open
-//             par.open.name= name of the file to open
-//          if action is read or write then:
-//             par.readWrite.buf= user provided buffer to read/write 
-//             the data from/to
-//             par.readWrite.size= how many bytes must be read/written
-//             par.readWrite.offset= absolute offset in file in bytes
-// return:
-//    return values are stored in the request error field:
-//       error= return state of the action, UNIX error see man open/errno
-//       userData= is untouched can be used be user.
-
-
-
-// void reportTo( MemoryChannel<Request> *reportTo );
-// Description:
-//   set the channel where the file must report the result of the 
-//    actions back to.
-// Parameters:
-//    reportTo: the memory channel to use use MemoryChannelMultipleWriter 
-//              if more 
-//              than one file uses this channel to report back.
-
 #include <kernel_types.h>
-#include "MemoryChannel.hpp"
+#include "AsyncIoThread.hpp"
 #include "Filename.hpp"
 
-// Use this define if you want printouts from AsyncFile class
-//#define DEBUG_ASYNCFILE
-
-#ifdef DEBUG_ASYNCFILE
-#include <NdbOut.hpp>
-#define DEBUG(x) x
-#define PRINT_ERRORANDFLAGS(f) printErrorAndFlags(f)
-void printErrorAndFlags(Uint32 used_flags);
-#else
-#define DEBUG(x)
-#define PRINT_ERRORANDFLAGS(f)
-#endif
-
-// Define the size of the write buffer (for each thread)
-#define WRITEBUFFERSIZE 262144
-
-const int ERR_ReadUnderflow = 1000;
-
-const int WRITECHUNK = 262144;
-
-class AsyncFile;
-
-class Request
-{
-public:
-  Request() {}
-
-  enum Action {
-    open,
-    close,
-    closeRemove,
-    read,   // Allways leave readv directly after 
-            // read because SimblockAsyncFileSystem depends on it
-    readv,
-    write,// Allways leave writev directly after 
-	        // write because SimblockAsyncFileSystem depends on it
-    writev,
-    writeSync,// Allways leave writevSync directly after 
-    // writeSync because SimblockAsyncFileSystem depends on it
-    writevSync,
-    sync,
-    end,
-    append,
-    append_synch,
-    rmrf,
-    readPartial
-  };
-  Action action;
-  union {
-    struct {
-      Uint32 flags;
-      Uint32 page_size;
-      Uint64 file_size;
-      Uint32 auto_sync_size;
-    } open;
-    struct {
-      int numberOfPages;
-      struct{
-	char *buf;
-	size_t size;
-	off_t offset;
-      } pages[16];
-    } readWrite;
-    struct {
-      const char * buf;
-      size_t size;
-    } append;
-    struct {
-      bool directory;
-      bool own_directory;
-    } rmrf;
-  } par;
-  int error;
-  
-  void set(BlockReference userReference, 
-	   Uint32 userPointer,
-	   Uint16 filePointer);
-  BlockReference theUserReference;
-  Uint32 theUserPointer;
-  Uint16 theFilePointer;
-   // Information for open, needed if the first open action fails.
-  AsyncFile* file;
-  Uint32 theTrace;
-};
-
-NdbOut& operator <<(NdbOut&, const Request&);
-
-inline
-void 
-Request::set(BlockReference userReference, 
-	     Uint32 userPointer, Uint16 filePointer) 
-{
-  theUserReference= userReference;
-  theUserPointer= userPointer;
-  theFilePointer= filePointer;
-}
-
 class AsyncFile
 {
   friend class Ndbfs;
+  friend class AsyncIoThread;
+
 public:
   AsyncFile(SimulatedBlock& fs);
   virtual ~AsyncFile() {};
 
-  void reportTo( MemoryChannel<Request> *reportTo );
+  virtual int init() = 0;
 
-  void execute( Request* request );
-
-  virtual struct NdbThread* doStart();
-
-  virtual void shutdown();
-
-  // its a thread so its always running
-  virtual void run();
+  void reportTo( MemoryChannel<Request> *reportTo );
 
   virtual bool isOpen() = 0;
 
   Filename theFileName;
   Request *m_current_request, *m_last_request;
+
+  void set_buffer(Ptr<GlobalPage> ptr, Uint32 cnt);
+  bool has_buffer() const;
+  void clear_buffer(Ptr<GlobalPage> & ptr, Uint32 & cnt);
+
+  AsyncIoThread* getThread() const { return m_thread;}
 private:
 
   /**
@@ -184,14 +51,6 @@ private:
    */
 
   /**
-   * init()
-   *
-   * Initialise buffers etc. After init(), ready to execute()
-   * Called with theStartMutexPtr held.
-   */
-  virtual int init();
-
-  /**
    * openReq() - open a file.
    */
   virtual void openReq(Request *request) = 0;
@@ -204,15 +63,13 @@ private:
   /**
    * writeBuffer() - write into file
    */
-  virtual int writeBuffer(const char * buf, size_t size, off_t offset,
-		  size_t chunk_size = WRITECHUNK)=0;
-
+  virtual int writeBuffer(const char * buf, size_t size, off_t offset)=0;
 
   virtual void closeReq(Request *request)=0;
   virtual void syncReq(Request *request)=0;
   virtual void removeReq(Request *request)=0;
   virtual void appendReq(Request *request)=0;
-  virtual void rmrfReq(Request *request, char * path, bool removePath)=0;
+  virtual void rmrfReq(Request *request, const char * path, bool removePath)=0;
   virtual void createDirectories()=0;
 
   /**
@@ -229,38 +86,60 @@ protected:
   virtual void writeReq(Request *request);
   virtual void writevReq(Request *request);
 
-  /**
-   * endReq()
-   *
-   * Inverse to ::init(). Cleans up thread before it exits.
-   */
-  virtual void endReq();
-
 private:
-  /**
-   * (end of what implementors need)
-   */
+  void attach(AsyncIoThread* thr);
+  void detach(AsyncIoThread* thr);
 
-  MemoryChannel<Request> *theReportTo;
-  MemoryChannel<Request>* theMemoryChannelPtr;
-
-  struct NdbThread* theThreadPtr;
-  NdbMutex* theStartMutexPtr;
-  NdbCondition* theStartConditionPtr;
-  bool   theStartFlag;
+  AsyncIoThread* m_thread; // For bound files
 
 protected:
-  int theWriteBufferSize;
-  char* theWriteBuffer;
-
   size_t m_write_wo_sync;  // Writes wo/ sync
   size_t m_auto_sync_freq; // Auto sync freq in bytes
+  Uint32 m_open_flags;
 
-public:
-  SimulatedBlock& m_fs;
-
+  /**
+   * file buffers
+   */
   Uint32 m_page_cnt;
   Ptr<GlobalPage> m_page_ptr;
+
+  char* theWriteBuffer;
+  Uint32 theWriteBufferSize;
+
+public:
+  SimulatedBlock& m_fs;
 };
 
+inline
+void
+AsyncFile::set_buffer(Ptr<GlobalPage> ptr, Uint32 cnt)
+{
+  assert(!has_buffer());
+  m_page_ptr = ptr;
+  m_page_cnt = cnt;
+  theWriteBuffer = (char*)ptr.p;
+  theWriteBufferSize = cnt * sizeof(GlobalPage);
+}
+
+inline
+bool
+AsyncFile::has_buffer() const
+{
+  return m_page_cnt > 0;
+}
+
+inline
+void
+AsyncFile::clear_buffer(Ptr<GlobalPage> & ptr, Uint32 & cnt)
+{
+  assert(has_buffer());
+  ptr = m_page_ptr;
+  cnt = m_page_cnt;
+  m_page_cnt = 0;
+  m_page_ptr.setNull();
+  theWriteBuffer = 0;
+  theWriteBufferSize = 0;
+}
+
+
 #endif

=== added file 'storage/ndb/src/kernel/blocks/ndbfs/AsyncIoThread.cpp'
--- a/storage/ndb/src/kernel/blocks/ndbfs/AsyncIoThread.cpp	1970-01-01 00:00:00 +0000
+++ b/storage/ndb/src/kernel/blocks/ndbfs/AsyncIoThread.cpp	2008-12-02 13:10:49 +0000
@@ -0,0 +1,203 @@
+/* Copyright (C) 2003 MySQL AB
+
+   This program is free software; you can redistribute it and/or modify
+   it under the terms of the GNU General Public License as published by
+   the Free Software Foundation; version 2 of the License.
+
+   This program is distributed in the hope that it will be useful,
+   but WITHOUT ANY WARRANTY; without even the implied warranty of
+   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+   GNU General Public License for more details.
+
+   You should have received a copy of the GNU General Public License
+   along with this program; if not, write to the Free Software
+   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
+
+#include <ndb_global.h>
+#include <my_sys.h>
+#include <my_pthread.h>
+
+#include "AsyncIoThread.hpp"
+#include "AsyncFile.hpp"
+#include <ErrorHandlingMacros.hpp>
+#include <kernel_types.h>
+#include <ndbd_malloc.hpp>
+#include <NdbThread.h>
+#include <signaldata/FsRef.hpp>
+#include <signaldata/FsOpenReq.hpp>
+#include <signaldata/FsReadWriteReq.hpp>
+#include <Configuration.hpp>
+#include "Ndbfs.hpp"
+
+AsyncIoThread::AsyncIoThread(class Ndbfs& fs, AsyncFile* file)
+  : m_fs(fs)
+{
+  m_current_file = file;
+  if (file)
+  {
+    theMemoryChannelPtr = &theMemoryChannel;
+  }
+  else
+  {
+    theMemoryChannelPtr = &m_fs.theToThreads;
+  }
+  theReportTo = &m_fs.theFromThreads;
+}
+
+static int numAsyncFiles = 0;
+
+extern "C"
+void *
+runAsyncIoThread(void* arg)
+{
+  ((AsyncIoThread*)arg)->run();
+  return (NULL);
+}
+
+
+struct NdbThread*
+AsyncIoThread::doStart()
+{
+  // Stacksize for filesystem threads
+  // An 8k stack should be enough
+  const NDB_THREAD_STACKSIZE stackSize = 8192;
+
+  char buf[16];
+  numAsyncFiles++;
+  BaseString::snprintf(buf, sizeof(buf), "AsyncIoThread%d", numAsyncFiles);
+
+  theStartMutexPtr = NdbMutex_Create();
+  theStartConditionPtr = NdbCondition_Create();
+  NdbMutex_Lock(theStartMutexPtr);
+  theStartFlag = false;
+
+  theThreadPtr = NdbThread_Create(runAsyncIoThread,
+                                  (void**)this,
+                                  stackSize,
+                                  buf,
+                                  NDB_THREAD_PRIO_MEAN);
+
+  if (theThreadPtr == 0)
+  {
+    ERROR_SET(fatal, NDBD_EXIT_MEMALLOC,
+              "","Could not allocate file system thread");
+  }
+
+  do
+  {
+    NdbCondition_Wait(theStartConditionPtr,
+                      theStartMutexPtr);
+  }
+  while (theStartFlag == false);
+
+  NdbMutex_Unlock(theStartMutexPtr);
+  NdbMutex_Destroy(theStartMutexPtr);
+  NdbCondition_Destroy(theStartConditionPtr);
+
+  return theThreadPtr;
+}
+
+void
+AsyncIoThread::shutdown()
+{
+  void *status;
+  Request request;
+  request.action = Request::end;
+  this->theMemoryChannelPtr->writeChannel( &request );
+  NdbThread_WaitFor(theThreadPtr, &status);
+  NdbThread_Destroy(&theThreadPtr);
+}
+
+void
+AsyncIoThread::dispatch(Request *request)
+{
+  assert(m_current_file);
+  assert(m_current_file->getThread() == this);
+  assert(theMemoryChannelPtr == &theMemoryChannel);
+  theMemoryChannelPtr->writeChannel(request);
+}
+
+void
+AsyncIoThread::run()
+{
+  Request *request;
+
+  // Create theMemoryChannel in the thread that will wait for it
+  NdbMutex_Lock(theStartMutexPtr);
+  theStartFlag = true;
+  NdbMutex_Unlock(theStartMutexPtr);
+  NdbCondition_Signal(theStartConditionPtr);
+
+  while (1)
+  {
+    request = theMemoryChannelPtr->readChannel();
+    if (!request || request->action == Request::end)
+    {
+      DEBUG(ndbout_c("Nothing read from Memory Channel in AsyncFile"));
+      theStartFlag = false;
+      return;
+    }//if
+
+    AsyncFile * file = request->file;
+    m_current_request= request;
+    switch (request->action) {
+    case Request::open:
+      file->openReq(request);
+      break;
+    case Request::close:
+      file->closeReq(request);
+      break;
+    case Request::closeRemove:
+      file->closeReq(request);
+      file->removeReq(request);
+      break;
+    case Request::readPartial:
+    case Request::read:
+      file->readReq(request);
+      break;
+    case Request::readv:
+      file->readvReq(request);
+      break;
+    case Request::write:
+      file->writeReq(request);
+      break;
+    case Request::writev:
+      file->writevReq(request);
+      break;
+    case Request::writeSync:
+      file->writeReq(request);
+      file->syncReq(request);
+      break;
+    case Request::writevSync:
+      file->writevReq(request);
+      file->syncReq(request);
+      break;
+    case Request::sync:
+      file->syncReq(request);
+      break;
+    case Request::append:
+      file->appendReq(request);
+      break;
+    case Request::append_synch:
+      file->appendReq(request);
+      file->syncReq(request);
+      break;
+    case Request::rmrf:
+      file->rmrfReq(request, file->theFileName.c_str(),
+                    request->par.rmrf.own_directory);
+      break;
+    case Request::end:
+      theStartFlag = false;
+      return;
+    default:
+      DEBUG(ndbout_c("Invalid Request"));
+      abort();
+      break;
+    }//switch
+    m_last_request = request;
+    m_current_request = 0;
+
+    // No need to signal as ndbfs only uses tryRead
+    theReportTo->writeChannelNoSignal(request);
+  }
+}

=== added file 'storage/ndb/src/kernel/blocks/ndbfs/AsyncIoThread.hpp'
--- a/storage/ndb/src/kernel/blocks/ndbfs/AsyncIoThread.hpp	1970-01-01 00:00:00 +0000
+++ b/storage/ndb/src/kernel/blocks/ndbfs/AsyncIoThread.hpp	2008-12-02 13:10:49 +0000
@@ -0,0 +1,151 @@
+/* Copyright (C) 2003 MySQL AB
+
+   This program is free software; you can redistribute it and/or modify
+   it under the terms of the GNU General Public License as published by
+   the Free Software Foundation; version 2 of the License.
+
+   This program is distributed in the hope that it will be useful,
+   but WITHOUT ANY WARRANTY; without even the implied warranty of
+   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+   GNU General Public License for more details.
+
+   You should have received a copy of the GNU General Public License
+   along with this program; if not, write to the Free Software
+   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
+
+#ifndef AsyncIoThread_H
+#define AsyncIoThread_H
+
+#include <kernel_types.h>
+#include <Pool.hpp>
+#include "MemoryChannel.hpp"
+
+// Use this define if you want printouts from AsyncFile class
+//#define DEBUG_ASYNCFILE
+
+#ifdef DEBUG_ASYNCFILE
+#include <NdbOut.hpp>
+#define DEBUG(x) x
+#define PRINT_ERRORANDFLAGS(f) printErrorAndFlags(f)
+void printErrorAndFlags(Uint32 used_flags);
+#else
+#define DEBUG(x)
+#define PRINT_ERRORANDFLAGS(f)
+#endif
+
+const int ERR_ReadUnderflow = 1000;
+
+class AsyncFile;
+
+class Request
+{
+public:
+  Request() {}
+
+  enum Action {
+    open,
+    close,
+    closeRemove,
+    read,   // Allways leave readv directly after
+            // read because SimblockAsyncFileSystem depends on it
+    readv,
+    write,// Allways leave writev directly after
+	        // write because SimblockAsyncFileSystem depends on it
+    writev,
+    writeSync,// Allways leave writevSync directly after
+    // writeSync because SimblockAsyncFileSystem depends on it
+    writevSync,
+    sync,
+    end,
+    append,
+    append_synch,
+    rmrf,
+    readPartial
+  };
+  Action action;
+  union {
+    struct {
+      Uint32 flags;
+      Uint32 page_size;
+      Uint64 file_size;
+      Uint32 auto_sync_size;
+    } open;
+    struct {
+      int numberOfPages;
+      struct{
+	char *buf;
+	size_t size;
+	off_t offset;
+      } pages[16];
+    } readWrite;
+    struct {
+      const char * buf;
+      size_t size;
+    } append;
+    struct {
+      bool directory;
+      bool own_directory;
+    } rmrf;
+  } par;
+  int error;
+
+  void set(BlockReference userReference,
+	   Uint32 userPointer,
+	   Uint16 filePointer);
+  BlockReference theUserReference;
+  Uint32 theUserPointer;
+  Uint16 theFilePointer;
+   // Information for open, needed if the first open action fails.
+  AsyncFile* file;
+  Uint32 theTrace;
+};
+
+NdbOut& operator <<(NdbOut&, const Request&);
+
+inline
+void
+Request::set(BlockReference userReference,
+	     Uint32 userPointer, Uint16 filePointer)
+{
+  theUserReference= userReference;
+  theUserPointer= userPointer;
+  theFilePointer= filePointer;
+}
+
+class AsyncIoThread
+{
+  friend class Ndbfs;
+  friend class AsyncFile;
+public:
+  AsyncIoThread(class Ndbfs&, AsyncFile* file);
+  virtual ~AsyncIoThread() {};
+
+  struct NdbThread* doStart();
+  void shutdown();
+
+  // its a thread so its always running
+  void run();
+
+  /**
+   * Add a request to a thread,
+   *   should only be used with bound threads
+   */
+  void dispatch(Request*);
+
+  AsyncFile * m_current_file;
+  Request *m_current_request, *m_last_request;
+
+private:
+  Ndbfs & m_fs;
+
+  MemoryChannel<Request> *theReportTo;
+  MemoryChannel<Request> *theMemoryChannelPtr;
+  MemoryChannel<Request> theMemoryChannel; // If file-bound
+
+  bool   theStartFlag;
+  struct NdbThread* theThreadPtr;
+  NdbMutex* theStartMutexPtr;
+  NdbCondition* theStartConditionPtr;
+};
+
+#endif

=== modified file 'storage/ndb/src/kernel/blocks/ndbfs/MemoryChannel.hpp'
--- a/storage/ndb/src/kernel/blocks/ndbfs/MemoryChannel.hpp	2008-08-22 11:02:38 +0000
+++ b/storage/ndb/src/kernel/blocks/ndbfs/MemoryChannel.hpp	2008-12-02 13:10:49 +0000
@@ -131,8 +131,8 @@ template <class T> void MemoryChannel<T>
   if(full(theWriteIndex, theReadIndex) || theChannel == NULL) abort();
   theChannel[theWriteIndex]= t;
   ++theWriteIndex;
-  NdbMutex_Unlock(theMutexPtr);
   NdbCondition_Signal(theConditionPtr);
+  NdbMutex_Unlock(theMutexPtr);
 }
 
 template <class T> void MemoryChannel<T>::writeChannelNoSignal( T *t)

=== modified file 'storage/ndb/src/kernel/blocks/ndbfs/Ndbfs.cpp'
--- a/storage/ndb/src/kernel/blocks/ndbfs/Ndbfs.cpp	2008-11-18 10:28:03 +0000
+++ b/storage/ndb/src/kernel/blocks/ndbfs/Ndbfs.cpp	2008-12-02 13:10:49 +0000
@@ -53,7 +53,6 @@ int pageSize( const NewVARIABLE* baseAdd
    return (1 << log_psize);
 }
 
-
 Ndbfs::Ndbfs(Block_context& ctx) :
   SimulatedBlock(NDBFS, ctx),
   scanningInProgress(false),
@@ -80,16 +79,47 @@ Ndbfs::Ndbfs(Block_context& ctx) :
 
 Ndbfs::~Ndbfs()
 {
-  // Delete all files
-  // AsyncFile destuctor will take care of deleting
-  // the thread it has created
+  /**
+   * Stop all unbound threads
+   */
+
+  /**
+   * Post enought Request::end to saturate all unbound threads
+   */
+  Request request;
+  request.action = Request::end;
+  for (unsigned i = 0; i < theThreads.size(); i++)
+  {
+    theToThreads.writeChannel(&request);
+  }
+
+  for (unsigned i = 0; i < theThreads.size(); i++)
+  {
+    AsyncIoThread * thr = theThreads[i];
+    thr->shutdown();
+  }
+
+  /**
+   * delete all threads
+   */
+  for (unsigned i = 0; i < theThreads.size(); i++)
+  {
+    AsyncIoThread * thr = theThreads[i];
+    delete thr;
+    theThreads[i] = 0;
+  }
+  theThreads.clear();
+
+  /**
+   * Delete all files
+   */
   for (unsigned i = 0; i < theFiles.size(); i++){
     AsyncFile* file = theFiles[i];
-    file->shutdown();
     delete file;
     theFiles[i] = NULL;
   }//for
   theFiles.clear();
+
   if (theRequestPool)
     delete theRequestPool;
 }
@@ -114,12 +144,34 @@ Ndbfs::execREAD_CONFIG_REQ(Signal* signa
   m_maxFiles = 0;
   ndb_mgm_get_int_parameter(p, CFG_DB_MAX_OPEN_FILES, &m_maxFiles);
   Uint32 noIdleFiles = 27;
+
   ndb_mgm_get_int_parameter(p, CFG_DB_INITIAL_OPEN_FILES, &noIdleFiles);
   if (noIdleFiles > m_maxFiles && m_maxFiles != 0)
     m_maxFiles = noIdleFiles;
+
   // Create idle AsyncFiles
-  for (Uint32 i = 0; i < noIdleFiles; i++){
-    theIdleFiles.push_back(createAsyncFile());
+  for (Uint32 i = 0; i < noIdleFiles; i++)
+  {
+    theIdleBoundFiles.push_back(createAsyncFile(true /* bound */));
+  }
+
+  Uint32 threadpool = 8;
+  ndb_mgm_get_int_parameter(p, CFG_DB_THREAD_POOL, &threadpool);
+
+  // Create IoThreads
+  for (Uint32 i = 0; i < threadpool; i++)
+  {
+    AsyncIoThread * thr = createIoThread(0);
+    if (thr)
+    {
+      jam();
+      theThreads.push_back(thr);
+    }
+    else
+    {
+      jam();
+      break;
+    }
   }
 
   ReadConfigConf * conf = (ReadConfigConf*)signal->getDataPtrSend();
@@ -176,7 +228,15 @@ int 
 Ndbfs::forward( AsyncFile * file, Request* request)
 {
   jam();
-  file->execute(request);
+  AsyncIoThread* thr = file->getThread();
+  if (thr) // bound
+  {
+    thr->dispatch(request);
+  }
+  else
+  {
+    theToThreads.writeChannel(request);
+  }
   return 1;
 }
 
@@ -186,13 +246,27 @@ Ndbfs::execFSOPENREQ(Signal* signal)
   jamEntry();
   const FsOpenReq * const fsOpenReq = (FsOpenReq *)&signal->theData[0];
   const BlockReference userRef = fsOpenReq->userReference;
-  AsyncFile* file = getIdleFile();
+
+  bool bound = (fsOpenReq->fileFlags & FsOpenReq::OM_THREAD_POOL) == 0;
+  AsyncFile* file = getIdleFile(bound);
   ndbrequire(file != NULL);
   Filename::NameSpec spec(theFileSystemPath, theBackupFilePath);
 
   Uint32 userPointer = fsOpenReq->userPointer;
+
+  if(signal->getNoOfSections() == 0){
+    jam();
+    file->theFileName.set(spec, userRef, fsOpenReq->fileNumber);
+  } else {
+    jam();
+    SectionHandle handle(this, signal);
+    SegmentedSectionPtr ptr;
+    handle.getSection(ptr, FsOpenReq::FILENAME);
+    file->theFileName.set(spec, ptr, g_sectionSegmentPool);
+    releaseSections(handle);
+  }
   
-  if(fsOpenReq->fileFlags & FsOpenReq::OM_INIT)
+  if (fsOpenReq->fileFlags & FsOpenReq::OM_INIT)
   {
     jam();
     Uint32 cnt = 16; // 512k
@@ -211,9 +285,29 @@ Ndbfs::execFSOPENREQ(Signal* signal)
       return;
     }
     m_shared_page_pool.getPtr(page_ptr);
-    file->m_page_ptr = page_ptr;
-    file->m_page_cnt = cnt;
+    file->set_buffer(page_ptr, cnt);
   } 
+  else if (fsOpenReq->fileFlags & FsOpenReq::OM_WRITE_BUFFER)
+  {
+    jam();
+    Uint32 cnt = NDB_FILE_BUFFER_SIZE / GLOBAL_PAGE_SIZE; // 256k
+    Ptr<GlobalPage> page_ptr;
+    m_ctx.m_mm.alloc_pages(RT_FILE_BUFFER, &page_ptr.i, &cnt, 1);
+    if(cnt == 0)
+    {
+      file->m_page_ptr.setNull();
+      file->m_page_cnt = 0;
+
+      FsRef * const fsRef = (FsRef *)&signal->theData[0];
+      fsRef->userPointer  = userPointer;
+      fsRef->setErrorCode(fsRef->errorCode, FsRef::fsErrOutOfMemory);
+      fsRef->osErrorCode  = ~0; // Indicate local error
+      sendSignal(userRef, GSN_FSOPENREF, signal, 3, JBB);
+      return;
+    }
+    m_shared_page_pool.getPtr(page_ptr);
+    file->set_buffer(page_ptr, cnt);
+  }
   else
   {
     ndbassert(file->m_page_ptr.isNull());
@@ -221,20 +315,8 @@ Ndbfs::execFSOPENREQ(Signal* signal)
     file->m_page_cnt = 0;
   }
   
-  if(signal->getNoOfSections() == 0){
-    jam();
-    file->theFileName.set(spec, userRef, fsOpenReq->fileNumber);
-  } else {
-    jam();
-    SectionHandle handle(this, signal);
-    SegmentedSectionPtr ptr;
-    handle.getSection(ptr, FsOpenReq::FILENAME);
-    file->theFileName.set(spec, ptr, g_sectionSegmentPool);
-    releaseSections(handle);
-  }
-  file->reportTo(&theFromThreads);
   if (getenv("NDB_TRACE_OPEN"))
-    ndbout_c("open(%s)", file->theFileName.c_str());
+    ndbout_c("open(%s) bound: %u", file->theFileName.c_str(), bound);
   
   Request* request = theRequestPool->get();
   request->action = Request::open;
@@ -258,12 +340,11 @@ Ndbfs::execFSREMOVEREQ(Signal* signal)
   jamEntry();
   const FsRemoveReq * const req = (FsRemoveReq *)signal->getDataPtr();
   const BlockReference userRef = req->userReference;
-  AsyncFile* file = getIdleFile();
+  AsyncFile* file = getIdleFile(true);
   ndbrequire(file != NULL);
 
   Filename::NameSpec spec(theFileSystemPath, theBackupFilePath);
   file->theFileName.set(spec, userRef, req->fileNumber, req->directory);
-  file->reportTo(&theFromThreads);
   
   Request* request = theRequestPool->get();
   request->action = Request::rmrf;
@@ -303,6 +384,9 @@ Ndbfs::execFSCLOSEREQ(Signal * signal)
     return;
   }
 
+  if (getenv("NDB_TRACE_OPEN"))
+    ndbout_c("close(%s)", openFile->theFileName.c_str());
+
   Request *request = theRequestPool->get();
   if( fsCloseReq->getRemoveFileFlag(fsCloseReq->fileFlag) == true ) {
      jam();
@@ -669,10 +753,11 @@ Ndbfs::newId()
 }
 
 AsyncFile*
-Ndbfs::createAsyncFile(){
+Ndbfs::createAsyncFile(bool bound){
 
   // Check limit of open files
-  if (m_maxFiles !=0 && theFiles.size() ==  m_maxFiles) {
+  if (m_maxFiles !=0 && theFiles.size() ==  m_maxFiles)
+  {
     // Print info about all open files
     for (unsigned i = 0; i < theFiles.size(); i++){
       AsyncFile* file = theFiles[i];
@@ -687,29 +772,76 @@ Ndbfs::createAsyncFile(){
   AsyncFile* file = new PosixAsyncFile(* this);
 #endif
 
-  struct NdbThread* thr = file->doStart();
-  globalEmulatorData.theConfiguration->addThread(thr, NdbfsThread);
+  if (file->init())
+  {
+    ERROR_SET(fatal, NDBD_EXIT_AFS_MAXOPEN,""," Ndbfs::createAsyncFile");
+  }
 
-  // Put the file in list of all files
-  theFiles.push_back(file);
+  if (bound)
+  {
+    AsyncIoThread * thr = createIoThread(file);
+    theThreads.push_back(thr);
+    file->attach(thr);
 
 #ifdef VM_TRACE
-  infoEvent("NDBFS: Created new file thread %d", theFiles.size());
+    ndbout_c("NDBFS: Created new file thread %d", theFiles.size());
 #endif
+  }
+
+  theFiles.push_back(file);
   
   return file;
 }
 
+void
+Ndbfs::pushIdleFile(AsyncFile* file)
+{
+  if (file->getThread())
+  {
+    theIdleBoundFiles.push_back(file);
+  }
+  else
+  {
+    theIdleUnboundFiles.push_back(file);
+  }
+}
+
+AsyncIoThread*
+Ndbfs::createIoThread(AsyncFile* file)
+{
+  AsyncIoThread* thr = new AsyncIoThread(*this, file);
+
+  struct NdbThread* thrptr = thr->doStart();
+  globalEmulatorData.theConfiguration->addThread(thrptr, NdbfsThread);
+
+  return thr;
+}
+
 AsyncFile*
-Ndbfs::getIdleFile(){
-  AsyncFile* file;
-  if (theIdleFiles.size() > 0){
-    file = theIdleFiles[0];
-    theIdleFiles.erase(0);
-  } else {
-    file = createAsyncFile();
-  } 
-  return file;
+Ndbfs::getIdleFile(bool bound)
+{
+  if (bound)
+  {
+    Uint32 sz = theIdleBoundFiles.size();
+    if (sz)
+    {
+      AsyncFile* file = theIdleBoundFiles[sz - 1];
+      theIdleBoundFiles.erase(sz - 1);
+      return file;
+    }
+  }
+  else
+  {
+    Uint32 sz = theIdleUnboundFiles.size();
+    if (sz)
+    {
+      AsyncFile* file = theIdleUnboundFiles[sz - 1];
+      theIdleUnboundFiles.erase(sz - 1);
+      return file;
+    }
+  }
+
+  return createAsyncFile(bound);
 }
 
 
@@ -721,14 +853,28 @@ Ndbfs::report(Request * request, Signal*
   signal->setTrace(request->theTrace);
   const BlockReference ref = request->theUserReference;
 
-  if(!request->file->m_page_ptr.isNull())
+  if(request->file->has_buffer())
   {
-    assert(request->file->m_page_cnt > 0);
-    m_ctx.m_mm.release_pages(RT_DBTUP_PAGE, 
-                             request->file->m_page_ptr.i,
-                             request->file->m_page_cnt);
-    request->file->m_page_ptr.setNull();
-    request->file->m_page_cnt = 0;
+    Uint32 cnt;
+    Ptr<GlobalPage> ptr;
+    if (request->file->m_open_flags & FsOpenReq::OM_INIT)
+    {
+      jam();
+      request->file->clear_buffer(ptr, cnt);
+      m_ctx.m_mm.release_pages(RT_DBTUP_PAGE, ptr.i, cnt);
+    }
+    else if (request->file->m_open_flags & FsOpenReq::OM_WRITE_BUFFER)
+    {
+      jam();
+      if ((request->action == Request::open && request->error) ||
+          (request->action == Request::close ||
+           request->action == Request::closeRemove))
+      {
+        jam();
+        request->file->clear_buffer(ptr, cnt);
+        m_ctx.m_mm.release_pages(RT_FILE_BUFFER, ptr.i, cnt);
+      }
+    }
   }
   
   if (request->error) {
@@ -750,7 +896,7 @@ Ndbfs::report(Request * request, Signal*
     case Request:: open: {
       jam();
       // Put the file back in idle files list
-      theIdleFiles.push_back(request->file);  
+      pushIdleFile(request->file);
       sendSignal(ref, GSN_FSOPENREF, signal, FsRef::SignalLength, JBB);
       break;
     }
@@ -790,7 +936,7 @@ Ndbfs::report(Request * request, Signal*
     case Request::rmrf: {
       jam();
       // Put the file back in idle files list
-      theIdleFiles.push_back(request->file);  
+      pushIdleFile(request->file);
       sendSignal(ref, GSN_FSREMOVEREF, signal, FsRef::SignalLength, JBB);
       break;
     }
@@ -823,7 +969,7 @@ Ndbfs::report(Request * request, Signal*
       // removes the file from OpenFiles list
       theOpenFiles.erase(request->theFilePointer); 
       // Put the file in idle files list
-      theIdleFiles.push_back(request->file); 
+      pushIdleFile(request->file);
       sendSignal(ref, GSN_FSCLOSECONF, signal, 1, JBB);
       break;
     }
@@ -863,7 +1009,7 @@ Ndbfs::report(Request * request, Signal*
     case Request::rmrf: {
       jam();
       // Put the file in idle files list
-      theIdleFiles.push_back(request->file);            
+      pushIdleFile(request->file);
       sendSignal(ref, GSN_FSREMOVECONF, signal, 1, JBB);
       break;
     }
@@ -1055,9 +1201,10 @@ Ndbfs::execDUMP_STATE_ORD(Signal* signal
     infoEvent("NDBFS: Files: %d Open files: %d",
 	      theFiles.size(),
 	      theOpenFiles.size());
-    infoEvent(" Idle files: %d Max opened files: %d",
-	       theIdleFiles.size(),
-	       m_maxOpenedFiles);
+    infoEvent(" Idle files: (bound: %u unbound: %u) Max opened files: %d",
+              theIdleBoundFiles.size(),
+              theIdleUnboundFiles.size(),
+              m_maxOpenedFiles);
     infoEvent(" Max files: %d",
 	      m_maxFiles);
     infoEvent(" Requests: %d",
@@ -1084,10 +1231,16 @@ Ndbfs::execDUMP_STATE_ORD(Signal* signal
     return;
   }
   if(signal->theData[0] == DumpStateOrd::NdbfsDumpIdleFiles){
-    infoEvent("NDBFS: Dump idle files: %d", theIdleFiles.size());
+    infoEvent("NDBFS: Dump idle files: %d %u",
+              theIdleBoundFiles.size(), theIdleUnboundFiles.size());
     
-    for (unsigned i = 0; i < theIdleFiles.size(); i++){
-      AsyncFile* file = theIdleFiles[i];
+    for (unsigned i = 0; i < theIdleBoundFiles.size(); i++){
+      AsyncFile* file = theIdleBoundFiles[i];
+      infoEvent("%2d (0x%lx): %s", i, (long)file, file->isOpen()?"OPEN":"CLOSED");
+    }
+
+    for (unsigned i = 0; i < theIdleUnboundFiles.size(); i++){
+      AsyncFile* file = theIdleUnboundFiles[i];
       infoEvent("%2d (0x%lx): %s", i, (long)file, file->isOpen()?"OPEN":"CLOSED");
     }
     return;
@@ -1095,6 +1248,7 @@ Ndbfs::execDUMP_STATE_ORD(Signal* signal
 
   if(signal->theData[0] == 404)
   {
+#if 0
     ndbrequire(signal->getLength() == 2);
     Uint32 file= signal->theData[1];
     AsyncFile* openFile = theOpenFiles.find(file);
@@ -1115,6 +1269,7 @@ Ndbfs::execDUMP_STATE_ORD(Signal* signal
       AsyncFile* file = theFiles[i];
       ndbout_c("%2d (0x%lx): %s", i, (long) file, file->isOpen()?"OPEN":"CLOSED");
     }
+#endif
   }
 }//Ndbfs::execDUMP_STATE_ORD()
 
@@ -1132,6 +1287,7 @@ Ndbfs::get_filename(Uint32 fd) const
 BLOCK_FUNCTIONS(Ndbfs)
 
 template class Vector<AsyncFile*>;
+template class Vector<AsyncIoThread*>;
 template class Vector<OpenFiles::OpenFileItem>;
 template class MemoryChannel<Request>;
 template class Pool<Request>;

=== modified file 'storage/ndb/src/kernel/blocks/ndbfs/Ndbfs.hpp'
--- a/storage/ndb/src/kernel/blocks/ndbfs/Ndbfs.hpp	2006-12-23 19:20:40 +0000
+++ b/storage/ndb/src/kernel/blocks/ndbfs/Ndbfs.hpp	2008-12-02 13:10:49 +0000
@@ -22,21 +22,21 @@
 #include "AsyncFile.hpp"
 #include "OpenFiles.hpp"
 
-
+class AsyncIoThread;
 
 // Because one NDB Signal request can result in multiple requests to
 // AsyncFile one class must be made responsible to keep track
 // of all out standing request and when all are finished the result
 // must be reported to the sending block.
 
-
 class Ndbfs : public SimulatedBlock
 {
+  friend class AsyncIoThread;
 public:
   Ndbfs(Block_context&);
   virtual ~Ndbfs();
-
   virtual const char* get_filename(Uint32 fd) const;
+
 protected:
   BLOCK_DEFINES(Ndbfs);
 
@@ -69,17 +69,22 @@ private:
   Uint16 theLastId;
   BlockReference cownref;
 
-  // Communication from files 
+  // Communication from/to files
   MemoryChannel<Request> theFromThreads;
+  MemoryChannel<Request> theToThreads;
 
   Pool<Request>* theRequestPool;
 
-  AsyncFile* createAsyncFile();
-  AsyncFile* getIdleFile();
-
-  Vector<AsyncFile*> theFiles;     // List all created AsyncFiles
-  Vector<AsyncFile*> theIdleFiles; // List of idle AsyncFiles
-  OpenFiles theOpenFiles;          // List of open AsyncFiles
+  AsyncIoThread* createIoThread(AsyncFile* file);
+  AsyncFile* createAsyncFile(bool bound);
+  AsyncFile* getIdleFile(bool bound);
+  void pushIdleFile(AsyncFile*);
+
+  Vector<AsyncIoThread*> theThreads;// List of all created threads
+  Vector<AsyncFile*> theFiles;      // List all created AsyncFiles
+  Vector<AsyncFile*> theIdleBoundFiles;   // List of idle AsyncFiles
+  Vector<AsyncFile*> theIdleUnboundFiles; // List of idle AsyncFiles
+  OpenFiles theOpenFiles;           // List of open AsyncFiles
 
   BaseString theFileSystemPath;
   BaseString theBackupFilePath;

=== modified file 'storage/ndb/src/kernel/blocks/ndbfs/PosixAsyncFile.cpp'
--- a/storage/ndb/src/kernel/blocks/ndbfs/PosixAsyncFile.cpp	2008-11-19 11:01:17 +0000
+++ b/storage/ndb/src/kernel/blocks/ndbfs/PosixAsyncFile.cpp	2008-12-02 13:10:49 +0000
@@ -21,6 +21,7 @@
 #include <xfs/xfs.h>
 #endif
 
+#include "Ndbfs.hpp"
 #include "AsyncFile.hpp"
 #include "PosixAsyncFile.hpp"
 
@@ -34,17 +35,8 @@
 
 #include <NdbTick.h>
 
-// use this to test broken pread code
-//#define HAVE_BROKEN_PREAD
-
-#ifdef HAVE_BROKEN_PREAD
-#undef HAVE_PWRITE
-#undef HAVE_PREAD
-#endif
-
 // For readv and writev
 #include <sys/uio.h>
-
 #include <dirent.h>
 
 PosixAsyncFile::PosixAsyncFile(SimulatedBlock& fs) :
@@ -53,18 +45,12 @@ PosixAsyncFile::PosixAsyncFile(Simulated
   use_gz(0)
 {
   memset(&azf,0,sizeof(azf));
+  init_mutex();
 }
 
 int PosixAsyncFile::init()
 {
   // Create write buffer for bigger writes
-  theWriteBufferSize = WRITEBUFFERSIZE;
-  theWriteBufferUnaligned = (char *) ndbd_malloc(theWriteBufferSize +
-                                                 NDB_O_DIRECT_WRITE_ALIGNMENT-1);
-  theWriteBuffer = (char *)
-    (((UintPtr)theWriteBufferUnaligned + NDB_O_DIRECT_WRITE_ALIGNMENT - 1) &
-     ~(UintPtr)(NDB_O_DIRECT_WRITE_ALIGNMENT - 1));
-
   azfBufferUnaligned= (Byte*)ndbd_malloc((AZ_BUFSIZE_READ+AZ_BUFSIZE_WRITE)
                                          +NDB_O_DIRECT_WRITE_ALIGNMENT-1);
 
@@ -82,13 +68,8 @@ int PosixAsyncFile::init()
 
   azf.stream.opaque= &az_mempool;
 
-  if (!theWriteBuffer) {
-    DEBUG(ndbout_c("AsyncFile::writeReq, Failed allocating write buffer"));
-    return -1;
-  }//if
-
   return 0;
-}//AsyncFile::init()
+}
 
 #ifdef O_DIRECT
 static char g_odirect_readbuf[2*GLOBAL_PAGE_SIZE -1];
@@ -229,8 +210,14 @@ void PosixAsyncFile::openReq(Request *re
     break;
     return;
   }
-  if(flags & FsOpenReq::OM_GZ)
-    use_gz= 1;
+  if (flags & FsOpenReq::OM_GZ)
+  {
+    use_gz = 1;
+  }
+  else
+  {
+    use_gz = 0;
+  }
 
   // allow for user to choose any permissionsa with umask
   const int mode = S_IRUSR | S_IWUSR |
@@ -363,7 +350,8 @@ no_odirect:
   retry:
       off_t save_size = size;
       char* buf = (char*)m_page_ptr.p;
-      while(size > 0){
+      while(size > 0)
+      {
 #ifdef TRACE_INIT
         write_cnt++;
 #endif
@@ -504,8 +492,9 @@ int PosixAsyncFile::readBuffer(Request *
 {
   int return_value;
   req->par.readWrite.pages[0].size = 0;
-#if ! defined(HAVE_PREAD)
   off_t seek_val;
+#if ! defined(HAVE_PREAD)
+  FileGuard guard(this);
   if(!use_gz)
   {
     while((seek_val= lseek(theFd, offset, SEEK_SET)) == (off_t)-1
@@ -516,7 +505,6 @@ int PosixAsyncFile::readBuffer(Request *
     }
   }
 #endif
-  off_t seek_val;
   if(use_gz)
   {
     while((seek_val= azseek(&azf, offset, SEEK_SET)) == (off_t)-1
@@ -611,15 +599,16 @@ void PosixAsyncFile::readvReq(Request *r
 #endif
 }
 
-int PosixAsyncFile::writeBuffer(const char *buf, size_t size, off_t offset,
-                                size_t chunk_size)
+int PosixAsyncFile::writeBuffer(const char *buf, size_t size, off_t offset)
 {
+  size_t chunk_size = 256*1024;
   size_t bytes_to_write = chunk_size;
   int return_value;
 
   m_write_wo_sync += size;
 
 #if ! defined(HAVE_PWRITE)
+  FileGuard guard(this);
   off_t seek_val;
   while((seek_val= lseek(theFd, offset, SEEK_SET)) == (off_t)-1
 	&& errno == EINTR);
@@ -772,20 +761,23 @@ void PosixAsyncFile::removeReq(Request *
   }
 }
 
-void PosixAsyncFile::rmrfReq(Request *request, char *path, bool removePath)
+void
+PosixAsyncFile::rmrfReq(Request *request, const char *_path, bool removePath)
 {
+  char path[PATH_MAX];
+  strcpy(path, _path);
   Uint32 path_len = strlen(path);
   Uint32 path_max_copy = PATH_MAX - path_len;
   char* path_add = &path[path_len];
 
   if(!request->par.rmrf.directory){
     // Remove file
-    if(unlink((const char *)path) != 0 && errno != ENOENT)
+    if(unlink(path) != 0 && errno != ENOENT)
       request->error = errno;
     return;
   }
   // Remove directory
-  DIR* dirp = opendir((const char *)path);
+  DIR* dirp = opendir(path);
   if(dirp == 0){
     if(errno != ENOENT)
       request->error = errno;
@@ -816,12 +808,8 @@ void PosixAsyncFile::rmrfReq(Request *re
   return;
 }
 
-void PosixAsyncFile::endReq()
+PosixAsyncFile::~PosixAsyncFile()
 {
-  // Thread is ended with return
-  if (theWriteBufferUnaligned)
-    ndbd_free(theWriteBufferUnaligned, theWriteBufferSize);
-
   if (azfBufferUnaligned)
     ndbd_free(azfBufferUnaligned, (AZ_BUFSIZE_READ*AZ_BUFSIZE_WRITE)
               +NDB_O_DIRECT_WRITE_ALIGNMENT-1);
@@ -830,11 +818,10 @@ void PosixAsyncFile::endReq()
     ndbd_free(az_mempool.mem,az_mempool.size);
 
   az_mempool.mem = NULL;
-  theWriteBufferUnaligned = NULL;
   azfBufferUnaligned = NULL;
+  destroy_mutex();
 }
 
-
 void PosixAsyncFile::createDirectories()
 {
   char* tmp;

=== modified file 'storage/ndb/src/kernel/blocks/ndbfs/PosixAsyncFile.hpp'
--- a/storage/ndb/src/kernel/blocks/ndbfs/PosixAsyncFile.hpp	2007-11-15 00:30:00 +0000
+++ b/storage/ndb/src/kernel/blocks/ndbfs/PosixAsyncFile.hpp	2008-12-02 13:10:49 +0000
@@ -24,15 +24,26 @@
 
 #include <azlib.h>
 
+/**
+ * PREAD/PWRITE is needed to use file != thread
+ *   therefor it's defined/checked here
+ */
+#ifdef HAVE_BROKEN_PREAD
+#undef HAVE_PWRITE
+#undef HAVE_PREAD
+#elif defined (HAVE_PREAD)
+#define HAVE_PWRITE
+#endif
+
 class PosixAsyncFile : public AsyncFile
 {
   friend class Ndbfs;
 public:
   PosixAsyncFile(SimulatedBlock& fs);
+  virtual ~PosixAsyncFile();
 
-  int init();
-
-  bool isOpen();
+  virtual int init();
+  virtual bool isOpen();
 
   virtual void openReq(Request *request);
   virtual void readvReq(Request *request);
@@ -41,32 +52,59 @@ public:
   virtual void syncReq(Request *request);
   virtual void removeReq(Request *request);
   virtual void appendReq(Request *request);
-  virtual void rmrfReq(Request *request, char * path, bool removePath);
-  void endReq();
+  virtual void rmrfReq(Request *request, const char * path, bool removePath);
 
   virtual int readBuffer(Request*, char * buf, size_t size, off_t offset);
-  virtual int writeBuffer(const char * buf, size_t size, off_t offset,
-		  size_t chunk_size = WRITECHUNK);
+  virtual int writeBuffer(const char * buf, size_t size, off_t offset);
 
   virtual void createDirectories();
 
 private:
   int theFd;
 
-  Uint32 m_open_flags; // OM_ flags from request to open file
-
   int use_gz;
   azio_stream azf;
   struct az_alloc_rec az_mempool;
-
-  void* theWriteBufferUnaligned;
   void* azfBufferUnaligned;
 
-  size_t m_write_wo_sync;  // Writes wo/ sync
-  size_t m_auto_sync_freq; // Auto sync freq in bytes
-
   int check_odirect_read(Uint32 flags, int&new_flags, int mode);
   int check_odirect_write(Uint32 flags, int&new_flags, int mode);
+
+#ifndef HAVE_PREAD
+  struct FileGuard;
+  friend struct FileGuard;
+  NdbMutex * m_mutex;
+  void init_mutex() { m_mutex = NdbMutex_Create();}
+  void destroy_mutex() { NdbMutex_Destroy(m_mutex);}
+
+  /**
+   * If dont HAVE_PREAD and using file != thread
+   */
+  struct FileGuard
+  {
+    PosixAsyncFile* m_file;
+    FileGuard (PosixAsyncFile* file) : m_file(file) {
+      if (m_file->getThread() == 0)
+      {
+        NdbMutex_Lock(m_file->m_mutex);
+      }
+    }
+    ~FileGuard() {
+      if (m_file->getThread() == 0)
+      {
+        NdbMutex_Unlock(m_file->m_mutex);
+      }
+    }
+  };
+#else
+  void init_mutex() {}
+  void destroy_mutex() {}
+  struct FileGuard
+  {
+    FileGuard (PosixAsyncFile* file){}
+    ~FileGuard () {}
+  };
+#endif
 };
 
 #endif

=== modified file 'storage/ndb/src/kernel/blocks/ndbfs/Win32AsyncFile.cpp'
--- a/storage/ndb/src/kernel/blocks/ndbfs/Win32AsyncFile.cpp	2008-11-19 11:01:17 +0000
+++ b/storage/ndb/src/kernel/blocks/ndbfs/Win32AsyncFile.cpp	2008-12-02 13:10:49 +0000
@@ -171,7 +171,7 @@ void Win32AsyncFile::openReq(Request* re
                           );
       Uint32 size = request->par.open.page_size;
       char* buf = (char*)m_page_ptr.p;
-	  DWORD dwWritten;
+      DWORD dwWritten;
       while(size > 0){
 	BOOL bWrite= WriteFile(hFile, buf, size, &dwWritten, 0);
 	if(!bWrite || dwWritten!=size)
@@ -205,7 +205,8 @@ void Win32AsyncFile::openReq(Request* re
 }
 
 int
-Win32AsyncFile::readBuffer(Request* req, char * buf, size_t size, off_t offset){
+Win32AsyncFile::readBuffer(Request* req, char * buf, size_t size, off_t offset)
+{
   req->par.readWrite.pages[0].size = 0;
 
   while (size > 0) {
@@ -259,9 +260,9 @@ Win32AsyncFile::readBuffer(Request* req,
 }
 
 int
-Win32AsyncFile::writeBuffer(const char * buf, size_t size, off_t offset,
-		       size_t chunk_size)
+Win32AsyncFile::writeBuffer(const char * buf, size_t size, off_t offset)
 {
+  size_t chunk_size = 256 * 1024;
   size_t bytes_to_write = chunk_size;
 
   m_write_wo_sync += size;
@@ -365,7 +366,9 @@ Win32AsyncFile::removeReq(Request * requ
 }
 
 void
-Win32AsyncFile::rmrfReq(Request * request, char * path, bool removePath){
+Win32AsyncFile::rmrfReq(Request * request, const char * _path, bool removePath){
+  char path[PATH_MAX];
+  strcpy(path, _path);
   Uint32 path_len = strlen(path);
   Uint32 path_max_copy = PATH_MAX - path_len;
   char* path_add = &path[path_len];

=== modified file 'storage/ndb/src/kernel/blocks/ndbfs/Win32AsyncFile.hpp'
--- a/storage/ndb/src/kernel/blocks/ndbfs/Win32AsyncFile.hpp	2008-08-21 06:38:48 +0000
+++ b/storage/ndb/src/kernel/blocks/ndbfs/Win32AsyncFile.hpp	2008-12-02 13:10:49 +0000
@@ -30,32 +30,24 @@ class Win32AsyncFile : public AsyncFile
   friend class Ndbfs;
 public:
   Win32AsyncFile(SimulatedBlock& fs);
-  ~Win32AsyncFile();
+  virtual ~Win32AsyncFile();
 
-  void reportTo( MemoryChannel<Request> *reportTo );
+  virtual bool isOpen();
+  virtual void openReq(Request *request);
+  virtual void closeReq(Request *request);
+  virtual void syncReq(Request *request);
+  virtual void removeReq(Request *request);
+  virtual void appendReq(Request *request);
+  virtual void rmrfReq(Request *request, const char * path, bool removePath);
 
-  void execute( Request* request );
-
-  bool isOpen();
+  virtual int readBuffer(Request*, char * buf, size_t size, off_t offset);
+  virtual int writeBuffer(const char * buf, size_t size, off_t offset);
 
 private:
-
-  void openReq(Request *request);
-  void closeReq(Request *request);
-  void syncReq(Request *request);
-  void removeReq(Request *request);
-  void appendReq(Request *request);
-  void rmrfReq(Request *request, char * path, bool removePath);
-
-  int readBuffer(Request*, char * buf, size_t size, off_t offset);
-  int writeBuffer(const char * buf, size_t size, off_t offset,
-		  size_t chunk_size = WRITECHUNK);
-
   int extendfile(Request* request);
   void createDirectories();
 
   HANDLE hFile;
-  Uint32 m_open_flags; // OM_ flags from request to open file
 };
 
 #endif

=== modified file 'storage/ndb/src/kernel/blocks/record_types.hpp'
--- a/storage/ndb/src/kernel/blocks/record_types.hpp	2008-01-01 12:45:11 +0000
+++ b/storage/ndb/src/kernel/blocks/record_types.hpp	2008-12-02 13:10:49 +0000
@@ -45,6 +45,11 @@
 #define RG_JOBBUFFER            4
 
 /**
+ * File-thread buffers
+ */
+#define RG_FILE_BUFFERS         5
+
+/**
  * 
  */
 #define RG_RESERVED             0
@@ -70,4 +75,6 @@
 
 #define RT_JOB_BUFFER              MAKE_TID( 1, RG_JOBBUFFER)
 
+#define RT_FILE_BUFFER             MAKE_TID( 1, RG_FILE_BUFFERS)
+
 #endif

=== modified file 'storage/ndb/src/kernel/blocks/tsman.cpp'
--- a/storage/ndb/src/kernel/blocks/tsman.cpp	2008-12-01 18:03:48 +0000
+++ b/storage/ndb/src/kernel/blocks/tsman.cpp	2008-12-02 13:10:49 +0000
@@ -767,6 +767,7 @@ Tsman::open_file(Signal* signal, 
   req->fileFlags = 0;
   req->fileFlags |= FsOpenReq::OM_READWRITE;
   req->fileFlags |= FsOpenReq::OM_DIRECT;
+  req->fileFlags |= FsOpenReq::OM_THREAD_POOL;
   switch(requestInfo){
   case CreateFileImplReq::Create:
     req->fileFlags |= FsOpenReq::OM_CREATE_IF_NONE;

=== modified file 'storage/ndb/src/kernel/main.cpp'
--- a/storage/ndb/src/kernel/main.cpp	2008-12-01 18:04:19 +0000
+++ b/storage/ndb/src/kernel/main.cpp	2008-12-02 13:10:49 +0000
@@ -250,20 +250,34 @@ init_global_memory_manager(EmulatorData 
                         "config, exiting.");
     return -1;
   }
+
   if (tupmem)
   {
     Resource_limit rl;
     rl.m_min = tupmem;
     rl.m_max = tupmem;
-    rl.m_resource_id = 3;
+    rl.m_resource_id = RG_DATAMEM;
+    ed.m_mem_manager->set_resource_limit(rl);
+  }
+
+  Uint32 maxopen = 4 * 4; // 4 redo parts, max 4 files per part
+  Uint32 filebuffer = NDB_FILE_BUFFER_SIZE;
+  Uint32 filepages = (filebuffer / GLOBAL_PAGE_SIZE) * maxopen;
+
+  if (filepages)
+  {
+    Resource_limit rl;
+    rl.m_min = filepages;
+    rl.m_max = filepages;
+    rl.m_resource_id = RG_FILE_BUFFERS;
     ed.m_mem_manager->set_resource_limit(rl);
   }
 
-  if (shared_mem+tupmem)
+  if (shared_mem + tupmem + filepages)
   {
     Resource_limit rl;
     rl.m_min = 0;
-    rl.m_max = shared_mem + tupmem;
+    rl.m_max = shared_mem + tupmem + filepages;
     rl.m_resource_id = 0;
     ed.m_mem_manager->set_resource_limit(rl);
   }
@@ -280,7 +294,7 @@ init_global_memory_manager(EmulatorData 
     ndb_mgm_get_db_parameter_info(CFG_DB_SGA, &sga, &size);
 
     g_eventLogger->alert("Malloc (%lld bytes) for %s and %s failed, exiting",
-                         Uint64(shared_mem + tupmem) * 32768,
+                         Uint64(shared_mem + tupmem) * GLOBAL_PAGE_SIZE,
                          dm.m_name, sga.m_name);
     return -1;
   }

=== modified file 'storage/ndb/src/kernel/vm/ndbd_malloc_impl.hpp'
--- a/storage/ndb/src/kernel/vm/ndbd_malloc_impl.hpp	2008-02-08 14:35:31 +0000
+++ b/storage/ndb/src/kernel/vm/ndbd_malloc_impl.hpp	2008-12-02 13:10:49 +0000
@@ -84,7 +84,7 @@ public:
 private:
   void grow(Uint32 start, Uint32 cnt);
 
-#define XX_RL_COUNT 5
+#define XX_RL_COUNT 6
   /**
    * Return pointer to free page data on page
    */

=== modified file 'storage/ndb/src/mgmsrv/ConfigInfo.cpp'
--- a/storage/ndb/src/mgmsrv/ConfigInfo.cpp	2008-11-19 11:01:17 +0000
+++ b/storage/ndb/src/mgmsrv/ConfigInfo.cpp	2008-12-02 13:10:49 +0000
@@ -1074,6 +1074,18 @@ const ConfigInfo::ParamInfo ConfigInfo::
     0, 0 },
 
   {
+    CFG_DB_THREAD_POOL,
+    "ThreadPool",
+    DB_TOKEN,
+    "No of unbound threads for file access (currently only for DD)",
+    ConfigInfo::CI_USED,
+    false,
+    ConfigInfo::CI_INT,
+    "8",
+    "0",  
+    STR_VALUE(MAX_INT_RNIL) },
+
+  {
     CFG_DB_MAX_OPEN_FILES,
     "MaxNoOfOpenFiles",
     DB_TOKEN,

Thread
bzr commit into mysql-5.1 branch (jonas:3148) Jonas Oreland2 Dec