Kelly,
Please add to WL#4479 the reasoning behind moving a Bdb to the head age
group instead of stopping to do a writePage. (almost a no-brainer, but we
need to document) Please summarize again the conditions in which a page in
the oldest age group can still have a useCount. We do not want to cover
over a useCount leak, which will eventually fill up cache like a memory
leak. It is very important to catch Bdbs that never get released instead of
just recycling them.
If you suspect that findBuffer does a moveToHead when the page cache is
large, then we need to know why that Bdb still has a useCount. You can turn
on BDB_HISTORY to collect an array of code locations in a BDB's recent
history. I implemented and used this tool to find conditions where a
useCount was incremented but not decremented. This adds a little memory to
each Bdb but not much code path, so it should not affect performance too
much.
Another question; Why is Dbb sent into FindBdb? I am missing its use.
Kevin
>-----Original Message-----
>From: Kelly Long [mailto:klong@stripped]
>Sent: Tuesday, September 02, 2008 11:09 AM
>To: commits@stripped
>Subject: bzr commit into mysql-6.0-falcon branch (klong:2808)
>
>#At file:///FC/MYSQL/wa-2008-BZR/mysql-6.0-falcon-team/
>
> 2808 Kelly Long 2008-09-02
> Version 2 for:
> Page Cache high concurency Get Free Buffer. WL4479
> Page cache lock per hash bucket. WL4480
> Page cache hash buckets are a power of two. WL4481
> traceFile is a compile time decision, thus remove the run time
>decisions for it.
>modified:
> storage/falcon/Cache.cpp
> storage/falcon/Cache.h
>
>=== modified file 'storage/falcon/Cache.cpp'
>--- a/storage/falcon/Cache.cpp 2008-08-22 06:47:40 +0000
>+++ b/storage/falcon/Cache.cpp 2008-09-02 16:08:11 +0000
>@@ -50,9 +50,11 @@
> extern uint falcon_io_threads;
>
> //#define STOP_PAGE 55
>-#define TRACE_FILE "cache.trace"
>+//#define CACHE_TRACE_FILE "cache.trace"
>
>+#ifdef CACHE_TRACE_FILE
> static FILE *traceFile;
>+#endif // CACHE_TRACE_FILE
>
> static const uint64 cacheHunkSize = 1024 * 1024 * 128;
> static const int ASYNC_BUFFER_SIZE = 1024000;
>@@ -68,20 +70,46 @@ static const char THIS_FILE[]=__FILE__;
>
> Cache::Cache(Database *db, int pageSz, int hashSz, int numBuffers)
> {
>- //openTraceFile();
>+ openTraceFile();
> database = db;
> panicShutdown = false;
> pageSize = pageSz;
>- hashSize = hashSz;
>+
>+ unsigned int highBit;
>+ for (highBit=0x01; highBit < hashSz; highBit= highBit << 1) { }
>+
>+ // if there are more than 4096 buckets then lets round down
>+ // else lets round up
>+ if (highBit >= 0x00001000) {
>+ // use power of two rounded down
>+ hashSize = highBit << 1;
>+ } else {
>+ // use power of two rounded up
>+ hashSize = highBit;
>+ }
>+
>+ hashMask = hashSize - 1;
> numberBuffers = numBuffers;
> upperFraction = numberBuffers / 4;
> bufferAge = 0;
> firstDirty = NULL;
> lastDirty = NULL;
>- numberDirtyPages = 0;
> pageWriter = NULL;
>- hashTable = new Bdb* [hashSz];
>+ hashTable = new Bdb* [hashSize];
> memset (hashTable, 0, sizeof (Bdb*) * hashSize);
>+#ifdef DEBUG_SYNC_HASH_TABLE_SIZE
>+ syncHashTable = new SyncObject [DEBUG_SYNC_HASH_TABLE_SIZE];
>+ for (int loop = 0; loop < DEBUG_SYNC_HASH_TABLE_SIZE; loop ++)
>+ syncHashTable[loop].setName("Cache::syncHashTable");
>+#else /* DEBUG_SYNC_HASH_TABLE_SIZE */
>+ syncHashTable = new SyncObject [hashSize];
>+ for (int loop = 0; loop < hashSize; loop ++)
>+ {
>+ char tmpName[128];
>+ snprintf(tmpName,120,"Cache::syncHashTable[%d]",loop);
>+ syncHashTable[loop].setName(tmpName);
>+ }
>+#endif /* DEBUG_SYNC_HASH_TABLE_SIZE */
> sectorCache = new SectorCache(sectorCacheSize / SECTOR_BUFFER_SIZE,
>pageSize);
>
> uint64 n = ((uint64) pageSize * numberBuffers + cacheHunkSize - 1) /
>cacheHunkSize;
>@@ -103,6 +131,7 @@ Cache::Cache(Database *db, int pageSz, i
>
> try
> {
>+ // non-protected access to bdbs,endBdbs is OK during
>initialization
> bdbs = new Bdb [numberBuffers];
> endBdbs = bdbs + numberBuffers;
> int remaining = 0;
>@@ -121,6 +150,7 @@ Cache::Cache(Database *db, int pageSz, i
> }
>
> bdb->cache = this;
>+ // non-protected access to bufferQueue is OK during
>initialization
> bufferQueue.append(bdb);
> bdb->buffer = (Page*) stuff;
> stuff += pageSize;
>@@ -146,10 +176,11 @@ Cache::Cache(Database *db, int pageSz, i
>
> Cache::~Cache()
> {
>- if (traceFile)
>- closeTraceFile();
>+
>+ closeTraceFile();
>
> delete [] hashTable;
>+ delete [] syncHashTable;
> delete [] bdbs;
> delete [] ioThreads;
> delete flushBitmap;
>@@ -167,15 +198,12 @@ Cache::~Cache()
> Bdb* Cache::probePage(Dbb *dbb, int32 pageNumber)
> {
> ASSERT (pageNumber >= 0);
>- Sync sync (&syncObject, "Cache::probePage");
>- sync.lock (Shared);
>- Bdb *bdb = findBdb(dbb, pageNumber);
>+ Bdb *bdb;
>
>+ /* If we already have a buffer for this, we're done */
>+ bdb = lockFindBdbIncrementUseCount(dbb, pageNumber);
> if (bdb)
> {
>- bdb->incrementUseCount(ADD_HISTORY);
>- sync.unlock();
>-
> if (bdb->buffer->pageType == PAGE_free)
> {
> bdb->decrementUseCount(REL_HISTORY);
>@@ -192,15 +220,56 @@ Bdb* Cache::probePage(Dbb *dbb, int32 pa
> return NULL;
> }
>
>-Bdb* Cache::findBdb(Dbb* dbb, int32 pageNumber)
>+Bdb* Cache::findBdb(Dbb* dbb, int32 pageNumber, int slot)
>+{
>+ for (Bdb *bdb = hashTable [slot]; bdb; bdb = bdb->hash)
> {
>- for (Bdb *bdb = hashTable [pageNumber % hashSize]; bdb; bdb = bdb-
>>hash)
> if (bdb->pageNumber == pageNumber && bdb->dbb == dbb)
>+ {
> return bdb;
>+ }
>+ }
>
> return NULL;
> }
>
>+Bdb* Cache::findBdb(Dbb* dbb, int32 pageNumber)
>+{
>+ return (findBdb(dbb, pageNumber, PAGENUM_2_SLOT(pageNumber)));
>+}
>+
>+Bdb* Cache::lockFindBdbIncrementUseCount(Dbb* dbb, int32 pageNumber)
>+{
>+ int slot = PAGENUM_2_SLOT(pageNumber);
>+ Sync lockHash (&syncHashTable[PAGENUM_2_LOCK_INDEX(pageNumber,
slot)],
>"Cache::lockFindBdbIncrementUseCount");
>+ lockHash.lock (Shared);
>+ Bdb *bdb;
>+
>+ bdb = findBdb(dbb, pageNumber, slot);
>+ if (bdb != NULL)
>+ bdb->incrementUseCount(ADD_HISTORY);
>+
>+ lockHash.unlock();
>+ return bdb;
>+}
>+
>+Bdb* Cache::lockFindBdbIncrementUseCount(int32 pageNumber, int slot)
>+{
>+ Sync lockHash (&syncHashTable[PAGENUM_2_LOCK_INDEX(pageNumber,
slot)],
>"Cache::lockFindBdbIncrementUseCount");
>+ lockHash.lock (Shared);
>+
>+ for (Bdb *bdb = hashTable [slot]; bdb; bdb = bdb->hash)
>+ if (bdb->pageNumber == pageNumber)
>+ {
>+ bdb->incrementUseCount(ADD_HISTORY);
>+ lockHash.unlock();
>+ return bdb;
>+ }
>+
>+ lockHash.unlock();
>+ return NULL;
>+}
>+
> Bdb* Cache::fetchPage(Dbb *dbb, int32 pageNumber, PageType pageType,
>LockType lockType)
> {
> if (panicShutdown)
>@@ -217,52 +286,45 @@ Bdb* Cache::fetchPage(Dbb *dbb, int32 pa
> #endif
>
> ASSERT (pageNumber >= 0);
>- int slot = pageNumber % hashSize;
>- LockType actual = lockType;
>- Sync sync (&syncObject, "Cache::fetchPage");
>- sync.lock (Shared);
>- int hit = 0;
>-
>- /* If we already have a buffer for this go, we're done */
>-
> Bdb *bdb;
>
>- for (bdb = hashTable [slot]; bdb; bdb = bdb->hash)
>- if (bdb->pageNumber == pageNumber && bdb->dbb == dbb)
>- {
>- //syncObject.validateShared("Cache::fetchPage");
>- bdb->incrementUseCount(ADD_HISTORY);
>- sync.unlock();
>- bdb->addRef(lockType COMMA_ADD_HISTORY);
>- bdb->decrementUseCount(REL_HISTORY);
>- hit = 1;
>- break;
>- }
>-
>+ /* If we already have a buffer for this, we're done */
>+ bdb = lockFindBdbIncrementUseCount(dbb, pageNumber);
> if (!bdb)
> {
>- sync.unlock();
>- actual = Exclusive;
>- sync.lock(Exclusive);
>-
>- for (bdb = hashTable [slot]; bdb; bdb = bdb->hash)
>- if (bdb->pageNumber == pageNumber && bdb->dbb ==
dbb)
>- {
>-
//syncObject.validateExclusive("Cache::fetchPage
>(retry)");
>- bdb->incrementUseCount(ADD_HISTORY);
>- sync.unlock();
>- bdb->addRef(lockType COMMA_ADD_HISTORY);
>- bdb->decrementUseCount(REL_HISTORY);
>- hit = 2;
>- break;
>- }
>+ // get getFreeBuffer() locks a hash bucket to remove the
>candidate bdb
>+ // if we locked out hash bucket before the call then we
could
>have
>+ // a deadlock
>+ // thus we get the free buffer before we lock the hash
bucket we
>will
>+ // be inserting into. This avoids a dead lock but generates
a
>race
>+ // we take care of the race by reversing the getFreeBuffer()
>work
>+ // when we lose the race
>+ Bdb *bdbAvailable;
>+ int slot = PAGENUM_2_SLOT(pageNumber);
>+ Sync lockHash
(&syncHashTable[PAGENUM_2_LOCK_INDEX(pageNumber,
>slot)], "Cache::fetchPage");
>+
>+ bdbAvailable = getFreeBuffer();
>+ /* assume we'll be inserting this new BDB. Set new page
number.
>*/
>+ bdbAvailable->addRef (Exclusive COMMA_ADD_HISTORY);
>+ bdbAvailable->decrementUseCount(REL_HISTORY);
>+
>+ bdbAvailable->hash = hashTable [slot];
>+ bdbAvailable->pageNumber = pageNumber;
>+ bdbAvailable->dbb = dbb;
>+#ifdef COLLECT_BDB_HISTORY
>+ bdbAvailable->initHistory();
>+#endif
>
>+ lockHash.lock(Exclusive);
>+ bdb = findBdb(dbb, pageNumber, slot);
> if (!bdb)
> {
>- bdb = findBuffer(dbb, pageNumber, actual);
>- moveToHead(bdb);
>- sync.unlock();
>+ // we won the race so lets use the free bdb
>+ // relink into hash table
>+ hashTable [slot] = bdbAvailable;
>+ lockHash.unlock();
>
>+ bdb = bdbAvailable;
> #ifdef STOP_PAGE
> if (bdb->pageNumber == STOP_PAGE)
> Log::debug("reading page %d/%d\n",
bdb->pageNumber,
>dbb->tableSpaceId);
>@@ -278,9 +340,31 @@ Bdb* Cache::fetchPage(Dbb *dbb, int32 pa
> #ifdef HAVE_PAGE_NUMBER
> ASSERT(bdb->buffer->pageNumber == pageNumber);
> #endif
>- if (actual != lockType)
>+ if (Exclusive != lockType)
> bdb->downGrade(lockType);
> }
>+ else
>+ {
>+ // lost a race. put our available back to useable
>+ // side effect, bdbAvailable will have to age again
>before we re-use it.
>+ bdbAvailable->hash = NULL;
>+ bdbAvailable->pageNumber = -1;
>+ bdbAvailable->dbb = NULL;
>+ bdbAvailable->release();
>+
>+ //syncObject.validateExclusive("Cache::fetchPage
>(retry)");
>+ bdb->incrementUseCount(ADD_HISTORY);
>+ lockHash.unlock();
>+ bdb->addRef(lockType COMMA_ADD_HISTORY);
>+ bdb->decrementUseCount(REL_HISTORY);
>+ moveToHead(bdb);
>+ }
>+ }
>+ else
>+ {
>+ bdb->addRef(lockType COMMA_ADD_HISTORY);
>+ bdb->decrementUseCount(REL_HISTORY);
>+ moveToHead(bdb);
> }
>
> Page *page = bdb->buffer;
>@@ -302,14 +386,6 @@ Bdb* Cache::fetchPage(Dbb *dbb, int32 pa
> bdb->pageNumber, dbb->tableSpaceId,
pageType,
>page->pageType);
> }
>
>- // If buffer has moved out of the upper "fraction" of the LRU queue,
>move it back up
>-
>- if (bdb->age < bufferAge - (uint64) upperFraction)
>- {
>- sync.lock (Exclusive);
>- moveToHead (bdb);
>- }
>-
> ASSERT (bdb->pageNumber == pageNumber);
> ASSERT (bdb->dbb == dbb);
> ASSERT (bdb->useCount > 0);
>@@ -319,9 +395,7 @@ Bdb* Cache::fetchPage(Dbb *dbb, int32 pa
>
> Bdb* Cache::fakePage(Dbb *dbb, int32 pageNumber, PageType type, TransId
>transId)
> {
>- Sync sync(&syncObject, "Cache::fakePage");
>- sync.lock(Exclusive);
>- int slot = pageNumber % hashSize;
>+ Bdb *bdb;
>
> #ifdef STOP_PAGE
> if (pageNumber == STOP_PAGE)
>@@ -329,33 +403,72 @@ Bdb* Cache::fakePage(Dbb *dbb, int32 pag
> #endif
>
> /* If we already have a buffer for this, we're done */
>+ bdb = lockFindBdbIncrementUseCount(dbb, pageNumber);
>+ if (!bdb)
>+ {
>+ // get getFreeBuffer() locks a hash bucket to remove the
>candidate bdb
>+ // if we locked out hash bucket before the call then we
could
>have
>+ // a deadlock
>+ // thus we get the free buffer before we lock the hash
bucket we
>will
>+ // be inserting into. This avoids a dead lock but generates
a
>race
>+ // we take care of the race by reversing the getFreeBuffer()
>work
>+ // when we lose the race
>+ Bdb *bdbAvailable;
>+ int slot = PAGENUM_2_SLOT(pageNumber);
>+ Sync lockHash
(&syncHashTable[PAGENUM_2_LOCK_INDEX(pageNumber,
>slot)], "Cache::fetchPage");
>+
>+ bdbAvailable = getFreeBuffer();
>+ /* assume we'll be inserting this new BDB. Set new page
number.
>*/
>+ bdbAvailable->addRef (Exclusive COMMA_ADD_HISTORY);
>+ bdbAvailable->decrementUseCount(REL_HISTORY);
>+
>+ bdbAvailable->hash = hashTable [slot];
>+ bdbAvailable->pageNumber = pageNumber;
>+ bdbAvailable->dbb = dbb;
>+#ifdef COLLECT_BDB_HISTORY
>+ bdbAvailable->initHistory();
>+#endif
>
>- Bdb *bdb;
>+ lockHash.lock(Exclusive);
>+ bdb = findBdb(dbb, pageNumber, slot);
>+ if (!bdb)
>+ {
>+ // we won the race so lets use the free bdb
>+ // relink into hash table
>+ hashTable [slot] = bdbAvailable;
>+ lockHash.unlock();
>
>- for (bdb = hashTable [slot]; bdb; bdb = bdb->hash)
>- if (bdb->pageNumber == pageNumber && bdb->dbb == dbb)
>+ bdb = bdbAvailable;
>+ }
>+ else
> {
>- if (bdb->syncObject.isLocked())
>- {
>- // The pageWriter may still be cleaning up
this
>freed page with a shared lock
>- ASSERT(bdb->buffer->pageType == PAGE_free);
>- ASSERT(bdb->syncObject.getState() >= 0);
>- }
>-
>+ // lost a race. put our available back to useable
>+ // side effect, bdbAvailable will have to age again
>before we re-use it.
>+ bdbAvailable->hash = NULL;
>+ bdbAvailable->pageNumber = -1;
>+ bdbAvailable->dbb = NULL;
>+ bdbAvailable->release();
>+
>+ //syncObject.validateExclusive("Cache::fetchPage
>(retry)");
>+ bdb->incrementUseCount(ADD_HISTORY);
>+ lockHash.unlock();
> bdb->addRef(Exclusive COMMA_ADD_HISTORY);
>-
>- break;
>+ bdb->decrementUseCount(REL_HISTORY);
>+ moveToHead(bdb);
> }
>-
>- if (!bdb)
>- bdb = findBuffer(dbb, pageNumber, Exclusive);
>+ }
>+ else
>+ {
>+ bdb->addRef(Exclusive COMMA_ADD_HISTORY);
>+ bdb->decrementUseCount(REL_HISTORY);
>+ moveToHead(bdb);
>+ }
>
> if (!dbb->isReadOnly)
> bdb->mark(transId);
>
> memset(bdb->buffer, 0, pageSize);
> bdb->setPageHeader(type);
>- moveToHead(bdb);
>
> return bdb;
> }
>@@ -363,32 +476,31 @@ Bdb* Cache::fakePage(Dbb *dbb, int32 pag
> void Cache::flush(int64 arg)
> {
> Sync flushLock(&syncFlush, "Cache::flush(1)");
>- Sync sync(&syncDirty, "Cache::flush(2)");
>+ Sync dirtyLock(&syncDirty, "Cache::flush(2)");
> flushLock.lock(Exclusive);
>
> if (flushing)
> return;
>
> syncWait.lock(NULL, Exclusive);
>- sync.lock(Shared);
> //Log::debug(%d: "Initiating flush\n", dbb->deltaTime);
> flushArg = arg;
> flushPages = 0;
> physicalWrites = 0;
>
>+ dirtyLock.lock(Shared);
> for (Bdb *bdb = firstDirty; bdb; bdb = bdb->nextDirty)
> {
> bdb->flushIt = true;
> flushBitmap->set(bdb->pageNumber);
> ++flushPages;
> }
>+ dirtyLock.unlock();
>
>- if (traceFile)
>- analyzeFlush();
>+ analyzeFlush();
>
> flushStart = database->timestamp;
> flushing = true;
>- sync.unlock();
> flushLock.unlock();
>
> for (int n = 0; n < numberIoThreads; ++n)
>@@ -398,69 +510,115 @@ void Cache::flush(int64 arg)
>
> void Cache::moveToHead(Bdb * bdb)
> {
>+ // If buffer has moved out of the upper "fraction" of the LRU queue,
>move it back up
>+ // non-protected access to age is harmless since it is fuzzy anyway
>+ if (bdb->age < bufferAge - (uint64) upperFraction)
>+ {
>+ Sync bufferQueueLock (&bufferQueue.syncObject,
>"Cache::moveToHead");
>+
>+ bufferQueueLock.lock (Exclusive);
>+ bdb->age = bufferAge++;
>+ bufferQueue.remove(bdb);
>+ bufferQueue.prepend(bdb);
>+ //validateUnique (bdb);
>+ }
>+}
>+
>+void Cache::moveToHeadAlreadyLocked(Bdb * bdb)
>+{
> bdb->age = bufferAge++;
> bufferQueue.remove(bdb);
> bufferQueue.prepend(bdb);
> //validateUnique (bdb);
> }
>
>-Bdb* Cache::findBuffer(Dbb *dbb, int pageNumber, LockType lockType)
>+Bdb* Cache::getFreeBuffer(void)
> {
>- //syncObject.validateExclusive("Cache::findBuffer");
>- int slot = pageNumber % hashSize;
>- Sync sync(&syncDirty, "Cache::findBuffer");
>-
>- /* Find least recently used, not-in-use buffer */
>-
>+ Sync bufferQueueLock (&bufferQueue.syncObject,
>"Cache::getFreeBuffer");
>+ unsigned int count;
> Bdb *bdb;
>
> // Find a candidate BDB.
>-
> for (;;)
> {
>- for (bdb = bufferQueue.last; bdb; bdb = bdb->prior)
>- if (bdb->useCount == 0)
>+ bufferQueueLock.lock (Exclusive);
>+ // find a candidate that is NOT in use and NOT dirty and in
the
>tail fraction of the LRU
>+ for (count = 0, bdb = bufferQueue.last; bdb ; bdb =
bdb->prior,
>count++)
>+ {
>+ if (count >= upperFraction)
>+ {
>+ bdb = NULL;
> break;
>+ }
>+ if (bdb->useCount == 0)
>+ {
>+ if (!bdb->isDirty)
>+ {
>+ bdb->incrementUseCount(REL_HISTORY);
>+ moveToHeadAlreadyLocked(bdb);
>+ break;
>+ }
>+ }
>+ else
>+ {
>+ // get this one out of the way so we
don't
>search it every time
>+ moveToHeadAlreadyLocked(bdb,count);
>+ }
>+ }
>+ if (!bdb)
>+ // find a candidate that is NOT in use, could be
dirty
>+ for (bdb = bufferQueue.last; bdb; bdb = bdb->prior)
>+ if (bdb->useCount == 0)
>+ {
>+ bdb->incrementUseCount(REL_HISTORY);
>+ moveToHeadAlreadyLocked(bdb);
>+ break;
>+ }
>+ bufferQueueLock.unlock();
>
> if (!bdb)
> throw SQLError(RUNTIME_ERROR, "buffer pool is
>exhausted\n");
>
>- if (!bdb->isDirty)
>- break;
>-
>- writePage (bdb, WRITE_TYPE_REUSE);
>- }
>-
>- /* Unlink its old incarnation from the page/hash table */
>+ if (bdb->pageNumber >= 0)
>+ {
>+ int slotRemove =
PAGENUM_2_SLOT(bdb->pageNumber);
>+ Sync lockHashRemove
>(&syncHashTable[PAGENUM_2_LOCK_INDEX(bdb->pageNumber, slotRemove)],
>"Cache::getFreeBuffer");
>+ lockHashRemove.lock(Exclusive);
>
>- if (bdb->pageNumber >= 0)
>- for (Bdb **ptr = hashTable + bdb->pageNumber % hashSize;;
ptr =
>&(*ptr)->hash)
>- if (*ptr == bdb)
>+ if (bdb->useCount != 1)
> {
>- *ptr = bdb->hash;
>- break;
>+ // we lost a race try again
>+ bdb->decrementUseCount(REL_HISTORY);
>+ lockHashRemove.unlock();
>+ continue;
> }
>- else
>- ASSERT (*ptr);
>-
>- bdb->addRef (lockType COMMA_ADD_HISTORY);
>
>- /* Set new page number and relink into hash table */
>+ if (bdb->isDirty)
>+ writePage (bdb, WRITE_TYPE_REUSE);
>
>- bdb->hash = hashTable [slot];
>- hashTable [slot] = bdb;
>- bdb->pageNumber = pageNumber;
>- bdb->dbb = dbb;
>+ /* Unlink its old incarnation from the page/hash
table */
>+ for (Bdb **ptr = hashTable + PAGENUM_2_SLOT(bdb-
>>pageNumber) ;; ptr = &(*ptr)->hash)
>+ if (*ptr == bdb)
>+ {
>+ *ptr = bdb->hash;
>+ break;
>+ }
>+ else
>+ ASSERT (*ptr);
>+ }
>
>-#ifdef COLLECT_BDB_HISTORY
>- bdb->initHistory();
>-#endif
>+ break;
>+ }
>
> return bdb;
> }
>
> void Cache::validate()
> {
>+ //Sync bufferQueueLock (&bufferQueue.syncObject, "Cache::validate");
>+
>+ //bufferQueueLock.lock (Shared);
>+ // non-protected access to bufferQueue is DANGEROUS...
> for (Bdb *bdb = bufferQueue.last; bdb; bdb = bdb->prior)
> {
> //IndexPage *page = (IndexPage*) bdb->buffer;
>@@ -470,8 +628,8 @@ void Cache::validate()
>
> void Cache::markDirty(Bdb *bdb)
> {
>- Sync sync (&syncDirty, "Cache::markDirty");
>- sync.lock (Exclusive);
>+ Sync dirtyLock (&syncDirty, "Cache::markDirty");
>+ dirtyLock.lock (Exclusive);
> bdb->nextDirty = NULL;
> bdb->priorDirty = lastDirty;
>
>@@ -481,14 +639,13 @@ void Cache::markDirty(Bdb *bdb)
> firstDirty = bdb;
>
> lastDirty = bdb;
>- ++numberDirtyPages;
> //validateUnique (bdb);
> }
>
> void Cache::markClean(Bdb *bdb)
> {
>- Sync sync (&syncDirty, "Cache::markClean");
>- sync.lock (Exclusive);
>+ Sync dirtyLock (&syncDirty, "Cache::markClean");
>+ dirtyLock.lock (Exclusive);
>
> /***
> if (bdb->flushIt)
>@@ -496,7 +653,6 @@ void Cache::markClean(Bdb *bdb)
> ***/
>
> bdb->flushIt = false;
>- --numberDirtyPages;
>
> if (bdb == lastDirty)
> lastDirty = bdb->priorDirty;
>@@ -600,8 +756,8 @@ void Cache::writePage(Bdb *bdb, int type
>
> if (dbb->shadows)
> {
>- Sync sync (&dbb->syncClone, "Cache::writePage(2)");
>- sync.lock (Shared);
>+ Sync cloneLock (&dbb->syncClone, "Cache::writePage(2)");
>+ cloneLock.lock (Shared);
>
> for (DatabaseCopy *shadow = dbb->shadows; shadow; shadow =
>shadow->next)
> shadow->rewritePage(bdb);
>@@ -610,14 +766,14 @@ void Cache::writePage(Bdb *bdb, int type
>
> void Cache::analyze(Stream *stream)
> {
>- Sync sync (&syncDirty, "Cache::analyze");
>- sync.lock (Shared);
>+ Sync dirtyLock (&syncDirty, "Cache::analyze");
> int inUse = 0;
> int dirty = 0;
> int dirtyList = 0;
> int total = 0;
> Bdb *bdb;
>
>+ // non-protected access to bdbs,endBdbs is DANGEROUS...
> for (bdb = bdbs; bdb < endBdbs; ++bdb)
> {
> ++total;
>@@ -629,8 +785,10 @@ void Cache::analyze(Stream *stream)
> ++inUse;
> }
>
>+ dirtyLock.lock (Shared);
> for (bdb = firstDirty; bdb; bdb = bdb->nextDirty)
> ++dirtyList;
>+ dirtyLock.unlock();
>
> stream->format ("Cache: %d pages, %d in use, %d dirty, %d in dirty
>chain\n",
> total, inUse, dirty, dirtyList);
>@@ -638,17 +796,18 @@ void Cache::analyze(Stream *stream)
>
> void Cache::validateUnique(Bdb *target)
> {
>- int slot = target->pageNumber % hashSize;
>+ int slot = PAGENUM_2_SLOT(target->pageNumber);
>
>+ // WARNING: unlocked walk of hash table.... DANGEROUS
> for (Bdb *bdb = hashTable [slot]; bdb; bdb = bdb->hash)
> ASSERT (bdb == target || !(bdb->pageNumber ==
target->pageNumber
>&& bdb->dbb == target->dbb));
> }
>
> void Cache::freePage(Dbb *dbb, int32 pageNumber)
> {
>- Sync sync (&syncObject, "Cache::freePage");
>- sync.lock (Shared);
>- int slot = pageNumber % hashSize;
>+ int slot = PAGENUM_2_SLOT(pageNumber);
>+ Sync lockHash (&syncHashTable[PAGENUM_2_LOCK_INDEX(pageNumber,
slot)],
>"Cache::freePage");
>+ lockHash.lock(Shared);
>
> // If page exists in cache (usual case), clean it up
>
>@@ -657,7 +816,7 @@ void Cache::freePage(Dbb *dbb, int32 pag
> {
> if (bdb->isDirty)
> {
>- sync.unlock();
>+ lockHash.unlock();
> markClean (bdb);
> }
>
>@@ -685,12 +844,16 @@ void Cache::flush(Dbb *dbb)
>
> bool Cache::hasDirtyPages(Dbb *dbb)
> {
>- Sync sync (&syncDirty, "Cache::hasDirtyPages");
>- sync.lock (Shared);
>+ Sync dirtyLock (&syncDirty, "Cache::hasDirtyPages");
>+ dirtyLock.lock (Shared);
>
> for (Bdb *bdb = firstDirty; bdb; bdb = bdb->nextDirty)
> if (bdb->dbb == dbb)
>+ {
>+ dirtyLock.unlock();
> return true;
>+ }
>+ dirtyLock.unlock();
>
> return false;
> }
>@@ -717,26 +880,16 @@ Bdb* Cache::trialFetch(Dbb* dbb, int32 p
> }
>
> ASSERT (pageNumber >= 0);
>- int slot = pageNumber % hashSize;
>- Sync sync (&syncObject, "Cache::trialFetch");
>- sync.lock (Shared);
>- int hit = 0;
>-
>- /* If we already have a buffer for this go, we're done */
>-
> Bdb *bdb;
>
>- for (bdb = hashTable [slot]; bdb; bdb = bdb->hash)
>- if (bdb->pageNumber == pageNumber && bdb->dbb == dbb)
>- {
>- //syncObject.validateShared("Cache::trialFetch");
>- bdb->incrementUseCount(ADD_HISTORY);
>- sync.unlock();
>- bdb->addRef(lockType COMMA_ADD_HISTORY);
>- bdb->decrementUseCount(REL_HISTORY);
>- hit = 1;
>- break;
>- }
>+ /* If we already have a buffer for this, we're done */
>+ bdb = lockFindBdbIncrementUseCount(dbb, pageNumber);
>+ if (bdb)
>+ {
>+ bdb->addRef(lockType COMMA_ADD_HISTORY);
>+ bdb->decrementUseCount(REL_HISTORY);
>+ moveToHead(bdb);
>+ }
>
> return bdb;
> }
>@@ -764,10 +917,9 @@ void Cache::ioThread(void* arg)
>
> void Cache::ioThread(void)
> {
>- Sync syncThread(&syncThreads, "Cache::ioThread(1)");
>+ Sync syncThread(&syncThreads, "Cache::ioThread");
> syncThread.lock(Shared);
>- Sync flushLock(&syncFlush, "Cache::ioThread(2)");
>- Sync sync(&syncObject, "Cache::ioThread(3)");
>+ Sync flushLock(&syncFlush, "Cache::ioThread");
> Priority priority(database->ioScheduler);
> Thread *thread = Thread::getThread("Cache::ioThread");
> UCHAR *rawBuffer = new UCHAR[ASYNC_BUFFER_SIZE];
>@@ -781,137 +933,135 @@ void Cache::ioThread(void)
> {
> int32 pageNumber = flushBitmap->nextSet(0);
> int count;
>- Dbb *dbb;
>
> if (pageNumber >= 0)
> {
>- int slot = pageNumber % hashSize;
>+ Bdb *bdb;
>+ Dbb *dbb;
>+ int slot = PAGENUM_2_SLOT(pageNumber);
> bool hit = false;
> Bdb *bdbList = NULL;
> UCHAR *p = buffer;
>- sync.lock(Shared);
>-
>- // Look for a page to flush. Then get all his
friends
>
>- for (Bdb *bdb = hashTable[slot]; bdb; bdb =
bdb->hash)
>- if (bdb->pageNumber == pageNumber &&
bdb->flushIt
>&& bdb->isDirty)
>+ // Look for the page to flush.
>+ bdb = lockFindBdbIncrementUseCount(pageNumber,
slot);
>+ if (bdb && bdb->flushIt && bdb->isDirty)
>+ {
>+ hit = true;
>+ count = 0;
>+ dbb = bdb->dbb;
>+
>+ flushBitmap->clear(pageNumber);
>+
>+ // get all his friends
>+ while (p < end)
> {
>- hit = true;
>- count = 0;
>- dbb = bdb->dbb;
>+ ++count;
>+ bdb->addRef(Shared
COMMA_ADD_HISTORY);
>
>- if (!bdb->hash)
>-
flushBitmap->clear(pageNumber);
>+ bdb->syncWrite.lock(NULL,
Exclusive);
>+ bdb->ioThreadNext = bdbList;
>+ bdbList = bdb;
>
>- while (p < end)
>- {
>- ++count;
>-
bdb->incrementUseCount(ADD_HISTORY);
>- sync.unlock();
>- bdb->addRef(Shared
>COMMA_ADD_HISTORY);
>- if (falcon_use_sectorcache)
>-
sectorCache->writePage(bdb);
>-
>- bdb->syncWrite.lock(NULL,
Exclusive);
>- bdb->ioThreadNext = bdbList;
>- bdbList = bdb;
>-
>- //ASSERT(!(bdb->flags &
>BDB_write_pending));
>- //bdb->flags |=
BDB_write_pending;
>- memcpy(p, bdb->buffer,
pageSize);
>- p += pageSize;
>- bdb->flushIt = false;
>- markClean(bdb);
>- bdb->isDirty = false;
>- bdb->release(REL_HISTORY);
>- sync.lock(Shared);
>-
>- if ( !(bdb = findBdb(dbb,
bdb-
>>pageNumber + 1)) )
>- break;
>-
>- if (!bdb->isDirty &&
>!continueWrite(bdb))
>- break;
>- }
>+ //ASSERT(!(bdb->flags &
BDB_write_pending));
>+ //bdb->flags |= BDB_write_pending;
>+ memcpy(p, bdb->buffer, pageSize);
>+ p += pageSize;
>+ bdb->flushIt = false;
>+ markClean(bdb);
>+ bdb->isDirty = false;
>+ bdb->release(REL_HISTORY);
>
>- if (sync.state != None)
>- sync.unlock();
>-
>- flushLock.unlock();
>- //Log::debug(" %d Writing %s %d
pages: %d -
>%d\n", thread->threadId, (const char*) dbb->fileName, count, pageNumber,
>pageNumber + count - 1);
>- int length = p - buffer;
>- priority.schedule(PRIORITY_LOW);
>+ bdb =
lockFindBdbIncrementUseCount(dbb, bdb-
>>pageNumber + 1);
>+ if (!bdb)
>+ break;
>
>- try
>+ if (!bdb->isDirty &&
!continueWrite(bdb))
> {
>-
priority.schedule(PRIORITY_LOW);
>- dbb->writePages(pageNumber,
length,
>buffer, WRITE_TYPE_FLUSH);
>+
bdb->decrementUseCount(REL_HISTORY);
>+ break;
> }
>- catch (SQLException& exception)
>+ }
>+
>+ flushLock.unlock();
>+ //Log::debug(" %d Writing %s %d pages: %d -
%d\n",
>thread->threadId, (const char*) dbb->fileName, count, pageNumber,
pageNumber
>+ count - 1);
>+ int length = p - buffer;
>+ priority.schedule(PRIORITY_LOW);
>+
>+ try
>+ {
>+ priority.schedule(PRIORITY_LOW);
>+ dbb->writePages(pageNumber, length,
buffer,
>WRITE_TYPE_FLUSH);
>+ }
>+ catch (SQLException& exception)
>+ {
>+ priority.finished();
>+
>+ if (exception.getSqlcode() !=
DEVICE_FULL)
>+ throw;
>+
>+ database->setIOError(&exception);
>+
>+ for (bool error = true; error;)
> {
>- priority.finished();
>-
>- if (exception.getSqlcode()
!=
>DEVICE_FULL)
>- throw;
>-
>-
database->setIOError(&exception);
>-
>- for (bool error = true;
error;)
>+ if
(thread->shutdownInProgress)
> {
>- if
(thread->shutdownInProgress)
>- {
>- Bdb *next;
>+ Bdb *next;
>
>- for (bdb =
bdbList; bdb;
>bdb = next)
>- {
>-
//bdb->flags &=
>~BDB_write_pending;
>- next
= bdb-
>>ioThreadNext;
>- bdb-
>>syncWrite.unlock();
>- bdb-
>>decrementUseCount(REL_HISTORY);
>- }
>-
>- return;
>- }
>-
>- thread->sleep(1000);
>-
>- try
>+ for (bdb = bdbList;
bdb; bdb =
>next)
> {
>-
> priority.schedule(PRIORITY_LOW);
>- dbb-
>>writePages(pageNumber, length, buffer, WRITE_TYPE_FLUSH);
>- error =
false;
>-
database->clearIOError();
>+ //bdb->flags
&=
>~BDB_write_pending;
>+ next =
bdb->ioThreadNext;
>+
bdb->syncWrite.unlock();
>+ bdb-
>>decrementUseCount(REL_HISTORY);
> }
>- catch (SQLException&
>exception2)
>- {
>-
priority.finished();
>
>- if
>(exception2.getSqlcode() != DEVICE_FULL)
>-
throw;
>- }
>+ return;
>+ }
>+
>+ thread->sleep(1000);
>+
>+ try
>+ {
>+
> priority.schedule(PRIORITY_LOW);
>+
dbb->writePages(pageNumber,
>length, buffer, WRITE_TYPE_FLUSH);
>+ error = false;
>+
database->clearIOError();
>+ }
>+ catch (SQLException&
exception2)
>+ {
>+ priority.finished();
>+
>+ if
(exception2.getSqlcode() !=
>DEVICE_FULL)
>+ throw;
> }
> }
>+ }
>
>- priority.finished();
>- Bdb *next;
>+ priority.finished();
>+ Bdb *next;
>
>- for (bdb = bdbList; bdb; bdb = next)
>- {
>- //ASSERT(bdb->flags &
>BDB_write_pending);
>- //bdb->flags &=
~BDB_write_pending;
>- next = bdb->ioThreadNext;
>- bdb->syncWrite.unlock();
>-
bdb->decrementUseCount(REL_HISTORY);
>- }
>-
>- flushLock.lock(Exclusive);
>- ++physicalWrites;
>-
>- break;
>+ for (bdb = bdbList; bdb; bdb = next)
>+ {
>+ //ASSERT(bdb->flags &
BDB_write_pending);
>+ //bdb->flags &= ~BDB_write_pending;
>+ next = bdb->ioThreadNext;
>+ bdb->syncWrite.unlock();
>+ bdb->decrementUseCount(REL_HISTORY);
> }
>+
>+ flushLock.lock(Exclusive);
>+ ++physicalWrites;
>+
>+ }
>+ else
>+ {
>+ if (bdb)
>+
bdb->decrementUseCount(REL_HISTORY);
>+ }
>
> if (!hit)
> {
>- sync.unlock();
> flushBitmap->clear(pageNumber);
> }
> }
>@@ -922,7 +1072,9 @@ void Cache::ioThread(void)
> int writes = physicalWrites;
> int pages = flushPages;
> int delta = (int) (database->timestamp -
>flushStart);
>+ int64 callbackArg = flushArg;
> flushing = false;
>+ flushArg = 0;
> flushLock.unlock();
> syncWait.unlock();
>
>@@ -930,7 +1082,8 @@ void Cache::ioThread(void)
> Log::log(LogInfo, "%d: Cache flush:
%d
>pages, %d writes in %d seconds (%d pps)\n",
>
database->deltaTime,
>pages, writes, delta, pages / MAX(delta, 1));
>
>- database->pageCacheFlushed(flushArg);
>+ if (callbackArg != 0)
>+
database->pageCacheFlushed(callbackArg);
> }
> else
> flushLock.unlock();
>@@ -940,8 +1093,8 @@ void Cache::ioThread(void)
>
> thread->sleep();
> flushLock.lock(Exclusive);
>- }
> }
>+ } // for ever
>
> delete [] rawBuffer;
> }
>@@ -954,11 +1107,12 @@ bool Cache::continueWrite(Bdb* startingB
>
> for (int32 pageNumber = startingBdb->pageNumber + 1, end =
pageNumber+
>5; pageNumber < end; ++pageNumber)
> {
>- Bdb *bdb = findBdb(dbb, pageNumber);
>+ Bdb *bdb;
>
> if (dirty > clean)
> return true;
>-
>+
>+ bdb = lockFindBdbIncrementUseCount(dbb, pageNumber);
> if (!bdb)
> return dirty >= clean;
>
>@@ -966,6 +1120,7 @@ bool Cache::continueWrite(Bdb* startingB
> ++dirty;
> else
> ++clean;
>+ bdb->decrementUseCount(REL_HISTORY);
> }
>
> return (dirty >= clean);
>@@ -995,15 +1150,18 @@ void Cache::shutdownThreads(void)
> ioThreads[n] = 0;
> }
>
>- Sync sync(&syncThreads, "Cache::shutdownThreads");
>- sync.lock(Exclusive);
>+ Sync lockThreads(&syncThreads, "Cache::shutdownThreads");
>+ lockThreads.lock(Exclusive);
> }
>
>+#ifdef CACHE_TRACE_FILE
> void Cache::analyzeFlush(void)
> {
> Dbb *dbb = NULL;
> Bdb *bdb;
>+ Sync dirtyLock (&syncDirty, "Cache::hasDirtyPages");
>
>+ dirtyLock.lock (Shared);
> for (bdb = firstDirty; bdb; bdb = bdb->nextDirty)
> if (bdb->dbb->tableSpaceId == 1)
> {
>@@ -1011,6 +1169,7 @@ void Cache::analyzeFlush(void)
>
> break;
> }
>+ dirtyLock.unlock();
>
> if (!dbb)
> return;
>@@ -1018,16 +1177,19 @@ void Cache::analyzeFlush(void)
> fprintf(traceFile, "-------- time %d -------\n",
database->deltaTime);
>
> for (int pageNumber = 0; (pageNumber = flushBitmap-
>>nextSet(pageNumber)) >= 0;)
>+ // non-protected access to hash table via findBdb()!
> if ( (bdb = findBdb(dbb, pageNumber)) )
> {
> int start = pageNumber;
> int type = bdb->buffer->pageType;
>
>+ // non-protected access to hash table via findBdb()!
> for (; (bdb = findBdb(dbb, ++pageNumber)) && bdb-
>>flushIt;)
> ;
>
> fprintf(traceFile, " %d flushed: %d to %d, first
type
>%d\n", pageNumber - start, start, pageNumber - 1, type);
>
>+ // non-protected access to hash table via findBdb()!
> for (int max = pageNumber + 5; pageNumber < max &&
(bdb =
>findBdb(dbb, pageNumber)) && !bdb->flushIt; ++pageNumber)
> {
> if (bdb->isDirty)
>@@ -1044,27 +1206,39 @@ void Cache::analyzeFlush(void)
>
> void Cache::openTraceFile(void)
> {
>-#ifdef TRACE_FILE
> if (traceFile)
> closeTraceFile();
>
>- traceFile = fopen(TRACE_FILE, "w");
>-#endif
>+ traceFile = fopen(TRACE_FILE, "a+");
>+ fprintf(traceFile, "Starting\n");
>+//KEL
>+// setvbuf(traceFile, (char *) NULL, _IOLBF, 0);
>+
> }
>
> void Cache::closeTraceFile(void)
> {
>-#ifdef TRACE_FILE
> if (traceFile)
> {
> fclose(traceFile);
> traceFile = NULL;
> }
>-#endif
> }
>+#else // CACHE_TRACE_FILE
>+void Cache::analyzeFlush(void)
>+{
>+}
>+void Cache::openTraceFile(void)
>+{
>+}
>+void Cache::closeTraceFile(void)
>+{
>+}
>+#endif // CACHE_TRACE_FILE
>
> void Cache::flushWait(void)
> {
>- Sync sync(&syncWait, "Cache::flushWait");
>- sync.lock(Shared);
>+ Sync waitLock(&syncWait, "Cache::flushWait");
>+ waitLock.lock(Exclusive);
> }
>+
>
>=== modified file 'storage/falcon/Cache.h'
>--- a/storage/falcon/Cache.h 2008-08-22 06:47:40 +0000
>+++ b/storage/falcon/Cache.h 2008-09-02 16:08:11 +0000
>@@ -28,6 +28,17 @@
> #include "SyncObject.h"
> #include "Queue.h"
>
>+// uncomment DEBUG_SYNC_HASH_TABLE_SIZE to cause more contention and test
>for race conditions
>+//#define DEBUG_SYNC_HASH_TABLE_SIZE (0x01 << 1)
>+#ifdef DEBUG_SYNC_HASH_TABLE_SIZE
>+# define DEBUG_SYNC_HASH_TABLE_MASK (DEBUG_SYNC_HASH_TABLE_SIZE - 1)
>+# define PAGENUM_2_LOCK_INDEX(_pgnum, _slot) ((_pgnum) &
>DEBUG_SYNC_HASH_TABLE_MASK)
>+#else /* DEBUG_SYNC_HASH_TABLE_SIZE */
>+# define PAGENUM_2_LOCK_INDEX(_pgnum, _slot) ((_slot))
>+#endif /* DEBUG_SYNC_HASH_TABLE_SIZE */
>+
>+#define PAGENUM_2_SLOT(_pgnum) ((_pgnum) & hashMask)
>+
> class Bdb;
> class Dbb;
> class PageWriter;
>@@ -54,7 +65,6 @@ public:
> void markClean (Bdb *bdb);
> void markDirty (Bdb *bdb);
> void validate();
>- void moveToHead (Bdb *bdb);
> void flush(int64 arg);
> void validateCache(void);
> void syncFile(Dbb *dbb, const char *text);
>@@ -83,14 +93,20 @@ public:
> bool flushing;
>
> protected:
>- Bdb* findBuffer (Dbb *dbb, int pageNumber, LockType
lockType);
>+ void moveToHead (Bdb *bdb);
>+ void moveToHeadAlreadyLocked (Bdb *bdb);
>+ Bdb* getFreeBuffer(void);
>+ Bdb* findBdb(Dbb* dbb, int32 pageNumber, int slot);
> Bdb* findBdb(Dbb* dbb, int32 pageNumber);
>+ Bdb* lockFindBdbIncrementUseCount(Dbb* dbb, int32
pageNumber);
>+ Bdb* lockFindBdbIncrementUseCount(int32 pageNumber, int
slot);
>
> int64 flushArg;
> Bdb *bdbs;
> Bdb *endBdbs;
> Queue<Bdb> bufferQueue;
> Bdb **hashTable;
>+ SyncObject *syncHashTable;
> Bdb *firstDirty;
> Bdb *lastDirty;
> Bitmap *flushBitmap;
>@@ -105,12 +121,13 @@ protected:
> int flushPages;
> int physicalWrites;
> int hashSize;
>+ unsigned int hashMask;
> int pageSize;
>- int upperFraction;
>+ unsigned int upperFraction;
> int numberHunks;
>- int numberDirtyPages;
> int numberIoThreads;
>- volatile int bufferAge;
>+ volatile uint64 bufferAge;
>+
> public:
> void flushWait(void);
> };
>
>
>--
>MySQL Code Commits Mailing List
>For list archives: http://lists.mysql.com/commits
>To unsubscribe: http://lists.mysql.com/commits?unsub=1