#At file:///FC/MYSQL/wa-2008-BZR/mysql-6.0-falcon-team/
2808 Kelly Long 2008-09-02
Version 2 for:
Page Cache high concurency Get Free Buffer. WL4479
Page cache lock per hash bucket. WL4480
Page cache hash buckets are a power of two. WL4481
traceFile is a compile time decision, thus remove the run time decisions for it.
modified:
storage/falcon/Cache.cpp
storage/falcon/Cache.h
=== modified file 'storage/falcon/Cache.cpp'
--- a/storage/falcon/Cache.cpp 2008-08-22 06:47:40 +0000
+++ b/storage/falcon/Cache.cpp 2008-09-02 16:08:11 +0000
@@ -50,9 +50,11 @@
extern uint falcon_io_threads;
//#define STOP_PAGE 55
-#define TRACE_FILE "cache.trace"
+//#define CACHE_TRACE_FILE "cache.trace"
+#ifdef CACHE_TRACE_FILE
static FILE *traceFile;
+#endif // CACHE_TRACE_FILE
static const uint64 cacheHunkSize = 1024 * 1024 * 128;
static const int ASYNC_BUFFER_SIZE = 1024000;
@@ -68,20 +70,46 @@ static const char THIS_FILE[]=__FILE__;
Cache::Cache(Database *db, int pageSz, int hashSz, int numBuffers)
{
- //openTraceFile();
+ openTraceFile();
database = db;
panicShutdown = false;
pageSize = pageSz;
- hashSize = hashSz;
+
+ unsigned int highBit;
+ for (highBit=0x01; highBit < hashSz; highBit= highBit << 1) { }
+
+ // if there are more than 4096 buckets then lets round down
+ // else lets round up
+ if (highBit >= 0x00001000) {
+ // use power of two rounded down
+ hashSize = highBit << 1;
+ } else {
+ // use power of two rounded up
+ hashSize = highBit;
+ }
+
+ hashMask = hashSize - 1;
numberBuffers = numBuffers;
upperFraction = numberBuffers / 4;
bufferAge = 0;
firstDirty = NULL;
lastDirty = NULL;
- numberDirtyPages = 0;
pageWriter = NULL;
- hashTable = new Bdb* [hashSz];
+ hashTable = new Bdb* [hashSize];
memset (hashTable, 0, sizeof (Bdb*) * hashSize);
+#ifdef DEBUG_SYNC_HASH_TABLE_SIZE
+ syncHashTable = new SyncObject [DEBUG_SYNC_HASH_TABLE_SIZE];
+ for (int loop = 0; loop < DEBUG_SYNC_HASH_TABLE_SIZE; loop ++)
+ syncHashTable[loop].setName("Cache::syncHashTable");
+#else /* DEBUG_SYNC_HASH_TABLE_SIZE */
+ syncHashTable = new SyncObject [hashSize];
+ for (int loop = 0; loop < hashSize; loop ++)
+ {
+ char tmpName[128];
+ snprintf(tmpName,120,"Cache::syncHashTable[%d]",loop);
+ syncHashTable[loop].setName(tmpName);
+ }
+#endif /* DEBUG_SYNC_HASH_TABLE_SIZE */
sectorCache = new SectorCache(sectorCacheSize / SECTOR_BUFFER_SIZE, pageSize);
uint64 n = ((uint64) pageSize * numberBuffers + cacheHunkSize - 1) / cacheHunkSize;
@@ -103,6 +131,7 @@ Cache::Cache(Database *db, int pageSz, i
try
{
+ // non-protected access to bdbs,endBdbs is OK during initialization
bdbs = new Bdb [numberBuffers];
endBdbs = bdbs + numberBuffers;
int remaining = 0;
@@ -121,6 +150,7 @@ Cache::Cache(Database *db, int pageSz, i
}
bdb->cache = this;
+ // non-protected access to bufferQueue is OK during initialization
bufferQueue.append(bdb);
bdb->buffer = (Page*) stuff;
stuff += pageSize;
@@ -146,10 +176,11 @@ Cache::Cache(Database *db, int pageSz, i
Cache::~Cache()
{
- if (traceFile)
- closeTraceFile();
+
+ closeTraceFile();
delete [] hashTable;
+ delete [] syncHashTable;
delete [] bdbs;
delete [] ioThreads;
delete flushBitmap;
@@ -167,15 +198,12 @@ Cache::~Cache()
Bdb* Cache::probePage(Dbb *dbb, int32 pageNumber)
{
ASSERT (pageNumber >= 0);
- Sync sync (&syncObject, "Cache::probePage");
- sync.lock (Shared);
- Bdb *bdb = findBdb(dbb, pageNumber);
+ Bdb *bdb;
+ /* If we already have a buffer for this, we're done */
+ bdb = lockFindBdbIncrementUseCount(dbb, pageNumber);
if (bdb)
{
- bdb->incrementUseCount(ADD_HISTORY);
- sync.unlock();
-
if (bdb->buffer->pageType == PAGE_free)
{
bdb->decrementUseCount(REL_HISTORY);
@@ -192,15 +220,56 @@ Bdb* Cache::probePage(Dbb *dbb, int32 pa
return NULL;
}
-Bdb* Cache::findBdb(Dbb* dbb, int32 pageNumber)
+Bdb* Cache::findBdb(Dbb* dbb, int32 pageNumber, int slot)
+{
+ for (Bdb *bdb = hashTable [slot]; bdb; bdb = bdb->hash)
{
- for (Bdb *bdb = hashTable [pageNumber % hashSize]; bdb; bdb = bdb->hash)
if (bdb->pageNumber == pageNumber && bdb->dbb == dbb)
+ {
return bdb;
+ }
+ }
return NULL;
}
+Bdb* Cache::findBdb(Dbb* dbb, int32 pageNumber)
+{
+ return (findBdb(dbb, pageNumber, PAGENUM_2_SLOT(pageNumber)));
+}
+
+Bdb* Cache::lockFindBdbIncrementUseCount(Dbb* dbb, int32 pageNumber)
+{
+ int slot = PAGENUM_2_SLOT(pageNumber);
+ Sync lockHash (&syncHashTable[PAGENUM_2_LOCK_INDEX(pageNumber, slot)],
"Cache::lockFindBdbIncrementUseCount");
+ lockHash.lock (Shared);
+ Bdb *bdb;
+
+ bdb = findBdb(dbb, pageNumber, slot);
+ if (bdb != NULL)
+ bdb->incrementUseCount(ADD_HISTORY);
+
+ lockHash.unlock();
+ return bdb;
+}
+
+Bdb* Cache::lockFindBdbIncrementUseCount(int32 pageNumber, int slot)
+{
+ Sync lockHash (&syncHashTable[PAGENUM_2_LOCK_INDEX(pageNumber, slot)],
"Cache::lockFindBdbIncrementUseCount");
+ lockHash.lock (Shared);
+
+ for (Bdb *bdb = hashTable [slot]; bdb; bdb = bdb->hash)
+ if (bdb->pageNumber == pageNumber)
+ {
+ bdb->incrementUseCount(ADD_HISTORY);
+ lockHash.unlock();
+ return bdb;
+ }
+
+ lockHash.unlock();
+ return NULL;
+}
+
Bdb* Cache::fetchPage(Dbb *dbb, int32 pageNumber, PageType pageType, LockType lockType)
{
if (panicShutdown)
@@ -217,52 +286,45 @@ Bdb* Cache::fetchPage(Dbb *dbb, int32 pa
#endif
ASSERT (pageNumber >= 0);
- int slot = pageNumber % hashSize;
- LockType actual = lockType;
- Sync sync (&syncObject, "Cache::fetchPage");
- sync.lock (Shared);
- int hit = 0;
-
- /* If we already have a buffer for this go, we're done */
-
Bdb *bdb;
- for (bdb = hashTable [slot]; bdb; bdb = bdb->hash)
- if (bdb->pageNumber == pageNumber && bdb->dbb == dbb)
- {
- //syncObject.validateShared("Cache::fetchPage");
- bdb->incrementUseCount(ADD_HISTORY);
- sync.unlock();
- bdb->addRef(lockType COMMA_ADD_HISTORY);
- bdb->decrementUseCount(REL_HISTORY);
- hit = 1;
- break;
- }
-
+ /* If we already have a buffer for this, we're done */
+ bdb = lockFindBdbIncrementUseCount(dbb, pageNumber);
if (!bdb)
{
- sync.unlock();
- actual = Exclusive;
- sync.lock(Exclusive);
-
- for (bdb = hashTable [slot]; bdb; bdb = bdb->hash)
- if (bdb->pageNumber == pageNumber && bdb->dbb == dbb)
- {
- //syncObject.validateExclusive("Cache::fetchPage (retry)");
- bdb->incrementUseCount(ADD_HISTORY);
- sync.unlock();
- bdb->addRef(lockType COMMA_ADD_HISTORY);
- bdb->decrementUseCount(REL_HISTORY);
- hit = 2;
- break;
- }
+ // get getFreeBuffer() locks a hash bucket to remove the candidate bdb
+ // if we locked out hash bucket before the call then we could have
+ // a deadlock
+ // thus we get the free buffer before we lock the hash bucket we will
+ // be inserting into. This avoids a dead lock but generates a race
+ // we take care of the race by reversing the getFreeBuffer() work
+ // when we lose the race
+ Bdb *bdbAvailable;
+ int slot = PAGENUM_2_SLOT(pageNumber);
+ Sync lockHash (&syncHashTable[PAGENUM_2_LOCK_INDEX(pageNumber, slot)],
"Cache::fetchPage");
+
+ bdbAvailable = getFreeBuffer();
+ /* assume we'll be inserting this new BDB. Set new page number. */
+ bdbAvailable->addRef (Exclusive COMMA_ADD_HISTORY);
+ bdbAvailable->decrementUseCount(REL_HISTORY);
+
+ bdbAvailable->hash = hashTable [slot];
+ bdbAvailable->pageNumber = pageNumber;
+ bdbAvailable->dbb = dbb;
+#ifdef COLLECT_BDB_HISTORY
+ bdbAvailable->initHistory();
+#endif
+ lockHash.lock(Exclusive);
+ bdb = findBdb(dbb, pageNumber, slot);
if (!bdb)
{
- bdb = findBuffer(dbb, pageNumber, actual);
- moveToHead(bdb);
- sync.unlock();
+ // we won the race so lets use the free bdb
+ // relink into hash table
+ hashTable [slot] = bdbAvailable;
+ lockHash.unlock();
+ bdb = bdbAvailable;
#ifdef STOP_PAGE
if (bdb->pageNumber == STOP_PAGE)
Log::debug("reading page %d/%d\n", bdb->pageNumber, dbb->tableSpaceId);
@@ -278,9 +340,31 @@ Bdb* Cache::fetchPage(Dbb *dbb, int32 pa
#ifdef HAVE_PAGE_NUMBER
ASSERT(bdb->buffer->pageNumber == pageNumber);
#endif
- if (actual != lockType)
+ if (Exclusive != lockType)
bdb->downGrade(lockType);
}
+ else
+ {
+ // lost a race. put our available back to useable
+ // side effect, bdbAvailable will have to age again before we re-use it.
+ bdbAvailable->hash = NULL;
+ bdbAvailable->pageNumber = -1;
+ bdbAvailable->dbb = NULL;
+ bdbAvailable->release();
+
+ //syncObject.validateExclusive("Cache::fetchPage (retry)");
+ bdb->incrementUseCount(ADD_HISTORY);
+ lockHash.unlock();
+ bdb->addRef(lockType COMMA_ADD_HISTORY);
+ bdb->decrementUseCount(REL_HISTORY);
+ moveToHead(bdb);
+ }
+ }
+ else
+ {
+ bdb->addRef(lockType COMMA_ADD_HISTORY);
+ bdb->decrementUseCount(REL_HISTORY);
+ moveToHead(bdb);
}
Page *page = bdb->buffer;
@@ -302,14 +386,6 @@ Bdb* Cache::fetchPage(Dbb *dbb, int32 pa
bdb->pageNumber, dbb->tableSpaceId, pageType, page->pageType);
}
- // If buffer has moved out of the upper "fraction" of the LRU queue, move it back up
-
- if (bdb->age < bufferAge - (uint64) upperFraction)
- {
- sync.lock (Exclusive);
- moveToHead (bdb);
- }
-
ASSERT (bdb->pageNumber == pageNumber);
ASSERT (bdb->dbb == dbb);
ASSERT (bdb->useCount > 0);
@@ -319,9 +395,7 @@ Bdb* Cache::fetchPage(Dbb *dbb, int32 pa
Bdb* Cache::fakePage(Dbb *dbb, int32 pageNumber, PageType type, TransId transId)
{
- Sync sync(&syncObject, "Cache::fakePage");
- sync.lock(Exclusive);
- int slot = pageNumber % hashSize;
+ Bdb *bdb;
#ifdef STOP_PAGE
if (pageNumber == STOP_PAGE)
@@ -329,33 +403,72 @@ Bdb* Cache::fakePage(Dbb *dbb, int32 pag
#endif
/* If we already have a buffer for this, we're done */
+ bdb = lockFindBdbIncrementUseCount(dbb, pageNumber);
+ if (!bdb)
+ {
+ // get getFreeBuffer() locks a hash bucket to remove the candidate bdb
+ // if we locked out hash bucket before the call then we could have
+ // a deadlock
+ // thus we get the free buffer before we lock the hash bucket we will
+ // be inserting into. This avoids a dead lock but generates a race
+ // we take care of the race by reversing the getFreeBuffer() work
+ // when we lose the race
+ Bdb *bdbAvailable;
+ int slot = PAGENUM_2_SLOT(pageNumber);
+ Sync lockHash (&syncHashTable[PAGENUM_2_LOCK_INDEX(pageNumber, slot)],
"Cache::fetchPage");
+
+ bdbAvailable = getFreeBuffer();
+ /* assume we'll be inserting this new BDB. Set new page number. */
+ bdbAvailable->addRef (Exclusive COMMA_ADD_HISTORY);
+ bdbAvailable->decrementUseCount(REL_HISTORY);
+
+ bdbAvailable->hash = hashTable [slot];
+ bdbAvailable->pageNumber = pageNumber;
+ bdbAvailable->dbb = dbb;
+#ifdef COLLECT_BDB_HISTORY
+ bdbAvailable->initHistory();
+#endif
- Bdb *bdb;
+ lockHash.lock(Exclusive);
+ bdb = findBdb(dbb, pageNumber, slot);
+ if (!bdb)
+ {
+ // we won the race so lets use the free bdb
+ // relink into hash table
+ hashTable [slot] = bdbAvailable;
+ lockHash.unlock();
- for (bdb = hashTable [slot]; bdb; bdb = bdb->hash)
- if (bdb->pageNumber == pageNumber && bdb->dbb == dbb)
+ bdb = bdbAvailable;
+ }
+ else
{
- if (bdb->syncObject.isLocked())
- {
- // The pageWriter may still be cleaning up this freed page with a shared lock
- ASSERT(bdb->buffer->pageType == PAGE_free);
- ASSERT(bdb->syncObject.getState() >= 0);
- }
-
+ // lost a race. put our available back to useable
+ // side effect, bdbAvailable will have to age again before we re-use it.
+ bdbAvailable->hash = NULL;
+ bdbAvailable->pageNumber = -1;
+ bdbAvailable->dbb = NULL;
+ bdbAvailable->release();
+
+ //syncObject.validateExclusive("Cache::fetchPage (retry)");
+ bdb->incrementUseCount(ADD_HISTORY);
+ lockHash.unlock();
bdb->addRef(Exclusive COMMA_ADD_HISTORY);
-
- break;
+ bdb->decrementUseCount(REL_HISTORY);
+ moveToHead(bdb);
}
-
- if (!bdb)
- bdb = findBuffer(dbb, pageNumber, Exclusive);
+ }
+ else
+ {
+ bdb->addRef(Exclusive COMMA_ADD_HISTORY);
+ bdb->decrementUseCount(REL_HISTORY);
+ moveToHead(bdb);
+ }
if (!dbb->isReadOnly)
bdb->mark(transId);
memset(bdb->buffer, 0, pageSize);
bdb->setPageHeader(type);
- moveToHead(bdb);
return bdb;
}
@@ -363,32 +476,31 @@ Bdb* Cache::fakePage(Dbb *dbb, int32 pag
void Cache::flush(int64 arg)
{
Sync flushLock(&syncFlush, "Cache::flush(1)");
- Sync sync(&syncDirty, "Cache::flush(2)");
+ Sync dirtyLock(&syncDirty, "Cache::flush(2)");
flushLock.lock(Exclusive);
if (flushing)
return;
syncWait.lock(NULL, Exclusive);
- sync.lock(Shared);
//Log::debug(%d: "Initiating flush\n", dbb->deltaTime);
flushArg = arg;
flushPages = 0;
physicalWrites = 0;
+ dirtyLock.lock(Shared);
for (Bdb *bdb = firstDirty; bdb; bdb = bdb->nextDirty)
{
bdb->flushIt = true;
flushBitmap->set(bdb->pageNumber);
++flushPages;
}
+ dirtyLock.unlock();
- if (traceFile)
- analyzeFlush();
+ analyzeFlush();
flushStart = database->timestamp;
flushing = true;
- sync.unlock();
flushLock.unlock();
for (int n = 0; n < numberIoThreads; ++n)
@@ -398,69 +510,115 @@ void Cache::flush(int64 arg)
void Cache::moveToHead(Bdb * bdb)
{
+ // If buffer has moved out of the upper "fraction" of the LRU queue, move it back up
+ // non-protected access to age is harmless since it is fuzzy anyway
+ if (bdb->age < bufferAge - (uint64) upperFraction)
+ {
+ Sync bufferQueueLock (&bufferQueue.syncObject, "Cache::moveToHead");
+
+ bufferQueueLock.lock (Exclusive);
+ bdb->age = bufferAge++;
+ bufferQueue.remove(bdb);
+ bufferQueue.prepend(bdb);
+ //validateUnique (bdb);
+ }
+}
+
+void Cache::moveToHeadAlreadyLocked(Bdb * bdb)
+{
bdb->age = bufferAge++;
bufferQueue.remove(bdb);
bufferQueue.prepend(bdb);
//validateUnique (bdb);
}
-Bdb* Cache::findBuffer(Dbb *dbb, int pageNumber, LockType lockType)
+Bdb* Cache::getFreeBuffer(void)
{
- //syncObject.validateExclusive("Cache::findBuffer");
- int slot = pageNumber % hashSize;
- Sync sync(&syncDirty, "Cache::findBuffer");
-
- /* Find least recently used, not-in-use buffer */
-
+ Sync bufferQueueLock (&bufferQueue.syncObject, "Cache::getFreeBuffer");
+ unsigned int count;
Bdb *bdb;
// Find a candidate BDB.
-
for (;;)
{
- for (bdb = bufferQueue.last; bdb; bdb = bdb->prior)
- if (bdb->useCount == 0)
+ bufferQueueLock.lock (Exclusive);
+ // find a candidate that is NOT in use and NOT dirty and in the tail fraction of the
LRU
+ for (count = 0, bdb = bufferQueue.last; bdb ; bdb = bdb->prior, count++)
+ {
+ if (count >= upperFraction)
+ {
+ bdb = NULL;
break;
+ }
+ if (bdb->useCount == 0)
+ {
+ if (!bdb->isDirty)
+ {
+ bdb->incrementUseCount(REL_HISTORY);
+ moveToHeadAlreadyLocked(bdb);
+ break;
+ }
+ }
+ else
+ {
+ // get this one out of the way so we don't search it every time
+ moveToHeadAlreadyLocked(bdb,count);
+ }
+ }
+ if (!bdb)
+ // find a candidate that is NOT in use, could be dirty
+ for (bdb = bufferQueue.last; bdb; bdb = bdb->prior)
+ if (bdb->useCount == 0)
+ {
+ bdb->incrementUseCount(REL_HISTORY);
+ moveToHeadAlreadyLocked(bdb);
+ break;
+ }
+ bufferQueueLock.unlock();
if (!bdb)
throw SQLError(RUNTIME_ERROR, "buffer pool is exhausted\n");
- if (!bdb->isDirty)
- break;
-
- writePage (bdb, WRITE_TYPE_REUSE);
- }
-
- /* Unlink its old incarnation from the page/hash table */
+ if (bdb->pageNumber >= 0)
+ {
+ int slotRemove = PAGENUM_2_SLOT(bdb->pageNumber);
+ Sync lockHashRemove (&syncHashTable[PAGENUM_2_LOCK_INDEX(bdb->pageNumber,
slotRemove)], "Cache::getFreeBuffer");
+ lockHashRemove.lock(Exclusive);
- if (bdb->pageNumber >= 0)
- for (Bdb **ptr = hashTable + bdb->pageNumber % hashSize;; ptr =
&(*ptr)->hash)
- if (*ptr == bdb)
+ if (bdb->useCount != 1)
{
- *ptr = bdb->hash;
- break;
+ // we lost a race try again
+ bdb->decrementUseCount(REL_HISTORY);
+ lockHashRemove.unlock();
+ continue;
}
- else
- ASSERT (*ptr);
-
- bdb->addRef (lockType COMMA_ADD_HISTORY);
- /* Set new page number and relink into hash table */
+ if (bdb->isDirty)
+ writePage (bdb, WRITE_TYPE_REUSE);
- bdb->hash = hashTable [slot];
- hashTable [slot] = bdb;
- bdb->pageNumber = pageNumber;
- bdb->dbb = dbb;
+ /* Unlink its old incarnation from the page/hash table */
+ for (Bdb **ptr = hashTable + PAGENUM_2_SLOT(bdb->pageNumber) ;; ptr =
&(*ptr)->hash)
+ if (*ptr == bdb)
+ {
+ *ptr = bdb->hash;
+ break;
+ }
+ else
+ ASSERT (*ptr);
+ }
-#ifdef COLLECT_BDB_HISTORY
- bdb->initHistory();
-#endif
+ break;
+ }
return bdb;
}
void Cache::validate()
{
+ //Sync bufferQueueLock (&bufferQueue.syncObject, "Cache::validate");
+
+ //bufferQueueLock.lock (Shared);
+ // non-protected access to bufferQueue is DANGEROUS...
for (Bdb *bdb = bufferQueue.last; bdb; bdb = bdb->prior)
{
//IndexPage *page = (IndexPage*) bdb->buffer;
@@ -470,8 +628,8 @@ void Cache::validate()
void Cache::markDirty(Bdb *bdb)
{
- Sync sync (&syncDirty, "Cache::markDirty");
- sync.lock (Exclusive);
+ Sync dirtyLock (&syncDirty, "Cache::markDirty");
+ dirtyLock.lock (Exclusive);
bdb->nextDirty = NULL;
bdb->priorDirty = lastDirty;
@@ -481,14 +639,13 @@ void Cache::markDirty(Bdb *bdb)
firstDirty = bdb;
lastDirty = bdb;
- ++numberDirtyPages;
//validateUnique (bdb);
}
void Cache::markClean(Bdb *bdb)
{
- Sync sync (&syncDirty, "Cache::markClean");
- sync.lock (Exclusive);
+ Sync dirtyLock (&syncDirty, "Cache::markClean");
+ dirtyLock.lock (Exclusive);
/***
if (bdb->flushIt)
@@ -496,7 +653,6 @@ void Cache::markClean(Bdb *bdb)
***/
bdb->flushIt = false;
- --numberDirtyPages;
if (bdb == lastDirty)
lastDirty = bdb->priorDirty;
@@ -600,8 +756,8 @@ void Cache::writePage(Bdb *bdb, int type
if (dbb->shadows)
{
- Sync sync (&dbb->syncClone, "Cache::writePage(2)");
- sync.lock (Shared);
+ Sync cloneLock (&dbb->syncClone, "Cache::writePage(2)");
+ cloneLock.lock (Shared);
for (DatabaseCopy *shadow = dbb->shadows; shadow; shadow = shadow->next)
shadow->rewritePage(bdb);
@@ -610,14 +766,14 @@ void Cache::writePage(Bdb *bdb, int type
void Cache::analyze(Stream *stream)
{
- Sync sync (&syncDirty, "Cache::analyze");
- sync.lock (Shared);
+ Sync dirtyLock (&syncDirty, "Cache::analyze");
int inUse = 0;
int dirty = 0;
int dirtyList = 0;
int total = 0;
Bdb *bdb;
+ // non-protected access to bdbs,endBdbs is DANGEROUS...
for (bdb = bdbs; bdb < endBdbs; ++bdb)
{
++total;
@@ -629,8 +785,10 @@ void Cache::analyze(Stream *stream)
++inUse;
}
+ dirtyLock.lock (Shared);
for (bdb = firstDirty; bdb; bdb = bdb->nextDirty)
++dirtyList;
+ dirtyLock.unlock();
stream->format ("Cache: %d pages, %d in use, %d dirty, %d in dirty chain\n",
total, inUse, dirty, dirtyList);
@@ -638,17 +796,18 @@ void Cache::analyze(Stream *stream)
void Cache::validateUnique(Bdb *target)
{
- int slot = target->pageNumber % hashSize;
+ int slot = PAGENUM_2_SLOT(target->pageNumber);
+ // WARNING: unlocked walk of hash table.... DANGEROUS
for (Bdb *bdb = hashTable [slot]; bdb; bdb = bdb->hash)
ASSERT (bdb == target || !(bdb->pageNumber == target->pageNumber &&
bdb->dbb == target->dbb));
}
void Cache::freePage(Dbb *dbb, int32 pageNumber)
{
- Sync sync (&syncObject, "Cache::freePage");
- sync.lock (Shared);
- int slot = pageNumber % hashSize;
+ int slot = PAGENUM_2_SLOT(pageNumber);
+ Sync lockHash (&syncHashTable[PAGENUM_2_LOCK_INDEX(pageNumber, slot)],
"Cache::freePage");
+ lockHash.lock(Shared);
// If page exists in cache (usual case), clean it up
@@ -657,7 +816,7 @@ void Cache::freePage(Dbb *dbb, int32 pag
{
if (bdb->isDirty)
{
- sync.unlock();
+ lockHash.unlock();
markClean (bdb);
}
@@ -685,12 +844,16 @@ void Cache::flush(Dbb *dbb)
bool Cache::hasDirtyPages(Dbb *dbb)
{
- Sync sync (&syncDirty, "Cache::hasDirtyPages");
- sync.lock (Shared);
+ Sync dirtyLock (&syncDirty, "Cache::hasDirtyPages");
+ dirtyLock.lock (Shared);
for (Bdb *bdb = firstDirty; bdb; bdb = bdb->nextDirty)
if (bdb->dbb == dbb)
+ {
+ dirtyLock.unlock();
return true;
+ }
+ dirtyLock.unlock();
return false;
}
@@ -717,26 +880,16 @@ Bdb* Cache::trialFetch(Dbb* dbb, int32 p
}
ASSERT (pageNumber >= 0);
- int slot = pageNumber % hashSize;
- Sync sync (&syncObject, "Cache::trialFetch");
- sync.lock (Shared);
- int hit = 0;
-
- /* If we already have a buffer for this go, we're done */
-
Bdb *bdb;
- for (bdb = hashTable [slot]; bdb; bdb = bdb->hash)
- if (bdb->pageNumber == pageNumber && bdb->dbb == dbb)
- {
- //syncObject.validateShared("Cache::trialFetch");
- bdb->incrementUseCount(ADD_HISTORY);
- sync.unlock();
- bdb->addRef(lockType COMMA_ADD_HISTORY);
- bdb->decrementUseCount(REL_HISTORY);
- hit = 1;
- break;
- }
+ /* If we already have a buffer for this, we're done */
+ bdb = lockFindBdbIncrementUseCount(dbb, pageNumber);
+ if (bdb)
+ {
+ bdb->addRef(lockType COMMA_ADD_HISTORY);
+ bdb->decrementUseCount(REL_HISTORY);
+ moveToHead(bdb);
+ }
return bdb;
}
@@ -764,10 +917,9 @@ void Cache::ioThread(void* arg)
void Cache::ioThread(void)
{
- Sync syncThread(&syncThreads, "Cache::ioThread(1)");
+ Sync syncThread(&syncThreads, "Cache::ioThread");
syncThread.lock(Shared);
- Sync flushLock(&syncFlush, "Cache::ioThread(2)");
- Sync sync(&syncObject, "Cache::ioThread(3)");
+ Sync flushLock(&syncFlush, "Cache::ioThread");
Priority priority(database->ioScheduler);
Thread *thread = Thread::getThread("Cache::ioThread");
UCHAR *rawBuffer = new UCHAR[ASYNC_BUFFER_SIZE];
@@ -781,137 +933,135 @@ void Cache::ioThread(void)
{
int32 pageNumber = flushBitmap->nextSet(0);
int count;
- Dbb *dbb;
if (pageNumber >= 0)
{
- int slot = pageNumber % hashSize;
+ Bdb *bdb;
+ Dbb *dbb;
+ int slot = PAGENUM_2_SLOT(pageNumber);
bool hit = false;
Bdb *bdbList = NULL;
UCHAR *p = buffer;
- sync.lock(Shared);
-
- // Look for a page to flush. Then get all his friends
- for (Bdb *bdb = hashTable[slot]; bdb; bdb = bdb->hash)
- if (bdb->pageNumber == pageNumber && bdb->flushIt &&
bdb->isDirty)
+ // Look for the page to flush.
+ bdb = lockFindBdbIncrementUseCount(pageNumber, slot);
+ if (bdb && bdb->flushIt && bdb->isDirty)
+ {
+ hit = true;
+ count = 0;
+ dbb = bdb->dbb;
+
+ flushBitmap->clear(pageNumber);
+
+ // get all his friends
+ while (p < end)
{
- hit = true;
- count = 0;
- dbb = bdb->dbb;
+ ++count;
+ bdb->addRef(Shared COMMA_ADD_HISTORY);
- if (!bdb->hash)
- flushBitmap->clear(pageNumber);
+ bdb->syncWrite.lock(NULL, Exclusive);
+ bdb->ioThreadNext = bdbList;
+ bdbList = bdb;
- while (p < end)
- {
- ++count;
- bdb->incrementUseCount(ADD_HISTORY);
- sync.unlock();
- bdb->addRef(Shared COMMA_ADD_HISTORY);
- if (falcon_use_sectorcache)
- sectorCache->writePage(bdb);
-
- bdb->syncWrite.lock(NULL, Exclusive);
- bdb->ioThreadNext = bdbList;
- bdbList = bdb;
-
- //ASSERT(!(bdb->flags & BDB_write_pending));
- //bdb->flags |= BDB_write_pending;
- memcpy(p, bdb->buffer, pageSize);
- p += pageSize;
- bdb->flushIt = false;
- markClean(bdb);
- bdb->isDirty = false;
- bdb->release(REL_HISTORY);
- sync.lock(Shared);
-
- if ( !(bdb = findBdb(dbb, bdb->pageNumber + 1)) )
- break;
-
- if (!bdb->isDirty && !continueWrite(bdb))
- break;
- }
+ //ASSERT(!(bdb->flags & BDB_write_pending));
+ //bdb->flags |= BDB_write_pending;
+ memcpy(p, bdb->buffer, pageSize);
+ p += pageSize;
+ bdb->flushIt = false;
+ markClean(bdb);
+ bdb->isDirty = false;
+ bdb->release(REL_HISTORY);
- if (sync.state != None)
- sync.unlock();
-
- flushLock.unlock();
- //Log::debug(" %d Writing %s %d pages: %d - %d\n", thread->threadId, (const
char*) dbb->fileName, count, pageNumber, pageNumber + count - 1);
- int length = p - buffer;
- priority.schedule(PRIORITY_LOW);
+ bdb = lockFindBdbIncrementUseCount(dbb, bdb->pageNumber + 1);
+ if (!bdb)
+ break;
- try
+ if (!bdb->isDirty && !continueWrite(bdb))
{
- priority.schedule(PRIORITY_LOW);
- dbb->writePages(pageNumber, length, buffer, WRITE_TYPE_FLUSH);
+ bdb->decrementUseCount(REL_HISTORY);
+ break;
}
- catch (SQLException& exception)
+ }
+
+ flushLock.unlock();
+ //Log::debug(" %d Writing %s %d pages: %d - %d\n", thread->threadId, (const char*)
dbb->fileName, count, pageNumber, pageNumber + count - 1);
+ int length = p - buffer;
+ priority.schedule(PRIORITY_LOW);
+
+ try
+ {
+ priority.schedule(PRIORITY_LOW);
+ dbb->writePages(pageNumber, length, buffer, WRITE_TYPE_FLUSH);
+ }
+ catch (SQLException& exception)
+ {
+ priority.finished();
+
+ if (exception.getSqlcode() != DEVICE_FULL)
+ throw;
+
+ database->setIOError(&exception);
+
+ for (bool error = true; error;)
{
- priority.finished();
-
- if (exception.getSqlcode() != DEVICE_FULL)
- throw;
-
- database->setIOError(&exception);
-
- for (bool error = true; error;)
+ if (thread->shutdownInProgress)
{
- if (thread->shutdownInProgress)
- {
- Bdb *next;
+ Bdb *next;
- for (bdb = bdbList; bdb; bdb = next)
- {
- //bdb->flags &= ~BDB_write_pending;
- next = bdb->ioThreadNext;
- bdb->syncWrite.unlock();
- bdb->decrementUseCount(REL_HISTORY);
- }
-
- return;
- }
-
- thread->sleep(1000);
-
- try
+ for (bdb = bdbList; bdb; bdb = next)
{
- priority.schedule(PRIORITY_LOW);
- dbb->writePages(pageNumber, length, buffer, WRITE_TYPE_FLUSH);
- error = false;
- database->clearIOError();
+ //bdb->flags &= ~BDB_write_pending;
+ next = bdb->ioThreadNext;
+ bdb->syncWrite.unlock();
+ bdb->decrementUseCount(REL_HISTORY);
}
- catch (SQLException& exception2)
- {
- priority.finished();
- if (exception2.getSqlcode() != DEVICE_FULL)
- throw;
- }
+ return;
+ }
+
+ thread->sleep(1000);
+
+ try
+ {
+ priority.schedule(PRIORITY_LOW);
+ dbb->writePages(pageNumber, length, buffer, WRITE_TYPE_FLUSH);
+ error = false;
+ database->clearIOError();
+ }
+ catch (SQLException& exception2)
+ {
+ priority.finished();
+
+ if (exception2.getSqlcode() != DEVICE_FULL)
+ throw;
}
}
+ }
- priority.finished();
- Bdb *next;
+ priority.finished();
+ Bdb *next;
- for (bdb = bdbList; bdb; bdb = next)
- {
- //ASSERT(bdb->flags & BDB_write_pending);
- //bdb->flags &= ~BDB_write_pending;
- next = bdb->ioThreadNext;
- bdb->syncWrite.unlock();
- bdb->decrementUseCount(REL_HISTORY);
- }
-
- flushLock.lock(Exclusive);
- ++physicalWrites;
-
- break;
+ for (bdb = bdbList; bdb; bdb = next)
+ {
+ //ASSERT(bdb->flags & BDB_write_pending);
+ //bdb->flags &= ~BDB_write_pending;
+ next = bdb->ioThreadNext;
+ bdb->syncWrite.unlock();
+ bdb->decrementUseCount(REL_HISTORY);
}
+
+ flushLock.lock(Exclusive);
+ ++physicalWrites;
+
+ }
+ else
+ {
+ if (bdb)
+ bdb->decrementUseCount(REL_HISTORY);
+ }
if (!hit)
{
- sync.unlock();
flushBitmap->clear(pageNumber);
}
}
@@ -922,7 +1072,9 @@ void Cache::ioThread(void)
int writes = physicalWrites;
int pages = flushPages;
int delta = (int) (database->timestamp - flushStart);
+ int64 callbackArg = flushArg;
flushing = false;
+ flushArg = 0;
flushLock.unlock();
syncWait.unlock();
@@ -930,7 +1082,8 @@ void Cache::ioThread(void)
Log::log(LogInfo, "%d: Cache flush: %d pages, %d writes in %d seconds (%d pps)\n",
database->deltaTime, pages, writes, delta, pages / MAX(delta, 1));
- database->pageCacheFlushed(flushArg);
+ if (callbackArg != 0)
+ database->pageCacheFlushed(callbackArg);
}
else
flushLock.unlock();
@@ -940,8 +1093,8 @@ void Cache::ioThread(void)
thread->sleep();
flushLock.lock(Exclusive);
- }
}
+ } // for ever
delete [] rawBuffer;
}
@@ -954,11 +1107,12 @@ bool Cache::continueWrite(Bdb* startingB
for (int32 pageNumber = startingBdb->pageNumber + 1, end = pageNumber+ 5; pageNumber
< end; ++pageNumber)
{
- Bdb *bdb = findBdb(dbb, pageNumber);
+ Bdb *bdb;
if (dirty > clean)
return true;
-
+
+ bdb = lockFindBdbIncrementUseCount(dbb, pageNumber);
if (!bdb)
return dirty >= clean;
@@ -966,6 +1120,7 @@ bool Cache::continueWrite(Bdb* startingB
++dirty;
else
++clean;
+ bdb->decrementUseCount(REL_HISTORY);
}
return (dirty >= clean);
@@ -995,15 +1150,18 @@ void Cache::shutdownThreads(void)
ioThreads[n] = 0;
}
- Sync sync(&syncThreads, "Cache::shutdownThreads");
- sync.lock(Exclusive);
+ Sync lockThreads(&syncThreads, "Cache::shutdownThreads");
+ lockThreads.lock(Exclusive);
}
+#ifdef CACHE_TRACE_FILE
void Cache::analyzeFlush(void)
{
Dbb *dbb = NULL;
Bdb *bdb;
+ Sync dirtyLock (&syncDirty, "Cache::hasDirtyPages");
+ dirtyLock.lock (Shared);
for (bdb = firstDirty; bdb; bdb = bdb->nextDirty)
if (bdb->dbb->tableSpaceId == 1)
{
@@ -1011,6 +1169,7 @@ void Cache::analyzeFlush(void)
break;
}
+ dirtyLock.unlock();
if (!dbb)
return;
@@ -1018,16 +1177,19 @@ void Cache::analyzeFlush(void)
fprintf(traceFile, "-------- time %d -------\n", database->deltaTime);
for (int pageNumber = 0; (pageNumber = flushBitmap->nextSet(pageNumber)) >= 0;)
+ // non-protected access to hash table via findBdb()!
if ( (bdb = findBdb(dbb, pageNumber)) )
{
int start = pageNumber;
int type = bdb->buffer->pageType;
+ // non-protected access to hash table via findBdb()!
for (; (bdb = findBdb(dbb, ++pageNumber)) && bdb->flushIt;)
;
fprintf(traceFile, " %d flushed: %d to %d, first type %d\n", pageNumber - start,
start, pageNumber - 1, type);
+ // non-protected access to hash table via findBdb()!
for (int max = pageNumber + 5; pageNumber < max && (bdb = findBdb(dbb,
pageNumber)) && !bdb->flushIt; ++pageNumber)
{
if (bdb->isDirty)
@@ -1044,27 +1206,39 @@ void Cache::analyzeFlush(void)
void Cache::openTraceFile(void)
{
-#ifdef TRACE_FILE
if (traceFile)
closeTraceFile();
- traceFile = fopen(TRACE_FILE, "w");
-#endif
+ traceFile = fopen(TRACE_FILE, "a+");
+ fprintf(traceFile, "Starting\n");
+//KEL
+// setvbuf(traceFile, (char *) NULL, _IOLBF, 0);
+
}
void Cache::closeTraceFile(void)
{
-#ifdef TRACE_FILE
if (traceFile)
{
fclose(traceFile);
traceFile = NULL;
}
-#endif
}
+#else // CACHE_TRACE_FILE
+void Cache::analyzeFlush(void)
+{
+}
+void Cache::openTraceFile(void)
+{
+}
+void Cache::closeTraceFile(void)
+{
+}
+#endif // CACHE_TRACE_FILE
void Cache::flushWait(void)
{
- Sync sync(&syncWait, "Cache::flushWait");
- sync.lock(Shared);
+ Sync waitLock(&syncWait, "Cache::flushWait");
+ waitLock.lock(Exclusive);
}
+
=== modified file 'storage/falcon/Cache.h'
--- a/storage/falcon/Cache.h 2008-08-22 06:47:40 +0000
+++ b/storage/falcon/Cache.h 2008-09-02 16:08:11 +0000
@@ -28,6 +28,17 @@
#include "SyncObject.h"
#include "Queue.h"
+// uncomment DEBUG_SYNC_HASH_TABLE_SIZE to cause more contention and test for race
conditions
+//#define DEBUG_SYNC_HASH_TABLE_SIZE (0x01 << 1)
+#ifdef DEBUG_SYNC_HASH_TABLE_SIZE
+# define DEBUG_SYNC_HASH_TABLE_MASK (DEBUG_SYNC_HASH_TABLE_SIZE - 1)
+# define PAGENUM_2_LOCK_INDEX(_pgnum, _slot) ((_pgnum) & DEBUG_SYNC_HASH_TABLE_MASK)
+#else /* DEBUG_SYNC_HASH_TABLE_SIZE */
+# define PAGENUM_2_LOCK_INDEX(_pgnum, _slot) ((_slot))
+#endif /* DEBUG_SYNC_HASH_TABLE_SIZE */
+
+#define PAGENUM_2_SLOT(_pgnum) ((_pgnum) & hashMask)
+
class Bdb;
class Dbb;
class PageWriter;
@@ -54,7 +65,6 @@ public:
void markClean (Bdb *bdb);
void markDirty (Bdb *bdb);
void validate();
- void moveToHead (Bdb *bdb);
void flush(int64 arg);
void validateCache(void);
void syncFile(Dbb *dbb, const char *text);
@@ -83,14 +93,20 @@ public:
bool flushing;
protected:
- Bdb* findBuffer (Dbb *dbb, int pageNumber, LockType lockType);
+ void moveToHead (Bdb *bdb);
+ void moveToHeadAlreadyLocked (Bdb *bdb);
+ Bdb* getFreeBuffer(void);
+ Bdb* findBdb(Dbb* dbb, int32 pageNumber, int slot);
Bdb* findBdb(Dbb* dbb, int32 pageNumber);
+ Bdb* lockFindBdbIncrementUseCount(Dbb* dbb, int32 pageNumber);
+ Bdb* lockFindBdbIncrementUseCount(int32 pageNumber, int slot);
int64 flushArg;
Bdb *bdbs;
Bdb *endBdbs;
Queue<Bdb> bufferQueue;
Bdb **hashTable;
+ SyncObject *syncHashTable;
Bdb *firstDirty;
Bdb *lastDirty;
Bitmap *flushBitmap;
@@ -105,12 +121,13 @@ protected:
int flushPages;
int physicalWrites;
int hashSize;
+ unsigned int hashMask;
int pageSize;
- int upperFraction;
+ unsigned int upperFraction;
int numberHunks;
- int numberDirtyPages;
int numberIoThreads;
- volatile int bufferAge;
+ volatile uint64 bufferAge;
+
public:
void flushWait(void);
};