#At file:///FC/MYSQL/wa-2008-BZR/lcl/mysql-6.0-falcon-team-KL-Cache/
2786 Kelly Long 2008-08-18
Page cache lock per hash bucket. WL4480
Page cache hash buckets are a power of two. WL4481
modified:
storage/falcon/Cache.cpp
storage/falcon/Cache.h
per-file messages:
storage/falcon/Cache.h
Page cache lock per hash bucket. WL4480
Page cache hash buckets are a power of two. WL4481
=== modified file 'storage/falcon/Cache.cpp'
--- a/storage/falcon/Cache.cpp 2008-07-24 08:45:03 +0000
+++ b/storage/falcon/Cache.cpp 2008-08-18 13:47:57 +0000
@@ -72,7 +72,22 @@ Cache::Cache(Database *db, int pageSz, i
database = db;
panicShutdown = false;
pageSize = pageSz;
- hashSize = hashSz;
+ {
+ unsigned int highBit;
+ for (highBit=0x01; highBit < hashSz; highBit= highBit << 1) {
+ }
+
+ // if there are more than 4096 buckets then lets round down
+ // else lets round up
+ if (highBit >= 0x00001000) {
+ // KEL use power of two rounded down
+ hashSize = highBit << 1;
+ } else {
+ // KEL use power of two rounded up
+ hashSize = highBit;
+ }
+ }
+ hashMask = hashSize - 1;
numberBuffers = numBuffers;
upperFraction = numberBuffers / 4;
bufferAge = 0;
@@ -80,8 +95,21 @@ Cache::Cache(Database *db, int pageSz, i
lastDirty = NULL;
numberDirtyPages = 0;
pageWriter = NULL;
- hashTable = new Bdb* [hashSz];
+ hashTable = new Bdb* [hashSize];
memset (hashTable, 0, sizeof (Bdb*) * hashSize);
+#ifdef DEBUG_SYNC_HASH_TABLE_SIZE
+ syncHashTable = new SyncObject [DEBUG_SYNC_HASH_TABLE_SIZE];
+ for (int loop = 0; loop < DEBUG_SYNC_HASH_TABLE_SIZE; loop ++)
+ syncHashTable[loop].setName("Cache::syncHashTable");
+#else /* DEBUG_SYNC_HASH_TABLE_SIZE */
+ syncHashTable = new SyncObject [hashSize];
+ for (int loop = 0; loop < hashSize; loop ++)
+ {
+ char tmpName[128];
+ snprintf(tmpName,120,"Cache::syncHashTable[%d]",loop);
+ syncHashTable[loop].setName(tmpName);
+ }
+#endif /* DEBUG_SYNC_HASH_TABLE_SIZE */
sectorCache = new SectorCache(sectorCacheSize / SECTOR_BUFFER_SIZE, pageSize);
uint64 n = ((uint64) pageSize * numberBuffers + cacheHunkSize - 1) / cacheHunkSize;
@@ -103,6 +131,7 @@ Cache::Cache(Database *db, int pageSz, i
try
{
+ // non-protected access to bdbs,endBdbs is OK during initialization
bdbs = new Bdb [numberBuffers];
endBdbs = bdbs + numberBuffers;
int remaining = 0;
@@ -121,6 +150,7 @@ Cache::Cache(Database *db, int pageSz, i
}
bdb->cache = this;
+ // non-protected access to bufferQueue is OK during initialization
bufferQueue.append(bdb);
bdb->buffer = (Page*) stuff;
stuff += pageSize;
@@ -150,6 +180,7 @@ Cache::~Cache()
closeTraceFile();
delete [] hashTable;
+ delete [] syncHashTable;
delete [] bdbs;
delete [] ioThreads;
delete flushBitmap;
@@ -167,14 +198,16 @@ Cache::~Cache()
Bdb* Cache::probePage(Dbb *dbb, int32 pageNumber)
{
ASSERT (pageNumber >= 0);
- Sync sync (&syncObject, "Cache::probePage");
- sync.lock (Shared);
- Bdb *bdb = findBdb(dbb, pageNumber);
+ int slot = PAGENUM_2_SLOT(pageNumber);
+ Sync lockHash (&syncHashTable[PAGENUM_2_LOCK_INDEX(pageNumber, slot)], "Cache::probePage");
+ lockHash.lock (Shared);
+ Bdb *bdb;
+ bdb = findBdb(dbb, pageNumber, slot);
if (bdb)
{
bdb->incrementUseCount(ADD_HISTORY);
- sync.unlock();
+ lockHash.unlock();
if (bdb->buffer->pageType == PAGE_free)
{
@@ -189,15 +222,57 @@ Bdb* Cache::probePage(Dbb *dbb, int32 pa
return bdb;
}
+ lockHash.unlock();
return NULL;
}
-Bdb* Cache::findBdb(Dbb* dbb, int32 pageNumber)
+Bdb* Cache::findBdb(Dbb* dbb, int32 pageNumber, int slot)
+{
+ for (Bdb *bdb = hashTable [slot]; bdb; bdb = bdb->hash)
{
- for (Bdb *bdb = hashTable [pageNumber % hashSize]; bdb; bdb = bdb->hash)
if (bdb->pageNumber == pageNumber && bdb->dbb == dbb)
+ {
+ return bdb;
+ }
+ }
+
+ return NULL;
+}
+
+Bdb* Cache::findBdb(Dbb* dbb, int32 pageNumber)
+{
+ return (findBdb(dbb, pageNumber, PAGENUM_2_SLOT(pageNumber)));
+}
+
+Bdb* Cache::lockFindBdbIncrementUseCount(Dbb* dbb, int32 pageNumber)
+{
+ int slot = PAGENUM_2_SLOT(pageNumber);
+ Sync lockHash (&syncHashTable[PAGENUM_2_LOCK_INDEX(pageNumber, slot)], "Cache::lockFindBdbIncrementUseCount");
+ lockHash.lock (Shared);
+ Bdb *bdb;
+
+ bdb = findBdb(dbb, pageNumber, slot);
+ if (bdb != NULL)
+ bdb->incrementUseCount(ADD_HISTORY);
+
+ lockHash.unlock();
+ return bdb;
+}
+
+Bdb* Cache::lockFindBdbIncrementUseCount(int32 pageNumber, int slot)
+{
+ Sync lockHash (&syncHashTable[PAGENUM_2_LOCK_INDEX(pageNumber, slot)], "Cache::lockFindBdbIncrementUseCount");
+ lockHash.lock (Shared);
+
+ for (Bdb *bdb = hashTable [slot]; bdb; bdb = bdb->hash)
+ if (bdb->pageNumber == pageNumber)
+ {
+ bdb->incrementUseCount(ADD_HISTORY);
+ lockHash.unlock();
return bdb;
+ }
+ lockHash.unlock();
return NULL;
}
@@ -217,51 +292,46 @@ Bdb* Cache::fetchPage(Dbb *dbb, int32 pa
#endif
ASSERT (pageNumber >= 0);
- int slot = pageNumber % hashSize;
- LockType actual = lockType;
- Sync sync (&syncObject, "Cache::fetchPage");
- sync.lock (Shared);
- int hit = 0;
-
- /* If we already have a buffer for this go, we're done */
+ int slot = PAGENUM_2_SLOT(pageNumber);
+ Sync lockHash (&syncHashTable[PAGENUM_2_LOCK_INDEX(pageNumber, slot)], "Cache::fetchPage");
+ lockHash.lock (Shared);
Bdb *bdb;
- for (bdb = hashTable [slot]; bdb; bdb = bdb->hash)
- if (bdb->pageNumber == pageNumber && bdb->dbb == dbb)
+ bdb = findBdb(dbb, pageNumber, slot);
+ if (!bdb)
{
- //syncObject.validateShared("Cache::fetchPage");
- bdb->incrementUseCount(ADD_HISTORY);
- sync.unlock();
- bdb->addRef(lockType COMMA_ADD_HISTORY);
- bdb->decrementUseCount(REL_HISTORY);
- hit = 1;
- break;
- }
+ lockHash.unlock();
+ // get getFreeBuffer() locks a hash bucket to remove the candidate bdb
+ // if we locked out hash bucket before the call then we could have
+ // a deadlock
+ // thus we get the free buffer before we lock the hash bucket we will
+ // be inserting into. This avoids a dead lock but generates a race
+ // we take care of the race by reversing the getFreeBuffer() work
+ // when we lose the race
+ Bdb *bdbAvailable;
+ bdbAvailable = getFreeBuffer();
+ lockHash.lock(Exclusive);
+ bdb = findBdb(dbb, pageNumber, slot);
if (!bdb)
{
- sync.unlock();
- actual = Exclusive;
- sync.lock(Exclusive);
-
- for (bdb = hashTable [slot]; bdb; bdb = bdb->hash)
- if (bdb->pageNumber == pageNumber && bdb->dbb == dbb)
- {
- //syncObject.validateExclusive("Cache::fetchPage (retry)");
- bdb->incrementUseCount(ADD_HISTORY);
- sync.unlock();
- bdb->addRef(lockType COMMA_ADD_HISTORY);
- bdb->decrementUseCount(REL_HISTORY);
- hit = 2;
- break;
- }
+ // we won the race so lets use the free bdb
+ /* Set new page number and relink into hash table */
+ bdbAvailable->addRef (Exclusive COMMA_ADD_HISTORY);
+ bdbAvailable->decrementUseCount(REL_HISTORY);
+
+ bdbAvailable->hash = hashTable [slot];
+ hashTable [slot] = bdbAvailable;
+ bdbAvailable->pageNumber = pageNumber;
+ bdbAvailable->dbb = dbb;
- if (!bdb)
- {
- bdb = findBuffer(dbb, pageNumber, actual);
+#ifdef COLLECT_BDB_HISTORY
+ bdbAvailable->initHistory();
+#endif
+ bdb = bdbAvailable;
moveToHead(bdb);
- sync.unlock();
+ lockHash.unlock();
#ifdef STOP_PAGE
if (bdb->pageNumber == STOP_PAGE)
@@ -278,9 +348,30 @@ Bdb* Cache::fetchPage(Dbb *dbb, int32 pa
#ifdef HAVE_PAGE_NUMBER
ASSERT(bdb->buffer->pageNumber == pageNumber);
#endif
- if (actual != lockType)
+ if (Exclusive != lockType)
bdb->downGrade(lockType);
}
+ else
+ {
+ // lost a race. put our available back to useable
+ bdbAvailable->pageNumber = -1;
+ bdbAvailable->dbb = NULL;
+ bdbAvailable->decrementUseCount(REL_HISTORY);
+
+ //syncObject.validateExclusive("Cache::fetchPage (retry)");
+ bdb->incrementUseCount(ADD_HISTORY);
+ lockHash.unlock();
+ bdb->addRef(lockType COMMA_ADD_HISTORY);
+ bdb->decrementUseCount(REL_HISTORY);
+ }
+ }
+ else
+ {
+ //syncObject.validateShared("Cache::fetchPage");
+ bdb->incrementUseCount(ADD_HISTORY);
+ lockHash.unlock();
+ bdb->addRef(lockType COMMA_ADD_HISTORY);
+ bdb->decrementUseCount(REL_HISTORY);
}
Page *page = bdb->buffer;
@@ -304,9 +395,9 @@ Bdb* Cache::fetchPage(Dbb *dbb, int32 pa
// If buffer has moved out of the upper "fraction" of the LRU queue, move it back up
+ // non-protected access to age is harmless since it is fuzzy anyway
if (bdb->age < bufferAge - (uint64) upperFraction)
{
- sync.lock (Exclusive);
moveToHead (bdb);
}
@@ -319,9 +410,10 @@ Bdb* Cache::fetchPage(Dbb *dbb, int32 pa
Bdb* Cache::fakePage(Dbb *dbb, int32 pageNumber, PageType type, TransId transId)
{
- Sync sync(&syncObject, "Cache::fakePage");
- sync.lock(Exclusive);
- int slot = pageNumber % hashSize;
+ int slot = PAGENUM_2_SLOT(pageNumber);
+ Sync lockHash (&syncHashTable[PAGENUM_2_LOCK_INDEX(pageNumber, slot)], "Cache::fakePage");
+ lockHash.lock(Exclusive);
+ Bdb *bdb;
#ifdef STOP_PAGE
if (pageNumber == STOP_PAGE)
@@ -330,25 +422,64 @@ Bdb* Cache::fakePage(Dbb *dbb, int32 pag
/* If we already have a buffer for this, we're done */
- Bdb *bdb;
-
- for (bdb = hashTable [slot]; bdb; bdb = bdb->hash)
- if (bdb->pageNumber == pageNumber && bdb->dbb == dbb)
+ bdb = findBdb(dbb, pageNumber, slot);
+ if (!bdb)
{
- if (bdb->syncObject.isLocked())
+ lockHash.unlock();
+ // get getFreeBuffer() locks a hash bucket to remove the candidate bdb
+ // if we locked out hash bucket before the call then we could have
+ // a deadlock
+ // thus we get the free buffer before we lock the hash bucket we will
+ // be inserting into. This avoids a dead lock but generates a race
+ // we take care of the race by reversing the getFreeBuffer() work
+ // when we lose the race
+ Bdb *bdbAvailable;
+ bdbAvailable = getFreeBuffer();
+ lockHash.lock(Exclusive);
+
+ bdb = findBdb(dbb, pageNumber, slot);
+ if (!bdb)
{
- // The pageWriter may still be cleaning up this freed page with a shared lock
- ASSERT(bdb->buffer->pageType == PAGE_free);
- ASSERT(bdb->syncObject.getState() >= 0);
- }
+ // we won the race so lets use the free bdb
+ /* Set new page number and relink into hash table */
+ bdbAvailable->addRef (Exclusive COMMA_ADD_HISTORY);
+ bdbAvailable->decrementUseCount(REL_HISTORY);
- bdb->addRef(Exclusive COMMA_ADD_HISTORY);
+ bdbAvailable->hash = hashTable [slot];
+ hashTable [slot] = bdbAvailable;
+ bdbAvailable->pageNumber = pageNumber;
+ bdbAvailable->dbb = dbb;
+
+#ifdef COLLECT_BDB_HISTORY
+ bdbAvailable->initHistory();
+#endif
+ bdb = bdbAvailable;
+ moveToHead(bdb);
+ lockHash.unlock();
- break;
}
+ else
+ {
+ // lost a race. put our available back to useable
+ bdbAvailable->pageNumber = -1;
+ bdbAvailable->dbb = NULL;
+ bdbAvailable->decrementUseCount(REL_HISTORY);
- if (!bdb)
- bdb = findBuffer(dbb, pageNumber, Exclusive);
+ //syncObject.validateExclusive("Cache::fakePage (retry)");
+ bdb->incrementUseCount(ADD_HISTORY);
+ lockHash.unlock();
+ bdb->addRef(Exclusive COMMA_ADD_HISTORY);
+ bdb->decrementUseCount(REL_HISTORY);
+ }
+ }
+ else
+ {
+ //syncObject.validateShared("Cache::fakePage");
+ bdb->incrementUseCount(ADD_HISTORY);
+ lockHash.unlock();
+ bdb->addRef(Exclusive COMMA_ADD_HISTORY);
+ bdb->decrementUseCount(REL_HISTORY);
+ }
if (!dbb->isReadOnly)
bdb->mark(transId);
@@ -363,14 +494,14 @@ Bdb* Cache::fakePage(Dbb *dbb, int32 pag
void Cache::flush(int64 arg)
{
Sync flushLock(&syncFlush, "Cache::flush(1)");
- Sync sync(&syncDirty, "Cache::flush(2)");
+ Sync dirtyLock(&syncDirty, "Cache::flush(2)");
flushLock.lock(Exclusive);
if (flushing)
return;
syncWait.lock(NULL, Exclusive);
- sync.lock(Shared);
+ dirtyLock.lock(Shared);
//Log::debug(%d: "Initiating flush\n", dbb->deltaTime);
flushArg = arg;
flushPages = 0;
@@ -388,7 +519,7 @@ void Cache::flush(int64 arg)
flushStart = database->timestamp;
flushing = true;
- sync.unlock();
+ dirtyLock.unlock();
flushLock.unlock();
for (int n = 0; n < numberIoThreads; ++n)
@@ -398,69 +529,92 @@ void Cache::flush(int64 arg)
void Cache::moveToHead(Bdb * bdb)
{
+ Sync bufferQueueLock (&bufferQueue.syncObject, "Cache::moveToHead");
+
+ bufferQueueLock.lock (Exclusive);
bdb->age = bufferAge++;
bufferQueue.remove(bdb);
bufferQueue.prepend(bdb);
//validateUnique (bdb);
}
-Bdb* Cache::findBuffer(Dbb *dbb, int pageNumber, LockType lockType)
+Bdb* Cache::getFreeBuffer(void)
{
- //syncObject.validateExclusive("Cache::findBuffer");
- int slot = pageNumber % hashSize;
- Sync sync(&syncDirty, "Cache::findBuffer");
-
- /* Find least recently used, not-in-use buffer */
-
+ Sync bufferQueueLock (&bufferQueue.syncObject, "Cache::getFreeBuffer");
+ unsigned int count;
Bdb *bdb;
// Find a candidate BDB.
-
for (;;)
{
- for (bdb = bufferQueue.last; bdb; bdb = bdb->prior)
+ bufferQueueLock.lock (Exclusive);
+ // find a candidate that is NOT in use and NOT dirty and in the tail fraction of the LRU
+ for (count = 0, bdb = bufferQueue.last; bdb && count < upperFraction; bdb = bdb->prior, count++)
if (bdb->useCount == 0)
- break;
+ {
+ if (!bdb->isDirty)
+ {
+ bdb->incrementUseCount(REL_HISTORY);
+ break;
+ }
+ }
+ else
+ {
+ moveToHead(bdb);
+ }
+ if (!bdb)
+ // find a candidate that is NOT in use, could be dirty
+ for (bdb = bufferQueue.last; bdb; bdb = bdb->prior)
+ if (bdb->useCount == 0)
+ {
+ bdb->incrementUseCount(REL_HISTORY);
+ break;
+ }
+ bufferQueueLock.unlock();
if (!bdb)
throw SQLError(RUNTIME_ERROR, "buffer pool is exhausted\n");
- if (!bdb->isDirty)
- break;
-
- writePage (bdb, WRITE_TYPE_REUSE);
- }
-
- /* Unlink its old incarnation from the page/hash table */
+ if (bdb->pageNumber >= 0)
+ {
+ int slotRemove = PAGENUM_2_SLOT(bdb->pageNumber);
+ Sync lockHashRemove (&syncHashTable[PAGENUM_2_LOCK_INDEX(bdb->pageNumber, slotRemove)], "Cache::getFreeBuffer");
+ lockHashRemove.lock(Exclusive);
- if (bdb->pageNumber >= 0)
- for (Bdb **ptr = hashTable + bdb->pageNumber % hashSize;; ptr = &(*ptr)->hash)
- if (*ptr == bdb)
+ if (bdb->useCount != 1)
{
- *ptr = bdb->hash;
- break;
+ // we lost a race try again
+ bdb->decrementUseCount(REL_HISTORY);
+ lockHashRemove.unlock();
+ continue;
}
- else
- ASSERT (*ptr);
-
- bdb->addRef (lockType COMMA_ADD_HISTORY);
- /* Set new page number and relink into hash table */
+ if (bdb->isDirty)
+ writePage (bdb, WRITE_TYPE_REUSE);
- bdb->hash = hashTable [slot];
- hashTable [slot] = bdb;
- bdb->pageNumber = pageNumber;
- bdb->dbb = dbb;
+ /* Unlink its old incarnation from the page/hash table */
+ for (Bdb **ptr = hashTable + PAGENUM_2_SLOT(bdb->pageNumber) ;; ptr = &(*ptr)->hash)
+ if (*ptr == bdb)
+ {
+ *ptr = bdb->hash;
+ break;
+ }
+ else
+ ASSERT (*ptr);
+ }
-#ifdef COLLECT_BDB_HISTORY
- bdb->initHistory();
-#endif
+ break;
+ }
return bdb;
}
void Cache::validate()
{
+ //Sync bufferQueueLock (&bufferQueue.syncObject, "Cache::moveToHead");
+
+ //bufferQueueLock.lock (Shared);
+ // non-protected access to bufferQueue is DANGEROUS...
for (Bdb *bdb = bufferQueue.last; bdb; bdb = bdb->prior)
{
//IndexPage *page = (IndexPage*) bdb->buffer;
@@ -470,8 +624,8 @@ void Cache::validate()
void Cache::markDirty(Bdb *bdb)
{
- Sync sync (&syncDirty, "Cache::markDirty");
- sync.lock (Exclusive);
+ Sync dirtyLock (&syncDirty, "Cache::markDirty");
+ dirtyLock.lock (Exclusive);
bdb->nextDirty = NULL;
bdb->priorDirty = lastDirty;
@@ -487,8 +641,8 @@ void Cache::markDirty(Bdb *bdb)
void Cache::markClean(Bdb *bdb)
{
- Sync sync (&syncDirty, "Cache::markClean");
- sync.lock (Exclusive);
+ Sync dirtyLock (&syncDirty, "Cache::markClean");
+ dirtyLock.lock (Exclusive);
/***
if (bdb->flushIt)
@@ -600,8 +754,8 @@ void Cache::writePage(Bdb *bdb, int type
if (dbb->shadows)
{
- Sync sync (&dbb->syncClone, "Cache::writePage(2)");
- sync.lock (Shared);
+ Sync cloneLock (&dbb->syncClone, "Cache::writePage(2)");
+ cloneLock.lock (Shared);
for (DatabaseCopy *shadow = dbb->shadows; shadow; shadow = shadow->next)
shadow->rewritePage(bdb);
@@ -610,14 +764,15 @@ void Cache::writePage(Bdb *bdb, int type
void Cache::analyze(Stream *stream)
{
- Sync sync (&syncDirty, "Cache::analyze");
- sync.lock (Shared);
+ Sync dirtyLock (&syncDirty, "Cache::analyze");
+ dirtyLock.lock (Shared);
int inUse = 0;
int dirty = 0;
int dirtyList = 0;
int total = 0;
Bdb *bdb;
+ // non-protected access to bdbs,endBdbs is DANGEROUS...
for (bdb = bdbs; bdb < endBdbs; ++bdb)
{
++total;
@@ -638,17 +793,18 @@ void Cache::analyze(Stream *stream)
void Cache::validateUnique(Bdb *target)
{
- int slot = target->pageNumber % hashSize;
+ int slot = PAGENUM_2_SLOT(target->pageNumber);
+ // WARNING: unlocked walk of hash table.... DANGEROUS
for (Bdb *bdb = hashTable [slot]; bdb; bdb = bdb->hash)
ASSERT (bdb == target || !(bdb->pageNumber == target->pageNumber && bdb->dbb == target->dbb));
}
void Cache::freePage(Dbb *dbb, int32 pageNumber)
{
- Sync sync (&syncObject, "Cache::freePage");
- sync.lock (Shared);
- int slot = pageNumber % hashSize;
+ int slot = PAGENUM_2_SLOT(pageNumber);
+ Sync lockHash (&syncHashTable[PAGENUM_2_LOCK_INDEX(pageNumber, slot)], "Cache::freePage");
+ lockHash.lock(Shared);
// If page exists in cache (usual case), clean it up
@@ -657,7 +813,7 @@ void Cache::freePage(Dbb *dbb, int32 pag
{
if (bdb->isDirty)
{
- sync.unlock();
+ lockHash.unlock();
markClean (bdb);
}
@@ -670,8 +826,8 @@ void Cache::flush(Dbb *dbb)
{
//Sync sync (&syncDirty, "Cache::flush(1)");
//sync.lock (Exclusive);
- Sync sync (&syncObject, "Cache::flush(3)");
- sync.lock (Shared);
+ Sync objectLock (&syncObject, "Cache::flush(3)");
+ objectLock.lock (Shared);
for (Bdb *bdb = bdbs; bdb < endBdbs; ++bdb)
if (bdb->dbb == dbb)
@@ -685,8 +841,8 @@ void Cache::flush(Dbb *dbb)
bool Cache::hasDirtyPages(Dbb *dbb)
{
- Sync sync (&syncDirty, "Cache::hasDirtyPages");
- sync.lock (Shared);
+ Sync dirtyLock (&syncDirty, "Cache::hasDirtyPages");
+ dirtyLock.lock (Shared);
for (Bdb *bdb = firstDirty; bdb; bdb = bdb->nextDirty)
if (bdb->dbb == dbb)
@@ -717,25 +873,21 @@ Bdb* Cache::trialFetch(Dbb* dbb, int32 p
}
ASSERT (pageNumber >= 0);
- int slot = pageNumber % hashSize;
- Sync sync (&syncObject, "Cache::trialFetch");
- sync.lock (Shared);
- int hit = 0;
+ int slot = PAGENUM_2_SLOT(pageNumber);
+ Sync lockHash (&syncHashTable[PAGENUM_2_LOCK_INDEX(pageNumber, slot)], "Cache::trialFetch");
+ lockHash.lock(Shared);
+ Bdb *bdb;
/* If we already have a buffer for this go, we're done */
- Bdb *bdb;
-
- for (bdb = hashTable [slot]; bdb; bdb = bdb->hash)
- if (bdb->pageNumber == pageNumber && bdb->dbb == dbb)
+ bdb = findBdb(dbb, pageNumber, slot);
+ if (bdb)
{
//syncObject.validateShared("Cache::trialFetch");
bdb->incrementUseCount(ADD_HISTORY);
- sync.unlock();
+ lockHash.unlock();
bdb->addRef(lockType COMMA_ADD_HISTORY);
bdb->decrementUseCount(REL_HISTORY);
- hit = 1;
- break;
}
return bdb;
@@ -764,10 +916,9 @@ void Cache::ioThread(void* arg)
void Cache::ioThread(void)
{
- Sync syncThread(&syncThreads, "Cache::ioThread(1)");
+ Sync syncThread(&syncThreads, "Cache::ioThread");
syncThread.lock(Shared);
- Sync flushLock(&syncFlush, "Cache::ioThread(2)");
- Sync sync(&syncObject, "Cache::ioThread(3)");
+ Sync flushLock(&syncFlush, "Cache::ioThread");
Priority priority(database->ioScheduler);
Thread *thread = Thread::getThread("Cache::ioThread");
UCHAR *rawBuffer = new UCHAR[ASYNC_BUFFER_SIZE];
@@ -781,137 +932,135 @@ void Cache::ioThread(void)
{
int32 pageNumber = flushBitmap->nextSet(0);
int count;
- Dbb *dbb;
if (pageNumber >= 0)
{
- int slot = pageNumber % hashSize;
+ Bdb *bdb;
+ Dbb *dbb;
+ int slot = PAGENUM_2_SLOT(pageNumber);
bool hit = false;
Bdb *bdbList = NULL;
UCHAR *p = buffer;
- sync.lock(Shared);
-
- // Look for a page to flush. Then get all his friends
- for (Bdb *bdb = hashTable[slot]; bdb; bdb = bdb->hash)
- if (bdb->pageNumber == pageNumber && bdb->flushIt && bdb->isDirty)
+ // Look for the page to flush.
+ bdb = lockFindBdbIncrementUseCount(pageNumber, slot);
+ if (bdb && bdb->flushIt && bdb->isDirty)
+ {
+ hit = true;
+ count = 0;
+ dbb = bdb->dbb;
+
+ flushBitmap->clear(pageNumber);
+
+ // get all his friends
+ while (p < end)
{
- hit = true;
- count = 0;
- dbb = bdb->dbb;
+ ++count;
+ bdb->addRef(Shared COMMA_ADD_HISTORY);
- if (!bdb->hash)
- flushBitmap->clear(pageNumber);
+ bdb->syncWrite.lock(NULL, Exclusive);
+ bdb->ioThreadNext = bdbList;
+ bdbList = bdb;
- while (p < end)
- {
- ++count;
- bdb->incrementUseCount(ADD_HISTORY);
- sync.unlock();
- bdb->addRef(Shared COMMA_ADD_HISTORY);
- if (falcon_use_sectorcache)
- sectorCache->writePage(bdb);
-
- bdb->syncWrite.lock(NULL, Exclusive);
- bdb->ioThreadNext = bdbList;
- bdbList = bdb;
-
- //ASSERT(!(bdb->flags & BDB_write_pending));
- //bdb->flags |= BDB_write_pending;
- memcpy(p, bdb->buffer, pageSize);
- p += pageSize;
- bdb->flushIt = false;
- markClean(bdb);
- bdb->isDirty = false;
- bdb->release(REL_HISTORY);
- sync.lock(Shared);
-
- if ( !(bdb = findBdb(dbb, bdb->pageNumber + 1)) )
- break;
-
- if (!bdb->isDirty && !continueWrite(bdb))
- break;
- }
+ //ASSERT(!(bdb->flags & BDB_write_pending));
+ //bdb->flags |= BDB_write_pending;
+ memcpy(p, bdb->buffer, pageSize);
+ p += pageSize;
+ bdb->flushIt = false;
+ markClean(bdb);
+ bdb->isDirty = false;
+ bdb->release(REL_HISTORY);
- if (sync.state != None)
- sync.unlock();
-
- flushLock.unlock();
- //Log::debug(" %d Writing %s %d pages: %d - %d\n", thread->threadId, (const char*) dbb->fileName, count, pageNumber, pageNumber + count - 1);
- int length = p - buffer;
- priority.schedule(PRIORITY_LOW);
+ bdb = lockFindBdbIncrementUseCount(dbb, bdb->pageNumber + 1);
+ if (!bdb)
+ break;
- try
+ if (!bdb->isDirty && !continueWrite(bdb))
{
- priority.schedule(PRIORITY_LOW);
- dbb->writePages(pageNumber, length, buffer, WRITE_TYPE_FLUSH);
+ bdb->decrementUseCount(REL_HISTORY);
+ break;
}
- catch (SQLException& exception)
+ }
+
+ flushLock.unlock();
+ //Log::debug(" %d Writing %s %d pages: %d - %d\n", thread->threadId, (const char*) dbb->fileName, count, pageNumber, pageNumber + count - 1);
+ int length = p - buffer;
+ priority.schedule(PRIORITY_LOW);
+
+ try
+ {
+ priority.schedule(PRIORITY_LOW);
+ dbb->writePages(pageNumber, length, buffer, WRITE_TYPE_FLUSH);
+ }
+ catch (SQLException& exception)
+ {
+ priority.finished();
+
+ if (exception.getSqlcode() != DEVICE_FULL)
+ throw;
+
+ database->setIOError(&exception);
+
+ for (bool error = true; error;)
{
- priority.finished();
-
- if (exception.getSqlcode() != DEVICE_FULL)
- throw;
-
- database->setIOError(&exception);
-
- for (bool error = true; error;)
+ if (thread->shutdownInProgress)
{
- if (thread->shutdownInProgress)
- {
- Bdb *next;
+ Bdb *next;
- for (bdb = bdbList; bdb; bdb = next)
- {
- //bdb->flags &= ~BDB_write_pending;
- next = bdb->ioThreadNext;
- bdb->syncWrite.unlock();
- bdb->decrementUseCount(REL_HISTORY);
- }
-
- return;
- }
-
- thread->sleep(1000);
-
- try
+ for (bdb = bdbList; bdb; bdb = next)
{
- priority.schedule(PRIORITY_LOW);
- dbb->writePages(pageNumber, length, buffer, WRITE_TYPE_FLUSH);
- error = false;
- database->clearIOError();
+ //bdb->flags &= ~BDB_write_pending;
+ next = bdb->ioThreadNext;
+ bdb->syncWrite.unlock();
+ bdb->decrementUseCount(REL_HISTORY);
}
- catch (SQLException& exception2)
- {
- priority.finished();
- if (exception2.getSqlcode() != DEVICE_FULL)
- throw;
- }
+ return;
+ }
+
+ thread->sleep(1000);
+
+ try
+ {
+ priority.schedule(PRIORITY_LOW);
+ dbb->writePages(pageNumber, length, buffer, WRITE_TYPE_FLUSH);
+ error = false;
+ database->clearIOError();
+ }
+ catch (SQLException& exception2)
+ {
+ priority.finished();
+
+ if (exception2.getSqlcode() != DEVICE_FULL)
+ throw;
}
}
+ }
- priority.finished();
- Bdb *next;
+ priority.finished();
+ Bdb *next;
- for (bdb = bdbList; bdb; bdb = next)
- {
- //ASSERT(bdb->flags & BDB_write_pending);
- //bdb->flags &= ~BDB_write_pending;
- next = bdb->ioThreadNext;
- bdb->syncWrite.unlock();
- bdb->decrementUseCount(REL_HISTORY);
- }
-
- flushLock.lock(Exclusive);
- ++physicalWrites;
-
- break;
+ for (bdb = bdbList; bdb; bdb = next)
+ {
+ //ASSERT(bdb->flags & BDB_write_pending);
+ //bdb->flags &= ~BDB_write_pending;
+ next = bdb->ioThreadNext;
+ bdb->syncWrite.unlock();
+ bdb->decrementUseCount(REL_HISTORY);
}
+
+ flushLock.lock(Exclusive);
+ ++physicalWrites;
+
+ }
+ else
+ {
+ if (bdb)
+ bdb->decrementUseCount(REL_HISTORY);
+ }
if (!hit)
{
- sync.unlock();
flushBitmap->clear(pageNumber);
}
}
@@ -940,8 +1089,8 @@ void Cache::ioThread(void)
thread->sleep();
flushLock.lock(Exclusive);
- }
}
+ } // for ever
delete [] rawBuffer;
}
@@ -974,8 +1123,8 @@ bool Cache::continueWrite(Bdb* startingB
void Cache::shutdown(void)
{
shutdownThreads();
- Sync sync (&syncDirty, "Cache::shutdown");
- sync.lock (Exclusive);
+ Sync dirtyLock (&syncDirty, "Cache::shutdown");
+ dirtyLock.lock (Exclusive);
for (Bdb *bdb = firstDirty; bdb; bdb = bdb->nextDirty)
bdb->dbb->writePage(bdb, WRITE_TYPE_SHUTDOWN);
@@ -995,8 +1144,8 @@ void Cache::shutdownThreads(void)
ioThreads[n] = 0;
}
- Sync sync(&syncThreads, "Cache::shutdownThreads");
- sync.lock(Exclusive);
+ Sync lockThreads(&syncThreads, "Cache::shutdownThreads");
+ lockThreads.lock(Exclusive);
}
void Cache::analyzeFlush(void)
@@ -1048,7 +1197,8 @@ void Cache::openTraceFile(void)
if (traceFile)
closeTraceFile();
- traceFile = fopen(TRACE_FILE, "w");
+ traceFile = fopen(TRACE_FILE, "a+");
+ setlinebuf(traceFile);
#endif
}
@@ -1065,6 +1215,6 @@ void Cache::closeTraceFile(void)
void Cache::flushWait(void)
{
- Sync sync(&syncWait, "Cache::flushWait");
- sync.lock(Shared);
+ Sync waitLock(&syncWait, "Cache::flushWait");
+ waitLock.lock(Shared);
}
=== modified file 'storage/falcon/Cache.h'
--- a/storage/falcon/Cache.h 2008-06-06 19:20:10 +0000
+++ b/storage/falcon/Cache.h 2008-08-18 13:47:57 +0000
@@ -28,6 +28,17 @@
#include "SyncObject.h"
#include "Queue.h"
+// define DEBUG_SYNC_HASH_TABLE_SIZE to cause more contention and test for race conditions
+//#define DEBUG_SYNC_HASH_TABLE_SIZE (0x01 << 1)
+#ifdef DEBUG_SYNC_HASH_TABLE_SIZE
+# define DEBUG_SYNC_HASH_TABLE_MASK (DEBUG_SYNC_HASH_TABLE_SIZE - 1)
+# define PAGENUM_2_LOCK_INDEX(_pgnum, _slot) ((_pgnum) & DEBUG_SYNC_HASH_TABLE_MASK)
+#else /* DEBUG_SYNC_HASH_TABLE_SIZE */
+# define PAGENUM_2_LOCK_INDEX(_pgnum, _slot) ((_slot))
+#endif /* DEBUG_SYNC_HASH_TABLE_SIZE */
+
+#define PAGENUM_2_SLOT(_pgnum) ((_pgnum) & hashMask)
+
class Bdb;
class Dbb;
class PageWriter;
@@ -83,14 +94,18 @@ public:
bool flushing;
protected:
- Bdb* findBuffer (Dbb *dbb, int pageNumber, LockType lockType);
+ Bdb* getFreeBuffer(void);
+ Bdb* findBdb(Dbb* dbb, int32 pageNumber, int slot);
Bdb* findBdb(Dbb* dbb, int32 pageNumber);
+ Bdb* lockFindBdbIncrementUseCount(Dbb* dbb, int32 pageNumber);
+ Bdb* lockFindBdbIncrementUseCount(int32 pageNumber, int slot);
int64 flushArg;
Bdb *bdbs;
Bdb *endBdbs;
Queue<Bdb> bufferQueue;
Bdb **hashTable;
+ SyncObject *syncHashTable;
Bdb *firstDirty;
Bdb *lastDirty;
Bitmap *flushBitmap;
@@ -105,12 +120,13 @@ protected:
int flushPages;
int physicalWrites;
int hashSize;
+ unsigned int hashMask;
int pageSize;
- int upperFraction;
+ unsigned int upperFraction;
int numberHunks;
int numberDirtyPages;
int numberIoThreads;
- volatile int bufferAge;
+ volatile uint64 bufferAge;
public:
void flushWait(void);
};