OK to push. Use mysql-6.0-falcon-kevin if you feel you would like to test
this on pushbuild before pushing it to mysql-6.0-falcon-team.
>-----Original Message-----
>From: Kelly Long [mailto:klong@stripped]
>Sent: Monday, August 18, 2008 8:48 AM
>To: commits@stripped
>Subject: bzr commit into mysql-6.0-falcon branch (klong:2786)
>
>#At file:///FC/MYSQL/wa-2008-BZR/lcl/mysql-6.0-falcon-team-KL-Cache/
>
> 2786 Kelly Long 2008-08-18
> Page cache lock per hash bucket. WL4480
> Page cache hash buckets are a power of two. WL4481
>modified:
> storage/falcon/Cache.cpp
> storage/falcon/Cache.h
>
>per-file messages:
> storage/falcon/Cache.h
> Page cache lock per hash bucket. WL4480
> Page cache hash buckets are a power of two. WL4481
>=== modified file 'storage/falcon/Cache.cpp'
>--- a/storage/falcon/Cache.cpp 2008-07-24 08:45:03 +0000
>+++ b/storage/falcon/Cache.cpp 2008-08-18 13:47:57 +0000
>@@ -72,7 +72,22 @@ Cache::Cache(Database *db, int pageSz, i
> database = db;
> panicShutdown = false;
> pageSize = pageSz;
>- hashSize = hashSz;
>+ {
>+ unsigned int highBit;
>+ for (highBit=0x01; highBit < hashSz; highBit= highBit << 1)
{
>+ }
>+
>+ // if there are more than 4096 buckets then lets round down
>+ // else lets round up
>+ if (highBit >= 0x00001000) {
>+ // KEL use power of two rounded down
>+ hashSize = highBit << 1;
>+ } else {
>+ // KEL use power of two rounded up
>+ hashSize = highBit;
>+ }
>+ }
>+ hashMask = hashSize - 1;
> numberBuffers = numBuffers;
> upperFraction = numberBuffers / 4;
> bufferAge = 0;
>@@ -80,8 +95,21 @@ Cache::Cache(Database *db, int pageSz, i
> lastDirty = NULL;
> numberDirtyPages = 0;
> pageWriter = NULL;
>- hashTable = new Bdb* [hashSz];
>+ hashTable = new Bdb* [hashSize];
> memset (hashTable, 0, sizeof (Bdb*) * hashSize);
>+#ifdef DEBUG_SYNC_HASH_TABLE_SIZE
>+ syncHashTable = new SyncObject [DEBUG_SYNC_HASH_TABLE_SIZE];
>+ for (int loop = 0; loop < DEBUG_SYNC_HASH_TABLE_SIZE; loop ++)
>+ syncHashTable[loop].setName("Cache::syncHashTable");
>+#else /* DEBUG_SYNC_HASH_TABLE_SIZE */
>+ syncHashTable = new SyncObject [hashSize];
>+ for (int loop = 0; loop < hashSize; loop ++)
>+ {
>+ char tmpName[128];
>+ snprintf(tmpName,120,"Cache::syncHashTable[%d]",loop);
>+ syncHashTable[loop].setName(tmpName);
>+ }
>+#endif /* DEBUG_SYNC_HASH_TABLE_SIZE */
> sectorCache = new SectorCache(sectorCacheSize / SECTOR_BUFFER_SIZE,
>pageSize);
>
> uint64 n = ((uint64) pageSize * numberBuffers + cacheHunkSize - 1) /
>cacheHunkSize;
>@@ -103,6 +131,7 @@ Cache::Cache(Database *db, int pageSz, i
>
> try
> {
>+ // non-protected access to bdbs,endBdbs is OK during
>initialization
> bdbs = new Bdb [numberBuffers];
> endBdbs = bdbs + numberBuffers;
> int remaining = 0;
>@@ -121,6 +150,7 @@ Cache::Cache(Database *db, int pageSz, i
> }
>
> bdb->cache = this;
>+ // non-protected access to bufferQueue is OK during
>initialization
> bufferQueue.append(bdb);
> bdb->buffer = (Page*) stuff;
> stuff += pageSize;
>@@ -150,6 +180,7 @@ Cache::~Cache()
> closeTraceFile();
>
> delete [] hashTable;
>+ delete [] syncHashTable;
> delete [] bdbs;
> delete [] ioThreads;
> delete flushBitmap;
>@@ -167,14 +198,16 @@ Cache::~Cache()
> Bdb* Cache::probePage(Dbb *dbb, int32 pageNumber)
> {
> ASSERT (pageNumber >= 0);
>- Sync sync (&syncObject, "Cache::probePage");
>- sync.lock (Shared);
>- Bdb *bdb = findBdb(dbb, pageNumber);
>+ int slot = PAGENUM_2_SLOT(pageNumber);
>+ Sync lockHash (&syncHashTable[PAGENUM_2_LOCK_INDEX(pageNumber,
slot)],
>"Cache::probePage");
>+ lockHash.lock (Shared);
>+ Bdb *bdb;
>
>+ bdb = findBdb(dbb, pageNumber, slot);
> if (bdb)
> {
> bdb->incrementUseCount(ADD_HISTORY);
>- sync.unlock();
>+ lockHash.unlock();
>
> if (bdb->buffer->pageType == PAGE_free)
> {
>@@ -189,15 +222,57 @@ Bdb* Cache::probePage(Dbb *dbb, int32 pa
> return bdb;
> }
>
>+ lockHash.unlock();
> return NULL;
> }
>
>-Bdb* Cache::findBdb(Dbb* dbb, int32 pageNumber)
>+Bdb* Cache::findBdb(Dbb* dbb, int32 pageNumber, int slot)
>+{
>+ for (Bdb *bdb = hashTable [slot]; bdb; bdb = bdb->hash)
> {
>- for (Bdb *bdb = hashTable [pageNumber % hashSize]; bdb; bdb = bdb-
>>hash)
> if (bdb->pageNumber == pageNumber && bdb->dbb == dbb)
>+ {
>+ return bdb;
>+ }
>+ }
>+
>+ return NULL;
>+}
>+
>+Bdb* Cache::findBdb(Dbb* dbb, int32 pageNumber)
>+{
>+ return (findBdb(dbb, pageNumber, PAGENUM_2_SLOT(pageNumber)));
>+}
>+
>+Bdb* Cache::lockFindBdbIncrementUseCount(Dbb* dbb, int32 pageNumber)
>+{
>+ int slot = PAGENUM_2_SLOT(pageNumber);
>+ Sync lockHash (&syncHashTable[PAGENUM_2_LOCK_INDEX(pageNumber,
slot)],
>"Cache::lockFindBdbIncrementUseCount");
>+ lockHash.lock (Shared);
>+ Bdb *bdb;
>+
>+ bdb = findBdb(dbb, pageNumber, slot);
>+ if (bdb != NULL)
>+ bdb->incrementUseCount(ADD_HISTORY);
>+
>+ lockHash.unlock();
>+ return bdb;
>+}
>+
>+Bdb* Cache::lockFindBdbIncrementUseCount(int32 pageNumber, int slot)
>+{
>+ Sync lockHash (&syncHashTable[PAGENUM_2_LOCK_INDEX(pageNumber,
slot)],
>"Cache::lockFindBdbIncrementUseCount");
>+ lockHash.lock (Shared);
>+
>+ for (Bdb *bdb = hashTable [slot]; bdb; bdb = bdb->hash)
>+ if (bdb->pageNumber == pageNumber)
>+ {
>+ bdb->incrementUseCount(ADD_HISTORY);
>+ lockHash.unlock();
> return bdb;
>+ }
>
>+ lockHash.unlock();
> return NULL;
> }
>
>@@ -217,51 +292,46 @@ Bdb* Cache::fetchPage(Dbb *dbb, int32 pa
> #endif
>
> ASSERT (pageNumber >= 0);
>- int slot = pageNumber % hashSize;
>- LockType actual = lockType;
>- Sync sync (&syncObject, "Cache::fetchPage");
>- sync.lock (Shared);
>- int hit = 0;
>-
>- /* If we already have a buffer for this go, we're done */
>+ int slot = PAGENUM_2_SLOT(pageNumber);
>+ Sync lockHash (&syncHashTable[PAGENUM_2_LOCK_INDEX(pageNumber,
slot)],
>"Cache::fetchPage");
>
>+ lockHash.lock (Shared);
> Bdb *bdb;
>
>- for (bdb = hashTable [slot]; bdb; bdb = bdb->hash)
>- if (bdb->pageNumber == pageNumber && bdb->dbb == dbb)
>+ bdb = findBdb(dbb, pageNumber, slot);
>+ if (!bdb)
> {
>- //syncObject.validateShared("Cache::fetchPage");
>- bdb->incrementUseCount(ADD_HISTORY);
>- sync.unlock();
>- bdb->addRef(lockType COMMA_ADD_HISTORY);
>- bdb->decrementUseCount(REL_HISTORY);
>- hit = 1;
>- break;
>- }
>+ lockHash.unlock();
>+ // get getFreeBuffer() locks a hash bucket to remove the
>candidate bdb
>+ // if we locked out hash bucket before the call then we
could
>have
>+ // a deadlock
>+ // thus we get the free buffer before we lock the hash
bucket we
>will
>+ // be inserting into. This avoids a dead lock but generates
a
>race
>+ // we take care of the race by reversing the getFreeBuffer()
>work
>+ // when we lose the race
>+ Bdb *bdbAvailable;
>+ bdbAvailable = getFreeBuffer();
>+ lockHash.lock(Exclusive);
>
>+ bdb = findBdb(dbb, pageNumber, slot);
> if (!bdb)
> {
>- sync.unlock();
>- actual = Exclusive;
>- sync.lock(Exclusive);
>-
>- for (bdb = hashTable [slot]; bdb; bdb = bdb->hash)
>- if (bdb->pageNumber == pageNumber && bdb->dbb ==
dbb)
>- {
>-
//syncObject.validateExclusive("Cache::fetchPage
>(retry)");
>- bdb->incrementUseCount(ADD_HISTORY);
>- sync.unlock();
>- bdb->addRef(lockType COMMA_ADD_HISTORY);
>- bdb->decrementUseCount(REL_HISTORY);
>- hit = 2;
>- break;
>- }
>+ // we won the race so lets use the free bdb
>+ /* Set new page number and relink into hash table */
>+ bdbAvailable->addRef (Exclusive COMMA_ADD_HISTORY);
>+ bdbAvailable->decrementUseCount(REL_HISTORY);
>+
>+ bdbAvailable->hash = hashTable [slot];
>+ hashTable [slot] = bdbAvailable;
>+ bdbAvailable->pageNumber = pageNumber;
>+ bdbAvailable->dbb = dbb;
>
>- if (!bdb)
>- {
>- bdb = findBuffer(dbb, pageNumber, actual);
>+#ifdef COLLECT_BDB_HISTORY
>+ bdbAvailable->initHistory();
>+#endif
>+ bdb = bdbAvailable;
> moveToHead(bdb);
>- sync.unlock();
>+ lockHash.unlock();
>
> #ifdef STOP_PAGE
> if (bdb->pageNumber == STOP_PAGE)
>@@ -278,9 +348,30 @@ Bdb* Cache::fetchPage(Dbb *dbb, int32 pa
> #ifdef HAVE_PAGE_NUMBER
> ASSERT(bdb->buffer->pageNumber == pageNumber);
> #endif
>- if (actual != lockType)
>+ if (Exclusive != lockType)
> bdb->downGrade(lockType);
> }
>+ else
>+ {
>+ // lost a race. put our available back to useable
>+ bdbAvailable->pageNumber = -1;
>+ bdbAvailable->dbb = NULL;
>+ bdbAvailable->decrementUseCount(REL_HISTORY);
>+
>+ //syncObject.validateExclusive("Cache::fetchPage
>(retry)");
>+ bdb->incrementUseCount(ADD_HISTORY);
>+ lockHash.unlock();
>+ bdb->addRef(lockType COMMA_ADD_HISTORY);
>+ bdb->decrementUseCount(REL_HISTORY);
>+ }
>+ }
>+ else
>+ {
>+ //syncObject.validateShared("Cache::fetchPage");
>+ bdb->incrementUseCount(ADD_HISTORY);
>+ lockHash.unlock();
>+ bdb->addRef(lockType COMMA_ADD_HISTORY);
>+ bdb->decrementUseCount(REL_HISTORY);
> }
>
> Page *page = bdb->buffer;
>@@ -304,9 +395,9 @@ Bdb* Cache::fetchPage(Dbb *dbb, int32 pa
>
> // If buffer has moved out of the upper "fraction" of the LRU queue,
>move it back up
>
>+ // non-protected access to age is harmless since it is fuzzy anyway
> if (bdb->age < bufferAge - (uint64) upperFraction)
> {
>- sync.lock (Exclusive);
> moveToHead (bdb);
> }
>
>@@ -319,9 +410,10 @@ Bdb* Cache::fetchPage(Dbb *dbb, int32 pa
>
> Bdb* Cache::fakePage(Dbb *dbb, int32 pageNumber, PageType type, TransId
>transId)
> {
>- Sync sync(&syncObject, "Cache::fakePage");
>- sync.lock(Exclusive);
>- int slot = pageNumber % hashSize;
>+ int slot = PAGENUM_2_SLOT(pageNumber);
>+ Sync lockHash (&syncHashTable[PAGENUM_2_LOCK_INDEX(pageNumber,
slot)],
>"Cache::fakePage");
>+ lockHash.lock(Exclusive);
>+ Bdb *bdb;
>
> #ifdef STOP_PAGE
> if (pageNumber == STOP_PAGE)
>@@ -330,25 +422,64 @@ Bdb* Cache::fakePage(Dbb *dbb, int32 pag
>
> /* If we already have a buffer for this, we're done */
>
>- Bdb *bdb;
>-
>- for (bdb = hashTable [slot]; bdb; bdb = bdb->hash)
>- if (bdb->pageNumber == pageNumber && bdb->dbb == dbb)
>+ bdb = findBdb(dbb, pageNumber, slot);
>+ if (!bdb)
> {
>- if (bdb->syncObject.isLocked())
>+ lockHash.unlock();
>+ // get getFreeBuffer() locks a hash bucket to remove the
>candidate bdb
>+ // if we locked out hash bucket before the call then we
could
>have
>+ // a deadlock
>+ // thus we get the free buffer before we lock the hash
bucket we
>will
>+ // be inserting into. This avoids a dead lock but generates
a
>race
>+ // we take care of the race by reversing the getFreeBuffer()
>work
>+ // when we lose the race
>+ Bdb *bdbAvailable;
>+ bdbAvailable = getFreeBuffer();
>+ lockHash.lock(Exclusive);
>+
>+ bdb = findBdb(dbb, pageNumber, slot);
>+ if (!bdb)
> {
>- // The pageWriter may still be cleaning up
this
>freed page with a shared lock
>- ASSERT(bdb->buffer->pageType == PAGE_free);
>- ASSERT(bdb->syncObject.getState() >= 0);
>- }
>+ // we won the race so lets use the free bdb
>+ /* Set new page number and relink into hash table */
>+ bdbAvailable->addRef (Exclusive COMMA_ADD_HISTORY);
>+ bdbAvailable->decrementUseCount(REL_HISTORY);
>
>- bdb->addRef(Exclusive COMMA_ADD_HISTORY);
>+ bdbAvailable->hash = hashTable [slot];
>+ hashTable [slot] = bdbAvailable;
>+ bdbAvailable->pageNumber = pageNumber;
>+ bdbAvailable->dbb = dbb;
>+
>+#ifdef COLLECT_BDB_HISTORY
>+ bdbAvailable->initHistory();
>+#endif
>+ bdb = bdbAvailable;
>+ moveToHead(bdb);
>+ lockHash.unlock();
>
>- break;
> }
>+ else
>+ {
>+ // lost a race. put our available back to useable
>+ bdbAvailable->pageNumber = -1;
>+ bdbAvailable->dbb = NULL;
>+ bdbAvailable->decrementUseCount(REL_HISTORY);
>
>- if (!bdb)
>- bdb = findBuffer(dbb, pageNumber, Exclusive);
>+ //syncObject.validateExclusive("Cache::fakePage
>(retry)");
>+ bdb->incrementUseCount(ADD_HISTORY);
>+ lockHash.unlock();
>+ bdb->addRef(Exclusive COMMA_ADD_HISTORY);
>+ bdb->decrementUseCount(REL_HISTORY);
>+ }
>+ }
>+ else
>+ {
>+ //syncObject.validateShared("Cache::fakePage");
>+ bdb->incrementUseCount(ADD_HISTORY);
>+ lockHash.unlock();
>+ bdb->addRef(Exclusive COMMA_ADD_HISTORY);
>+ bdb->decrementUseCount(REL_HISTORY);
>+ }
>
> if (!dbb->isReadOnly)
> bdb->mark(transId);
>@@ -363,14 +494,14 @@ Bdb* Cache::fakePage(Dbb *dbb, int32 pag
> void Cache::flush(int64 arg)
> {
> Sync flushLock(&syncFlush, "Cache::flush(1)");
>- Sync sync(&syncDirty, "Cache::flush(2)");
>+ Sync dirtyLock(&syncDirty, "Cache::flush(2)");
> flushLock.lock(Exclusive);
>
> if (flushing)
> return;
>
> syncWait.lock(NULL, Exclusive);
>- sync.lock(Shared);
>+ dirtyLock.lock(Shared);
> //Log::debug(%d: "Initiating flush\n", dbb->deltaTime);
> flushArg = arg;
> flushPages = 0;
>@@ -388,7 +519,7 @@ void Cache::flush(int64 arg)
>
> flushStart = database->timestamp;
> flushing = true;
>- sync.unlock();
>+ dirtyLock.unlock();
> flushLock.unlock();
>
> for (int n = 0; n < numberIoThreads; ++n)
>@@ -398,69 +529,92 @@ void Cache::flush(int64 arg)
>
> void Cache::moveToHead(Bdb * bdb)
> {
>+ Sync bufferQueueLock (&bufferQueue.syncObject, "Cache::moveToHead");
>+
>+ bufferQueueLock.lock (Exclusive);
> bdb->age = bufferAge++;
> bufferQueue.remove(bdb);
> bufferQueue.prepend(bdb);
> //validateUnique (bdb);
> }
>
>-Bdb* Cache::findBuffer(Dbb *dbb, int pageNumber, LockType lockType)
>+Bdb* Cache::getFreeBuffer(void)
> {
>- //syncObject.validateExclusive("Cache::findBuffer");
>- int slot = pageNumber % hashSize;
>- Sync sync(&syncDirty, "Cache::findBuffer");
>-
>- /* Find least recently used, not-in-use buffer */
>-
>+ Sync bufferQueueLock (&bufferQueue.syncObject,
>"Cache::getFreeBuffer");
>+ unsigned int count;
> Bdb *bdb;
>
> // Find a candidate BDB.
>-
> for (;;)
> {
>- for (bdb = bufferQueue.last; bdb; bdb = bdb->prior)
>+ bufferQueueLock.lock (Exclusive);
>+ // find a candidate that is NOT in use and NOT dirty and in
the
>tail fraction of the LRU
>+ for (count = 0, bdb = bufferQueue.last; bdb && count <
>upperFraction; bdb = bdb->prior, count++)
> if (bdb->useCount == 0)
>- break;
>+ {
>+ if (!bdb->isDirty)
>+ {
>+ bdb->incrementUseCount(REL_HISTORY);
>+ break;
>+ }
>+ }
>+ else
>+ {
>+ moveToHead(bdb);
>+ }
>+ if (!bdb)
>+ // find a candidate that is NOT in use, could be
dirty
>+ for (bdb = bufferQueue.last; bdb; bdb = bdb->prior)
>+ if (bdb->useCount == 0)
>+ {
>+ bdb->incrementUseCount(REL_HISTORY);
>+ break;
>+ }
>+ bufferQueueLock.unlock();
>
> if (!bdb)
> throw SQLError(RUNTIME_ERROR, "buffer pool is
>exhausted\n");
>
>- if (!bdb->isDirty)
>- break;
>-
>- writePage (bdb, WRITE_TYPE_REUSE);
>- }
>-
>- /* Unlink its old incarnation from the page/hash table */
>+ if (bdb->pageNumber >= 0)
>+ {
>+ int slotRemove =
PAGENUM_2_SLOT(bdb->pageNumber);
>+ Sync lockHashRemove
>(&syncHashTable[PAGENUM_2_LOCK_INDEX(bdb->pageNumber, slotRemove)],
>"Cache::getFreeBuffer");
>+ lockHashRemove.lock(Exclusive);
>
>- if (bdb->pageNumber >= 0)
>- for (Bdb **ptr = hashTable + bdb->pageNumber % hashSize;;
ptr =
>&(*ptr)->hash)
>- if (*ptr == bdb)
>+ if (bdb->useCount != 1)
> {
>- *ptr = bdb->hash;
>- break;
>+ // we lost a race try again
>+ bdb->decrementUseCount(REL_HISTORY);
>+ lockHashRemove.unlock();
>+ continue;
> }
>- else
>- ASSERT (*ptr);
>-
>- bdb->addRef (lockType COMMA_ADD_HISTORY);
>
>- /* Set new page number and relink into hash table */
>+ if (bdb->isDirty)
>+ writePage (bdb, WRITE_TYPE_REUSE);
>
>- bdb->hash = hashTable [slot];
>- hashTable [slot] = bdb;
>- bdb->pageNumber = pageNumber;
>- bdb->dbb = dbb;
>+ /* Unlink its old incarnation from the page/hash
table */
>+ for (Bdb **ptr = hashTable + PAGENUM_2_SLOT(bdb-
>>pageNumber) ;; ptr = &(*ptr)->hash)
>+ if (*ptr == bdb)
>+ {
>+ *ptr = bdb->hash;
>+ break;
>+ }
>+ else
>+ ASSERT (*ptr);
>+ }
>
>-#ifdef COLLECT_BDB_HISTORY
>- bdb->initHistory();
>-#endif
>+ break;
>+ }
>
> return bdb;
> }
>
> void Cache::validate()
> {
>+ //Sync bufferQueueLock (&bufferQueue.syncObject,
"Cache::moveToHead");
>+
>+ //bufferQueueLock.lock (Shared);
>+ // non-protected access to bufferQueue is DANGEROUS...
> for (Bdb *bdb = bufferQueue.last; bdb; bdb = bdb->prior)
> {
> //IndexPage *page = (IndexPage*) bdb->buffer;
>@@ -470,8 +624,8 @@ void Cache::validate()
>
> void Cache::markDirty(Bdb *bdb)
> {
>- Sync sync (&syncDirty, "Cache::markDirty");
>- sync.lock (Exclusive);
>+ Sync dirtyLock (&syncDirty, "Cache::markDirty");
>+ dirtyLock.lock (Exclusive);
> bdb->nextDirty = NULL;
> bdb->priorDirty = lastDirty;
>
>@@ -487,8 +641,8 @@ void Cache::markDirty(Bdb *bdb)
>
> void Cache::markClean(Bdb *bdb)
> {
>- Sync sync (&syncDirty, "Cache::markClean");
>- sync.lock (Exclusive);
>+ Sync dirtyLock (&syncDirty, "Cache::markClean");
>+ dirtyLock.lock (Exclusive);
>
> /***
> if (bdb->flushIt)
>@@ -600,8 +754,8 @@ void Cache::writePage(Bdb *bdb, int type
>
> if (dbb->shadows)
> {
>- Sync sync (&dbb->syncClone, "Cache::writePage(2)");
>- sync.lock (Shared);
>+ Sync cloneLock (&dbb->syncClone, "Cache::writePage(2)");
>+ cloneLock.lock (Shared);
>
> for (DatabaseCopy *shadow = dbb->shadows; shadow; shadow =
>shadow->next)
> shadow->rewritePage(bdb);
>@@ -610,14 +764,15 @@ void Cache::writePage(Bdb *bdb, int type
>
> void Cache::analyze(Stream *stream)
> {
>- Sync sync (&syncDirty, "Cache::analyze");
>- sync.lock (Shared);
>+ Sync dirtyLock (&syncDirty, "Cache::analyze");
>+ dirtyLock.lock (Shared);
> int inUse = 0;
> int dirty = 0;
> int dirtyList = 0;
> int total = 0;
> Bdb *bdb;
>
>+ // non-protected access to bdbs,endBdbs is DANGEROUS...
> for (bdb = bdbs; bdb < endBdbs; ++bdb)
> {
> ++total;
>@@ -638,17 +793,18 @@ void Cache::analyze(Stream *stream)
>
> void Cache::validateUnique(Bdb *target)
> {
>- int slot = target->pageNumber % hashSize;
>+ int slot = PAGENUM_2_SLOT(target->pageNumber);
>
>+ // WARNING: unlocked walk of hash table.... DANGEROUS
> for (Bdb *bdb = hashTable [slot]; bdb; bdb = bdb->hash)
> ASSERT (bdb == target || !(bdb->pageNumber ==
target->pageNumber
>&& bdb->dbb == target->dbb));
> }
>
> void Cache::freePage(Dbb *dbb, int32 pageNumber)
> {
>- Sync sync (&syncObject, "Cache::freePage");
>- sync.lock (Shared);
>- int slot = pageNumber % hashSize;
>+ int slot = PAGENUM_2_SLOT(pageNumber);
>+ Sync lockHash (&syncHashTable[PAGENUM_2_LOCK_INDEX(pageNumber,
slot)],
>"Cache::freePage");
>+ lockHash.lock(Shared);
>
> // If page exists in cache (usual case), clean it up
>
>@@ -657,7 +813,7 @@ void Cache::freePage(Dbb *dbb, int32 pag
> {
> if (bdb->isDirty)
> {
>- sync.unlock();
>+ lockHash.unlock();
> markClean (bdb);
> }
>
>@@ -670,8 +826,8 @@ void Cache::flush(Dbb *dbb)
> {
> //Sync sync (&syncDirty, "Cache::flush(1)");
> //sync.lock (Exclusive);
>- Sync sync (&syncObject, "Cache::flush(3)");
>- sync.lock (Shared);
>+ Sync objectLock (&syncObject, "Cache::flush(3)");
>+ objectLock.lock (Shared);
>
> for (Bdb *bdb = bdbs; bdb < endBdbs; ++bdb)
> if (bdb->dbb == dbb)
>@@ -685,8 +841,8 @@ void Cache::flush(Dbb *dbb)
>
> bool Cache::hasDirtyPages(Dbb *dbb)
> {
>- Sync sync (&syncDirty, "Cache::hasDirtyPages");
>- sync.lock (Shared);
>+ Sync dirtyLock (&syncDirty, "Cache::hasDirtyPages");
>+ dirtyLock.lock (Shared);
>
> for (Bdb *bdb = firstDirty; bdb; bdb = bdb->nextDirty)
> if (bdb->dbb == dbb)
>@@ -717,25 +873,21 @@ Bdb* Cache::trialFetch(Dbb* dbb, int32 p
> }
>
> ASSERT (pageNumber >= 0);
>- int slot = pageNumber % hashSize;
>- Sync sync (&syncObject, "Cache::trialFetch");
>- sync.lock (Shared);
>- int hit = 0;
>+ int slot = PAGENUM_2_SLOT(pageNumber);
>+ Sync lockHash (&syncHashTable[PAGENUM_2_LOCK_INDEX(pageNumber,
slot)],
>"Cache::trialFetch");
>+ lockHash.lock(Shared);
>+ Bdb *bdb;
>
> /* If we already have a buffer for this go, we're done */
>
>- Bdb *bdb;
>-
>- for (bdb = hashTable [slot]; bdb; bdb = bdb->hash)
>- if (bdb->pageNumber == pageNumber && bdb->dbb == dbb)
>+ bdb = findBdb(dbb, pageNumber, slot);
>+ if (bdb)
> {
> //syncObject.validateShared("Cache::trialFetch");
> bdb->incrementUseCount(ADD_HISTORY);
>- sync.unlock();
>+ lockHash.unlock();
> bdb->addRef(lockType COMMA_ADD_HISTORY);
> bdb->decrementUseCount(REL_HISTORY);
>- hit = 1;
>- break;
> }
>
> return bdb;
>@@ -764,10 +916,9 @@ void Cache::ioThread(void* arg)
>
> void Cache::ioThread(void)
> {
>- Sync syncThread(&syncThreads, "Cache::ioThread(1)");
>+ Sync syncThread(&syncThreads, "Cache::ioThread");
> syncThread.lock(Shared);
>- Sync flushLock(&syncFlush, "Cache::ioThread(2)");
>- Sync sync(&syncObject, "Cache::ioThread(3)");
>+ Sync flushLock(&syncFlush, "Cache::ioThread");
> Priority priority(database->ioScheduler);
> Thread *thread = Thread::getThread("Cache::ioThread");
> UCHAR *rawBuffer = new UCHAR[ASYNC_BUFFER_SIZE];
>@@ -781,137 +932,135 @@ void Cache::ioThread(void)
> {
> int32 pageNumber = flushBitmap->nextSet(0);
> int count;
>- Dbb *dbb;
>
> if (pageNumber >= 0)
> {
>- int slot = pageNumber % hashSize;
>+ Bdb *bdb;
>+ Dbb *dbb;
>+ int slot = PAGENUM_2_SLOT(pageNumber);
> bool hit = false;
> Bdb *bdbList = NULL;
> UCHAR *p = buffer;
>- sync.lock(Shared);
>-
>- // Look for a page to flush. Then get all his
friends
>
>- for (Bdb *bdb = hashTable[slot]; bdb; bdb =
bdb->hash)
>- if (bdb->pageNumber == pageNumber &&
bdb->flushIt
>&& bdb->isDirty)
>+ // Look for the page to flush.
>+ bdb = lockFindBdbIncrementUseCount(pageNumber,
slot);
>+ if (bdb && bdb->flushIt && bdb->isDirty)
>+ {
>+ hit = true;
>+ count = 0;
>+ dbb = bdb->dbb;
>+
>+ flushBitmap->clear(pageNumber);
>+
>+ // get all his friends
>+ while (p < end)
> {
>- hit = true;
>- count = 0;
>- dbb = bdb->dbb;
>+ ++count;
>+ bdb->addRef(Shared
COMMA_ADD_HISTORY);
>
>- if (!bdb->hash)
>-
flushBitmap->clear(pageNumber);
>+ bdb->syncWrite.lock(NULL,
Exclusive);
>+ bdb->ioThreadNext = bdbList;
>+ bdbList = bdb;
>
>- while (p < end)
>- {
>- ++count;
>-
bdb->incrementUseCount(ADD_HISTORY);
>- sync.unlock();
>- bdb->addRef(Shared
>COMMA_ADD_HISTORY);
>- if (falcon_use_sectorcache)
>-
sectorCache->writePage(bdb);
>-
>- bdb->syncWrite.lock(NULL,
Exclusive);
>- bdb->ioThreadNext = bdbList;
>- bdbList = bdb;
>-
>- //ASSERT(!(bdb->flags &
>BDB_write_pending));
>- //bdb->flags |=
BDB_write_pending;
>- memcpy(p, bdb->buffer,
pageSize);
>- p += pageSize;
>- bdb->flushIt = false;
>- markClean(bdb);
>- bdb->isDirty = false;
>- bdb->release(REL_HISTORY);
>- sync.lock(Shared);
>-
>- if ( !(bdb = findBdb(dbb,
bdb-
>>pageNumber + 1)) )
>- break;
>-
>- if (!bdb->isDirty &&
>!continueWrite(bdb))
>- break;
>- }
>+ //ASSERT(!(bdb->flags &
BDB_write_pending));
>+ //bdb->flags |= BDB_write_pending;
>+ memcpy(p, bdb->buffer, pageSize);
>+ p += pageSize;
>+ bdb->flushIt = false;
>+ markClean(bdb);
>+ bdb->isDirty = false;
>+ bdb->release(REL_HISTORY);
>
>- if (sync.state != None)
>- sync.unlock();
>-
>- flushLock.unlock();
>- //Log::debug(" %d Writing %s %d
pages: %d -
>%d\n", thread->threadId, (const char*) dbb->fileName, count, pageNumber,
>pageNumber + count - 1);
>- int length = p - buffer;
>- priority.schedule(PRIORITY_LOW);
>+ bdb =
lockFindBdbIncrementUseCount(dbb, bdb-
>>pageNumber + 1);
>+ if (!bdb)
>+ break;
>
>- try
>+ if (!bdb->isDirty &&
!continueWrite(bdb))
> {
>-
priority.schedule(PRIORITY_LOW);
>- dbb->writePages(pageNumber,
length,
>buffer, WRITE_TYPE_FLUSH);
>+
bdb->decrementUseCount(REL_HISTORY);
>+ break;
> }
>- catch (SQLException& exception)
>+ }
>+
>+ flushLock.unlock();
>+ //Log::debug(" %d Writing %s %d pages: %d -
%d\n",
>thread->threadId, (const char*) dbb->fileName, count, pageNumber,
pageNumber
>+ count - 1);
>+ int length = p - buffer;
>+ priority.schedule(PRIORITY_LOW);
>+
>+ try
>+ {
>+ priority.schedule(PRIORITY_LOW);
>+ dbb->writePages(pageNumber, length,
buffer,
>WRITE_TYPE_FLUSH);
>+ }
>+ catch (SQLException& exception)
>+ {
>+ priority.finished();
>+
>+ if (exception.getSqlcode() !=
DEVICE_FULL)
>+ throw;
>+
>+ database->setIOError(&exception);
>+
>+ for (bool error = true; error;)
> {
>- priority.finished();
>-
>- if (exception.getSqlcode()
!=
>DEVICE_FULL)
>- throw;
>-
>-
database->setIOError(&exception);
>-
>- for (bool error = true;
error;)
>+ if
(thread->shutdownInProgress)
> {
>- if
(thread->shutdownInProgress)
>- {
>- Bdb *next;
>+ Bdb *next;
>
>- for (bdb =
bdbList; bdb;
>bdb = next)
>- {
>-
//bdb->flags &=
>~BDB_write_pending;
>- next
= bdb-
>>ioThreadNext;
>- bdb-
>>syncWrite.unlock();
>- bdb-
>>decrementUseCount(REL_HISTORY);
>- }
>-
>- return;
>- }
>-
>- thread->sleep(1000);
>-
>- try
>+ for (bdb = bdbList;
bdb; bdb =
>next)
> {
>-
> priority.schedule(PRIORITY_LOW);
>- dbb-
>>writePages(pageNumber, length, buffer, WRITE_TYPE_FLUSH);
>- error =
false;
>-
database->clearIOError();
>+ //bdb->flags
&=
>~BDB_write_pending;
>+ next =
bdb->ioThreadNext;
>+
bdb->syncWrite.unlock();
>+ bdb-
>>decrementUseCount(REL_HISTORY);
> }
>- catch (SQLException&
>exception2)
>- {
>-
priority.finished();
>
>- if
>(exception2.getSqlcode() != DEVICE_FULL)
>-
throw;
>- }
>+ return;
>+ }
>+
>+ thread->sleep(1000);
>+
>+ try
>+ {
>+
> priority.schedule(PRIORITY_LOW);
>+
dbb->writePages(pageNumber,
>length, buffer, WRITE_TYPE_FLUSH);
>+ error = false;
>+
database->clearIOError();
>+ }
>+ catch (SQLException&
exception2)
>+ {
>+ priority.finished();
>+
>+ if
(exception2.getSqlcode() !=
>DEVICE_FULL)
>+ throw;
> }
> }
>+ }
>
>- priority.finished();
>- Bdb *next;
>+ priority.finished();
>+ Bdb *next;
>
>- for (bdb = bdbList; bdb; bdb = next)
>- {
>- //ASSERT(bdb->flags &
>BDB_write_pending);
>- //bdb->flags &=
~BDB_write_pending;
>- next = bdb->ioThreadNext;
>- bdb->syncWrite.unlock();
>-
bdb->decrementUseCount(REL_HISTORY);
>- }
>-
>- flushLock.lock(Exclusive);
>- ++physicalWrites;
>-
>- break;
>+ for (bdb = bdbList; bdb; bdb = next)
>+ {
>+ //ASSERT(bdb->flags &
BDB_write_pending);
>+ //bdb->flags &= ~BDB_write_pending;
>+ next = bdb->ioThreadNext;
>+ bdb->syncWrite.unlock();
>+ bdb->decrementUseCount(REL_HISTORY);
> }
>+
>+ flushLock.lock(Exclusive);
>+ ++physicalWrites;
>+
>+ }
>+ else
>+ {
>+ if (bdb)
>+
bdb->decrementUseCount(REL_HISTORY);
>+ }
>
> if (!hit)
> {
>- sync.unlock();
> flushBitmap->clear(pageNumber);
> }
> }
>@@ -940,8 +1089,8 @@ void Cache::ioThread(void)
>
> thread->sleep();
> flushLock.lock(Exclusive);
>- }
> }
>+ } // for ever
>
> delete [] rawBuffer;
> }
>@@ -974,8 +1123,8 @@ bool Cache::continueWrite(Bdb* startingB
> void Cache::shutdown(void)
> {
> shutdownThreads();
>- Sync sync (&syncDirty, "Cache::shutdown");
>- sync.lock (Exclusive);
>+ Sync dirtyLock (&syncDirty, "Cache::shutdown");
>+ dirtyLock.lock (Exclusive);
>
> for (Bdb *bdb = firstDirty; bdb; bdb = bdb->nextDirty)
> bdb->dbb->writePage(bdb, WRITE_TYPE_SHUTDOWN);
>@@ -995,8 +1144,8 @@ void Cache::shutdownThreads(void)
> ioThreads[n] = 0;
> }
>
>- Sync sync(&syncThreads, "Cache::shutdownThreads");
>- sync.lock(Exclusive);
>+ Sync lockThreads(&syncThreads, "Cache::shutdownThreads");
>+ lockThreads.lock(Exclusive);
> }
>
> void Cache::analyzeFlush(void)
>@@ -1048,7 +1197,8 @@ void Cache::openTraceFile(void)
> if (traceFile)
> closeTraceFile();
>
>- traceFile = fopen(TRACE_FILE, "w");
>+ traceFile = fopen(TRACE_FILE, "a+");
>+ setlinebuf(traceFile);
> #endif
> }
>
>@@ -1065,6 +1215,6 @@ void Cache::closeTraceFile(void)
>
> void Cache::flushWait(void)
> {
>- Sync sync(&syncWait, "Cache::flushWait");
>- sync.lock(Shared);
>+ Sync waitLock(&syncWait, "Cache::flushWait");
>+ waitLock.lock(Shared);
> }
>
>=== modified file 'storage/falcon/Cache.h'
>--- a/storage/falcon/Cache.h 2008-06-06 19:20:10 +0000
>+++ b/storage/falcon/Cache.h 2008-08-18 13:47:57 +0000
>@@ -28,6 +28,17 @@
> #include "SyncObject.h"
> #include "Queue.h"
>
>+// define DEBUG_SYNC_HASH_TABLE_SIZE to cause more contention and test for
>race conditions
>+//#define DEBUG_SYNC_HASH_TABLE_SIZE (0x01 << 1)
>+#ifdef DEBUG_SYNC_HASH_TABLE_SIZE
>+# define DEBUG_SYNC_HASH_TABLE_MASK (DEBUG_SYNC_HASH_TABLE_SIZE - 1)
>+# define PAGENUM_2_LOCK_INDEX(_pgnum, _slot) ((_pgnum) &
>DEBUG_SYNC_HASH_TABLE_MASK)
>+#else /* DEBUG_SYNC_HASH_TABLE_SIZE */
>+# define PAGENUM_2_LOCK_INDEX(_pgnum, _slot) ((_slot))
>+#endif /* DEBUG_SYNC_HASH_TABLE_SIZE */
>+
>+#define PAGENUM_2_SLOT(_pgnum) ((_pgnum) & hashMask)
>+
> class Bdb;
> class Dbb;
> class PageWriter;
>@@ -83,14 +94,18 @@ public:
> bool flushing;
>
> protected:
>- Bdb* findBuffer (Dbb *dbb, int pageNumber, LockType
lockType);
>+ Bdb* getFreeBuffer(void);
>+ Bdb* findBdb(Dbb* dbb, int32 pageNumber, int slot);
> Bdb* findBdb(Dbb* dbb, int32 pageNumber);
>+ Bdb* lockFindBdbIncrementUseCount(Dbb* dbb, int32
pageNumber);
>+ Bdb* lockFindBdbIncrementUseCount(int32 pageNumber, int
slot);
>
> int64 flushArg;
> Bdb *bdbs;
> Bdb *endBdbs;
> Queue<Bdb> bufferQueue;
> Bdb **hashTable;
>+ SyncObject *syncHashTable;
> Bdb *firstDirty;
> Bdb *lastDirty;
> Bitmap *flushBitmap;
>@@ -105,12 +120,13 @@ protected:
> int flushPages;
> int physicalWrites;
> int hashSize;
>+ unsigned int hashMask;
> int pageSize;
>- int upperFraction;
>+ unsigned int upperFraction;
> int numberHunks;
> int numberDirtyPages;
> int numberIoThreads;
>- volatile int bufferAge;
>+ volatile uint64 bufferAge;
> public:
> void flushWait(void);
> };
>
>
>--
>MySQL Code Commits Mailing List
>For list archives: http://lists.mysql.com/commits
>To unsubscribe: http://lists.mysql.com/commits?unsub=1