2793 Kelly Long 2008-08-21
Improve page cache at higher concurrency.
WL4479: Falcon Engine - Page Cache: high concurrency Get Free Buffer
WL4480: Falcon Engine - Page Cache: hash table lock per bucket
WL4481: Falcon Engine - Make hash table sizes power of two
modified:
storage/falcon/Cache.cpp
storage/falcon/Cache.h
2792 Olav Sandstaa 2008-08-21
Fix for Bug#38766 Falcon hangs during start-up if compiled for 64 bit SPARC
Wrong CompareAndChange routine was used for in interlockedAdd. Replaced routine for
exchanging
pointers with routine for exchanging integer values. This problem resultet in errors
on big-endian 64 bit machines.
modified:
storage/falcon/Interlock.h
=== modified file 'storage/falcon/Cache.cpp'
--- a/storage/falcon/Cache.cpp 2008-07-24 08:45:03 +0000
+++ b/storage/falcon/Cache.cpp 2008-08-21 14:45:38 +0000
@@ -72,7 +72,21 @@ Cache::Cache(Database *db, int pageSz, i
database = db;
panicShutdown = false;
pageSize = pageSz;
- hashSize = hashSz;
+
+ unsigned int highBit;
+ for (highBit=0x01; highBit < hashSz; highBit= highBit << 1) { }
+
+ // if there are more than 4096 buckets then lets round down
+ // else lets round up
+ if (highBit >= 0x00001000) {
+ // KEL use power of two rounded down
+ hashSize = highBit << 1;
+ } else {
+ // KEL use power of two rounded up
+ hashSize = highBit;
+ }
+
+ hashMask = hashSize - 1;
numberBuffers = numBuffers;
upperFraction = numberBuffers / 4;
bufferAge = 0;
@@ -80,8 +94,21 @@ Cache::Cache(Database *db, int pageSz, i
lastDirty = NULL;
numberDirtyPages = 0;
pageWriter = NULL;
- hashTable = new Bdb* [hashSz];
+ hashTable = new Bdb* [hashSize];
memset (hashTable, 0, sizeof (Bdb*) * hashSize);
+#ifdef DEBUG_SYNC_HASH_TABLE_SIZE
+ syncHashTable = new SyncObject [DEBUG_SYNC_HASH_TABLE_SIZE];
+ for (int loop = 0; loop < DEBUG_SYNC_HASH_TABLE_SIZE; loop ++)
+ syncHashTable[loop].setName("Cache::syncHashTable");
+#else /* DEBUG_SYNC_HASH_TABLE_SIZE */
+ syncHashTable = new SyncObject [hashSize];
+ for (int loop = 0; loop < hashSize; loop ++)
+ {
+ char tmpName[128];
+ snprintf(tmpName,120,"Cache::syncHashTable[%d]",loop);
+ syncHashTable[loop].setName(tmpName);
+ }
+#endif /* DEBUG_SYNC_HASH_TABLE_SIZE */
sectorCache = new SectorCache(sectorCacheSize / SECTOR_BUFFER_SIZE, pageSize);
uint64 n = ((uint64) pageSize * numberBuffers + cacheHunkSize - 1) / cacheHunkSize;
@@ -103,6 +130,7 @@ Cache::Cache(Database *db, int pageSz, i
try
{
+ // non-protected access to bdbs,endBdbs is OK during initialization
bdbs = new Bdb [numberBuffers];
endBdbs = bdbs + numberBuffers;
int remaining = 0;
@@ -121,6 +149,7 @@ Cache::Cache(Database *db, int pageSz, i
}
bdb->cache = this;
+ // non-protected access to bufferQueue is OK during initialization
bufferQueue.append(bdb);
bdb->buffer = (Page*) stuff;
stuff += pageSize;
@@ -150,6 +179,7 @@ Cache::~Cache()
closeTraceFile();
delete [] hashTable;
+ delete [] syncHashTable;
delete [] bdbs;
delete [] ioThreads;
delete flushBitmap;
@@ -167,14 +197,16 @@ Cache::~Cache()
Bdb* Cache::probePage(Dbb *dbb, int32 pageNumber)
{
ASSERT (pageNumber >= 0);
- Sync sync (&syncObject, "Cache::probePage");
- sync.lock (Shared);
- Bdb *bdb = findBdb(dbb, pageNumber);
+ int slot = PAGENUM_2_SLOT(pageNumber);
+ Sync lockHash (&syncHashTable[PAGENUM_2_LOCK_INDEX(pageNumber, slot)],
"Cache::probePage");
+ lockHash.lock (Shared);
+ Bdb *bdb;
+ bdb = findBdb(dbb, pageNumber, slot);
if (bdb)
{
bdb->incrementUseCount(ADD_HISTORY);
- sync.unlock();
+ lockHash.unlock();
if (bdb->buffer->pageType == PAGE_free)
{
@@ -189,15 +221,57 @@ Bdb* Cache::probePage(Dbb *dbb, int32 pa
return bdb;
}
+ lockHash.unlock();
return NULL;
}
-Bdb* Cache::findBdb(Dbb* dbb, int32 pageNumber)
+Bdb* Cache::findBdb(Dbb* dbb, int32 pageNumber, int slot)
+{
+ for (Bdb *bdb = hashTable [slot]; bdb; bdb = bdb->hash)
{
- for (Bdb *bdb = hashTable [pageNumber % hashSize]; bdb; bdb = bdb->hash)
if (bdb->pageNumber == pageNumber && bdb->dbb == dbb)
+ {
+ return bdb;
+ }
+ }
+
+ return NULL;
+}
+
+Bdb* Cache::findBdb(Dbb* dbb, int32 pageNumber)
+{
+ return (findBdb(dbb, pageNumber, PAGENUM_2_SLOT(pageNumber)));
+}
+
+Bdb* Cache::lockFindBdbIncrementUseCount(Dbb* dbb, int32 pageNumber)
+{
+ int slot = PAGENUM_2_SLOT(pageNumber);
+ Sync lockHash (&syncHashTable[PAGENUM_2_LOCK_INDEX(pageNumber, slot)],
"Cache::lockFindBdbIncrementUseCount");
+ lockHash.lock (Shared);
+ Bdb *bdb;
+
+ bdb = findBdb(dbb, pageNumber, slot);
+ if (bdb != NULL)
+ bdb->incrementUseCount(ADD_HISTORY);
+
+ lockHash.unlock();
+ return bdb;
+}
+
+Bdb* Cache::lockFindBdbIncrementUseCount(int32 pageNumber, int slot)
+{
+ Sync lockHash (&syncHashTable[PAGENUM_2_LOCK_INDEX(pageNumber, slot)],
"Cache::lockFindBdbIncrementUseCount");
+ lockHash.lock (Shared);
+
+ for (Bdb *bdb = hashTable [slot]; bdb; bdb = bdb->hash)
+ if (bdb->pageNumber == pageNumber)
+ {
+ bdb->incrementUseCount(ADD_HISTORY);
+ lockHash.unlock();
return bdb;
+ }
+ lockHash.unlock();
return NULL;
}
@@ -217,51 +291,46 @@ Bdb* Cache::fetchPage(Dbb *dbb, int32 pa
#endif
ASSERT (pageNumber >= 0);
- int slot = pageNumber % hashSize;
- LockType actual = lockType;
- Sync sync (&syncObject, "Cache::fetchPage");
- sync.lock (Shared);
- int hit = 0;
-
- /* If we already have a buffer for this go, we're done */
+ int slot = PAGENUM_2_SLOT(pageNumber);
+ Sync lockHash (&syncHashTable[PAGENUM_2_LOCK_INDEX(pageNumber, slot)],
"Cache::fetchPage");
+ lockHash.lock (Shared);
Bdb *bdb;
- for (bdb = hashTable [slot]; bdb; bdb = bdb->hash)
- if (bdb->pageNumber == pageNumber && bdb->dbb == dbb)
+ bdb = findBdb(dbb, pageNumber, slot);
+ if (!bdb)
{
- //syncObject.validateShared("Cache::fetchPage");
- bdb->incrementUseCount(ADD_HISTORY);
- sync.unlock();
- bdb->addRef(lockType COMMA_ADD_HISTORY);
- bdb->decrementUseCount(REL_HISTORY);
- hit = 1;
- break;
- }
+ lockHash.unlock();
+ // get getFreeBuffer() locks a hash bucket to remove the candidate bdb
+ // if we locked out hash bucket before the call then we could have
+ // a deadlock
+ // thus we get the free buffer before we lock the hash bucket we will
+ // be inserting into. This avoids a dead lock but generates a race
+ // we take care of the race by reversing the getFreeBuffer() work
+ // when we lose the race
+ Bdb *bdbAvailable;
+ bdbAvailable = getFreeBuffer();
+ lockHash.lock(Exclusive);
+ bdb = findBdb(dbb, pageNumber, slot);
if (!bdb)
{
- sync.unlock();
- actual = Exclusive;
- sync.lock(Exclusive);
-
- for (bdb = hashTable [slot]; bdb; bdb = bdb->hash)
- if (bdb->pageNumber == pageNumber && bdb->dbb == dbb)
- {
- //syncObject.validateExclusive("Cache::fetchPage (retry)");
- bdb->incrementUseCount(ADD_HISTORY);
- sync.unlock();
- bdb->addRef(lockType COMMA_ADD_HISTORY);
- bdb->decrementUseCount(REL_HISTORY);
- hit = 2;
- break;
- }
+ // we won the race so lets use the free bdb
+ /* Set new page number and relink into hash table */
+ bdbAvailable->addRef (Exclusive COMMA_ADD_HISTORY);
+ bdbAvailable->decrementUseCount(REL_HISTORY);
+
+ bdbAvailable->hash = hashTable [slot];
+ hashTable [slot] = bdbAvailable;
+ bdbAvailable->pageNumber = pageNumber;
+ bdbAvailable->dbb = dbb;
- if (!bdb)
- {
- bdb = findBuffer(dbb, pageNumber, actual);
+#ifdef COLLECT_BDB_HISTORY
+ bdbAvailable->initHistory();
+#endif
+ bdb = bdbAvailable;
moveToHead(bdb);
- sync.unlock();
+ lockHash.unlock();
#ifdef STOP_PAGE
if (bdb->pageNumber == STOP_PAGE)
@@ -278,9 +347,30 @@ Bdb* Cache::fetchPage(Dbb *dbb, int32 pa
#ifdef HAVE_PAGE_NUMBER
ASSERT(bdb->buffer->pageNumber == pageNumber);
#endif
- if (actual != lockType)
+ if (Exclusive != lockType)
bdb->downGrade(lockType);
}
+ else
+ {
+ // lost a race. put our available back to useable
+ bdbAvailable->pageNumber = -1;
+ bdbAvailable->dbb = NULL;
+ bdbAvailable->decrementUseCount(REL_HISTORY);
+
+ //syncObject.validateExclusive("Cache::fetchPage (retry)");
+ bdb->incrementUseCount(ADD_HISTORY);
+ lockHash.unlock();
+ bdb->addRef(lockType COMMA_ADD_HISTORY);
+ bdb->decrementUseCount(REL_HISTORY);
+ }
+ }
+ else
+ {
+ //syncObject.validateShared("Cache::fetchPage");
+ bdb->incrementUseCount(ADD_HISTORY);
+ lockHash.unlock();
+ bdb->addRef(lockType COMMA_ADD_HISTORY);
+ bdb->decrementUseCount(REL_HISTORY);
}
Page *page = bdb->buffer;
@@ -304,9 +394,9 @@ Bdb* Cache::fetchPage(Dbb *dbb, int32 pa
// If buffer has moved out of the upper "fraction" of the LRU queue, move it back up
+ // non-protected access to age is harmless since it is fuzzy anyway
if (bdb->age < bufferAge - (uint64) upperFraction)
{
- sync.lock (Exclusive);
moveToHead (bdb);
}
@@ -319,9 +409,10 @@ Bdb* Cache::fetchPage(Dbb *dbb, int32 pa
Bdb* Cache::fakePage(Dbb *dbb, int32 pageNumber, PageType type, TransId transId)
{
- Sync sync(&syncObject, "Cache::fakePage");
- sync.lock(Exclusive);
- int slot = pageNumber % hashSize;
+ int slot = PAGENUM_2_SLOT(pageNumber);
+ Sync lockHash (&syncHashTable[PAGENUM_2_LOCK_INDEX(pageNumber, slot)],
"Cache::fakePage");
+ lockHash.lock(Exclusive);
+ Bdb *bdb;
#ifdef STOP_PAGE
if (pageNumber == STOP_PAGE)
@@ -330,25 +421,64 @@ Bdb* Cache::fakePage(Dbb *dbb, int32 pag
/* If we already have a buffer for this, we're done */
- Bdb *bdb;
-
- for (bdb = hashTable [slot]; bdb; bdb = bdb->hash)
- if (bdb->pageNumber == pageNumber && bdb->dbb == dbb)
+ bdb = findBdb(dbb, pageNumber, slot);
+ if (!bdb)
{
- if (bdb->syncObject.isLocked())
+ lockHash.unlock();
+ // get getFreeBuffer() locks a hash bucket to remove the candidate bdb
+ // if we locked out hash bucket before the call then we could have
+ // a deadlock
+ // thus we get the free buffer before we lock the hash bucket we will
+ // be inserting into. This avoids a dead lock but generates a race
+ // we take care of the race by reversing the getFreeBuffer() work
+ // when we lose the race
+ Bdb *bdbAvailable;
+ bdbAvailable = getFreeBuffer();
+ lockHash.lock(Exclusive);
+
+ bdb = findBdb(dbb, pageNumber, slot);
+ if (!bdb)
{
- // The pageWriter may still be cleaning up this freed page with a shared lock
- ASSERT(bdb->buffer->pageType == PAGE_free);
- ASSERT(bdb->syncObject.getState() >= 0);
- }
+ // we won the race so lets use the free bdb
+ /* Set new page number and relink into hash table */
+ bdbAvailable->addRef (Exclusive COMMA_ADD_HISTORY);
+ bdbAvailable->decrementUseCount(REL_HISTORY);
- bdb->addRef(Exclusive COMMA_ADD_HISTORY);
+ bdbAvailable->hash = hashTable [slot];
+ hashTable [slot] = bdbAvailable;
+ bdbAvailable->pageNumber = pageNumber;
+ bdbAvailable->dbb = dbb;
+
+#ifdef COLLECT_BDB_HISTORY
+ bdbAvailable->initHistory();
+#endif
+ bdb = bdbAvailable;
+ moveToHead(bdb);
+ lockHash.unlock();
- break;
}
+ else
+ {
+ // lost a race. put our available back to useable
+ bdbAvailable->pageNumber = -1;
+ bdbAvailable->dbb = NULL;
+ bdbAvailable->decrementUseCount(REL_HISTORY);
- if (!bdb)
- bdb = findBuffer(dbb, pageNumber, Exclusive);
+ //syncObject.validateExclusive("Cache::fakePage (retry)");
+ bdb->incrementUseCount(ADD_HISTORY);
+ lockHash.unlock();
+ bdb->addRef(Exclusive COMMA_ADD_HISTORY);
+ bdb->decrementUseCount(REL_HISTORY);
+ }
+ }
+ else
+ {
+ //syncObject.validateShared("Cache::fakePage");
+ bdb->incrementUseCount(ADD_HISTORY);
+ lockHash.unlock();
+ bdb->addRef(Exclusive COMMA_ADD_HISTORY);
+ bdb->decrementUseCount(REL_HISTORY);
+ }
if (!dbb->isReadOnly)
bdb->mark(transId);
@@ -363,14 +493,14 @@ Bdb* Cache::fakePage(Dbb *dbb, int32 pag
void Cache::flush(int64 arg)
{
Sync flushLock(&syncFlush, "Cache::flush(1)");
- Sync sync(&syncDirty, "Cache::flush(2)");
+ Sync dirtyLock(&syncDirty, "Cache::flush(2)");
flushLock.lock(Exclusive);
if (flushing)
return;
syncWait.lock(NULL, Exclusive);
- sync.lock(Shared);
+ dirtyLock.lock(Shared);
//Log::debug(%d: "Initiating flush\n", dbb->deltaTime);
flushArg = arg;
flushPages = 0;
@@ -388,7 +518,7 @@ void Cache::flush(int64 arg)
flushStart = database->timestamp;
flushing = true;
- sync.unlock();
+ dirtyLock.unlock();
flushLock.unlock();
for (int n = 0; n < numberIoThreads; ++n)
@@ -398,69 +528,92 @@ void Cache::flush(int64 arg)
void Cache::moveToHead(Bdb * bdb)
{
+ Sync bufferQueueLock (&bufferQueue.syncObject, "Cache::moveToHead");
+
+ bufferQueueLock.lock (Exclusive);
bdb->age = bufferAge++;
bufferQueue.remove(bdb);
bufferQueue.prepend(bdb);
//validateUnique (bdb);
}
-Bdb* Cache::findBuffer(Dbb *dbb, int pageNumber, LockType lockType)
+Bdb* Cache::getFreeBuffer(void)
{
- //syncObject.validateExclusive("Cache::findBuffer");
- int slot = pageNumber % hashSize;
- Sync sync(&syncDirty, "Cache::findBuffer");
-
- /* Find least recently used, not-in-use buffer */
-
+ Sync bufferQueueLock (&bufferQueue.syncObject, "Cache::getFreeBuffer");
+ unsigned int count;
Bdb *bdb;
// Find a candidate BDB.
-
for (;;)
{
- for (bdb = bufferQueue.last; bdb; bdb = bdb->prior)
+ bufferQueueLock.lock (Exclusive);
+ // find a candidate that is NOT in use and NOT dirty and in the tail fraction of the
LRU
+ for (count = 0, bdb = bufferQueue.last; bdb && count < upperFraction; bdb =
bdb->prior, count++)
if (bdb->useCount == 0)
- break;
+ {
+ if (!bdb->isDirty)
+ {
+ bdb->incrementUseCount(REL_HISTORY);
+ break;
+ }
+ }
+ else
+ {
+ moveToHead(bdb);
+ }
+ if (!bdb)
+ // find a candidate that is NOT in use, could be dirty
+ for (bdb = bufferQueue.last; bdb; bdb = bdb->prior)
+ if (bdb->useCount == 0)
+ {
+ bdb->incrementUseCount(REL_HISTORY);
+ break;
+ }
+ bufferQueueLock.unlock();
if (!bdb)
throw SQLError(RUNTIME_ERROR, "buffer pool is exhausted\n");
- if (!bdb->isDirty)
- break;
-
- writePage (bdb, WRITE_TYPE_REUSE);
- }
-
- /* Unlink its old incarnation from the page/hash table */
+ if (bdb->pageNumber >= 0)
+ {
+ int slotRemove = PAGENUM_2_SLOT(bdb->pageNumber);
+ Sync lockHashRemove (&syncHashTable[PAGENUM_2_LOCK_INDEX(bdb->pageNumber,
slotRemove)], "Cache::getFreeBuffer");
+ lockHashRemove.lock(Exclusive);
- if (bdb->pageNumber >= 0)
- for (Bdb **ptr = hashTable + bdb->pageNumber % hashSize;; ptr =
&(*ptr)->hash)
- if (*ptr == bdb)
+ if (bdb->useCount != 1)
{
- *ptr = bdb->hash;
- break;
+ // we lost a race try again
+ bdb->decrementUseCount(REL_HISTORY);
+ lockHashRemove.unlock();
+ continue;
}
- else
- ASSERT (*ptr);
- bdb->addRef (lockType COMMA_ADD_HISTORY);
-
- /* Set new page number and relink into hash table */
+ if (bdb->isDirty)
+ writePage (bdb, WRITE_TYPE_REUSE);
- bdb->hash = hashTable [slot];
- hashTable [slot] = bdb;
- bdb->pageNumber = pageNumber;
- bdb->dbb = dbb;
+ /* Unlink its old incarnation from the page/hash table */
+ for (Bdb **ptr = hashTable + PAGENUM_2_SLOT(bdb->pageNumber) ;; ptr =
&(*ptr)->hash)
+ if (*ptr == bdb)
+ {
+ *ptr = bdb->hash;
+ break;
+ }
+ else
+ ASSERT (*ptr);
+ }
-#ifdef COLLECT_BDB_HISTORY
- bdb->initHistory();
-#endif
+ break;
+ }
return bdb;
}
void Cache::validate()
{
+ //Sync bufferQueueLock (&bufferQueue.syncObject, "Cache::moveToHead");
+
+ //bufferQueueLock.lock (Shared);
+ // non-protected access to bufferQueue is DANGEROUS...
for (Bdb *bdb = bufferQueue.last; bdb; bdb = bdb->prior)
{
//IndexPage *page = (IndexPage*) bdb->buffer;
@@ -470,8 +623,8 @@ void Cache::validate()
void Cache::markDirty(Bdb *bdb)
{
- Sync sync (&syncDirty, "Cache::markDirty");
- sync.lock (Exclusive);
+ Sync dirtyLock (&syncDirty, "Cache::markDirty");
+ dirtyLock.lock (Exclusive);
bdb->nextDirty = NULL;
bdb->priorDirty = lastDirty;
@@ -487,8 +640,8 @@ void Cache::markDirty(Bdb *bdb)
void Cache::markClean(Bdb *bdb)
{
- Sync sync (&syncDirty, "Cache::markClean");
- sync.lock (Exclusive);
+ Sync dirtyLock (&syncDirty, "Cache::markClean");
+ dirtyLock.lock (Exclusive);
/***
if (bdb->flushIt)
@@ -600,8 +753,8 @@ void Cache::writePage(Bdb *bdb, int type
if (dbb->shadows)
{
- Sync sync (&dbb->syncClone, "Cache::writePage(2)");
- sync.lock (Shared);
+ Sync cloneLock (&dbb->syncClone, "Cache::writePage(2)");
+ cloneLock.lock (Shared);
for (DatabaseCopy *shadow = dbb->shadows; shadow; shadow = shadow->next)
shadow->rewritePage(bdb);
@@ -610,14 +763,15 @@ void Cache::writePage(Bdb *bdb, int type
void Cache::analyze(Stream *stream)
{
- Sync sync (&syncDirty, "Cache::analyze");
- sync.lock (Shared);
+ Sync dirtyLock (&syncDirty, "Cache::analyze");
+ dirtyLock.lock (Shared);
int inUse = 0;
int dirty = 0;
int dirtyList = 0;
int total = 0;
Bdb *bdb;
+ // non-protected access to bdbs,endBdbs is DANGEROUS...
for (bdb = bdbs; bdb < endBdbs; ++bdb)
{
++total;
@@ -638,17 +792,18 @@ void Cache::analyze(Stream *stream)
void Cache::validateUnique(Bdb *target)
{
- int slot = target->pageNumber % hashSize;
+ int slot = PAGENUM_2_SLOT(target->pageNumber);
+ // WARNING: unlocked walk of hash table.... DANGEROUS
for (Bdb *bdb = hashTable [slot]; bdb; bdb = bdb->hash)
ASSERT (bdb == target || !(bdb->pageNumber == target->pageNumber &&
bdb->dbb == target->dbb));
}
void Cache::freePage(Dbb *dbb, int32 pageNumber)
{
- Sync sync (&syncObject, "Cache::freePage");
- sync.lock (Shared);
- int slot = pageNumber % hashSize;
+ int slot = PAGENUM_2_SLOT(pageNumber);
+ Sync lockHash (&syncHashTable[PAGENUM_2_LOCK_INDEX(pageNumber, slot)],
"Cache::freePage");
+ lockHash.lock(Shared);
// If page exists in cache (usual case), clean it up
@@ -657,7 +812,7 @@ void Cache::freePage(Dbb *dbb, int32 pag
{
if (bdb->isDirty)
{
- sync.unlock();
+ lockHash.unlock();
markClean (bdb);
}
@@ -670,8 +825,8 @@ void Cache::flush(Dbb *dbb)
{
//Sync sync (&syncDirty, "Cache::flush(1)");
//sync.lock (Exclusive);
- Sync sync (&syncObject, "Cache::flush(3)");
- sync.lock (Shared);
+ Sync objectLock (&syncObject, "Cache::flush(3)");
+ objectLock.lock (Shared);
for (Bdb *bdb = bdbs; bdb < endBdbs; ++bdb)
if (bdb->dbb == dbb)
@@ -685,8 +840,8 @@ void Cache::flush(Dbb *dbb)
bool Cache::hasDirtyPages(Dbb *dbb)
{
- Sync sync (&syncDirty, "Cache::hasDirtyPages");
- sync.lock (Shared);
+ Sync dirtyLock (&syncDirty, "Cache::hasDirtyPages");
+ dirtyLock.lock (Shared);
for (Bdb *bdb = firstDirty; bdb; bdb = bdb->nextDirty)
if (bdb->dbb == dbb)
@@ -717,25 +872,21 @@ Bdb* Cache::trialFetch(Dbb* dbb, int32 p
}
ASSERT (pageNumber >= 0);
- int slot = pageNumber % hashSize;
- Sync sync (&syncObject, "Cache::trialFetch");
- sync.lock (Shared);
- int hit = 0;
+ int slot = PAGENUM_2_SLOT(pageNumber);
+ Sync lockHash (&syncHashTable[PAGENUM_2_LOCK_INDEX(pageNumber, slot)],
"Cache::trialFetch");
+ lockHash.lock(Shared);
+ Bdb *bdb;
/* If we already have a buffer for this go, we're done */
- Bdb *bdb;
-
- for (bdb = hashTable [slot]; bdb; bdb = bdb->hash)
- if (bdb->pageNumber == pageNumber && bdb->dbb == dbb)
+ bdb = findBdb(dbb, pageNumber, slot);
+ if (bdb)
{
//syncObject.validateShared("Cache::trialFetch");
bdb->incrementUseCount(ADD_HISTORY);
- sync.unlock();
+ lockHash.unlock();
bdb->addRef(lockType COMMA_ADD_HISTORY);
bdb->decrementUseCount(REL_HISTORY);
- hit = 1;
- break;
}
return bdb;
@@ -764,10 +915,9 @@ void Cache::ioThread(void* arg)
void Cache::ioThread(void)
{
- Sync syncThread(&syncThreads, "Cache::ioThread(1)");
+ Sync syncThread(&syncThreads, "Cache::ioThread");
syncThread.lock(Shared);
- Sync flushLock(&syncFlush, "Cache::ioThread(2)");
- Sync sync(&syncObject, "Cache::ioThread(3)");
+ Sync flushLock(&syncFlush, "Cache::ioThread");
Priority priority(database->ioScheduler);
Thread *thread = Thread::getThread("Cache::ioThread");
UCHAR *rawBuffer = new UCHAR[ASYNC_BUFFER_SIZE];
@@ -781,137 +931,135 @@ void Cache::ioThread(void)
{
int32 pageNumber = flushBitmap->nextSet(0);
int count;
- Dbb *dbb;
if (pageNumber >= 0)
{
- int slot = pageNumber % hashSize;
+ Bdb *bdb;
+ Dbb *dbb;
+ int slot = PAGENUM_2_SLOT(pageNumber);
bool hit = false;
Bdb *bdbList = NULL;
UCHAR *p = buffer;
- sync.lock(Shared);
-
- // Look for a page to flush. Then get all his friends
- for (Bdb *bdb = hashTable[slot]; bdb; bdb = bdb->hash)
- if (bdb->pageNumber == pageNumber && bdb->flushIt &&
bdb->isDirty)
+ // Look for the page to flush.
+ bdb = lockFindBdbIncrementUseCount(pageNumber, slot);
+ if (bdb && bdb->flushIt && bdb->isDirty)
+ {
+ hit = true;
+ count = 0;
+ dbb = bdb->dbb;
+
+ flushBitmap->clear(pageNumber);
+
+ // get all his friends
+ while (p < end)
{
- hit = true;
- count = 0;
- dbb = bdb->dbb;
+ ++count;
+ bdb->addRef(Shared COMMA_ADD_HISTORY);
- if (!bdb->hash)
- flushBitmap->clear(pageNumber);
+ bdb->syncWrite.lock(NULL, Exclusive);
+ bdb->ioThreadNext = bdbList;
+ bdbList = bdb;
- while (p < end)
- {
- ++count;
- bdb->incrementUseCount(ADD_HISTORY);
- sync.unlock();
- bdb->addRef(Shared COMMA_ADD_HISTORY);
- if (falcon_use_sectorcache)
- sectorCache->writePage(bdb);
-
- bdb->syncWrite.lock(NULL, Exclusive);
- bdb->ioThreadNext = bdbList;
- bdbList = bdb;
-
- //ASSERT(!(bdb->flags & BDB_write_pending));
- //bdb->flags |= BDB_write_pending;
- memcpy(p, bdb->buffer, pageSize);
- p += pageSize;
- bdb->flushIt = false;
- markClean(bdb);
- bdb->isDirty = false;
- bdb->release(REL_HISTORY);
- sync.lock(Shared);
-
- if ( !(bdb = findBdb(dbb, bdb->pageNumber + 1)) )
- break;
-
- if (!bdb->isDirty && !continueWrite(bdb))
- break;
- }
+ //ASSERT(!(bdb->flags & BDB_write_pending));
+ //bdb->flags |= BDB_write_pending;
+ memcpy(p, bdb->buffer, pageSize);
+ p += pageSize;
+ bdb->flushIt = false;
+ markClean(bdb);
+ bdb->isDirty = false;
+ bdb->release(REL_HISTORY);
- if (sync.state != None)
- sync.unlock();
-
- flushLock.unlock();
- //Log::debug(" %d Writing %s %d pages: %d - %d\n", thread->threadId, (const
char*) dbb->fileName, count, pageNumber, pageNumber + count - 1);
- int length = p - buffer;
- priority.schedule(PRIORITY_LOW);
+ bdb = lockFindBdbIncrementUseCount(dbb, bdb->pageNumber + 1);
+ if (!bdb)
+ break;
- try
+ if (!bdb->isDirty && !continueWrite(bdb))
{
- priority.schedule(PRIORITY_LOW);
- dbb->writePages(pageNumber, length, buffer, WRITE_TYPE_FLUSH);
+ bdb->decrementUseCount(REL_HISTORY);
+ break;
}
- catch (SQLException& exception)
+ }
+
+ flushLock.unlock();
+ //Log::debug(" %d Writing %s %d pages: %d - %d\n", thread->threadId, (const char*)
dbb->fileName, count, pageNumber, pageNumber + count - 1);
+ int length = p - buffer;
+ priority.schedule(PRIORITY_LOW);
+
+ try
+ {
+ priority.schedule(PRIORITY_LOW);
+ dbb->writePages(pageNumber, length, buffer, WRITE_TYPE_FLUSH);
+ }
+ catch (SQLException& exception)
+ {
+ priority.finished();
+
+ if (exception.getSqlcode() != DEVICE_FULL)
+ throw;
+
+ database->setIOError(&exception);
+
+ for (bool error = true; error;)
{
- priority.finished();
-
- if (exception.getSqlcode() != DEVICE_FULL)
- throw;
-
- database->setIOError(&exception);
-
- for (bool error = true; error;)
+ if (thread->shutdownInProgress)
{
- if (thread->shutdownInProgress)
- {
- Bdb *next;
+ Bdb *next;
- for (bdb = bdbList; bdb; bdb = next)
- {
- //bdb->flags &= ~BDB_write_pending;
- next = bdb->ioThreadNext;
- bdb->syncWrite.unlock();
- bdb->decrementUseCount(REL_HISTORY);
- }
-
- return;
- }
-
- thread->sleep(1000);
-
- try
+ for (bdb = bdbList; bdb; bdb = next)
{
- priority.schedule(PRIORITY_LOW);
- dbb->writePages(pageNumber, length, buffer, WRITE_TYPE_FLUSH);
- error = false;
- database->clearIOError();
+ //bdb->flags &= ~BDB_write_pending;
+ next = bdb->ioThreadNext;
+ bdb->syncWrite.unlock();
+ bdb->decrementUseCount(REL_HISTORY);
}
- catch (SQLException& exception2)
- {
- priority.finished();
- if (exception2.getSqlcode() != DEVICE_FULL)
- throw;
- }
+ return;
+ }
+
+ thread->sleep(1000);
+
+ try
+ {
+ priority.schedule(PRIORITY_LOW);
+ dbb->writePages(pageNumber, length, buffer, WRITE_TYPE_FLUSH);
+ error = false;
+ database->clearIOError();
+ }
+ catch (SQLException& exception2)
+ {
+ priority.finished();
+
+ if (exception2.getSqlcode() != DEVICE_FULL)
+ throw;
}
}
+ }
- priority.finished();
- Bdb *next;
+ priority.finished();
+ Bdb *next;
- for (bdb = bdbList; bdb; bdb = next)
- {
- //ASSERT(bdb->flags & BDB_write_pending);
- //bdb->flags &= ~BDB_write_pending;
- next = bdb->ioThreadNext;
- bdb->syncWrite.unlock();
- bdb->decrementUseCount(REL_HISTORY);
- }
-
- flushLock.lock(Exclusive);
- ++physicalWrites;
-
- break;
+ for (bdb = bdbList; bdb; bdb = next)
+ {
+ //ASSERT(bdb->flags & BDB_write_pending);
+ //bdb->flags &= ~BDB_write_pending;
+ next = bdb->ioThreadNext;
+ bdb->syncWrite.unlock();
+ bdb->decrementUseCount(REL_HISTORY);
}
+
+ flushLock.lock(Exclusive);
+ ++physicalWrites;
+
+ }
+ else
+ {
+ if (bdb)
+ bdb->decrementUseCount(REL_HISTORY);
+ }
if (!hit)
{
- sync.unlock();
flushBitmap->clear(pageNumber);
}
}
@@ -940,8 +1088,8 @@ void Cache::ioThread(void)
thread->sleep();
flushLock.lock(Exclusive);
- }
}
+ } // for ever
delete [] rawBuffer;
}
@@ -974,8 +1122,8 @@ bool Cache::continueWrite(Bdb* startingB
void Cache::shutdown(void)
{
shutdownThreads();
- Sync sync (&syncDirty, "Cache::shutdown");
- sync.lock (Exclusive);
+ Sync dirtyLock (&syncDirty, "Cache::shutdown");
+ dirtyLock.lock (Exclusive);
for (Bdb *bdb = firstDirty; bdb; bdb = bdb->nextDirty)
bdb->dbb->writePage(bdb, WRITE_TYPE_SHUTDOWN);
@@ -995,8 +1143,8 @@ void Cache::shutdownThreads(void)
ioThreads[n] = 0;
}
- Sync sync(&syncThreads, "Cache::shutdownThreads");
- sync.lock(Exclusive);
+ Sync lockThreads(&syncThreads, "Cache::shutdownThreads");
+ lockThreads.lock(Exclusive);
}
void Cache::analyzeFlush(void)
@@ -1048,7 +1196,8 @@ void Cache::openTraceFile(void)
if (traceFile)
closeTraceFile();
- traceFile = fopen(TRACE_FILE, "w");
+ traceFile = fopen(TRACE_FILE, "a+");
+ setlinebuf(traceFile);
#endif
}
@@ -1065,6 +1214,6 @@ void Cache::closeTraceFile(void)
void Cache::flushWait(void)
{
- Sync sync(&syncWait, "Cache::flushWait");
- sync.lock(Shared);
+ Sync waitLock(&syncWait, "Cache::flushWait");
+ waitLock.lock(Shared);
}
=== modified file 'storage/falcon/Cache.h'
--- a/storage/falcon/Cache.h 2008-06-06 19:20:10 +0000
+++ b/storage/falcon/Cache.h 2008-08-21 14:45:38 +0000
@@ -28,6 +28,17 @@
#include "SyncObject.h"
#include "Queue.h"
+// uncomment DEBUG_SYNC_HASH_TABLE_SIZE to cause more contention and test for race
conditions
+//#define DEBUG_SYNC_HASH_TABLE_SIZE (0x01 << 1)
+#ifdef DEBUG_SYNC_HASH_TABLE_SIZE
+# define DEBUG_SYNC_HASH_TABLE_MASK (DEBUG_SYNC_HASH_TABLE_SIZE - 1)
+# define PAGENUM_2_LOCK_INDEX(_pgnum, _slot) ((_pgnum) & DEBUG_SYNC_HASH_TABLE_MASK)
+#else /* DEBUG_SYNC_HASH_TABLE_SIZE */
+# define PAGENUM_2_LOCK_INDEX(_pgnum, _slot) ((_slot))
+#endif /* DEBUG_SYNC_HASH_TABLE_SIZE */
+
+#define PAGENUM_2_SLOT(_pgnum) ((_pgnum) & hashMask)
+
class Bdb;
class Dbb;
class PageWriter;
@@ -83,14 +94,18 @@ public:
bool flushing;
protected:
- Bdb* findBuffer (Dbb *dbb, int pageNumber, LockType lockType);
+ Bdb* getFreeBuffer(void);
+ Bdb* findBdb(Dbb* dbb, int32 pageNumber, int slot);
Bdb* findBdb(Dbb* dbb, int32 pageNumber);
+ Bdb* lockFindBdbIncrementUseCount(Dbb* dbb, int32 pageNumber);
+ Bdb* lockFindBdbIncrementUseCount(int32 pageNumber, int slot);
int64 flushArg;
Bdb *bdbs;
Bdb *endBdbs;
Queue<Bdb> bufferQueue;
Bdb **hashTable;
+ SyncObject *syncHashTable;
Bdb *firstDirty;
Bdb *lastDirty;
Bitmap *flushBitmap;
@@ -105,12 +120,13 @@ protected:
int flushPages;
int physicalWrites;
int hashSize;
+ unsigned int hashMask;
int pageSize;
- int upperFraction;
+ unsigned int upperFraction;
int numberHunks;
int numberDirtyPages;
int numberIoThreads;
- volatile int bufferAge;
+ volatile uint64 bufferAge;
public:
void flushWait(void);
};
| Thread |
|---|
| • bzr push into mysql-6.0-falcon branch (klong:2792 to 2793) Bug#38766 | Kelly Long | 21 Aug |