#At file:///home/jonas/src/telco-6.4/
3148 Jonas Oreland 2008-12-02
ndb - file != thread (aka ThreadPool) for DD
Make full split of file/thread
added:
storage/ndb/src/kernel/blocks/ndbfs/AsyncIoThread.cpp
storage/ndb/src/kernel/blocks/ndbfs/AsyncIoThread.hpp
modified:
storage/ndb/include/kernel/ndb_limits.h
storage/ndb/include/kernel/signaldata/FsOpenReq.hpp
storage/ndb/include/mgmapi/mgmapi_config_parameters.h
storage/ndb/src/kernel/blocks/Makefile.am
storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp
storage/ndb/src/kernel/blocks/lgman.cpp
storage/ndb/src/kernel/blocks/ndbcntr/NdbcntrMain.cpp
storage/ndb/src/kernel/blocks/ndbfs/AsyncFile.cpp
storage/ndb/src/kernel/blocks/ndbfs/AsyncFile.hpp
storage/ndb/src/kernel/blocks/ndbfs/MemoryChannel.hpp
storage/ndb/src/kernel/blocks/ndbfs/Ndbfs.cpp
storage/ndb/src/kernel/blocks/ndbfs/Ndbfs.hpp
storage/ndb/src/kernel/blocks/ndbfs/PosixAsyncFile.cpp
storage/ndb/src/kernel/blocks/ndbfs/PosixAsyncFile.hpp
storage/ndb/src/kernel/blocks/ndbfs/Win32AsyncFile.cpp
storage/ndb/src/kernel/blocks/ndbfs/Win32AsyncFile.hpp
storage/ndb/src/kernel/blocks/record_types.hpp
storage/ndb/src/kernel/blocks/tsman.cpp
storage/ndb/src/kernel/main.cpp
storage/ndb/src/kernel/vm/ndbd_malloc_impl.hpp
storage/ndb/src/mgmsrv/ConfigInfo.cpp
=== modified file 'storage/ndb/include/kernel/ndb_limits.h'
--- a/storage/ndb/include/kernel/ndb_limits.h 2008-11-13 15:22:59 +0000
+++ b/storage/ndb/include/kernel/ndb_limits.h 2008-12-02 13:10:49 +0000
@@ -187,4 +187,6 @@
#define MAX_NDBMT_LQH_WORKERS 4
#define MAX_NDBMT_LQH_THREADS 4
+#define NDB_FILE_BUFFER_SIZE (256*1024)
+
#endif
=== modified file 'storage/ndb/include/kernel/signaldata/FsOpenReq.hpp'
--- a/storage/ndb/include/kernel/signaldata/FsOpenReq.hpp 2008-08-21 06:38:48 +0000
+++ b/storage/ndb/include/kernel/signaldata/FsOpenReq.hpp 2008-12-02 13:10:49 +0000
@@ -33,6 +33,7 @@ class FsOpenReq {
friend class Win32AsyncFile; // FIXME
friend class Filename;
friend class VoidFs;
+ friend class AsyncIoThread;
/**
* Sender(s)
@@ -90,6 +91,8 @@ private:
STATIC_CONST( OM_CHECK_SIZE = 0x2000 );
STATIC_CONST( OM_DIRECT = 0x4000 );
STATIC_CONST( OM_GZ = 0x8000 );
+ STATIC_CONST( OM_THREAD_POOL = 0x10000 );
+ STATIC_CONST( OM_WRITE_BUFFER = 0x20000 );
enum Suffixes {
S_DATA = 0,
=== modified file 'storage/ndb/include/mgmapi/mgmapi_config_parameters.h'
--- a/storage/ndb/include/mgmapi/mgmapi_config_parameters.h 2008-11-19 11:01:17 +0000
+++ b/storage/ndb/include/mgmapi/mgmapi_config_parameters.h 2008-12-02 13:10:49 +0000
@@ -147,6 +147,7 @@
#define CFG_NDBMT_LQH_WORKERS 188
#define CFG_DB_INIT_REDO 189
+#define CFG_DB_THREAD_POOL 190
#define CFG_DB_SGA 198 /* super pool mem */
#define CFG_DB_DATA_MEM_2 199 /* used in special build in 5.1 */
=== modified file 'storage/ndb/src/kernel/blocks/Makefile.am'
--- a/storage/ndb/src/kernel/blocks/Makefile.am 2008-11-16 15:29:16 +0000
+++ b/storage/ndb/src/kernel/blocks/Makefile.am 2008-12-02 13:10:49 +0000
@@ -41,6 +41,7 @@ libblocks_a_SOURCES = tsman.cpp lgman.cp
dbtup/DbtupDebug.cpp dbtup/DbtupScan.cpp \
dbtup/DbtupDiskAlloc.cpp dbtup/DbtupVarAlloc.cpp \
dbtup/tuppage.cpp dbtup/Undo_buffer.cpp \
+ ndbfs/AsyncIoThread.cpp \
ndbfs/PosixAsyncFile.cpp ndbfs/AsyncFile.cpp \
ndbfs/Ndbfs.cpp \
ndbfs/VoidFs.cpp \
=== modified file 'storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp'
--- a/storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp 2008-12-01 18:50:58 +0000
+++ b/storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp 2008-12-02 13:10:49 +0000
@@ -14228,7 +14228,7 @@ void Dblqh::openFileRw(Signal* signal, L
signal->theData[3] = olfLogFilePtr.p->fileName[1];
signal->theData[4] = olfLogFilePtr.p->fileName[2];
signal->theData[5] = olfLogFilePtr.p->fileName[3];
- signal->theData[6] = FsOpenReq::OM_READWRITE | FsOpenReq::OM_AUTOSYNC | FsOpenReq::OM_CHECK_SIZE;
+ signal->theData[6] = FsOpenReq::OM_READWRITE | FsOpenReq::OM_AUTOSYNC | FsOpenReq::OM_CHECK_SIZE | FsOpenReq::OM_WRITE_BUFFER;
if (c_o_direct)
signal->theData[6] |= FsOpenReq::OM_DIRECT;
req->auto_sync_size = MAX_REDO_PAGES_WITHOUT_SYNCH * sizeof(LogPageRecord);
@@ -14254,7 +14254,7 @@ void Dblqh::openLogfileInit(Signal* sign
signal->theData[3] = logFilePtr.p->fileName[1];
signal->theData[4] = logFilePtr.p->fileName[2];
signal->theData[5] = logFilePtr.p->fileName[3];
- signal->theData[6] = FsOpenReq::OM_READWRITE | FsOpenReq::OM_TRUNCATE | FsOpenReq::OM_CREATE | FsOpenReq::OM_AUTOSYNC;
+ signal->theData[6] = FsOpenReq::OM_READWRITE | FsOpenReq::OM_TRUNCATE | FsOpenReq::OM_CREATE | FsOpenReq::OM_AUTOSYNC | FsOpenReq::OM_WRITE_BUFFER;
if (c_o_direct)
signal->theData[6] |= FsOpenReq::OM_DIRECT;
@@ -14358,7 +14358,7 @@ void Dblqh::openNextLogfile(Signal* sign
signal->theData[3] = onlLogFilePtr.p->fileName[1];
signal->theData[4] = onlLogFilePtr.p->fileName[2];
signal->theData[5] = onlLogFilePtr.p->fileName[3];
- signal->theData[6] = FsOpenReq::OM_READWRITE | FsOpenReq::OM_AUTOSYNC | FsOpenReq::OM_CHECK_SIZE;
+ signal->theData[6] = FsOpenReq::OM_READWRITE | FsOpenReq::OM_AUTOSYNC | FsOpenReq::OM_CHECK_SIZE | FsOpenReq::OM_WRITE_BUFFER;
if (c_o_direct)
signal->theData[6] |= FsOpenReq::OM_DIRECT;
req->auto_sync_size = MAX_REDO_PAGES_WITHOUT_SYNCH * sizeof(LogPageRecord);
=== modified file 'storage/ndb/src/kernel/blocks/lgman.cpp'
--- a/storage/ndb/src/kernel/blocks/lgman.cpp 2008-12-01 18:03:48 +0000
+++ b/storage/ndb/src/kernel/blocks/lgman.cpp 2008-12-02 13:10:49 +0000
@@ -26,7 +26,6 @@
#include <signaldata/SumaImpl.hpp>
#include <signaldata/LgmanContinueB.hpp>
#include <signaldata/GetTabInfo.hpp>
-#include "ndbfs/Ndbfs.hpp"
#include "dbtup/Dbtup.hpp"
#include <EventLogger.hpp>
=== modified file 'storage/ndb/src/kernel/blocks/ndbcntr/NdbcntrMain.cpp'
--- a/storage/ndb/src/kernel/blocks/ndbcntr/NdbcntrMain.cpp 2008-10-05 07:12:56 +0000
+++ b/storage/ndb/src/kernel/blocks/ndbcntr/NdbcntrMain.cpp 2008-12-02 13:10:49 +0000
@@ -98,8 +98,8 @@ static BlockInfo ALL_BLOCKS[] = {
static const Uint32 ALL_BLOCKS_SZ = sizeof(ALL_BLOCKS)/sizeof(BlockInfo);
static BlockReference readConfigOrder[ALL_BLOCKS_SZ] = {
- NDBFS_REF, // let it run first to make sure it can start the threads
CMVMI_REF,
+ NDBFS_REF,
DBINFO_REF,
DBTUP_REF,
DBACC_REF,
=== modified file 'storage/ndb/src/kernel/blocks/ndbfs/AsyncFile.cpp'
--- a/storage/ndb/src/kernel/blocks/ndbfs/AsyncFile.cpp 2008-11-06 10:17:49 +0000
+++ b/storage/ndb/src/kernel/blocks/ndbfs/AsyncFile.cpp 2008-12-02 13:10:49 +0000
@@ -27,203 +27,44 @@
#include <signaldata/FsReadWriteReq.hpp>
#include <Configuration.hpp>
-const char *actionName[] = {
- "open",
- "close",
- "closeRemove",
- "read",
- "readv",
- "write",
- "writev",
- "writeSync",
- "writevSync",
- "sync",
- "end" };
-
-static int numAsyncFiles = 0;
-
-extern "C" void * runAsyncFile(void* arg)
-{
- ((AsyncFile*)arg)->run();
- return (NULL);
-}
-
-
AsyncFile::AsyncFile(SimulatedBlock& fs) :
theFileName(),
- theReportTo(0),
- theMemoryChannelPtr(NULL),
m_fs(fs)
{
- m_page_ptr.setNull();
- m_current_request= m_last_request= 0;
- m_auto_sync_freq = 0;
-}
-
-struct NdbThread*
-AsyncFile::doStart()
-{
- // Stacksize for filesystem threads
- // An 8k stack should be enough
- const NDB_THREAD_STACKSIZE stackSize = 8192;
-
- char buf[16];
- numAsyncFiles++;
- BaseString::snprintf(buf, sizeof(buf), "AsyncFile%d", numAsyncFiles);
-
- theStartMutexPtr = NdbMutex_Create();
- theStartConditionPtr = NdbCondition_Create();
- NdbMutex_Lock(theStartMutexPtr);
- theStartFlag = false;
-
- theThreadPtr = NdbThread_Create(runAsyncFile,
- (void**)this,
- stackSize,
- (char*)&buf,
- NDB_THREAD_PRIO_MEAN);
-
- if (theThreadPtr == 0)
- {
- ERROR_SET(fatal, NDBD_EXIT_MEMALLOC,
- "","Could not allocate file system thread");
- }
- NdbCondition_Wait(theStartConditionPtr,
- theStartMutexPtr);
- NdbMutex_Unlock(theStartMutexPtr);
- NdbMutex_Destroy(theStartMutexPtr);
- NdbCondition_Destroy(theStartConditionPtr);
-
- return theThreadPtr;
-}
+ m_thread = 0;
-void AsyncFile::shutdown()
-{
- void *status;
- Request request;
- request.action = Request::end;
- this->theMemoryChannelPtr->writeChannel( &request );
- NdbThread_WaitFor(theThreadPtr, &status);
- NdbThread_Destroy(&theThreadPtr);
- delete theMemoryChannelPtr;
+ m_page_cnt = 0;
+ m_page_ptr.setNull();
+ theWriteBuffer = 0;
+ theWriteBufferSize = 0;
}
void
-AsyncFile::reportTo( MemoryChannel<Request> *reportTo )
-{
- theReportTo = reportTo;
-}
-
-void AsyncFile::execute(Request* request)
-{
- theMemoryChannelPtr->writeChannel( request );
-}
-
-int AsyncFile::init()
+AsyncFile::attach(AsyncIoThread* thr)
{
- // Create write buffer for bigger writes
- theWriteBufferSize = WRITEBUFFERSIZE;
- theWriteBuffer = (char *) ndbd_malloc(theWriteBufferSize);
-
- return 0;
+#if 0
+ ndbout_c("%p:%s attach to %p (m_thread: %p)", this, theFileName.c_str(), thr,
+ m_thread);
+#endif
+ assert(m_thread == 0);
+ m_thread = thr;
}
void
-AsyncFile::run()
+AsyncFile::detach(AsyncIoThread* thr)
{
- Request *request;
-
- // Create theMemoryChannel in the thread that will wait for it
- NdbMutex_Lock(theStartMutexPtr);
- theMemoryChannelPtr = new MemoryChannel<Request>();
- theStartFlag = true;
-
- int r= this->init();
-
- NdbMutex_Unlock(theStartMutexPtr);
- NdbCondition_Signal(theStartConditionPtr);
-
- if(r!=0)
- {
- DEBUG(ndbout_c("AsyncFile::init() failed"));
- return;
- }
-
- while (1) {
- request = theMemoryChannelPtr->readChannel();
- if (!request) {
- DEBUG(ndbout_c("Nothing read from Memory Channel in AsyncFile"));
- endReq();
- return;
- }//if
- m_current_request= request;
- switch (request->action) {
- case Request:: open:
- openReq(request);
- break;
- case Request:: close:
- closeReq(request);
- break;
- case Request:: closeRemove:
- closeReq(request);
- removeReq(request);
- break;
- case Request:: readPartial:
- case Request:: read:
- readReq(request);
- break;
- case Request:: readv:
- readvReq(request);
- break;
- case Request:: write:
- writeReq(request);
- break;
- case Request:: writev:
- writevReq(request);
- break;
- case Request:: writeSync:
- writeReq(request);
- syncReq(request);
- break;
- case Request:: writevSync:
- writevReq(request);
- syncReq(request);
- break;
- case Request:: sync:
- syncReq(request);
- break;
- case Request:: append:
- appendReq(request);
- break;
- case Request:: append_synch:
- appendReq(request);
- syncReq(request);
- break;
- case Request::rmrf:
- rmrfReq(request, (char*)theFileName.c_str(), request->par.rmrf.own_directory);
- break;
- case Request:: end:
- if (isOpen())
- closeReq(request);
- endReq();
- return;
- default:
- DEBUG(ndbout_c("Invalid Request"));
- abort();
- break;
- }//switch
- m_last_request= request;
- m_current_request= 0;
-
- // No need to signal as ndbfs only uses tryRead
- theReportTo->writeChannelNoSignal(request);
- }//while
-}//AsyncFile::run()
-
+#if 0
+ ndbout_c("%p:%s detach from %p", this, theFileName.c_str(), thr);
+#endif
+ assert(m_thread == thr);
+ m_thread = 0;
+}
void
AsyncFile::readReq( Request * request)
{
- for(int i = 0; i < request->par.readWrite.numberOfPages ; i++) {
+ for(int i = 0; i < request->par.readWrite.numberOfPages ; i++)
+ {
off_t offset = request->par.readWrite.pages[i].offset;
size_t size = request->par.readWrite.pages[i].size;
char * buf = request->par.readWrite.pages[i].buf;
@@ -244,78 +85,92 @@ AsyncFile::readvReq( Request * request)
}
void
-AsyncFile::writeReq( Request * request)
+AsyncFile::writeReq(Request * request)
{
- int page_num = 0;
- bool write_not_complete = true;
+ Uint32 cnt = request->par.readWrite.numberOfPages;
+ if (theWriteBuffer == 0 || cnt == 1)
+ {
+ for (Uint32 i = 0; i<cnt; i++)
+ {
+ int err = writeBuffer(request->par.readWrite.pages[i].buf,
+ request->par.readWrite.pages[i].size,
+ request->par.readWrite.pages[i].offset);
+ if (err)
+ {
+ request->error = err;
+ return;
+ }
+ goto done;
+ }
+ }
- while(write_not_complete) {
- int totsize = 0;
- off_t offset = request->par.readWrite.pages[page_num].offset;
- char* bufptr = theWriteBuffer;
-
- write_not_complete = false;
- if (request->par.readWrite.numberOfPages > 1) {
- off_t page_offset = offset;
-
- // Multiple page write, copy to buffer for one write
- for(int i=page_num; i < request->par.readWrite.numberOfPages; i++) {
- memcpy(bufptr,
- request->par.readWrite.pages[i].buf,
- request->par.readWrite.pages[i].size);
- bufptr += request->par.readWrite.pages[i].size;
- totsize += request->par.readWrite.pages[i].size;
- if (((i + 1) < request->par.readWrite.numberOfPages)) {
- // There are more pages to write
- // Check that offsets are consequtive
- off_t tmp = page_offset + request->par.readWrite.pages[i].size;
- if (tmp != request->par.readWrite.pages[i+1].offset) {
- // Next page is not aligned with previous, not allowed
- DEBUG(ndbout_c("Page offsets are not aligned"));
- request->error = EINVAL;
- return;
- }
- if ((unsigned)(totsize + request->par.readWrite.pages[i+1].size) > (unsigned)theWriteBufferSize) {
- // We are not finished and the buffer is full
- write_not_complete = true;
- // Start again with next page
- page_num = i + 1;
- break;
+ {
+ int page_num = 0;
+ bool write_not_complete = true;
+
+ while(write_not_complete) {
+ int totsize = 0;
+ off_t offset = request->par.readWrite.pages[page_num].offset;
+ char* bufptr = theWriteBuffer;
+
+ write_not_complete = false;
+ if (request->par.readWrite.numberOfPages > 1) {
+ off_t page_offset = offset;
+
+ // Multiple page write, copy to buffer for one write
+ for(int i=page_num; i < request->par.readWrite.numberOfPages; i++) {
+ memcpy(bufptr,
+ request->par.readWrite.pages[i].buf,
+ request->par.readWrite.pages[i].size);
+ bufptr += request->par.readWrite.pages[i].size;
+ totsize += request->par.readWrite.pages[i].size;
+ if (((i + 1) < request->par.readWrite.numberOfPages)) {
+ // There are more pages to write
+ // Check that offsets are consequtive
+ off_t tmp = page_offset + request->par.readWrite.pages[i].size;
+ if (tmp != request->par.readWrite.pages[i+1].offset) {
+ // Next page is not aligned with previous, not allowed
+ DEBUG(ndbout_c("Page offsets are not aligned"));
+ request->error = EINVAL;
+ return;
+ }
+ if ((unsigned)(totsize + request->par.readWrite.pages[i+1].size) > (unsigned)theWriteBufferSize) {
+ // We are not finished and the buffer is full
+ write_not_complete = true;
+ // Start again with next page
+ page_num = i + 1;
+ break;
+ }
}
+ page_offset += request->par.readWrite.pages[i].size;
}
- page_offset += request->par.readWrite.pages[i].size;
+ bufptr = theWriteBuffer;
+ } else {
+ // One page write, write page directly
+ bufptr = request->par.readWrite.pages[0].buf;
+ totsize = request->par.readWrite.pages[0].size;
}
- bufptr = theWriteBuffer;
- } else {
- // One page write, write page directly
- bufptr = request->par.readWrite.pages[0].buf;
- totsize = request->par.readWrite.pages[0].size;
- }
- int err = writeBuffer(bufptr, totsize, offset);
- if(err != 0){
- request->error = err;
- return;
- }
- } // while(write_not_complete)
-
- if(m_auto_sync_freq && m_write_wo_sync > m_auto_sync_freq){
+ int err = writeBuffer(bufptr, totsize, offset);
+ if(err != 0){
+ request->error = err;
+ return;
+ }
+ } // while(write_not_complete)
+ }
+done:
+ if(m_auto_sync_freq && m_write_wo_sync > m_auto_sync_freq)
+ {
syncReq(request);
}
}
void
-AsyncFile::writevReq( Request * request)
+AsyncFile::writevReq(Request * request)
{
// WriteFileGather on WIN32?
writeReq(request);
}
-void AsyncFile::endReq()
-{
- if (theWriteBuffer)
- ndbd_free(theWriteBuffer, theWriteBufferSize);
-}
-
#ifdef DEBUG_ASYNCFILE
void printErrorAndFlags(Uint32 used_flags) {
char buf[255];
=== modified file 'storage/ndb/src/kernel/blocks/ndbfs/AsyncFile.hpp'
--- a/storage/ndb/src/kernel/blocks/ndbfs/AsyncFile.hpp 2008-11-18 10:28:03 +0000
+++ b/storage/ndb/src/kernel/blocks/ndbfs/AsyncFile.hpp 2008-12-02 13:10:49 +0000
@@ -16,166 +16,33 @@
#ifndef AsyncFile_H
#define AsyncFile_H
-/**
- AsyncFile
-
- All file operations executed in thread-per-file, away from the DB threads.
- */
-
-
-// void execute(Request *request);
-// Description:
-// performens the requered action.
-// Parameters:
-// request: request to be called when open is finished.
-// action= open|close|read|write|sync
-// if action is open then:
-// par.open.flags= UNIX open flags, see man open
-// par.open.name= name of the file to open
-// if action is read or write then:
-// par.readWrite.buf= user provided buffer to read/write
-// the data from/to
-// par.readWrite.size= how many bytes must be read/written
-// par.readWrite.offset= absolute offset in file in bytes
-// return:
-// return values are stored in the request error field:
-// error= return state of the action, UNIX error see man open/errno
-// userData= is untouched can be used be user.
-
-
-
-// void reportTo( MemoryChannel<Request> *reportTo );
-// Description:
-// set the channel where the file must report the result of the
-// actions back to.
-// Parameters:
-// reportTo: the memory channel to use use MemoryChannelMultipleWriter
-// if more
-// than one file uses this channel to report back.
-
#include <kernel_types.h>
-#include "MemoryChannel.hpp"
+#include "AsyncIoThread.hpp"
#include "Filename.hpp"
-// Use this define if you want printouts from AsyncFile class
-//#define DEBUG_ASYNCFILE
-
-#ifdef DEBUG_ASYNCFILE
-#include <NdbOut.hpp>
-#define DEBUG(x) x
-#define PRINT_ERRORANDFLAGS(f) printErrorAndFlags(f)
-void printErrorAndFlags(Uint32 used_flags);
-#else
-#define DEBUG(x)
-#define PRINT_ERRORANDFLAGS(f)
-#endif
-
-// Define the size of the write buffer (for each thread)
-#define WRITEBUFFERSIZE 262144
-
-const int ERR_ReadUnderflow = 1000;
-
-const int WRITECHUNK = 262144;
-
-class AsyncFile;
-
-class Request
-{
-public:
- Request() {}
-
- enum Action {
- open,
- close,
- closeRemove,
- read, // Allways leave readv directly after
- // read because SimblockAsyncFileSystem depends on it
- readv,
- write,// Allways leave writev directly after
- // write because SimblockAsyncFileSystem depends on it
- writev,
- writeSync,// Allways leave writevSync directly after
- // writeSync because SimblockAsyncFileSystem depends on it
- writevSync,
- sync,
- end,
- append,
- append_synch,
- rmrf,
- readPartial
- };
- Action action;
- union {
- struct {
- Uint32 flags;
- Uint32 page_size;
- Uint64 file_size;
- Uint32 auto_sync_size;
- } open;
- struct {
- int numberOfPages;
- struct{
- char *buf;
- size_t size;
- off_t offset;
- } pages[16];
- } readWrite;
- struct {
- const char * buf;
- size_t size;
- } append;
- struct {
- bool directory;
- bool own_directory;
- } rmrf;
- } par;
- int error;
-
- void set(BlockReference userReference,
- Uint32 userPointer,
- Uint16 filePointer);
- BlockReference theUserReference;
- Uint32 theUserPointer;
- Uint16 theFilePointer;
- // Information for open, needed if the first open action fails.
- AsyncFile* file;
- Uint32 theTrace;
-};
-
-NdbOut& operator <<(NdbOut&, const Request&);
-
-inline
-void
-Request::set(BlockReference userReference,
- Uint32 userPointer, Uint16 filePointer)
-{
- theUserReference= userReference;
- theUserPointer= userPointer;
- theFilePointer= filePointer;
-}
-
class AsyncFile
{
friend class Ndbfs;
+ friend class AsyncIoThread;
+
public:
AsyncFile(SimulatedBlock& fs);
virtual ~AsyncFile() {};
- void reportTo( MemoryChannel<Request> *reportTo );
+ virtual int init() = 0;
- void execute( Request* request );
-
- virtual struct NdbThread* doStart();
-
- virtual void shutdown();
-
- // its a thread so its always running
- virtual void run();
+ void reportTo( MemoryChannel<Request> *reportTo );
virtual bool isOpen() = 0;
Filename theFileName;
Request *m_current_request, *m_last_request;
+
+ void set_buffer(Ptr<GlobalPage> ptr, Uint32 cnt);
+ bool has_buffer() const;
+ void clear_buffer(Ptr<GlobalPage> & ptr, Uint32 & cnt);
+
+ AsyncIoThread* getThread() const { return m_thread;}
private:
/**
@@ -184,14 +51,6 @@ private:
*/
/**
- * init()
- *
- * Initialise buffers etc. After init(), ready to execute()
- * Called with theStartMutexPtr held.
- */
- virtual int init();
-
- /**
* openReq() - open a file.
*/
virtual void openReq(Request *request) = 0;
@@ -204,15 +63,13 @@ private:
/**
* writeBuffer() - write into file
*/
- virtual int writeBuffer(const char * buf, size_t size, off_t offset,
- size_t chunk_size = WRITECHUNK)=0;
-
+ virtual int writeBuffer(const char * buf, size_t size, off_t offset)=0;
virtual void closeReq(Request *request)=0;
virtual void syncReq(Request *request)=0;
virtual void removeReq(Request *request)=0;
virtual void appendReq(Request *request)=0;
- virtual void rmrfReq(Request *request, char * path, bool removePath)=0;
+ virtual void rmrfReq(Request *request, const char * path, bool removePath)=0;
virtual void createDirectories()=0;
/**
@@ -229,38 +86,60 @@ protected:
virtual void writeReq(Request *request);
virtual void writevReq(Request *request);
- /**
- * endReq()
- *
- * Inverse to ::init(). Cleans up thread before it exits.
- */
- virtual void endReq();
-
private:
- /**
- * (end of what implementors need)
- */
+ void attach(AsyncIoThread* thr);
+ void detach(AsyncIoThread* thr);
- MemoryChannel<Request> *theReportTo;
- MemoryChannel<Request>* theMemoryChannelPtr;
-
- struct NdbThread* theThreadPtr;
- NdbMutex* theStartMutexPtr;
- NdbCondition* theStartConditionPtr;
- bool theStartFlag;
+ AsyncIoThread* m_thread; // For bound files
protected:
- int theWriteBufferSize;
- char* theWriteBuffer;
-
size_t m_write_wo_sync; // Writes wo/ sync
size_t m_auto_sync_freq; // Auto sync freq in bytes
+ Uint32 m_open_flags;
-public:
- SimulatedBlock& m_fs;
-
+ /**
+ * file buffers
+ */
Uint32 m_page_cnt;
Ptr<GlobalPage> m_page_ptr;
+
+ char* theWriteBuffer;
+ Uint32 theWriteBufferSize;
+
+public:
+ SimulatedBlock& m_fs;
};
+inline
+void
+AsyncFile::set_buffer(Ptr<GlobalPage> ptr, Uint32 cnt)
+{
+ assert(!has_buffer());
+ m_page_ptr = ptr;
+ m_page_cnt = cnt;
+ theWriteBuffer = (char*)ptr.p;
+ theWriteBufferSize = cnt * sizeof(GlobalPage);
+}
+
+inline
+bool
+AsyncFile::has_buffer() const
+{
+ return m_page_cnt > 0;
+}
+
+inline
+void
+AsyncFile::clear_buffer(Ptr<GlobalPage> & ptr, Uint32 & cnt)
+{
+ assert(has_buffer());
+ ptr = m_page_ptr;
+ cnt = m_page_cnt;
+ m_page_cnt = 0;
+ m_page_ptr.setNull();
+ theWriteBuffer = 0;
+ theWriteBufferSize = 0;
+}
+
+
#endif
=== added file 'storage/ndb/src/kernel/blocks/ndbfs/AsyncIoThread.cpp'
--- a/storage/ndb/src/kernel/blocks/ndbfs/AsyncIoThread.cpp 1970-01-01 00:00:00 +0000
+++ b/storage/ndb/src/kernel/blocks/ndbfs/AsyncIoThread.cpp 2008-12-02 13:10:49 +0000
@@ -0,0 +1,203 @@
+/* Copyright (C) 2003 MySQL AB
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; version 2 of the License.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
+
+#include <ndb_global.h>
+#include <my_sys.h>
+#include <my_pthread.h>
+
+#include "AsyncIoThread.hpp"
+#include "AsyncFile.hpp"
+#include <ErrorHandlingMacros.hpp>
+#include <kernel_types.h>
+#include <ndbd_malloc.hpp>
+#include <NdbThread.h>
+#include <signaldata/FsRef.hpp>
+#include <signaldata/FsOpenReq.hpp>
+#include <signaldata/FsReadWriteReq.hpp>
+#include <Configuration.hpp>
+#include "Ndbfs.hpp"
+
+AsyncIoThread::AsyncIoThread(class Ndbfs& fs, AsyncFile* file)
+ : m_fs(fs)
+{
+ m_current_file = file;
+ if (file)
+ {
+ theMemoryChannelPtr = &theMemoryChannel;
+ }
+ else
+ {
+ theMemoryChannelPtr = &m_fs.theToThreads;
+ }
+ theReportTo = &m_fs.theFromThreads;
+}
+
+static int numAsyncFiles = 0;
+
+extern "C"
+void *
+runAsyncIoThread(void* arg)
+{
+ ((AsyncIoThread*)arg)->run();
+ return (NULL);
+}
+
+
+struct NdbThread*
+AsyncIoThread::doStart()
+{
+ // Stacksize for filesystem threads
+ // An 8k stack should be enough
+ const NDB_THREAD_STACKSIZE stackSize = 8192;
+
+ char buf[16];
+ numAsyncFiles++;
+ BaseString::snprintf(buf, sizeof(buf), "AsyncIoThread%d", numAsyncFiles);
+
+ theStartMutexPtr = NdbMutex_Create();
+ theStartConditionPtr = NdbCondition_Create();
+ NdbMutex_Lock(theStartMutexPtr);
+ theStartFlag = false;
+
+ theThreadPtr = NdbThread_Create(runAsyncIoThread,
+ (void**)this,
+ stackSize,
+ buf,
+ NDB_THREAD_PRIO_MEAN);
+
+ if (theThreadPtr == 0)
+ {
+ ERROR_SET(fatal, NDBD_EXIT_MEMALLOC,
+ "","Could not allocate file system thread");
+ }
+
+ do
+ {
+ NdbCondition_Wait(theStartConditionPtr,
+ theStartMutexPtr);
+ }
+ while (theStartFlag == false);
+
+ NdbMutex_Unlock(theStartMutexPtr);
+ NdbMutex_Destroy(theStartMutexPtr);
+ NdbCondition_Destroy(theStartConditionPtr);
+
+ return theThreadPtr;
+}
+
+void
+AsyncIoThread::shutdown()
+{
+ void *status;
+ Request request;
+ request.action = Request::end;
+ this->theMemoryChannelPtr->writeChannel( &request );
+ NdbThread_WaitFor(theThreadPtr, &status);
+ NdbThread_Destroy(&theThreadPtr);
+}
+
+void
+AsyncIoThread::dispatch(Request *request)
+{
+ assert(m_current_file);
+ assert(m_current_file->getThread() == this);
+ assert(theMemoryChannelPtr == &theMemoryChannel);
+ theMemoryChannelPtr->writeChannel(request);
+}
+
+void
+AsyncIoThread::run()
+{
+ Request *request;
+
+ // Create theMemoryChannel in the thread that will wait for it
+ NdbMutex_Lock(theStartMutexPtr);
+ theStartFlag = true;
+ NdbMutex_Unlock(theStartMutexPtr);
+ NdbCondition_Signal(theStartConditionPtr);
+
+ while (1)
+ {
+ request = theMemoryChannelPtr->readChannel();
+ if (!request || request->action == Request::end)
+ {
+ DEBUG(ndbout_c("Nothing read from Memory Channel in AsyncFile"));
+ theStartFlag = false;
+ return;
+ }//if
+
+ AsyncFile * file = request->file;
+ m_current_request= request;
+ switch (request->action) {
+ case Request::open:
+ file->openReq(request);
+ break;
+ case Request::close:
+ file->closeReq(request);
+ break;
+ case Request::closeRemove:
+ file->closeReq(request);
+ file->removeReq(request);
+ break;
+ case Request::readPartial:
+ case Request::read:
+ file->readReq(request);
+ break;
+ case Request::readv:
+ file->readvReq(request);
+ break;
+ case Request::write:
+ file->writeReq(request);
+ break;
+ case Request::writev:
+ file->writevReq(request);
+ break;
+ case Request::writeSync:
+ file->writeReq(request);
+ file->syncReq(request);
+ break;
+ case Request::writevSync:
+ file->writevReq(request);
+ file->syncReq(request);
+ break;
+ case Request::sync:
+ file->syncReq(request);
+ break;
+ case Request::append:
+ file->appendReq(request);
+ break;
+ case Request::append_synch:
+ file->appendReq(request);
+ file->syncReq(request);
+ break;
+ case Request::rmrf:
+ file->rmrfReq(request, file->theFileName.c_str(),
+ request->par.rmrf.own_directory);
+ break;
+ case Request::end:
+ theStartFlag = false;
+ return;
+ default:
+ DEBUG(ndbout_c("Invalid Request"));
+ abort();
+ break;
+ }//switch
+ m_last_request = request;
+ m_current_request = 0;
+
+ // No need to signal as ndbfs only uses tryRead
+ theReportTo->writeChannelNoSignal(request);
+ }
+}
=== added file 'storage/ndb/src/kernel/blocks/ndbfs/AsyncIoThread.hpp'
--- a/storage/ndb/src/kernel/blocks/ndbfs/AsyncIoThread.hpp 1970-01-01 00:00:00 +0000
+++ b/storage/ndb/src/kernel/blocks/ndbfs/AsyncIoThread.hpp 2008-12-02 13:10:49 +0000
@@ -0,0 +1,151 @@
+/* Copyright (C) 2003 MySQL AB
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; version 2 of the License.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
+
+#ifndef AsyncIoThread_H
+#define AsyncIoThread_H
+
+#include <kernel_types.h>
+#include <Pool.hpp>
+#include "MemoryChannel.hpp"
+
+// Use this define if you want printouts from AsyncFile class
+//#define DEBUG_ASYNCFILE
+
+#ifdef DEBUG_ASYNCFILE
+#include <NdbOut.hpp>
+#define DEBUG(x) x
+#define PRINT_ERRORANDFLAGS(f) printErrorAndFlags(f)
+void printErrorAndFlags(Uint32 used_flags);
+#else
+#define DEBUG(x)
+#define PRINT_ERRORANDFLAGS(f)
+#endif
+
+const int ERR_ReadUnderflow = 1000;
+
+class AsyncFile;
+
+class Request
+{
+public:
+ Request() {}
+
+ enum Action {
+ open,
+ close,
+ closeRemove,
+ read, // Allways leave readv directly after
+ // read because SimblockAsyncFileSystem depends on it
+ readv,
+ write,// Allways leave writev directly after
+ // write because SimblockAsyncFileSystem depends on it
+ writev,
+ writeSync,// Allways leave writevSync directly after
+ // writeSync because SimblockAsyncFileSystem depends on it
+ writevSync,
+ sync,
+ end,
+ append,
+ append_synch,
+ rmrf,
+ readPartial
+ };
+ Action action;
+ union {
+ struct {
+ Uint32 flags;
+ Uint32 page_size;
+ Uint64 file_size;
+ Uint32 auto_sync_size;
+ } open;
+ struct {
+ int numberOfPages;
+ struct{
+ char *buf;
+ size_t size;
+ off_t offset;
+ } pages[16];
+ } readWrite;
+ struct {
+ const char * buf;
+ size_t size;
+ } append;
+ struct {
+ bool directory;
+ bool own_directory;
+ } rmrf;
+ } par;
+ int error;
+
+ void set(BlockReference userReference,
+ Uint32 userPointer,
+ Uint16 filePointer);
+ BlockReference theUserReference;
+ Uint32 theUserPointer;
+ Uint16 theFilePointer;
+ // Information for open, needed if the first open action fails.
+ AsyncFile* file;
+ Uint32 theTrace;
+};
+
+NdbOut& operator <<(NdbOut&, const Request&);
+
+inline
+void
+Request::set(BlockReference userReference,
+ Uint32 userPointer, Uint16 filePointer)
+{
+ theUserReference= userReference;
+ theUserPointer= userPointer;
+ theFilePointer= filePointer;
+}
+
+class AsyncIoThread
+{
+ friend class Ndbfs;
+ friend class AsyncFile;
+public:
+ AsyncIoThread(class Ndbfs&, AsyncFile* file);
+ virtual ~AsyncIoThread() {};
+
+ struct NdbThread* doStart();
+ void shutdown();
+
+ // its a thread so its always running
+ void run();
+
+ /**
+ * Add a request to a thread,
+ * should only be used with bound threads
+ */
+ void dispatch(Request*);
+
+ AsyncFile * m_current_file;
+ Request *m_current_request, *m_last_request;
+
+private:
+ Ndbfs & m_fs;
+
+ MemoryChannel<Request> *theReportTo;
+ MemoryChannel<Request> *theMemoryChannelPtr;
+ MemoryChannel<Request> theMemoryChannel; // If file-bound
+
+ bool theStartFlag;
+ struct NdbThread* theThreadPtr;
+ NdbMutex* theStartMutexPtr;
+ NdbCondition* theStartConditionPtr;
+};
+
+#endif
=== modified file 'storage/ndb/src/kernel/blocks/ndbfs/MemoryChannel.hpp'
--- a/storage/ndb/src/kernel/blocks/ndbfs/MemoryChannel.hpp 2008-08-22 11:02:38 +0000
+++ b/storage/ndb/src/kernel/blocks/ndbfs/MemoryChannel.hpp 2008-12-02 13:10:49 +0000
@@ -131,8 +131,8 @@ template <class T> void MemoryChannel<T>
if(full(theWriteIndex, theReadIndex) || theChannel == NULL) abort();
theChannel[theWriteIndex]= t;
++theWriteIndex;
- NdbMutex_Unlock(theMutexPtr);
NdbCondition_Signal(theConditionPtr);
+ NdbMutex_Unlock(theMutexPtr);
}
template <class T> void MemoryChannel<T>::writeChannelNoSignal( T *t)
=== modified file 'storage/ndb/src/kernel/blocks/ndbfs/Ndbfs.cpp'
--- a/storage/ndb/src/kernel/blocks/ndbfs/Ndbfs.cpp 2008-11-18 10:28:03 +0000
+++ b/storage/ndb/src/kernel/blocks/ndbfs/Ndbfs.cpp 2008-12-02 13:10:49 +0000
@@ -53,7 +53,6 @@ int pageSize( const NewVARIABLE* baseAdd
return (1 << log_psize);
}
-
Ndbfs::Ndbfs(Block_context& ctx) :
SimulatedBlock(NDBFS, ctx),
scanningInProgress(false),
@@ -80,16 +79,47 @@ Ndbfs::Ndbfs(Block_context& ctx) :
Ndbfs::~Ndbfs()
{
- // Delete all files
- // AsyncFile destuctor will take care of deleting
- // the thread it has created
+ /**
+ * Stop all unbound threads
+ */
+
+ /**
+ * Post enought Request::end to saturate all unbound threads
+ */
+ Request request;
+ request.action = Request::end;
+ for (unsigned i = 0; i < theThreads.size(); i++)
+ {
+ theToThreads.writeChannel(&request);
+ }
+
+ for (unsigned i = 0; i < theThreads.size(); i++)
+ {
+ AsyncIoThread * thr = theThreads[i];
+ thr->shutdown();
+ }
+
+ /**
+ * delete all threads
+ */
+ for (unsigned i = 0; i < theThreads.size(); i++)
+ {
+ AsyncIoThread * thr = theThreads[i];
+ delete thr;
+ theThreads[i] = 0;
+ }
+ theThreads.clear();
+
+ /**
+ * Delete all files
+ */
for (unsigned i = 0; i < theFiles.size(); i++){
AsyncFile* file = theFiles[i];
- file->shutdown();
delete file;
theFiles[i] = NULL;
}//for
theFiles.clear();
+
if (theRequestPool)
delete theRequestPool;
}
@@ -114,12 +144,34 @@ Ndbfs::execREAD_CONFIG_REQ(Signal* signa
m_maxFiles = 0;
ndb_mgm_get_int_parameter(p, CFG_DB_MAX_OPEN_FILES, &m_maxFiles);
Uint32 noIdleFiles = 27;
+
ndb_mgm_get_int_parameter(p, CFG_DB_INITIAL_OPEN_FILES, &noIdleFiles);
if (noIdleFiles > m_maxFiles && m_maxFiles != 0)
m_maxFiles = noIdleFiles;
+
// Create idle AsyncFiles
- for (Uint32 i = 0; i < noIdleFiles; i++){
- theIdleFiles.push_back(createAsyncFile());
+ for (Uint32 i = 0; i < noIdleFiles; i++)
+ {
+ theIdleBoundFiles.push_back(createAsyncFile(true /* bound */));
+ }
+
+ Uint32 threadpool = 8;
+ ndb_mgm_get_int_parameter(p, CFG_DB_THREAD_POOL, &threadpool);
+
+ // Create IoThreads
+ for (Uint32 i = 0; i < threadpool; i++)
+ {
+ AsyncIoThread * thr = createIoThread(0);
+ if (thr)
+ {
+ jam();
+ theThreads.push_back(thr);
+ }
+ else
+ {
+ jam();
+ break;
+ }
}
ReadConfigConf * conf = (ReadConfigConf*)signal->getDataPtrSend();
@@ -176,7 +228,15 @@ int
Ndbfs::forward( AsyncFile * file, Request* request)
{
jam();
- file->execute(request);
+ AsyncIoThread* thr = file->getThread();
+ if (thr) // bound
+ {
+ thr->dispatch(request);
+ }
+ else
+ {
+ theToThreads.writeChannel(request);
+ }
return 1;
}
@@ -186,13 +246,27 @@ Ndbfs::execFSOPENREQ(Signal* signal)
jamEntry();
const FsOpenReq * const fsOpenReq = (FsOpenReq *)&signal->theData[0];
const BlockReference userRef = fsOpenReq->userReference;
- AsyncFile* file = getIdleFile();
+
+ bool bound = (fsOpenReq->fileFlags & FsOpenReq::OM_THREAD_POOL) == 0;
+ AsyncFile* file = getIdleFile(bound);
ndbrequire(file != NULL);
Filename::NameSpec spec(theFileSystemPath, theBackupFilePath);
Uint32 userPointer = fsOpenReq->userPointer;
+
+ if(signal->getNoOfSections() == 0){
+ jam();
+ file->theFileName.set(spec, userRef, fsOpenReq->fileNumber);
+ } else {
+ jam();
+ SectionHandle handle(this, signal);
+ SegmentedSectionPtr ptr;
+ handle.getSection(ptr, FsOpenReq::FILENAME);
+ file->theFileName.set(spec, ptr, g_sectionSegmentPool);
+ releaseSections(handle);
+ }
- if(fsOpenReq->fileFlags & FsOpenReq::OM_INIT)
+ if (fsOpenReq->fileFlags & FsOpenReq::OM_INIT)
{
jam();
Uint32 cnt = 16; // 512k
@@ -211,9 +285,29 @@ Ndbfs::execFSOPENREQ(Signal* signal)
return;
}
m_shared_page_pool.getPtr(page_ptr);
- file->m_page_ptr = page_ptr;
- file->m_page_cnt = cnt;
+ file->set_buffer(page_ptr, cnt);
}
+ else if (fsOpenReq->fileFlags & FsOpenReq::OM_WRITE_BUFFER)
+ {
+ jam();
+ Uint32 cnt = NDB_FILE_BUFFER_SIZE / GLOBAL_PAGE_SIZE; // 256k
+ Ptr<GlobalPage> page_ptr;
+ m_ctx.m_mm.alloc_pages(RT_FILE_BUFFER, &page_ptr.i, &cnt, 1);
+ if(cnt == 0)
+ {
+ file->m_page_ptr.setNull();
+ file->m_page_cnt = 0;
+
+ FsRef * const fsRef = (FsRef *)&signal->theData[0];
+ fsRef->userPointer = userPointer;
+ fsRef->setErrorCode(fsRef->errorCode, FsRef::fsErrOutOfMemory);
+ fsRef->osErrorCode = ~0; // Indicate local error
+ sendSignal(userRef, GSN_FSOPENREF, signal, 3, JBB);
+ return;
+ }
+ m_shared_page_pool.getPtr(page_ptr);
+ file->set_buffer(page_ptr, cnt);
+ }
else
{
ndbassert(file->m_page_ptr.isNull());
@@ -221,20 +315,8 @@ Ndbfs::execFSOPENREQ(Signal* signal)
file->m_page_cnt = 0;
}
- if(signal->getNoOfSections() == 0){
- jam();
- file->theFileName.set(spec, userRef, fsOpenReq->fileNumber);
- } else {
- jam();
- SectionHandle handle(this, signal);
- SegmentedSectionPtr ptr;
- handle.getSection(ptr, FsOpenReq::FILENAME);
- file->theFileName.set(spec, ptr, g_sectionSegmentPool);
- releaseSections(handle);
- }
- file->reportTo(&theFromThreads);
if (getenv("NDB_TRACE_OPEN"))
- ndbout_c("open(%s)", file->theFileName.c_str());
+ ndbout_c("open(%s) bound: %u", file->theFileName.c_str(), bound);
Request* request = theRequestPool->get();
request->action = Request::open;
@@ -258,12 +340,11 @@ Ndbfs::execFSREMOVEREQ(Signal* signal)
jamEntry();
const FsRemoveReq * const req = (FsRemoveReq *)signal->getDataPtr();
const BlockReference userRef = req->userReference;
- AsyncFile* file = getIdleFile();
+ AsyncFile* file = getIdleFile(true);
ndbrequire(file != NULL);
Filename::NameSpec spec(theFileSystemPath, theBackupFilePath);
file->theFileName.set(spec, userRef, req->fileNumber, req->directory);
- file->reportTo(&theFromThreads);
Request* request = theRequestPool->get();
request->action = Request::rmrf;
@@ -303,6 +384,9 @@ Ndbfs::execFSCLOSEREQ(Signal * signal)
return;
}
+ if (getenv("NDB_TRACE_OPEN"))
+ ndbout_c("close(%s)", openFile->theFileName.c_str());
+
Request *request = theRequestPool->get();
if( fsCloseReq->getRemoveFileFlag(fsCloseReq->fileFlag) == true ) {
jam();
@@ -669,10 +753,11 @@ Ndbfs::newId()
}
AsyncFile*
-Ndbfs::createAsyncFile(){
+Ndbfs::createAsyncFile(bool bound){
// Check limit of open files
- if (m_maxFiles !=0 && theFiles.size() == m_maxFiles) {
+ if (m_maxFiles !=0 && theFiles.size() == m_maxFiles)
+ {
// Print info about all open files
for (unsigned i = 0; i < theFiles.size(); i++){
AsyncFile* file = theFiles[i];
@@ -687,29 +772,76 @@ Ndbfs::createAsyncFile(){
AsyncFile* file = new PosixAsyncFile(* this);
#endif
- struct NdbThread* thr = file->doStart();
- globalEmulatorData.theConfiguration->addThread(thr, NdbfsThread);
+ if (file->init())
+ {
+ ERROR_SET(fatal, NDBD_EXIT_AFS_MAXOPEN,""," Ndbfs::createAsyncFile");
+ }
- // Put the file in list of all files
- theFiles.push_back(file);
+ if (bound)
+ {
+ AsyncIoThread * thr = createIoThread(file);
+ theThreads.push_back(thr);
+ file->attach(thr);
#ifdef VM_TRACE
- infoEvent("NDBFS: Created new file thread %d", theFiles.size());
+ ndbout_c("NDBFS: Created new file thread %d", theFiles.size());
#endif
+ }
+
+ theFiles.push_back(file);
return file;
}
+void
+Ndbfs::pushIdleFile(AsyncFile* file)
+{
+ if (file->getThread())
+ {
+ theIdleBoundFiles.push_back(file);
+ }
+ else
+ {
+ theIdleUnboundFiles.push_back(file);
+ }
+}
+
+AsyncIoThread*
+Ndbfs::createIoThread(AsyncFile* file)
+{
+ AsyncIoThread* thr = new AsyncIoThread(*this, file);
+
+ struct NdbThread* thrptr = thr->doStart();
+ globalEmulatorData.theConfiguration->addThread(thrptr, NdbfsThread);
+
+ return thr;
+}
+
AsyncFile*
-Ndbfs::getIdleFile(){
- AsyncFile* file;
- if (theIdleFiles.size() > 0){
- file = theIdleFiles[0];
- theIdleFiles.erase(0);
- } else {
- file = createAsyncFile();
- }
- return file;
+Ndbfs::getIdleFile(bool bound)
+{
+ if (bound)
+ {
+ Uint32 sz = theIdleBoundFiles.size();
+ if (sz)
+ {
+ AsyncFile* file = theIdleBoundFiles[sz - 1];
+ theIdleBoundFiles.erase(sz - 1);
+ return file;
+ }
+ }
+ else
+ {
+ Uint32 sz = theIdleUnboundFiles.size();
+ if (sz)
+ {
+ AsyncFile* file = theIdleUnboundFiles[sz - 1];
+ theIdleUnboundFiles.erase(sz - 1);
+ return file;
+ }
+ }
+
+ return createAsyncFile(bound);
}
@@ -721,14 +853,28 @@ Ndbfs::report(Request * request, Signal*
signal->setTrace(request->theTrace);
const BlockReference ref = request->theUserReference;
- if(!request->file->m_page_ptr.isNull())
+ if(request->file->has_buffer())
{
- assert(request->file->m_page_cnt > 0);
- m_ctx.m_mm.release_pages(RT_DBTUP_PAGE,
- request->file->m_page_ptr.i,
- request->file->m_page_cnt);
- request->file->m_page_ptr.setNull();
- request->file->m_page_cnt = 0;
+ Uint32 cnt;
+ Ptr<GlobalPage> ptr;
+ if (request->file->m_open_flags & FsOpenReq::OM_INIT)
+ {
+ jam();
+ request->file->clear_buffer(ptr, cnt);
+ m_ctx.m_mm.release_pages(RT_DBTUP_PAGE, ptr.i, cnt);
+ }
+ else if (request->file->m_open_flags & FsOpenReq::OM_WRITE_BUFFER)
+ {
+ jam();
+ if ((request->action == Request::open && request->error) ||
+ (request->action == Request::close ||
+ request->action == Request::closeRemove))
+ {
+ jam();
+ request->file->clear_buffer(ptr, cnt);
+ m_ctx.m_mm.release_pages(RT_FILE_BUFFER, ptr.i, cnt);
+ }
+ }
}
if (request->error) {
@@ -750,7 +896,7 @@ Ndbfs::report(Request * request, Signal*
case Request:: open: {
jam();
// Put the file back in idle files list
- theIdleFiles.push_back(request->file);
+ pushIdleFile(request->file);
sendSignal(ref, GSN_FSOPENREF, signal, FsRef::SignalLength, JBB);
break;
}
@@ -790,7 +936,7 @@ Ndbfs::report(Request * request, Signal*
case Request::rmrf: {
jam();
// Put the file back in idle files list
- theIdleFiles.push_back(request->file);
+ pushIdleFile(request->file);
sendSignal(ref, GSN_FSREMOVEREF, signal, FsRef::SignalLength, JBB);
break;
}
@@ -823,7 +969,7 @@ Ndbfs::report(Request * request, Signal*
// removes the file from OpenFiles list
theOpenFiles.erase(request->theFilePointer);
// Put the file in idle files list
- theIdleFiles.push_back(request->file);
+ pushIdleFile(request->file);
sendSignal(ref, GSN_FSCLOSECONF, signal, 1, JBB);
break;
}
@@ -863,7 +1009,7 @@ Ndbfs::report(Request * request, Signal*
case Request::rmrf: {
jam();
// Put the file in idle files list
- theIdleFiles.push_back(request->file);
+ pushIdleFile(request->file);
sendSignal(ref, GSN_FSREMOVECONF, signal, 1, JBB);
break;
}
@@ -1055,9 +1201,10 @@ Ndbfs::execDUMP_STATE_ORD(Signal* signal
infoEvent("NDBFS: Files: %d Open files: %d",
theFiles.size(),
theOpenFiles.size());
- infoEvent(" Idle files: %d Max opened files: %d",
- theIdleFiles.size(),
- m_maxOpenedFiles);
+ infoEvent(" Idle files: (bound: %u unbound: %u) Max opened files: %d",
+ theIdleBoundFiles.size(),
+ theIdleUnboundFiles.size(),
+ m_maxOpenedFiles);
infoEvent(" Max files: %d",
m_maxFiles);
infoEvent(" Requests: %d",
@@ -1084,10 +1231,16 @@ Ndbfs::execDUMP_STATE_ORD(Signal* signal
return;
}
if(signal->theData[0] == DumpStateOrd::NdbfsDumpIdleFiles){
- infoEvent("NDBFS: Dump idle files: %d", theIdleFiles.size());
+ infoEvent("NDBFS: Dump idle files: %d %u",
+ theIdleBoundFiles.size(), theIdleUnboundFiles.size());
- for (unsigned i = 0; i < theIdleFiles.size(); i++){
- AsyncFile* file = theIdleFiles[i];
+ for (unsigned i = 0; i < theIdleBoundFiles.size(); i++){
+ AsyncFile* file = theIdleBoundFiles[i];
+ infoEvent("%2d (0x%lx): %s", i, (long)file, file->isOpen()?"OPEN":"CLOSED");
+ }
+
+ for (unsigned i = 0; i < theIdleUnboundFiles.size(); i++){
+ AsyncFile* file = theIdleUnboundFiles[i];
infoEvent("%2d (0x%lx): %s", i, (long)file, file->isOpen()?"OPEN":"CLOSED");
}
return;
@@ -1095,6 +1248,7 @@ Ndbfs::execDUMP_STATE_ORD(Signal* signal
if(signal->theData[0] == 404)
{
+#if 0
ndbrequire(signal->getLength() == 2);
Uint32 file= signal->theData[1];
AsyncFile* openFile = theOpenFiles.find(file);
@@ -1115,6 +1269,7 @@ Ndbfs::execDUMP_STATE_ORD(Signal* signal
AsyncFile* file = theFiles[i];
ndbout_c("%2d (0x%lx): %s", i, (long) file, file->isOpen()?"OPEN":"CLOSED");
}
+#endif
}
}//Ndbfs::execDUMP_STATE_ORD()
@@ -1132,6 +1287,7 @@ Ndbfs::get_filename(Uint32 fd) const
BLOCK_FUNCTIONS(Ndbfs)
template class Vector<AsyncFile*>;
+template class Vector<AsyncIoThread*>;
template class Vector<OpenFiles::OpenFileItem>;
template class MemoryChannel<Request>;
template class Pool<Request>;
=== modified file 'storage/ndb/src/kernel/blocks/ndbfs/Ndbfs.hpp'
--- a/storage/ndb/src/kernel/blocks/ndbfs/Ndbfs.hpp 2006-12-23 19:20:40 +0000
+++ b/storage/ndb/src/kernel/blocks/ndbfs/Ndbfs.hpp 2008-12-02 13:10:49 +0000
@@ -22,21 +22,21 @@
#include "AsyncFile.hpp"
#include "OpenFiles.hpp"
-
+class AsyncIoThread;
// Because one NDB Signal request can result in multiple requests to
// AsyncFile one class must be made responsible to keep track
// of all out standing request and when all are finished the result
// must be reported to the sending block.
-
class Ndbfs : public SimulatedBlock
{
+ friend class AsyncIoThread;
public:
Ndbfs(Block_context&);
virtual ~Ndbfs();
-
virtual const char* get_filename(Uint32 fd) const;
+
protected:
BLOCK_DEFINES(Ndbfs);
@@ -69,17 +69,22 @@ private:
Uint16 theLastId;
BlockReference cownref;
- // Communication from files
+ // Communication from/to files
MemoryChannel<Request> theFromThreads;
+ MemoryChannel<Request> theToThreads;
Pool<Request>* theRequestPool;
- AsyncFile* createAsyncFile();
- AsyncFile* getIdleFile();
-
- Vector<AsyncFile*> theFiles; // List all created AsyncFiles
- Vector<AsyncFile*> theIdleFiles; // List of idle AsyncFiles
- OpenFiles theOpenFiles; // List of open AsyncFiles
+ AsyncIoThread* createIoThread(AsyncFile* file);
+ AsyncFile* createAsyncFile(bool bound);
+ AsyncFile* getIdleFile(bool bound);
+ void pushIdleFile(AsyncFile*);
+
+ Vector<AsyncIoThread*> theThreads;// List of all created threads
+ Vector<AsyncFile*> theFiles; // List all created AsyncFiles
+ Vector<AsyncFile*> theIdleBoundFiles; // List of idle AsyncFiles
+ Vector<AsyncFile*> theIdleUnboundFiles; // List of idle AsyncFiles
+ OpenFiles theOpenFiles; // List of open AsyncFiles
BaseString theFileSystemPath;
BaseString theBackupFilePath;
=== modified file 'storage/ndb/src/kernel/blocks/ndbfs/PosixAsyncFile.cpp'
--- a/storage/ndb/src/kernel/blocks/ndbfs/PosixAsyncFile.cpp 2008-11-19 11:01:17 +0000
+++ b/storage/ndb/src/kernel/blocks/ndbfs/PosixAsyncFile.cpp 2008-12-02 13:10:49 +0000
@@ -21,6 +21,7 @@
#include <xfs/xfs.h>
#endif
+#include "Ndbfs.hpp"
#include "AsyncFile.hpp"
#include "PosixAsyncFile.hpp"
@@ -34,17 +35,8 @@
#include <NdbTick.h>
-// use this to test broken pread code
-//#define HAVE_BROKEN_PREAD
-
-#ifdef HAVE_BROKEN_PREAD
-#undef HAVE_PWRITE
-#undef HAVE_PREAD
-#endif
-
// For readv and writev
#include <sys/uio.h>
-
#include <dirent.h>
PosixAsyncFile::PosixAsyncFile(SimulatedBlock& fs) :
@@ -53,18 +45,12 @@ PosixAsyncFile::PosixAsyncFile(Simulated
use_gz(0)
{
memset(&azf,0,sizeof(azf));
+ init_mutex();
}
int PosixAsyncFile::init()
{
// Create write buffer for bigger writes
- theWriteBufferSize = WRITEBUFFERSIZE;
- theWriteBufferUnaligned = (char *) ndbd_malloc(theWriteBufferSize +
- NDB_O_DIRECT_WRITE_ALIGNMENT-1);
- theWriteBuffer = (char *)
- (((UintPtr)theWriteBufferUnaligned + NDB_O_DIRECT_WRITE_ALIGNMENT - 1) &
- ~(UintPtr)(NDB_O_DIRECT_WRITE_ALIGNMENT - 1));
-
azfBufferUnaligned= (Byte*)ndbd_malloc((AZ_BUFSIZE_READ+AZ_BUFSIZE_WRITE)
+NDB_O_DIRECT_WRITE_ALIGNMENT-1);
@@ -82,13 +68,8 @@ int PosixAsyncFile::init()
azf.stream.opaque= &az_mempool;
- if (!theWriteBuffer) {
- DEBUG(ndbout_c("AsyncFile::writeReq, Failed allocating write buffer"));
- return -1;
- }//if
-
return 0;
-}//AsyncFile::init()
+}
#ifdef O_DIRECT
static char g_odirect_readbuf[2*GLOBAL_PAGE_SIZE -1];
@@ -229,8 +210,14 @@ void PosixAsyncFile::openReq(Request *re
break;
return;
}
- if(flags & FsOpenReq::OM_GZ)
- use_gz= 1;
+ if (flags & FsOpenReq::OM_GZ)
+ {
+ use_gz = 1;
+ }
+ else
+ {
+ use_gz = 0;
+ }
// allow for user to choose any permissionsa with umask
const int mode = S_IRUSR | S_IWUSR |
@@ -363,7 +350,8 @@ no_odirect:
retry:
off_t save_size = size;
char* buf = (char*)m_page_ptr.p;
- while(size > 0){
+ while(size > 0)
+ {
#ifdef TRACE_INIT
write_cnt++;
#endif
@@ -504,8 +492,9 @@ int PosixAsyncFile::readBuffer(Request *
{
int return_value;
req->par.readWrite.pages[0].size = 0;
-#if ! defined(HAVE_PREAD)
off_t seek_val;
+#if ! defined(HAVE_PREAD)
+ FileGuard guard(this);
if(!use_gz)
{
while((seek_val= lseek(theFd, offset, SEEK_SET)) == (off_t)-1
@@ -516,7 +505,6 @@ int PosixAsyncFile::readBuffer(Request *
}
}
#endif
- off_t seek_val;
if(use_gz)
{
while((seek_val= azseek(&azf, offset, SEEK_SET)) == (off_t)-1
@@ -611,15 +599,16 @@ void PosixAsyncFile::readvReq(Request *r
#endif
}
-int PosixAsyncFile::writeBuffer(const char *buf, size_t size, off_t offset,
- size_t chunk_size)
+int PosixAsyncFile::writeBuffer(const char *buf, size_t size, off_t offset)
{
+ size_t chunk_size = 256*1024;
size_t bytes_to_write = chunk_size;
int return_value;
m_write_wo_sync += size;
#if ! defined(HAVE_PWRITE)
+ FileGuard guard(this);
off_t seek_val;
while((seek_val= lseek(theFd, offset, SEEK_SET)) == (off_t)-1
&& errno == EINTR);
@@ -772,20 +761,23 @@ void PosixAsyncFile::removeReq(Request *
}
}
-void PosixAsyncFile::rmrfReq(Request *request, char *path, bool removePath)
+void
+PosixAsyncFile::rmrfReq(Request *request, const char *_path, bool removePath)
{
+ char path[PATH_MAX];
+ strcpy(path, _path);
Uint32 path_len = strlen(path);
Uint32 path_max_copy = PATH_MAX - path_len;
char* path_add = &path[path_len];
if(!request->par.rmrf.directory){
// Remove file
- if(unlink((const char *)path) != 0 && errno != ENOENT)
+ if(unlink(path) != 0 && errno != ENOENT)
request->error = errno;
return;
}
// Remove directory
- DIR* dirp = opendir((const char *)path);
+ DIR* dirp = opendir(path);
if(dirp == 0){
if(errno != ENOENT)
request->error = errno;
@@ -816,12 +808,8 @@ void PosixAsyncFile::rmrfReq(Request *re
return;
}
-void PosixAsyncFile::endReq()
+PosixAsyncFile::~PosixAsyncFile()
{
- // Thread is ended with return
- if (theWriteBufferUnaligned)
- ndbd_free(theWriteBufferUnaligned, theWriteBufferSize);
-
if (azfBufferUnaligned)
ndbd_free(azfBufferUnaligned, (AZ_BUFSIZE_READ*AZ_BUFSIZE_WRITE)
+NDB_O_DIRECT_WRITE_ALIGNMENT-1);
@@ -830,11 +818,10 @@ void PosixAsyncFile::endReq()
ndbd_free(az_mempool.mem,az_mempool.size);
az_mempool.mem = NULL;
- theWriteBufferUnaligned = NULL;
azfBufferUnaligned = NULL;
+ destroy_mutex();
}
-
void PosixAsyncFile::createDirectories()
{
char* tmp;
=== modified file 'storage/ndb/src/kernel/blocks/ndbfs/PosixAsyncFile.hpp'
--- a/storage/ndb/src/kernel/blocks/ndbfs/PosixAsyncFile.hpp 2007-11-15 00:30:00 +0000
+++ b/storage/ndb/src/kernel/blocks/ndbfs/PosixAsyncFile.hpp 2008-12-02 13:10:49 +0000
@@ -24,15 +24,26 @@
#include <azlib.h>
+/**
+ * PREAD/PWRITE is needed to use file != thread
+ * therefor it's defined/checked here
+ */
+#ifdef HAVE_BROKEN_PREAD
+#undef HAVE_PWRITE
+#undef HAVE_PREAD
+#elif defined (HAVE_PREAD)
+#define HAVE_PWRITE
+#endif
+
class PosixAsyncFile : public AsyncFile
{
friend class Ndbfs;
public:
PosixAsyncFile(SimulatedBlock& fs);
+ virtual ~PosixAsyncFile();
- int init();
-
- bool isOpen();
+ virtual int init();
+ virtual bool isOpen();
virtual void openReq(Request *request);
virtual void readvReq(Request *request);
@@ -41,32 +52,59 @@ public:
virtual void syncReq(Request *request);
virtual void removeReq(Request *request);
virtual void appendReq(Request *request);
- virtual void rmrfReq(Request *request, char * path, bool removePath);
- void endReq();
+ virtual void rmrfReq(Request *request, const char * path, bool removePath);
virtual int readBuffer(Request*, char * buf, size_t size, off_t offset);
- virtual int writeBuffer(const char * buf, size_t size, off_t offset,
- size_t chunk_size = WRITECHUNK);
+ virtual int writeBuffer(const char * buf, size_t size, off_t offset);
virtual void createDirectories();
private:
int theFd;
- Uint32 m_open_flags; // OM_ flags from request to open file
-
int use_gz;
azio_stream azf;
struct az_alloc_rec az_mempool;
-
- void* theWriteBufferUnaligned;
void* azfBufferUnaligned;
- size_t m_write_wo_sync; // Writes wo/ sync
- size_t m_auto_sync_freq; // Auto sync freq in bytes
-
int check_odirect_read(Uint32 flags, int&new_flags, int mode);
int check_odirect_write(Uint32 flags, int&new_flags, int mode);
+
+#ifndef HAVE_PREAD
+ struct FileGuard;
+ friend struct FileGuard;
+ NdbMutex * m_mutex;
+ void init_mutex() { m_mutex = NdbMutex_Create();}
+ void destroy_mutex() { NdbMutex_Destroy(m_mutex);}
+
+ /**
+ * If dont HAVE_PREAD and using file != thread
+ */
+ struct FileGuard
+ {
+ PosixAsyncFile* m_file;
+ FileGuard (PosixAsyncFile* file) : m_file(file) {
+ if (m_file->getThread() == 0)
+ {
+ NdbMutex_Lock(m_file->m_mutex);
+ }
+ }
+ ~FileGuard() {
+ if (m_file->getThread() == 0)
+ {
+ NdbMutex_Unlock(m_file->m_mutex);
+ }
+ }
+ };
+#else
+ void init_mutex() {}
+ void destroy_mutex() {}
+ struct FileGuard
+ {
+ FileGuard (PosixAsyncFile* file){}
+ ~FileGuard () {}
+ };
+#endif
};
#endif
=== modified file 'storage/ndb/src/kernel/blocks/ndbfs/Win32AsyncFile.cpp'
--- a/storage/ndb/src/kernel/blocks/ndbfs/Win32AsyncFile.cpp 2008-11-19 11:01:17 +0000
+++ b/storage/ndb/src/kernel/blocks/ndbfs/Win32AsyncFile.cpp 2008-12-02 13:10:49 +0000
@@ -171,7 +171,7 @@ void Win32AsyncFile::openReq(Request* re
);
Uint32 size = request->par.open.page_size;
char* buf = (char*)m_page_ptr.p;
- DWORD dwWritten;
+ DWORD dwWritten;
while(size > 0){
BOOL bWrite= WriteFile(hFile, buf, size, &dwWritten, 0);
if(!bWrite || dwWritten!=size)
@@ -205,7 +205,8 @@ void Win32AsyncFile::openReq(Request* re
}
int
-Win32AsyncFile::readBuffer(Request* req, char * buf, size_t size, off_t offset){
+Win32AsyncFile::readBuffer(Request* req, char * buf, size_t size, off_t offset)
+{
req->par.readWrite.pages[0].size = 0;
while (size > 0) {
@@ -259,9 +260,9 @@ Win32AsyncFile::readBuffer(Request* req,
}
int
-Win32AsyncFile::writeBuffer(const char * buf, size_t size, off_t offset,
- size_t chunk_size)
+Win32AsyncFile::writeBuffer(const char * buf, size_t size, off_t offset)
{
+ size_t chunk_size = 256 * 1024;
size_t bytes_to_write = chunk_size;
m_write_wo_sync += size;
@@ -365,7 +366,9 @@ Win32AsyncFile::removeReq(Request * requ
}
void
-Win32AsyncFile::rmrfReq(Request * request, char * path, bool removePath){
+Win32AsyncFile::rmrfReq(Request * request, const char * _path, bool removePath){
+ char path[PATH_MAX];
+ strcpy(path, _path);
Uint32 path_len = strlen(path);
Uint32 path_max_copy = PATH_MAX - path_len;
char* path_add = &path[path_len];
=== modified file 'storage/ndb/src/kernel/blocks/ndbfs/Win32AsyncFile.hpp'
--- a/storage/ndb/src/kernel/blocks/ndbfs/Win32AsyncFile.hpp 2008-08-21 06:38:48 +0000
+++ b/storage/ndb/src/kernel/blocks/ndbfs/Win32AsyncFile.hpp 2008-12-02 13:10:49 +0000
@@ -30,32 +30,24 @@ class Win32AsyncFile : public AsyncFile
friend class Ndbfs;
public:
Win32AsyncFile(SimulatedBlock& fs);
- ~Win32AsyncFile();
+ virtual ~Win32AsyncFile();
- void reportTo( MemoryChannel<Request> *reportTo );
+ virtual bool isOpen();
+ virtual void openReq(Request *request);
+ virtual void closeReq(Request *request);
+ virtual void syncReq(Request *request);
+ virtual void removeReq(Request *request);
+ virtual void appendReq(Request *request);
+ virtual void rmrfReq(Request *request, const char * path, bool removePath);
- void execute( Request* request );
-
- bool isOpen();
+ virtual int readBuffer(Request*, char * buf, size_t size, off_t offset);
+ virtual int writeBuffer(const char * buf, size_t size, off_t offset);
private:
-
- void openReq(Request *request);
- void closeReq(Request *request);
- void syncReq(Request *request);
- void removeReq(Request *request);
- void appendReq(Request *request);
- void rmrfReq(Request *request, char * path, bool removePath);
-
- int readBuffer(Request*, char * buf, size_t size, off_t offset);
- int writeBuffer(const char * buf, size_t size, off_t offset,
- size_t chunk_size = WRITECHUNK);
-
int extendfile(Request* request);
void createDirectories();
HANDLE hFile;
- Uint32 m_open_flags; // OM_ flags from request to open file
};
#endif
=== modified file 'storage/ndb/src/kernel/blocks/record_types.hpp'
--- a/storage/ndb/src/kernel/blocks/record_types.hpp 2008-01-01 12:45:11 +0000
+++ b/storage/ndb/src/kernel/blocks/record_types.hpp 2008-12-02 13:10:49 +0000
@@ -45,6 +45,11 @@
#define RG_JOBBUFFER 4
/**
+ * File-thread buffers
+ */
+#define RG_FILE_BUFFERS 5
+
+/**
*
*/
#define RG_RESERVED 0
@@ -70,4 +75,6 @@
#define RT_JOB_BUFFER MAKE_TID( 1, RG_JOBBUFFER)
+#define RT_FILE_BUFFER MAKE_TID( 1, RG_FILE_BUFFERS)
+
#endif
=== modified file 'storage/ndb/src/kernel/blocks/tsman.cpp'
--- a/storage/ndb/src/kernel/blocks/tsman.cpp 2008-12-01 18:03:48 +0000
+++ b/storage/ndb/src/kernel/blocks/tsman.cpp 2008-12-02 13:10:49 +0000
@@ -767,6 +767,7 @@ Tsman::open_file(Signal* signal,
req->fileFlags = 0;
req->fileFlags |= FsOpenReq::OM_READWRITE;
req->fileFlags |= FsOpenReq::OM_DIRECT;
+ req->fileFlags |= FsOpenReq::OM_THREAD_POOL;
switch(requestInfo){
case CreateFileImplReq::Create:
req->fileFlags |= FsOpenReq::OM_CREATE_IF_NONE;
=== modified file 'storage/ndb/src/kernel/main.cpp'
--- a/storage/ndb/src/kernel/main.cpp 2008-12-01 18:04:19 +0000
+++ b/storage/ndb/src/kernel/main.cpp 2008-12-02 13:10:49 +0000
@@ -250,20 +250,34 @@ init_global_memory_manager(EmulatorData
"config, exiting.");
return -1;
}
+
if (tupmem)
{
Resource_limit rl;
rl.m_min = tupmem;
rl.m_max = tupmem;
- rl.m_resource_id = 3;
+ rl.m_resource_id = RG_DATAMEM;
+ ed.m_mem_manager->set_resource_limit(rl);
+ }
+
+ Uint32 maxopen = 4 * 4; // 4 redo parts, max 4 files per part
+ Uint32 filebuffer = NDB_FILE_BUFFER_SIZE;
+ Uint32 filepages = (filebuffer / GLOBAL_PAGE_SIZE) * maxopen;
+
+ if (filepages)
+ {
+ Resource_limit rl;
+ rl.m_min = filepages;
+ rl.m_max = filepages;
+ rl.m_resource_id = RG_FILE_BUFFERS;
ed.m_mem_manager->set_resource_limit(rl);
}
- if (shared_mem+tupmem)
+ if (shared_mem + tupmem + filepages)
{
Resource_limit rl;
rl.m_min = 0;
- rl.m_max = shared_mem + tupmem;
+ rl.m_max = shared_mem + tupmem + filepages;
rl.m_resource_id = 0;
ed.m_mem_manager->set_resource_limit(rl);
}
@@ -280,7 +294,7 @@ init_global_memory_manager(EmulatorData
ndb_mgm_get_db_parameter_info(CFG_DB_SGA, &sga, &size);
g_eventLogger->alert("Malloc (%lld bytes) for %s and %s failed, exiting",
- Uint64(shared_mem + tupmem) * 32768,
+ Uint64(shared_mem + tupmem) * GLOBAL_PAGE_SIZE,
dm.m_name, sga.m_name);
return -1;
}
=== modified file 'storage/ndb/src/kernel/vm/ndbd_malloc_impl.hpp'
--- a/storage/ndb/src/kernel/vm/ndbd_malloc_impl.hpp 2008-02-08 14:35:31 +0000
+++ b/storage/ndb/src/kernel/vm/ndbd_malloc_impl.hpp 2008-12-02 13:10:49 +0000
@@ -84,7 +84,7 @@ public:
private:
void grow(Uint32 start, Uint32 cnt);
-#define XX_RL_COUNT 5
+#define XX_RL_COUNT 6
/**
* Return pointer to free page data on page
*/
=== modified file 'storage/ndb/src/mgmsrv/ConfigInfo.cpp'
--- a/storage/ndb/src/mgmsrv/ConfigInfo.cpp 2008-11-19 11:01:17 +0000
+++ b/storage/ndb/src/mgmsrv/ConfigInfo.cpp 2008-12-02 13:10:49 +0000
@@ -1074,6 +1074,18 @@ const ConfigInfo::ParamInfo ConfigInfo::
0, 0 },
{
+ CFG_DB_THREAD_POOL,
+ "ThreadPool",
+ DB_TOKEN,
+ "No of unbound threads for file access (currently only for DD)",
+ ConfigInfo::CI_USED,
+ false,
+ ConfigInfo::CI_INT,
+ "8",
+ "0",
+ STR_VALUE(MAX_INT_RNIL) },
+
+ {
CFG_DB_MAX_OPEN_FILES,
"MaxNoOfOpenFiles",
DB_TOKEN,
| Thread |
|---|
| • bzr commit into mysql-5.1 branch (jonas:3148) | Jonas Oreland | 2 Dec |