2849 Christopher Powers 2008-10-02
Bug#39706, "Falcon recovery assertion in Cache::fetchPage for simple scenarios"
Reapply fix from Bug#39692, "Falcon allocates SectorCache even if falcon_use_sectorcache is OFF"
modified:
storage/falcon/Cache.cpp
2848 Christopher Powers 2008-10-02
Bug#39706 "Falcon recovery assertion in Cache::fetchPage for simple scenarios"
Reverted Cache.* to the revision preceding the 2008-09-02 updates for WL#4479, 4480 and 4481
modified:
storage/falcon/Cache.cpp
storage/falcon/Cache.h
2847 Sergey Vojtovich 2008-10-02 [merge]
Merge.
renamed:
mysql-test/suite/falcon_team/r/falcon_bug_23945.result => mysql-test/suite/falcon/r/falcon_bug_23945.result
mysql-test/suite/falcon_team/r/falcon_bug_34892.result => mysql-test/suite/falcon/r/falcon_bug_34892.result
mysql-test/suite/falcon_team/t/falcon_bug_23945.test => mysql-test/suite/falcon/t/falcon_bug_23945.test
mysql-test/suite/falcon_team/t/falcon_bug_34892.test => mysql-test/suite/falcon/t/falcon_bug_34892.test
modified:
mysql-test/suite/falcon_team/t/disabled.def
mysql-test/suite/falcon_team/t/test2bug.def
sql/sql_insert.cc
mysql-test/suite/falcon/r/falcon_bug_23945.result
mysql-test/suite/falcon/t/falcon_bug_23945.test
=== modified file 'storage/falcon/Cache.cpp'
--- a/storage/falcon/Cache.cpp 2008-09-27 18:04:19 +0000
+++ b/storage/falcon/Cache.cpp 2008-10-02 23:06:04 +0000
@@ -50,11 +50,9 @@
extern uint falcon_io_threads;
//#define STOP_PAGE 55
-//#define CACHE_TRACE_FILE "cache.trace"
+#define TRACE_FILE "cache.trace"
-#ifdef CACHE_TRACE_FILE
static FILE *traceFile;
-#endif // CACHE_TRACE_FILE
static const uint64 cacheHunkSize = 1024 * 1024 * 128;
static const int ASYNC_BUFFER_SIZE = 1024000;
@@ -70,50 +68,21 @@ static const char THIS_FILE[]=__FILE__;
Cache::Cache(Database *db, int pageSz, int hashSz, int numBuffers)
{
- openTraceFile();
+ //openTraceFile();
database = db;
panicShutdown = false;
pageSize = pageSz;
-
- unsigned int highBit;
- for (highBit=0x01; highBit < (unsigned int)hashSz; highBit= highBit << 1) { }
-
- // if there are more than 4096 buckets then lets round down
- // else lets round up
- if (highBit >= 0x00001000)
- {
- // use power of two rounded down
- hashSize = highBit << 1;
- }
- else
- {
- // use power of two rounded up
- hashSize = highBit;
- }
-
- hashMask = hashSize - 1;
+ hashSize = hashSz;
numberBuffers = numBuffers;
upperFraction = numberBuffers / 4;
bufferAge = 0;
firstDirty = NULL;
lastDirty = NULL;
+ numberDirtyPages = 0;
pageWriter = NULL;
- hashTable = new Bdb* [hashSize];
+ hashTable = new Bdb* [hashSz];
memset (hashTable, 0, sizeof (Bdb*) * hashSize);
-#ifdef DEBUG_SYNC_HASH_TABLE_SIZE
- syncHashTable = new SyncObject [DEBUG_SYNC_HASH_TABLE_SIZE];
- for (int loop = 0; loop < DEBUG_SYNC_HASH_TABLE_SIZE; loop ++)
- syncHashTable[loop].setName("Cache::syncHashTable");
-
-#else /* DEBUG_SYNC_HASH_TABLE_SIZE */
- syncHashTable = new SyncObject [hashSize];
- for (int loop = 0; loop < hashSize; loop ++)
- {
- char tmpName[128];
- snprintf(tmpName,120,"Cache::syncHashTable[%d]",loop);
- syncHashTable[loop].setName(tmpName);
- }
-#endif /* DEBUG_SYNC_HASH_TABLE_SIZE */
+
if (falcon_use_sectorcache)
sectorCache = new SectorCache(sectorCacheSize / SECTOR_BUFFER_SIZE, pageSize);
@@ -133,10 +102,9 @@ Cache::Cache(Database *db, int pageSz, i
ioThreads = new Thread*[numberIoThreads];
memset(ioThreads, 0, numberIoThreads * sizeof(ioThreads[0]));
flushing = false;
-
+
try
{
- // non-protected access to bdbs,endBdbs is OK during initialization
bdbs = new Bdb [numberBuffers];
endBdbs = bdbs + numberBuffers;
int remaining = 0;
@@ -155,7 +123,6 @@ Cache::Cache(Database *db, int pageSz, i
}
bdb->cache = this;
- // non-protected access to bufferQueue is OK during initialization
bufferQueue.append(bdb);
bdb->buffer = (Page*) stuff;
stuff += pageSize;
@@ -181,17 +148,17 @@ Cache::Cache(Database *db, int pageSz, i
Cache::~Cache()
{
-
- closeTraceFile();
+ if (traceFile)
+ closeTraceFile();
delete [] hashTable;
- delete [] syncHashTable;
delete [] bdbs;
delete [] ioThreads;
delete flushBitmap;
+
if (falcon_use_sectorcache)
delete sectorCache;
-
+
if (bufferHunks)
{
for (int n = 0; n < numberHunks; ++n)
@@ -204,16 +171,19 @@ Cache::~Cache()
Bdb* Cache::probePage(Dbb *dbb, int32 pageNumber)
{
ASSERT (pageNumber >= 0);
- Bdb *bdb;
-
- /* If we already have a buffer for this, we're done */
- bdb = lockFindBdbIncrementUseCount(dbb, pageNumber);
+ Sync sync (&syncObject, "Cache::probePage");
+ sync.lock (Shared);
+ Bdb *bdb = findBdb(dbb, pageNumber);
+
if (bdb)
{
+ bdb->incrementUseCount(ADD_HISTORY);
+ sync.unlock();
+
if (bdb->buffer->pageType == PAGE_free)
{
bdb->decrementUseCount(REL_HISTORY);
-
+
return NULL;
}
@@ -226,49 +196,11 @@ Bdb* Cache::probePage(Dbb *dbb, int32 pa
return NULL;
}
-Bdb* Cache::findBdb(Dbb* dbb, int32 pageNumber, int slot)
-{
- for (Bdb *bdb = hashTable [slot]; bdb; bdb = bdb->hash)
- {
- if (bdb->pageNumber == pageNumber && bdb->dbb == dbb)
- {
- return bdb;
- }
- }
-
- return NULL;
-}
-
Bdb* Cache::findBdb(Dbb* dbb, int32 pageNumber)
{
- return (findBdb(dbb, pageNumber, PAGENUM_2_SLOT(pageNumber)));
-}
-
-Bdb* Cache::lockFindBdbIncrementUseCount(Dbb* dbb, int32 pageNumber)
-{
- int slot = PAGENUM_2_SLOT(pageNumber);
- Sync lockHash (&syncHashTable[PAGENUM_2_LOCK_INDEX(pageNumber, slot)], "Cache::lockFindBdbIncrementUseCount");
- lockHash.lock (Shared);
- Bdb *bdb;
-
- bdb = findBdb(dbb, pageNumber, slot);
- if (bdb != NULL)
- bdb->incrementUseCount(ADD_HISTORY);
-
- return bdb;
-}
-
-Bdb* Cache::lockFindBdbIncrementUseCount(int32 pageNumber, int slot)
-{
- Sync lockHash (&syncHashTable[PAGENUM_2_LOCK_INDEX(pageNumber, slot)], "Cache::lockFindBdbIncrementUseCount");
- lockHash.lock (Shared);
-
- for (Bdb *bdb = hashTable [slot]; bdb; bdb = bdb->hash)
- if (bdb->pageNumber == pageNumber)
- {
- bdb->incrementUseCount(ADD_HISTORY);
+ for (Bdb *bdb = hashTable [pageNumber % hashSize]; bdb; bdb = bdb->hash)
+ if (bdb->pageNumber == pageNumber && bdb->dbb == dbb)
return bdb;
- }
return NULL;
}
@@ -278,106 +210,95 @@ Bdb* Cache::fetchPage(Dbb *dbb, int32 pa
if (panicShutdown)
{
Thread *thread = Thread::getThread("Cache::fetchPage");
-
+
if (thread->pageMarks == 0)
throw SQLError(RUNTIME_ERROR, "Emergency shut is underway");
}
-#ifdef STOP_PAGE
+#ifdef STOP_PAGE
if (pageNumber == STOP_PAGE)
Log::debug("fetching page %d/%d\n", pageNumber, dbb->tableSpaceId);
#endif
ASSERT (pageNumber >= 0);
+ int slot = pageNumber % hashSize;
+ LockType actual = lockType;
+ Sync sync (&syncObject, "Cache::fetchPage");
+ sync.lock (Shared);
+ int hit = 0;
+
+ /* If we already have a buffer for this go, we're done */
+
Bdb *bdb;
- /* If we already have a buffer for this, we're done */
- bdb = lockFindBdbIncrementUseCount(dbb, pageNumber);
+ for (bdb = hashTable [slot]; bdb; bdb = bdb->hash)
+ if (bdb->pageNumber == pageNumber && bdb->dbb == dbb)
+ {
+ //syncObject.validateShared("Cache::fetchPage");
+ bdb->incrementUseCount(ADD_HISTORY);
+ sync.unlock();
+ bdb->addRef(lockType COMMA_ADD_HISTORY);
+ bdb->decrementUseCount(REL_HISTORY);
+ hit = 1;
+ break;
+ }
+
if (!bdb)
{
- // getFreeBuffer() locks a hash bucket to remove the candidate bdb
- // if we locked our hash bucket before the call then we could have
- // a deadlock
- // thus we get the free buffer before we lock the hash bucket we will
- // be inserting into. This avoids a dead lock but generates a race
- // we take care of the race by reversing the getFreeBuffer() work
- // when we lose the race
- Bdb *bdbAvailable;
- int slot = PAGENUM_2_SLOT(pageNumber);
- Sync lockHash (&syncHashTable[PAGENUM_2_LOCK_INDEX(pageNumber, slot)], "Cache::fetchPage");
-
- bdbAvailable = getFreeBuffer();
- /* assume we'll be inserting this new BDB. Set new page number. */
- bdbAvailable->pageNumber = pageNumber;
- bdbAvailable->dbb = dbb;
+ sync.unlock();
+ actual = Exclusive;
+ sync.lock(Exclusive);
+
+ for (bdb = hashTable [slot]; bdb; bdb = bdb->hash)
+ if (bdb->pageNumber == pageNumber && bdb->dbb == dbb)
+ {
+ //syncObject.validateExclusive("Cache::fetchPage (retry)");
+ bdb->incrementUseCount(ADD_HISTORY);
+ sync.unlock();
+ bdb->addRef(lockType COMMA_ADD_HISTORY);
+ bdb->decrementUseCount(REL_HISTORY);
+ hit = 2;
+ break;
+ }
- lockHash.lock(Exclusive);
- bdb = findBdb(dbb, pageNumber, slot);
if (!bdb)
{
- // we won the race so lets use the free bdb
- // relink into hash table
- bdbAvailable->hash = hashTable [slot];
- hashTable [slot] = bdbAvailable;
- lockHash.unlock();
+ bdb = findBuffer(dbb, pageNumber, actual);
+ moveToHead(bdb);
+ sync.unlock();
- bdb = bdbAvailable;
-#ifdef STOP_PAGE
+#ifdef STOP_PAGE
if (bdb->pageNumber == STOP_PAGE)
Log::debug("reading page %d/%d\n", bdb->pageNumber, dbb->tableSpaceId);
#endif
-
+
Priority priority(database->ioScheduler);
- priority.schedule(PRIORITY_MEDIUM);
+ priority.schedule(PRIORITY_MEDIUM);
if (falcon_use_sectorcache)
sectorCache->readPage(bdb);
else
dbb->readPage(bdb);
-
priority.finished();
#ifdef HAVE_PAGE_NUMBER
ASSERT(bdb->buffer->pageNumber == pageNumber);
-#endif
- if (Exclusive != lockType)
+#endif
+ if (actual != lockType)
bdb->downGrade(lockType);
-
- }
- else
- {
- //syncObject.validateExclusive("Cache::fetchPage (retry)");
- bdb->incrementUseCount(ADD_HISTORY);
- lockHash.unlock();
- bdb->addRef(lockType COMMA_ADD_HISTORY);
- bdb->decrementUseCount(REL_HISTORY);
- moveToHead(bdb);
-
- // lost a race. put our available back to useable
- // side effect, bdbAvailable will have to age again before we re-use it.
- bdbAvailable->hash = NULL;
- bdbAvailable->pageNumber = -1;
- bdbAvailable->dbb = NULL;
- bdbAvailable->release(REL_HISTORY);
}
}
- else
- {
- bdb->addRef(lockType COMMA_ADD_HISTORY);
- bdb->decrementUseCount(REL_HISTORY);
- moveToHead(bdb);
- }
Page *page = bdb->buffer;
-
+
/***
if (page->checksum != (short) pageNumber)
FATAL ("page %d wrong page number, got %d\n",
bdb->pageNumber, page->checksum);
***/
-
+
if (pageType && page->pageType != pageType)
{
/*** future code
- bdb->release(REL_HISTORY);
+ bdb->release();
throw SQLError (DATABASE_CORRUPTION, "page %d wrong page type, expected %d got %d\n",
pageNumber, pageType, page->pageType);
***/
@@ -385,6 +306,14 @@ Bdb* Cache::fetchPage(Dbb *dbb, int32 pa
bdb->pageNumber, dbb->tableSpaceId, pageType, page->pageType);
}
+ // If buffer has moved out of the upper "fraction" of the LRU queue, move it back up
+
+ if (bdb->age < bufferAge - (uint64) upperFraction)
+ {
+ sync.lock (Exclusive);
+ moveToHead (bdb);
+ }
+
ASSERT (bdb->pageNumber == pageNumber);
ASSERT (bdb->dbb == dbb);
ASSERT (bdb->useCount > 0);
@@ -394,74 +323,43 @@ Bdb* Cache::fetchPage(Dbb *dbb, int32 pa
Bdb* Cache::fakePage(Dbb *dbb, int32 pageNumber, PageType type, TransId transId)
{
- Bdb *bdb;
+ Sync sync(&syncObject, "Cache::fakePage");
+ sync.lock(Exclusive);
+ int slot = pageNumber % hashSize;
-#ifdef STOP_PAGE
+#ifdef STOP_PAGE
if (pageNumber == STOP_PAGE)
Log::debug("faking page %d/%d\n",pageNumber, dbb->tableSpaceId);
#endif
/* If we already have a buffer for this, we're done */
- bdb = lockFindBdbIncrementUseCount(dbb, pageNumber);
- if (!bdb)
- {
- // getFreeBuffer() locks a hash bucket to remove the candidate bdb
- // if we locked our hash bucket before the call then we could have
- // a deadlock
- // thus we get the free buffer before we lock the hash bucket we will
- // be inserting into. This avoids a dead lock but generates a race
- // we take care of the race by reversing the getFreeBuffer() work
- // when we lose the race
- Bdb *bdbAvailable;
- int slot = PAGENUM_2_SLOT(pageNumber);
- Sync lockHash (&syncHashTable[PAGENUM_2_LOCK_INDEX(pageNumber, slot)], "Cache::fetchPage");
-
- bdbAvailable = getFreeBuffer();
- /* assume we'll be inserting this new BDB. Set new page number. */
- bdbAvailable->pageNumber = pageNumber;
- bdbAvailable->dbb = dbb;
- lockHash.lock(Exclusive);
- bdb = findBdb(dbb, pageNumber, slot);
- if (!bdb)
- {
- // we won the race so lets use the free bdb
- // relink into hash table
- bdbAvailable->hash = hashTable [slot];
- hashTable [slot] = bdbAvailable;
- lockHash.unlock();
+ Bdb *bdb;
- bdb = bdbAvailable;
- }
- else
+ for (bdb = hashTable [slot]; bdb; bdb = bdb->hash)
+ if (bdb->pageNumber == pageNumber && bdb->dbb == dbb)
{
- //syncObject.validateExclusive("Cache::fetchPage (retry)");
- bdb->incrementUseCount(ADD_HISTORY);
- lockHash.unlock();
+ if (bdb->syncObject.isLocked())
+ {
+ // The pageWriter may still be cleaning up this freed page with a shared lock
+ ASSERT(bdb->buffer->pageType == PAGE_free);
+ ASSERT(bdb->syncObject.getState() >= 0);
+ }
+
bdb->addRef(Exclusive COMMA_ADD_HISTORY);
- bdb->decrementUseCount(REL_HISTORY);
- moveToHead(bdb);
-
- // lost a race. put our available back to useable
- // side effect, bdbAvailable will have to age again before we re-use it.
- bdbAvailable->hash = NULL;
- bdbAvailable->pageNumber = -1;
- bdbAvailable->dbb = NULL;
- bdbAvailable->release(REL_HISTORY);
+
+ break;
}
- }
- else
- {
- bdb->addRef(Exclusive COMMA_ADD_HISTORY);
- bdb->decrementUseCount(REL_HISTORY);
- moveToHead(bdb);
- }
+
+ if (!bdb)
+ bdb = findBuffer(dbb, pageNumber, Exclusive);
if (!dbb->isReadOnly)
bdb->mark(transId);
-
+
memset(bdb->buffer, 0, pageSize);
bdb->setPageHeader(type);
+ moveToHead(bdb);
return bdb;
}
@@ -469,19 +367,19 @@ Bdb* Cache::fakePage(Dbb *dbb, int32 pag
void Cache::flush(int64 arg)
{
Sync flushLock(&syncFlush, "Cache::flush(1)");
- Sync dirtyLock(&syncDirty, "Cache::flush(2)");
+ Sync sync(&syncDirty, "Cache::flush(2)");
flushLock.lock(Exclusive);
-
+
if (flushing)
return;
syncWait.lock(NULL, Exclusive);
+ sync.lock(Shared);
//Log::debug(%d: "Initiating flush\n", dbb->deltaTime);
flushArg = arg;
flushPages = 0;
physicalWrites = 0;
-
- dirtyLock.lock(Shared);
+
for (Bdb *bdb = firstDirty; bdb; bdb = bdb->nextDirty)
{
bdb->flushIt = true;
@@ -489,14 +387,14 @@ void Cache::flush(int64 arg)
++flushPages;
}
- dirtyLock.unlock();
-
- analyzeFlush();
+ if (traceFile)
+ analyzeFlush();
flushStart = database->timestamp;
flushing = true;
+ sync.unlock();
flushLock.unlock();
-
+
for (int n = 0; n < numberIoThreads; ++n)
if (ioThreads[n])
ioThreads[n]->wake();
@@ -504,135 +402,69 @@ void Cache::flush(int64 arg)
void Cache::moveToHead(Bdb * bdb)
{
- // If buffer has moved out of the upper "fraction" of the LRU queue, move it back up
- // non-protected access to age is harmless since it is fuzzy anyway
- if (bdb->age < bufferAge - (uint64) upperFraction)
- {
- Sync bufferQueueLock (&bufferQueue.syncObject, "Cache::moveToHead");
-
- bufferQueueLock.lock (Exclusive);
- bdb->age = bufferAge++;
- bufferQueue.remove(bdb);
- bufferQueue.prepend(bdb);
- //validateUnique (bdb);
- }
-}
-
-void Cache::moveToHeadAlreadyLocked(Bdb * bdb)
-{
bdb->age = bufferAge++;
bufferQueue.remove(bdb);
bufferQueue.prepend(bdb);
//validateUnique (bdb);
}
-Bdb* Cache::getFreeBuffer(void)
+Bdb* Cache::findBuffer(Dbb *dbb, int pageNumber, LockType lockType)
{
- Sync bufferQueueLock (&bufferQueue.syncObject, "Cache::getFreeBuffer");
- unsigned int count;
+ //syncObject.validateExclusive("Cache::findBuffer");
+ int slot = pageNumber % hashSize;
+ Sync sync(&syncDirty, "Cache::findBuffer");
+
+ /* Find least recently used, not-in-use buffer */
+
Bdb *bdb;
// Find a candidate BDB.
+
for (;;)
{
- bufferQueueLock.lock (Exclusive);
- // find a candidate that is NOT in use and NOT dirty and in the tail fraction of the LRU
- for (count = 0, bdb = bufferQueue.last; bdb ; bdb = bdb->prior, count++)
- {
- if (count >= upperFraction)
- {
- bdb = NULL;
- break;
- }
-
+ for (bdb = bufferQueue.last; bdb; bdb = bdb->prior)
if (bdb->useCount == 0)
- {
- if (!bdb->isDirty)
- {
- bdb->incrementUseCount(REL_HISTORY);
- moveToHeadAlreadyLocked(bdb);
- break;
- }
- }
- else
- {
- // get this one out of the way so we don't search it every time
- moveToHeadAlreadyLocked(bdb);
-#ifdef CHECK_STALLED_BDB
- bdb->stallCount++;
- if ((bdb->stallCount & 0x03) == 0x03)
- {
- Log::debug("Page %d is in use and aged %d times\n",
- bdb->pageNumber, bdb->stallCount);
- }
-#endif // CHECK_STALLED_BDB
- }
- }
- if (!bdb)
- // find a candidate that is NOT in use, could be dirty
- for (bdb = bufferQueue.last; bdb; bdb = bdb->prior)
- if (bdb->useCount == 0)
- {
- bdb->incrementUseCount(REL_HISTORY);
- moveToHeadAlreadyLocked(bdb);
- break;
- }
-
- bufferQueueLock.unlock();
+ break;
if (!bdb)
throw SQLError(RUNTIME_ERROR, "buffer pool is exhausted\n");
+
+ if (!bdb->isDirty)
+ break;
+
+ writePage (bdb, WRITE_TYPE_REUSE);
+ }
- if (bdb->pageNumber >= 0)
- {
- int slotRemove = PAGENUM_2_SLOT(bdb->pageNumber);
- Sync lockHashRemove (&syncHashTable[PAGENUM_2_LOCK_INDEX(bdb->pageNumber, slotRemove)], "Cache::getFreeBuffer");
- lockHashRemove.lock(Exclusive);
+ /* Unlink its old incarnation from the page/hash table */
- if (bdb->useCount != 1)
+ if (bdb->pageNumber >= 0)
+ for (Bdb **ptr = hashTable + bdb->pageNumber % hashSize;; ptr = &(*ptr)->hash)
+ if (*ptr == bdb)
{
- // we lost a race try again
- bdb->decrementUseCount(REL_HISTORY);
- lockHashRemove.unlock();
- continue;
+ *ptr = bdb->hash;
+ break;
}
+ else
+ ASSERT (*ptr);
- if (bdb->isDirty)
- writePage (bdb, WRITE_TYPE_REUSE);
+ bdb->addRef (lockType COMMA_ADD_HISTORY);
- /* Unlink its old incarnation from the page/hash table */
- for (Bdb **ptr = hashTable + PAGENUM_2_SLOT(bdb->pageNumber) ;; ptr = &(*ptr)->hash)
- if (*ptr == bdb)
- {
- *ptr = bdb->hash;
- break;
- }
- else
- ASSERT (*ptr);
+ /* Set new page number and relink into hash table */
- }
-
- break;
- }
-#ifdef CHECK_STALLED_BDB
- bdb->stallCount = 0;
-#endif // CHECK_STALLED_BDB
+ bdb->hash = hashTable [slot];
+ hashTable [slot] = bdb;
+ bdb->pageNumber = pageNumber;
+ bdb->dbb = dbb;
#ifdef COLLECT_BDB_HISTORY
bdb->initHistory();
#endif
- bdb->addRef (Exclusive COMMA_ADD_HISTORY);
- bdb->decrementUseCount(REL_HISTORY);
return bdb;
}
void Cache::validate()
{
- //Sync bufferQueueLock (&bufferQueue.syncObject, "Cache::validate");
-
- //bufferQueueLock.lock (Shared);
- // non-protected access to bufferQueue is DANGEROUS...
for (Bdb *bdb = bufferQueue.last; bdb; bdb = bdb->prior)
{
//IndexPage *page = (IndexPage*) bdb->buffer;
@@ -642,8 +474,8 @@ void Cache::validate()
void Cache::markDirty(Bdb *bdb)
{
- Sync dirtyLock (&syncDirty, "Cache::markDirty");
- dirtyLock.lock (Exclusive);
+ Sync sync (&syncDirty, "Cache::markDirty");
+ sync.lock (Exclusive);
bdb->nextDirty = NULL;
bdb->priorDirty = lastDirty;
@@ -653,21 +485,23 @@ void Cache::markDirty(Bdb *bdb)
firstDirty = bdb;
lastDirty = bdb;
+ ++numberDirtyPages;
//validateUnique (bdb);
}
void Cache::markClean(Bdb *bdb)
{
- Sync dirtyLock (&syncDirty, "Cache::markClean");
- dirtyLock.lock (Exclusive);
+ Sync sync (&syncDirty, "Cache::markClean");
+ sync.lock (Exclusive);
/***
if (bdb->flushIt)
Log::debug(" Cleaning page %d in %s marked for flush\n", bdb->pageNumber, (const char*) bdb->dbb->fileName);
***/
-
+
bdb->flushIt = false;
-
+ --numberDirtyPages;
+
if (bdb == lastDirty)
lastDirty = bdb->priorDirty;
@@ -709,7 +543,6 @@ void Cache::writePage(Bdb *bdb, int type
{
if (falcon_use_sectorcache)
sectorCache->writePage(bdb);
-
dbb->writePage(bdb, type);
}
catch (SQLException& exception)
@@ -762,7 +595,7 @@ void Cache::writePage(Bdb *bdb, int type
#endif
bdb->isDirty = false;
-
+
if (pageWriter && bdb->isRegistered)
{
bdb->isRegistered = false;
@@ -771,8 +604,8 @@ void Cache::writePage(Bdb *bdb, int type
if (dbb->shadows)
{
- Sync cloneLock (&dbb->syncClone, "Cache::writePage(2)");
- cloneLock.lock (Shared);
+ Sync sync (&dbb->syncClone, "Cache::writePage(2)");
+ sync.lock (Shared);
for (DatabaseCopy *shadow = dbb->shadows; shadow; shadow = shadow->next)
shadow->rewritePage(bdb);
@@ -781,49 +614,45 @@ void Cache::writePage(Bdb *bdb, int type
void Cache::analyze(Stream *stream)
{
- Sync dirtyLock (&syncDirty, "Cache::analyze");
+ Sync sync (&syncDirty, "Cache::analyze");
+ sync.lock (Shared);
int inUse = 0;
int dirty = 0;
int dirtyList = 0;
int total = 0;
Bdb *bdb;
- // non-protected access to bdbs,endBdbs is DANGEROUS...
for (bdb = bdbs; bdb < endBdbs; ++bdb)
{
++total;
-
+
if (bdb->isDirty)
++dirty;
-
+
if (bdb->useCount)
++inUse;
}
- dirtyLock.lock (Shared);
for (bdb = firstDirty; bdb; bdb = bdb->nextDirty)
++dirtyList;
- dirtyLock.unlock();
-
stream->format ("Cache: %d pages, %d in use, %d dirty, %d in dirty chain\n",
total, inUse, dirty, dirtyList);
}
void Cache::validateUnique(Bdb *target)
{
- int slot = PAGENUM_2_SLOT(target->pageNumber);
+ int slot = target->pageNumber % hashSize;
- // WARNING: unlocked walk of hash table.... DANGEROUS
for (Bdb *bdb = hashTable [slot]; bdb; bdb = bdb->hash)
ASSERT (bdb == target || !(bdb->pageNumber == target->pageNumber && bdb->dbb == target->dbb));
}
void Cache::freePage(Dbb *dbb, int32 pageNumber)
{
- int slot = PAGENUM_2_SLOT(pageNumber);
- Sync lockHash (&syncHashTable[PAGENUM_2_LOCK_INDEX(pageNumber, slot)], "Cache::freePage");
- lockHash.lock(Shared);
+ Sync sync (&syncObject, "Cache::freePage");
+ sync.lock (Shared);
+ int slot = pageNumber % hashSize;
// If page exists in cache (usual case), clean it up
@@ -832,9 +661,10 @@ void Cache::freePage(Dbb *dbb, int32 pag
{
if (bdb->isDirty)
{
+ sync.unlock();
markClean (bdb);
}
-
+
bdb->isDirty = false;
break;
}
@@ -859,14 +689,12 @@ void Cache::flush(Dbb *dbb)
bool Cache::hasDirtyPages(Dbb *dbb)
{
- Sync dirtyLock (&syncDirty, "Cache::hasDirtyPages");
- dirtyLock.lock (Shared);
+ Sync sync (&syncDirty, "Cache::hasDirtyPages");
+ sync.lock (Shared);
for (Bdb *bdb = firstDirty; bdb; bdb = bdb->nextDirty)
if (bdb->dbb == dbb)
- {
return true;
- }
return false;
}
@@ -887,22 +715,32 @@ Bdb* Cache::trialFetch(Dbb* dbb, int32 p
if (panicShutdown)
{
Thread *thread = Thread::getThread("Cache::trialFetch");
-
+
if (thread->pageMarks == 0)
throw SQLError(RUNTIME_ERROR, "Emergency shut is underway");
}
ASSERT (pageNumber >= 0);
+ int slot = pageNumber % hashSize;
+ Sync sync (&syncObject, "Cache::trialFetch");
+ sync.lock (Shared);
+ int hit = 0;
+
+ /* If we already have a buffer for this go, we're done */
+
Bdb *bdb;
- /* If we already have a buffer for this, we're done */
- bdb = lockFindBdbIncrementUseCount(dbb, pageNumber);
- if (bdb)
- {
- bdb->addRef(lockType COMMA_ADD_HISTORY);
- bdb->decrementUseCount(REL_HISTORY);
- moveToHead(bdb);
- }
+ for (bdb = hashTable [slot]; bdb; bdb = bdb->hash)
+ if (bdb->pageNumber == pageNumber && bdb->dbb == dbb)
+ {
+ //syncObject.validateShared("Cache::trialFetch");
+ bdb->incrementUseCount(ADD_HISTORY);
+ sync.unlock();
+ bdb->addRef(lockType COMMA_ADD_HISTORY);
+ bdb->decrementUseCount(REL_HISTORY);
+ hit = 1;
+ break;
+ }
return bdb;
}
@@ -913,14 +751,13 @@ void Cache::syncFile(Dbb *dbb, const cha
int writes = dbb->writesSinceSync;
time_t start = database->timestamp;
dbb->sync();
-
+
if (Log::isActive(LogInfo))
{
time_t delta = database->timestamp - start;
-
+
if (delta > 1)
- Log::log(LogInfo, "%d: %s %s sync: %d pages in %d seconds\n",
- database->deltaTime, fileName, text, writes, delta);
+ Log::log(LogInfo, "%d: %s %s sync: %d pages in %d seconds\n", database->deltaTime, fileName, text, writes, delta);
}
}
@@ -931,186 +768,186 @@ void Cache::ioThread(void* arg)
void Cache::ioThread(void)
{
- Sync syncThread(&syncThreads, "Cache::ioThread");
+ Sync syncThread(&syncThreads, "Cache::ioThread(1)");
syncThread.lock(Shared);
- Sync flushLock(&syncFlush, "Cache::ioThread");
+ Sync flushLock(&syncFlush, "Cache::ioThread(2)");
+ Sync sync(&syncObject, "Cache::ioThread(3)");
Priority priority(database->ioScheduler);
Thread *thread = Thread::getThread("Cache::ioThread");
UCHAR *rawBuffer = new UCHAR[ASYNC_BUFFER_SIZE];
UCHAR *buffer = (UCHAR*) (((UIPTR) rawBuffer + pageSize - 1) / pageSize * pageSize);
UCHAR *end = (UCHAR*) ((UIPTR) (rawBuffer + ASYNC_BUFFER_SIZE) / pageSize * pageSize);
flushLock.lock(Exclusive);
-
+
// This is the main loop. Write blocks until there's nothing to do, then sleep
-
+
for (;;)
{
int32 pageNumber = flushBitmap->nextSet(0);
int count;
-
+ Dbb *dbb;
+
if (pageNumber >= 0)
{
- Bdb *bdb;
- Dbb *dbb;
- int slot = PAGENUM_2_SLOT(pageNumber);
+ int slot = pageNumber % hashSize;
bool hit = false;
Bdb *bdbList = NULL;
UCHAR *p = buffer;
-
- // Look for the page to flush.
- bdb = lockFindBdbIncrementUseCount(pageNumber, slot);
- if (bdb && bdb->flushIt && bdb->isDirty)
- {
- hit = true;
- count = 0;
- dbb = bdb->dbb;
-
- flushBitmap->clear(pageNumber);
-
- // get all his friends
- while (p < end)
+ sync.lock(Shared);
+
+ // Look for a page to flush. Then get all his friends
+
+ for (Bdb *bdb = hashTable[slot]; bdb; bdb = bdb->hash)
+ if (bdb->pageNumber == pageNumber && bdb->flushIt && bdb->isDirty)
{
- ++count;
- bdb->addRef(Shared COMMA_ADD_HISTORY);
-
- bdb->syncWrite.lock(NULL, Exclusive);
- bdb->ioThreadNext = bdbList;
- bdbList = bdb;
-
- //ASSERT(!(bdb->flags & BDB_write_pending));
- //bdb->flags |= BDB_write_pending;
- memcpy(p, bdb->buffer, pageSize);
- p += pageSize;
- bdb->flushIt = false;
- markClean(bdb);
- bdb->isDirty = false;
- bdb->release(REL_HISTORY);
-
- bdb = lockFindBdbIncrementUseCount(dbb, bdb->pageNumber + 1);
- if (!bdb)
- break;
-
- if (!bdb->isDirty && !continueWrite(bdb))
+ hit = true;
+ count = 0;
+ dbb = bdb->dbb;
+
+ if (!bdb->hash)
+ flushBitmap->clear(pageNumber);
+
+ while (p < end)
{
- bdb->decrementUseCount(REL_HISTORY);
- break;
+ ++count;
+ bdb->incrementUseCount(ADD_HISTORY);
+ sync.unlock();
+ bdb->addRef(Shared COMMA_ADD_HISTORY);
+ if (falcon_use_sectorcache)
+ sectorCache->writePage(bdb);
+
+ bdb->syncWrite.lock(NULL, Exclusive);
+ bdb->ioThreadNext = bdbList;
+ bdbList = bdb;
+
+ //ASSERT(!(bdb->flags & BDB_write_pending));
+ //bdb->flags |= BDB_write_pending;
+ memcpy(p, bdb->buffer, pageSize);
+ p += pageSize;
+ bdb->flushIt = false;
+ markClean(bdb);
+ bdb->isDirty = false;
+ bdb->release(REL_HISTORY);
+ sync.lock(Shared);
+
+ if ( !(bdb = findBdb(dbb, bdb->pageNumber + 1)) )
+ break;
+
+ if (!bdb->isDirty && !continueWrite(bdb))
+ break;
}
- }
-
- flushLock.unlock();
- //Log::debug(" %d Writing %s %d pages: %d - %d\n", thread->threadId, (const char*) dbb->fileName, count, pageNumber, pageNumber + count - 1);
- int length = p - buffer;
- priority.schedule(PRIORITY_LOW);
-
- try
- {
+
+ if (sync.state != None)
+ sync.unlock();
+
+ flushLock.unlock();
+ //Log::debug(" %d Writing %s %d pages: %d - %d\n", thread->threadId, (const char*) dbb->fileName, count, pageNumber, pageNumber + count - 1);
+ int length = p - buffer;
priority.schedule(PRIORITY_LOW);
- dbb->writePages(pageNumber, length, buffer, WRITE_TYPE_FLUSH);
- }
- catch (SQLException& exception)
- {
- priority.finished();
-
- if (exception.getSqlcode() != DEVICE_FULL)
- throw;
-
- database->setIOError(&exception);
-
- for (bool error = true; error;)
+
+ try
+ {
+ priority.schedule(PRIORITY_LOW);
+ dbb->writePages(pageNumber, length, buffer, WRITE_TYPE_FLUSH);
+ }
+ catch (SQLException& exception)
{
- if (thread->shutdownInProgress)
+ priority.finished();
+
+ if (exception.getSqlcode() != DEVICE_FULL)
+ throw;
+
+ database->setIOError(&exception);
+
+ for (bool error = true; error;)
{
- Bdb *next;
+ if (thread->shutdownInProgress)
+ {
+ Bdb *next;
- for (bdb = bdbList; bdb; bdb = next)
+ for (bdb = bdbList; bdb; bdb = next)
+ {
+ //bdb->flags &= ~BDB_write_pending;
+ next = bdb->ioThreadNext;
+ bdb->syncWrite.unlock();
+ bdb->decrementUseCount(REL_HISTORY);
+ }
+
+ return;
+ }
+
+ thread->sleep(1000);
+
+ try
{
- //bdb->flags &= ~BDB_write_pending;
- next = bdb->ioThreadNext;
- bdb->syncWrite.unlock();
- bdb->decrementUseCount(REL_HISTORY);
+ priority.schedule(PRIORITY_LOW);
+ dbb->writePages(pageNumber, length, buffer, WRITE_TYPE_FLUSH);
+ error = false;
+ database->clearIOError();
+ }
+ catch (SQLException& exception2)
+ {
+ priority.finished();
+
+ if (exception2.getSqlcode() != DEVICE_FULL)
+ throw;
}
-
- return;
- }
-
- thread->sleep(1000);
-
- try
- {
- priority.schedule(PRIORITY_LOW);
- dbb->writePages(pageNumber, length, buffer, WRITE_TYPE_FLUSH);
- error = false;
- database->clearIOError();
- }
- catch (SQLException& exception2)
- {
- priority.finished();
-
- if (exception2.getSqlcode() != DEVICE_FULL)
- throw;
}
}
- }
- priority.finished();
- Bdb *next;
+ priority.finished();
+ Bdb *next;
- for (bdb = bdbList; bdb; bdb = next)
- {
- //ASSERT(bdb->flags & BDB_write_pending);
- //bdb->flags &= ~BDB_write_pending;
- next = bdb->ioThreadNext;
- bdb->syncWrite.unlock();
- bdb->decrementUseCount(REL_HISTORY);
+ for (bdb = bdbList; bdb; bdb = next)
+ {
+ //ASSERT(bdb->flags & BDB_write_pending);
+ //bdb->flags &= ~BDB_write_pending;
+ next = bdb->ioThreadNext;
+ bdb->syncWrite.unlock();
+ bdb->decrementUseCount(REL_HISTORY);
+ }
+
+ flushLock.lock(Exclusive);
+ ++physicalWrites;
+
+ break;
}
-
- flushLock.lock(Exclusive);
- ++physicalWrites;
-
- }
- else
- {
- if (bdb)
- bdb->decrementUseCount(REL_HISTORY);
- }
-
- if (!hit)
+
+ if (!hit)
{
+ sync.unlock();
flushBitmap->clear(pageNumber);
}
}
- else
+ else
{
if (flushing)
{
int writes = physicalWrites;
int pages = flushPages;
int delta = (int) (database->timestamp - flushStart);
- int64 callbackArg = flushArg;
flushing = false;
- flushArg = 0;
flushLock.unlock();
syncWait.unlock();
-
+
if (writes > 0 && Log::isActive(LogInfo))
Log::log(LogInfo, "%d: Cache flush: %d pages, %d writes in %d seconds (%d pps)\n",
- database->deltaTime, pages, writes, delta, pages / MAX(delta, 1));
+ database->deltaTime, pages, writes, delta, pages / MAX(delta, 1));
- if (callbackArg != 0)
- database->pageCacheFlushed(callbackArg);
+ database->pageCacheFlushed(flushArg);
}
else
flushLock.unlock();
-
+
if (thread->shutdownInProgress)
break;
thread->sleep();
flushLock.lock(Exclusive);
}
- } // for ever
-
- delete [] rawBuffer;
+ }
+
+ delete [] rawBuffer;
}
bool Cache::continueWrite(Bdb* startingBdb)
@@ -1118,26 +955,23 @@ bool Cache::continueWrite(Bdb* startingB
Dbb *dbb = startingBdb->dbb;
int clean = 1;
int dirty = 0;
-
+
for (int32 pageNumber = startingBdb->pageNumber + 1, end = pageNumber+ 5; pageNumber < end; ++pageNumber)
{
- Bdb *bdb;
-
+ Bdb *bdb = findBdb(dbb, pageNumber);
+
if (dirty > clean)
return true;
-
- bdb = lockFindBdbIncrementUseCount(dbb, pageNumber);
+
if (!bdb)
return dirty >= clean;
-
+
if (bdb->isDirty)
++dirty;
else
++clean;
-
- bdb->decrementUseCount(REL_HISTORY);
}
-
+
return (dirty >= clean);
}
@@ -1164,97 +998,77 @@ void Cache::shutdownThreads(void)
ioThreads[n]->shutdown();
ioThreads[n] = 0;
}
-
- Sync lockThreads(&syncThreads, "Cache::shutdownThreads");
- lockThreads.lock(Exclusive);
+
+ Sync sync(&syncThreads, "Cache::shutdownThreads");
+ sync.lock(Exclusive);
}
-#ifdef CACHE_TRACE_FILE
void Cache::analyzeFlush(void)
{
Dbb *dbb = NULL;
Bdb *bdb;
- Sync dirtyLock (&syncDirty, "Cache::hasDirtyPages");
-
- dirtyLock.lock (Shared);
+
for (bdb = firstDirty; bdb; bdb = bdb->nextDirty)
if (bdb->dbb->tableSpaceId == 1)
{
dbb = bdb->dbb;
-
+
break;
}
- dirtyLock.unlock();
-
+
if (!dbb)
return;
-
+
fprintf(traceFile, "-------- time %d -------\n", database->deltaTime);
for (int pageNumber = 0; (pageNumber = flushBitmap->nextSet(pageNumber)) >= 0;)
- // non-protected access to hash table via findBdb()!
if ( (bdb = findBdb(dbb, pageNumber)) )
{
int start = pageNumber;
int type = bdb->buffer->pageType;
-
- // non-protected access to hash table via findBdb()!
+
for (; (bdb = findBdb(dbb, ++pageNumber)) && bdb->flushIt;)
;
-
+
fprintf(traceFile, " %d flushed: %d to %d, first type %d\n", pageNumber - start, start, pageNumber - 1, type);
-
- // non-protected access to hash table via findBdb()!
+
for (int max = pageNumber + 5; pageNumber < max && (bdb = findBdb(dbb, pageNumber)) && !bdb->flushIt; ++pageNumber)
{
if (bdb->isDirty)
fprintf(traceFile, " %d dirty not flushed, type %d \n", pageNumber, bdb->buffer->pageType);
else
- fprintf(traceFile, " %d not dirty, type %d\n", pageNumber, bdb->buffer->pageType);
+ fprintf(traceFile," %d not dirty, type %d\n", pageNumber, bdb->buffer->pageType);
}
}
else
++pageNumber;
-
- fflush(traceFile);
+
+ fflush(traceFile);
}
void Cache::openTraceFile(void)
{
+#ifdef TRACE_FILE
if (traceFile)
closeTraceFile();
-
- traceFile = fopen(TRACE_FILE, "a+");
- fprintf(traceFile, "Starting\n");
-//KEL
-// setvbuf(traceFile, (char *) NULL, _IOLBF, 0);
-
+
+ traceFile = fopen(TRACE_FILE, "w");
+#endif
}
void Cache::closeTraceFile(void)
{
+#ifdef TRACE_FILE
if (traceFile)
{
fclose(traceFile);
traceFile = NULL;
}
+#endif
}
-#else // CACHE_TRACE_FILE
-void Cache::analyzeFlush(void)
-{
-}
-
-void Cache::openTraceFile(void)
-{
-}
-
-void Cache::closeTraceFile(void)
-{
-}
-#endif // CACHE_TRACE_FILE
void Cache::flushWait(void)
{
- Sync waitLock(&syncWait, "Cache::flushWait");
- waitLock.lock(Exclusive);
+ Sync sync(&syncWait, "Cache::flushWait");
+ sync.lock(Shared);
}
=== modified file 'storage/falcon/Cache.h'
--- a/storage/falcon/Cache.h 2008-09-02 16:08:11 +0000
+++ b/storage/falcon/Cache.h 2008-10-02 22:15:11 +0000
@@ -28,17 +28,6 @@
#include "SyncObject.h"
#include "Queue.h"
-// uncomment DEBUG_SYNC_HASH_TABLE_SIZE to cause more contention and test for race conditions
-//#define DEBUG_SYNC_HASH_TABLE_SIZE (0x01 << 1)
-#ifdef DEBUG_SYNC_HASH_TABLE_SIZE
-# define DEBUG_SYNC_HASH_TABLE_MASK (DEBUG_SYNC_HASH_TABLE_SIZE - 1)
-# define PAGENUM_2_LOCK_INDEX(_pgnum, _slot) ((_pgnum) & DEBUG_SYNC_HASH_TABLE_MASK)
-#else /* DEBUG_SYNC_HASH_TABLE_SIZE */
-# define PAGENUM_2_LOCK_INDEX(_pgnum, _slot) ((_slot))
-#endif /* DEBUG_SYNC_HASH_TABLE_SIZE */
-
-#define PAGENUM_2_SLOT(_pgnum) ((_pgnum) & hashMask)
-
class Bdb;
class Dbb;
class PageWriter;
@@ -65,6 +54,7 @@ public:
void markClean (Bdb *bdb);
void markDirty (Bdb *bdb);
void validate();
+ void moveToHead (Bdb *bdb);
void flush(int64 arg);
void validateCache(void);
void syncFile(Dbb *dbb, const char *text);
@@ -93,20 +83,14 @@ public:
bool flushing;
protected:
- void moveToHead (Bdb *bdb);
- void moveToHeadAlreadyLocked (Bdb *bdb);
- Bdb* getFreeBuffer(void);
- Bdb* findBdb(Dbb* dbb, int32 pageNumber, int slot);
+ Bdb* findBuffer (Dbb *dbb, int pageNumber, LockType lockType);
Bdb* findBdb(Dbb* dbb, int32 pageNumber);
- Bdb* lockFindBdbIncrementUseCount(Dbb* dbb, int32 pageNumber);
- Bdb* lockFindBdbIncrementUseCount(int32 pageNumber, int slot);
int64 flushArg;
Bdb *bdbs;
Bdb *endBdbs;
Queue<Bdb> bufferQueue;
Bdb **hashTable;
- SyncObject *syncHashTable;
Bdb *firstDirty;
Bdb *lastDirty;
Bitmap *flushBitmap;
@@ -121,13 +105,12 @@ protected:
int flushPages;
int physicalWrites;
int hashSize;
- unsigned int hashMask;
int pageSize;
- unsigned int upperFraction;
+ int upperFraction;
int numberHunks;
+ int numberDirtyPages;
int numberIoThreads;
- volatile uint64 bufferAge;
-
+ volatile int bufferAge;
public:
void flushWait(void);
};
| Thread |
|---|
| • bzr push into mysql-6.0-falcon-team branch (cpowers:2847 to 2849) Bug#39706 | Christopher Powers | 3 Oct |