List:Commits« Previous MessageNext Message »
From:Kevin Lewis Date:September 3 2008 7:17pm
Subject:RE: bzr commit into mysql-6.0-falcon branch (klong:2808)
View as plain text  
Was the temporary code expensive?  All we need is a one byte int in Bdb.
Maybe it could not assert, but do a Debug::log message.  Where would it
assert on shutdown if Bdbs remain with useCounts?

Please also add the results of your investigations into skipped Dbds to the
worklog.  With the skipCounter in place, I think this is OK to push.

>-----Original Message-----
>From: K.Long@stripped [mailto:K.Long@stripped]
>Sent: Wednesday, September 03, 2008 1:21 PM
>To: Kevin Lewis
>Subject: Re: bzr commit into mysql-6.0-falcon branch (klong:2808)
>
>Hi Kevin.
>
>Thanks for looking.  Can I assume that you've reviewed the patch and I can
>push it?
>
>Kevin Lewis wrote:
>> Kelly,
>>
>> Please add to WL#4479 the reasoning behind moving a Bdb to the head age
>> group instead of stopping to do a writePage. (almost a no-brainer, but we
>> need to document)
>Will do.
>
>> Please summarize again the conditions in which a page in
>> the oldest age group can still have a useCount.  We do not want to cover
>> over a useCount leak, which will eventually fill up cache like a memory
>> leak.  It is very important to catch Bdbs that never get released instead
>of
>> just recycling them.
>I have not seen us forget to release a bdb.  This is not covering over a
>leak.
>We would assert on shutdown if this was the case.  These bdbs are in use
for
>a
>variety of page types.  From what I've seen, there are transactions using
>them
>probably because the page cache is too small for the workload.
>
>I had some temporary code that did a "skip count" and would assert on skip
>count
>of 3.  The assert never trigged so we were not holding indefinitely.
>
>>
>> If you suspect that findBuffer does a moveToHead when the page cache is
>> large, then we need to know why that Bdb still has a useCount.  You can
>turn
>> on BDB_HISTORY to collect an array of code locations in a BDB's recent
>> history.  I implemented and used this tool to find conditions where a
>> useCount was incremented but not decremented.  This adds a little memory
>to
>> each Bdb but not much code path, so it should not affect performance too
>> much.
>See above.  If I find this to be true later, then I'll turn on History and
>see
>if we can find an interesting issue.  I suspect this is chasing our tail as
>what
>I've seen is the page cache is too small for workload.
>
>>
>> Another question;  Why is Dbb sent into FindBdb?  I am missing its use.
>Are we looking at the same findBdb()?
>
>Bdb* Cache::findBdb(Dbb* dbb, int32 pageNumber, int slot)
>{
>     for (Bdb *bdb = hashTable [slot]; bdb; bdb = bdb->hash)
>{
>         if (bdb->pageNumber == pageNumber && bdb->dbb == dbb)
>USAGE OF DBB---------------------------------^^^^^^^^^^^^^^^
>             {
>             return bdb;
>             }
>         }
>
>     return NULL;
>}
>
>>
>> Kevin
>>
>>> -----Original Message-----
>>> From: Kelly Long [mailto:klong@stripped]
>>> Sent: Tuesday, September 02, 2008 11:09 AM
>>> To: commits@stripped
>>> Subject: bzr commit into mysql-6.0-falcon branch (klong:2808)
>>>
>>> #At file:///FC/MYSQL/wa-2008-BZR/mysql-6.0-falcon-team/
>>>
>>> 2808 Kelly Long	2008-09-02
>>>      Version 2 for:
>>>           Page Cache high concurency Get Free Buffer. WL4479
>>>           Page cache lock per hash bucket. WL4480
>>>           Page cache hash buckets are a power of two. WL4481
>>>      traceFile is a compile time decision, thus remove the run time
>>> decisions for it.
>>> modified:
>>>  storage/falcon/Cache.cpp
>>>  storage/falcon/Cache.h
>>>
>>> === modified file 'storage/falcon/Cache.cpp'
>>> --- a/storage/falcon/Cache.cpp	2008-08-22 06:47:40 +0000
>>> +++ b/storage/falcon/Cache.cpp	2008-09-02 16:08:11 +0000
>>> @@ -50,9 +50,11 @@
>>> extern uint falcon_io_threads;
>>>
>>> //#define STOP_PAGE		55
>>> -#define TRACE_FILE	"cache.trace"
>>> +//#define CACHE_TRACE_FILE	"cache.trace"
>>>
>>> +#ifdef CACHE_TRACE_FILE
>>> static FILE			*traceFile;
>>> +#endif // CACHE_TRACE_FILE
>>>
>>> static const uint64 cacheHunkSize		= 1024 * 1024 * 128;
>>> static const int	ASYNC_BUFFER_SIZE	= 1024000;
>>> @@ -68,20 +70,46 @@ static const char THIS_FILE[]=__FILE__;
>>>
>>> Cache::Cache(Database *db, int pageSz, int hashSz, int numBuffers)
>>> {
>>> -	//openTraceFile();
>>> +	openTraceFile();
>>> 	database = db;
>>> 	panicShutdown = false;
>>> 	pageSize = pageSz;
>>> -	hashSize = hashSz;
>>> +
>>> +	unsigned int highBit;
>>> +	for (highBit=0x01; highBit < hashSz; highBit= highBit << 1) { }
>>> +
>>> +	// if there are more than 4096 buckets then lets round down
>>> +	// else lets round up
>>> +	if (highBit >= 0x00001000) {
>>> +		// use power of two rounded down
>>> +		hashSize = highBit << 1;
>>> +	} else {
>>> +		// use power of two rounded up
>>> +		hashSize = highBit;
>>> +	}
>>> +
>>> +	hashMask = hashSize - 1;
>>> 	numberBuffers = numBuffers;
>>> 	upperFraction = numberBuffers / 4;
>>> 	bufferAge = 0;
>>> 	firstDirty = NULL;
>>> 	lastDirty = NULL;
>>> -	numberDirtyPages = 0;
>>> 	pageWriter = NULL;
>>> -	hashTable = new Bdb* [hashSz];
>>> +	hashTable = new Bdb* [hashSize];
>>> 	memset (hashTable, 0, sizeof (Bdb*) * hashSize);
>>> +#ifdef DEBUG_SYNC_HASH_TABLE_SIZE
>>> +    syncHashTable = new SyncObject [DEBUG_SYNC_HASH_TABLE_SIZE];
>>> +	for (int loop = 0; loop < DEBUG_SYNC_HASH_TABLE_SIZE; loop ++)
>>> +		syncHashTable[loop].setName("Cache::syncHashTable");
>>> +#else /* DEBUG_SYNC_HASH_TABLE_SIZE */
>>> +    syncHashTable = new SyncObject [hashSize];
>>> +	for (int loop = 0; loop < hashSize; loop ++)
>>> +		{
>>> +		char tmpName[128];
>>> +		snprintf(tmpName,120,"Cache::syncHashTable[%d]",loop);
>>> +		syncHashTable[loop].setName(tmpName);
>>> +		}
>>> +#endif /* DEBUG_SYNC_HASH_TABLE_SIZE */
>>> 	sectorCache = new SectorCache(sectorCacheSize / SECTOR_BUFFER_SIZE,
>>> pageSize);
>>>
>>> 	uint64 n = ((uint64) pageSize * numberBuffers + cacheHunkSize - 1) /
>>> cacheHunkSize;
>>> @@ -103,6 +131,7 @@ Cache::Cache(Database *db, int pageSz, i
>>>
>>> 	try
>>> 		{
>>> +		// non-protected access to bdbs,endBdbs is OK during
>>> initialization
>>> 		bdbs = new Bdb [numberBuffers];
>>> 		endBdbs = bdbs + numberBuffers;
>>> 		int remaining = 0;
>>> @@ -121,6 +150,7 @@ Cache::Cache(Database *db, int pageSz, i
>>> 				}
>>>
>>> 			bdb->cache = this;
>>> +			// non-protected access to bufferQueue is OK during
>>> initialization
>>> 			bufferQueue.append(bdb);
>>> 			bdb->buffer = (Page*) stuff;
>>> 			stuff += pageSize;
>>> @@ -146,10 +176,11 @@ Cache::Cache(Database *db, int pageSz, i
>>>
>>> Cache::~Cache()
>>> {
>>> -	if (traceFile)
>>> -		closeTraceFile();
>>> +
>>> +	closeTraceFile();
>>>
>>> 	delete [] hashTable;
>>> +	delete [] syncHashTable;
>>> 	delete [] bdbs;
>>> 	delete [] ioThreads;
>>> 	delete flushBitmap;
>>> @@ -167,15 +198,12 @@ Cache::~Cache()
>>> Bdb* Cache::probePage(Dbb *dbb, int32 pageNumber)
>>> {
>>> 	ASSERT (pageNumber >= 0);
>>> -	Sync sync (&syncObject, "Cache::probePage");
>>> -	sync.lock (Shared);
>>> -	Bdb *bdb = findBdb(dbb, pageNumber);
>>> +	Bdb *bdb;
>>>
>>> +	/* If we already have a buffer for this, we're done */
>>> +	bdb = lockFindBdbIncrementUseCount(dbb, pageNumber);
>>> 	if (bdb)
>>> 		{
>>> -		bdb->incrementUseCount(ADD_HISTORY);
>>> -		sync.unlock();
>>> -
>>> 		if (bdb->buffer->pageType == PAGE_free)
>>> 			{
>>> 			bdb->decrementUseCount(REL_HISTORY);
>>> @@ -192,15 +220,56 @@ Bdb* Cache::probePage(Dbb *dbb, int32 pa
>>> 	return NULL;
>>> }
>>>
>>> -Bdb* Cache::findBdb(Dbb* dbb, int32 pageNumber)
>>> +Bdb* Cache::findBdb(Dbb* dbb, int32 pageNumber, int slot)
>>> +{
>>> +	for (Bdb *bdb = hashTable [slot]; bdb; bdb = bdb->hash)
>>> {
>>> -	for (Bdb *bdb = hashTable [pageNumber % hashSize]; bdb; bdb = bdb-
>>>> hash)
>>> 		if (bdb->pageNumber == pageNumber && bdb->dbb == dbb)
>>> +			{
>>> 			return bdb;
>>> +			}
>>> +		}
>>>
>>> 	return NULL;
>>> }
>>>
>>> +Bdb* Cache::findBdb(Dbb* dbb, int32 pageNumber)
>>> +{
>>> +	return (findBdb(dbb, pageNumber, PAGENUM_2_SLOT(pageNumber)));
>>> +}
>>> +
>>> +Bdb* Cache::lockFindBdbIncrementUseCount(Dbb* dbb, int32 pageNumber)
>>> +{
>>> +	int slot = PAGENUM_2_SLOT(pageNumber);
>>> +	Sync lockHash (&syncHashTable[PAGENUM_2_LOCK_INDEX(pageNumber,
>> slot)],
>>> "Cache::lockFindBdbIncrementUseCount");
>>> +	lockHash.lock (Shared);
>>> +	Bdb *bdb;
>>> +
>>> +	bdb = findBdb(dbb, pageNumber, slot);
>>> +	if (bdb != NULL)
>>> +		bdb->incrementUseCount(ADD_HISTORY);
>>> +
>>> +	lockHash.unlock();
>>> +	return bdb;
>>> +}
>>> +
>>> +Bdb* Cache::lockFindBdbIncrementUseCount(int32 pageNumber, int slot)
>>> +{
>>> +	Sync lockHash (&syncHashTable[PAGENUM_2_LOCK_INDEX(pageNumber,
>> slot)],
>>> "Cache::lockFindBdbIncrementUseCount");
>>> +	lockHash.lock (Shared);
>>> +
>>> +	for (Bdb *bdb = hashTable [slot]; bdb; bdb = bdb->hash)
>>> +		if (bdb->pageNumber == pageNumber)
>>> +		{
>>> +			bdb->incrementUseCount(ADD_HISTORY);
>>> +			lockHash.unlock();
>>> +			return bdb;
>>> +		}
>>> +
>>> +	lockHash.unlock();
>>> +	return NULL;
>>> +}
>>> +
>>> Bdb* Cache::fetchPage(Dbb *dbb, int32 pageNumber, PageType pageType,
>>> LockType lockType)
>>> {
>>> 	if (panicShutdown)
>>> @@ -217,52 +286,45 @@ Bdb* Cache::fetchPage(Dbb *dbb, int32 pa
>>> #endif
>>>
>>> 	ASSERT (pageNumber >= 0);
>>> -	int slot = pageNumber % hashSize;
>>> -	LockType actual = lockType;
>>> -	Sync sync (&syncObject, "Cache::fetchPage");
>>> -	sync.lock (Shared);
>>> -	int hit = 0;
>>> -
>>> -	/* If we already have a buffer for this go, we're done */
>>> -
>>> 	Bdb *bdb;
>>>
>>> -	for (bdb = hashTable [slot]; bdb; bdb = bdb->hash)
>>> -		if (bdb->pageNumber == pageNumber && bdb->dbb == dbb)
>>> -			{
>>> -			//syncObject.validateShared("Cache::fetchPage");
>>> -			bdb->incrementUseCount(ADD_HISTORY);
>>> -			sync.unlock();
>>> -			bdb->addRef(lockType  COMMA_ADD_HISTORY);
>>> -			bdb->decrementUseCount(REL_HISTORY);
>>> -			hit = 1;
>>> -			break;
>>> -			}
>>> -
>>> +	/* If we already have a buffer for this, we're done */
>>> +	bdb = lockFindBdbIncrementUseCount(dbb, pageNumber);
>>> 	if (!bdb)
>>> 		{
>>> -		sync.unlock();
>>> -		actual = Exclusive;
>>> -		sync.lock(Exclusive);
>>> -
>>> -		for (bdb = hashTable [slot]; bdb; bdb = bdb->hash)
>>> -			if (bdb->pageNumber == pageNumber && bdb->dbb ==
>> dbb)
>>> -				{
>>> -
>> //syncObject.validateExclusive("Cache::fetchPage
>>> (retry)");
>>> -				bdb->incrementUseCount(ADD_HISTORY);
>>> -				sync.unlock();
>>> -				bdb->addRef(lockType  COMMA_ADD_HISTORY);
>>> -				bdb->decrementUseCount(REL_HISTORY);
>>> -				hit = 2;
>>> -				break;
>>> -				}
>>> +		// get getFreeBuffer() locks a hash bucket to remove the
>>> candidate bdb
>>> +		// if we locked out hash bucket before the call then we
>> could
>>> have
>>> +		// a deadlock
>>> +		// thus we get the free buffer before we lock the hash
>> bucket we
>>> will
>>> +		// be inserting into.  This avoids a dead lock but generates
>> a
>>> race
>>> +		// we take care of the race by reversing the getFreeBuffer()
>>> work
>>> +		// when we lose the race
>>> +		Bdb *bdbAvailable;
>>> +		int slot = PAGENUM_2_SLOT(pageNumber);
>>> +		Sync lockHash
>> (&syncHashTable[PAGENUM_2_LOCK_INDEX(pageNumber,
>>> slot)], "Cache::fetchPage");
>>> +
>>> +		bdbAvailable = getFreeBuffer();
>>> +		/* assume we'll be inserting this new BDB.  Set new page
>> number.
>>> */
>>> +		bdbAvailable->addRef (Exclusive  COMMA_ADD_HISTORY);
>>> +		bdbAvailable->decrementUseCount(REL_HISTORY);
>>> +
>>> +		bdbAvailable->hash = hashTable [slot];
>>> +		bdbAvailable->pageNumber = pageNumber;
>>> +		bdbAvailable->dbb = dbb;
>>> +#ifdef COLLECT_BDB_HISTORY
>>> +		bdbAvailable->initHistory();
>>> +#endif
>>>
>>> +		lockHash.lock(Exclusive);
>>> +		bdb = findBdb(dbb, pageNumber, slot);
>>> 		if (!bdb)
>>> 			{
>>> -			bdb = findBuffer(dbb, pageNumber, actual);
>>> -			moveToHead(bdb);
>>> -			sync.unlock();
>>> +			// we won the race so lets use the free bdb
>>> +			// relink into hash table
>>> +			hashTable [slot] = bdbAvailable;
>>> +			lockHash.unlock();
>>>
>>> +			bdb = bdbAvailable;
>>> #ifdef STOP_PAGE
>>> 			if (bdb->pageNumber == STOP_PAGE)
>>> 				Log::debug("reading page %d/%d\n",
>> bdb->pageNumber,
>>> dbb->tableSpaceId);
>>> @@ -278,9 +340,31 @@ Bdb* Cache::fetchPage(Dbb *dbb, int32 pa
>>> #ifdef HAVE_PAGE_NUMBER
>>> 			ASSERT(bdb->buffer->pageNumber == pageNumber);
>>> #endif
>>> -			if (actual != lockType)
>>> +			if (Exclusive != lockType)
>>> 				bdb->downGrade(lockType);
>>> 			}
>>> +			else
>>> +			{
>>> +			// lost a race.  put our available back to useable
>>> +			// side effect, bdbAvailable will have to age again
>>> before we re-use it.
>>> +			bdbAvailable->hash = NULL;
>>> +			bdbAvailable->pageNumber = -1;
>>> +			bdbAvailable->dbb = NULL;
>>> +			bdbAvailable->release();
>>> +
>>> +			//syncObject.validateExclusive("Cache::fetchPage
>>> (retry)");
>>> +			bdb->incrementUseCount(ADD_HISTORY);
>>> +			lockHash.unlock();
>>> +			bdb->addRef(lockType  COMMA_ADD_HISTORY);
>>> +			bdb->decrementUseCount(REL_HISTORY);
>>> +			moveToHead(bdb);
>>> +			}
>>> +		}
>>> +		else
>>> +		{
>>> +		bdb->addRef(lockType  COMMA_ADD_HISTORY);
>>> +		bdb->decrementUseCount(REL_HISTORY);
>>> +		moveToHead(bdb);
>>> 		}
>>>
>>> 	Page *page = bdb->buffer;
>>> @@ -302,14 +386,6 @@ Bdb* Cache::fetchPage(Dbb *dbb, int32 pa
>>> 				 bdb->pageNumber, dbb->tableSpaceId,
>> pageType,
>>> page->pageType);
>>> 		}
>>>
>>> -	// If buffer has moved out of the upper "fraction" of the LRU queue,
>>> move it back up
>>> -
>>> -	if (bdb->age < bufferAge - (uint64) upperFraction)
>>> -		{
>>> -		sync.lock (Exclusive);
>>> -		moveToHead (bdb);
>>> -		}
>>> -
>>> 	ASSERT (bdb->pageNumber == pageNumber);
>>> 	ASSERT (bdb->dbb == dbb);
>>> 	ASSERT (bdb->useCount > 0);
>>> @@ -319,9 +395,7 @@ Bdb* Cache::fetchPage(Dbb *dbb, int32 pa
>>>
>>> Bdb* Cache::fakePage(Dbb *dbb, int32 pageNumber, PageType type, TransId
>>> transId)
>>> {
>>> -	Sync sync(&syncObject, "Cache::fakePage");
>>> -	sync.lock(Exclusive);
>>> -	int	slot = pageNumber % hashSize;
>>> +	Bdb *bdb;
>>>
>>> #ifdef STOP_PAGE
>>> 	if (pageNumber == STOP_PAGE)
>>> @@ -329,33 +403,72 @@ Bdb* Cache::fakePage(Dbb *dbb, int32 pag
>>> #endif
>>>
>>> 	/* If we already have a buffer for this, we're done */
>>> +	bdb = lockFindBdbIncrementUseCount(dbb, pageNumber);
>>> +	if (!bdb)
>>> +		{
>>> +		// get getFreeBuffer() locks a hash bucket to remove the
>>> candidate bdb
>>> +		// if we locked out hash bucket before the call then we
>> could
>>> have
>>> +		// a deadlock
>>> +		// thus we get the free buffer before we lock the hash
>> bucket we
>>> will
>>> +		// be inserting into.  This avoids a dead lock but generates
>> a
>>> race
>>> +		// we take care of the race by reversing the getFreeBuffer()
>>> work
>>> +		// when we lose the race
>>> +		Bdb *bdbAvailable;
>>> +		int slot = PAGENUM_2_SLOT(pageNumber);
>>> +		Sync lockHash
>> (&syncHashTable[PAGENUM_2_LOCK_INDEX(pageNumber,
>>> slot)], "Cache::fetchPage");
>>> +
>>> +		bdbAvailable = getFreeBuffer();
>>> +		/* assume we'll be inserting this new BDB.  Set new page
>> number.
>>> */
>>> +		bdbAvailable->addRef (Exclusive  COMMA_ADD_HISTORY);
>>> +		bdbAvailable->decrementUseCount(REL_HISTORY);
>>> +
>>> +		bdbAvailable->hash = hashTable [slot];
>>> +		bdbAvailable->pageNumber = pageNumber;
>>> +		bdbAvailable->dbb = dbb;
>>> +#ifdef COLLECT_BDB_HISTORY
>>> +		bdbAvailable->initHistory();
>>> +#endif
>>>
>>> -	Bdb *bdb;
>>> +		lockHash.lock(Exclusive);
>>> +		bdb = findBdb(dbb, pageNumber, slot);
>>> +		if (!bdb)
>>> +			{
>>> +			// we won the race so lets use the free bdb
>>> +			// relink into hash table
>>> +			hashTable [slot] = bdbAvailable;
>>> +			lockHash.unlock();
>>>
>>> -	for (bdb = hashTable [slot]; bdb; bdb = bdb->hash)
>>> -		if (bdb->pageNumber == pageNumber && bdb->dbb == dbb)
>>> +			bdb = bdbAvailable;
>>> +			}
>>> +			else
>>> 			{
>>> -			if (bdb->syncObject.isLocked())
>>> -				{
>>> -				// The pageWriter may still be cleaning up
>> this
>>> freed page with a shared lock
>>> -				ASSERT(bdb->buffer->pageType == PAGE_free);
>>> -				ASSERT(bdb->syncObject.getState() >= 0);
>>> -				}
>>> -
>>> +			// lost a race.  put our available back to useable
>>> +			// side effect, bdbAvailable will have to age again
>>> before we re-use it.
>>> +			bdbAvailable->hash = NULL;
>>> +			bdbAvailable->pageNumber = -1;
>>> +			bdbAvailable->dbb = NULL;
>>> +			bdbAvailable->release();
>>> +
>>> +			//syncObject.validateExclusive("Cache::fetchPage
>>> (retry)");
>>> +			bdb->incrementUseCount(ADD_HISTORY);
>>> +			lockHash.unlock();
>>> 			bdb->addRef(Exclusive  COMMA_ADD_HISTORY);
>>> -
>>> -			break;
>>> +			bdb->decrementUseCount(REL_HISTORY);
>>> +			moveToHead(bdb);
>>> 			}
>>> -
>>> -	if (!bdb)
>>> -		bdb = findBuffer(dbb, pageNumber, Exclusive);
>>> +		}
>>> +		else
>>> +		{
>>> +		bdb->addRef(Exclusive  COMMA_ADD_HISTORY);
>>> +		bdb->decrementUseCount(REL_HISTORY);
>>> +		moveToHead(bdb);
>>> +		}
>>>
>>> 	if (!dbb->isReadOnly)
>>> 		bdb->mark(transId);
>>>
>>> 	memset(bdb->buffer, 0, pageSize);
>>> 	bdb->setPageHeader(type);
>>> -	moveToHead(bdb);
>>>
>>> 	return bdb;
>>> }
>>> @@ -363,32 +476,31 @@ Bdb* Cache::fakePage(Dbb *dbb, int32 pag
>>> void Cache::flush(int64 arg)
>>> {
>>> 	Sync flushLock(&syncFlush, "Cache::flush(1)");
>>> -	Sync sync(&syncDirty, "Cache::flush(2)");
>>> +	Sync dirtyLock(&syncDirty, "Cache::flush(2)");
>>> 	flushLock.lock(Exclusive);
>>>
>>> 	if (flushing)
>>> 		return;
>>>
>>> 	syncWait.lock(NULL, Exclusive);
>>> -	sync.lock(Shared);
>>> 	//Log::debug(%d: "Initiating flush\n", dbb->deltaTime);
>>> 	flushArg = arg;
>>> 	flushPages = 0;
>>> 	physicalWrites = 0;
>>>
>>> +	dirtyLock.lock(Shared);
>>> 	for (Bdb *bdb = firstDirty; bdb; bdb = bdb->nextDirty)
>>> 		{
>>> 		bdb->flushIt = true;
>>> 		flushBitmap->set(bdb->pageNumber);
>>> 		++flushPages;
>>> 		}
>>> +	dirtyLock.unlock();
>>>
>>> -	if (traceFile)
>>> -		analyzeFlush();
>>> +	analyzeFlush();
>>>
>>> 	flushStart = database->timestamp;
>>> 	flushing = true;
>>> -	sync.unlock();
>>> 	flushLock.unlock();
>>>
>>> 	for (int n = 0; n < numberIoThreads; ++n)
>>> @@ -398,69 +510,115 @@ void Cache::flush(int64 arg)
>>>
>>> void Cache::moveToHead(Bdb * bdb)
>>> {
>>> +	// If buffer has moved out of the upper "fraction" of the LRU queue,
>>> move it back up
>>> +	// non-protected access to age is harmless since it is fuzzy anyway
>>> +	if (bdb->age < bufferAge - (uint64) upperFraction)
>>> +		{
>>> +		Sync bufferQueueLock (&bufferQueue.syncObject,
>>> "Cache::moveToHead");
>>> +
>>> +		bufferQueueLock.lock (Exclusive);
>>> +		bdb->age = bufferAge++;
>>> +		bufferQueue.remove(bdb);
>>> +		bufferQueue.prepend(bdb);
>>> +		//validateUnique (bdb);
>>> +		}
>>> +}
>>> +
>>> +void Cache::moveToHeadAlreadyLocked(Bdb * bdb)
>>> +{
>>> 	bdb->age = bufferAge++;
>>> 	bufferQueue.remove(bdb);
>>> 	bufferQueue.prepend(bdb);
>>> 	//validateUnique (bdb);
>>> }
>>>
>>> -Bdb* Cache::findBuffer(Dbb *dbb, int pageNumber, LockType lockType)
>>> +Bdb* Cache::getFreeBuffer(void)
>>> {
>>> -	//syncObject.validateExclusive("Cache::findBuffer");
>>> -	int	slot = pageNumber % hashSize;
>>> -	Sync sync(&syncDirty, "Cache::findBuffer");
>>> -
>>> -	/* Find least recently used, not-in-use buffer */
>>> -
>>> +	Sync bufferQueueLock (&bufferQueue.syncObject,
>>> "Cache::getFreeBuffer");
>>> +	unsigned int count;
>>> 	Bdb *bdb;
>>>
>>> 	// Find a candidate BDB.
>>> -
>>> 	for (;;)
>>> 		{
>>> -		for (bdb = bufferQueue.last; bdb; bdb = bdb->prior)
>>> -			if (bdb->useCount == 0)
>>> +		bufferQueueLock.lock (Exclusive);
>>> +		// find a candidate that is NOT in use and NOT dirty and in
>> the
>>> tail fraction of the LRU
>>> +		for (count = 0, bdb = bufferQueue.last; bdb ; bdb =
>> bdb->prior,
>>> count++)
>>> +			{
>>> +			if (count >= upperFraction)
>>> +				{
>>> +				bdb = NULL;
>>> 				break;
>>> +				}
>>> +			if (bdb->useCount == 0)
>>> +				{
>>> +				if (!bdb->isDirty)
>>> +					{
>>> +					bdb->incrementUseCount(REL_HISTORY);
>>> +					moveToHeadAlreadyLocked(bdb);
>>> +					break;
>>> +					}
>>> +				}
>>> +				else
>>> +				{
>>> +					// get this one out of the way so we
>> don't
>>> search it every time
>>> +					moveToHeadAlreadyLocked(bdb,count);
>>> +				}
>>> +			}
>>> +		if (!bdb)
>>> +			// find a candidate that is NOT in use, could be
>> dirty
>>> +			for (bdb = bufferQueue.last; bdb; bdb = bdb->prior)
>>> +				if (bdb->useCount == 0)
>>> +					{
>>> +					bdb->incrementUseCount(REL_HISTORY);
>>> +					moveToHeadAlreadyLocked(bdb);
>>> +					break;
>>> +					}
>>> +		bufferQueueLock.unlock();
>>>
>>> 		if (!bdb)
>>> 			throw SQLError(RUNTIME_ERROR, "buffer pool is
>>> exhausted\n");
>>>
>>> -		if (!bdb->isDirty)
>>> -			break;
>>> -
>>> -		writePage (bdb, WRITE_TYPE_REUSE);
>>> -		}
>>> -
>>> -	/* Unlink its old incarnation from the page/hash table */
>>> +		if (bdb->pageNumber >= 0)
>>> +		{
>>> +			int	slotRemove =
>> PAGENUM_2_SLOT(bdb->pageNumber);
>>> +			Sync lockHashRemove
>>> (&syncHashTable[PAGENUM_2_LOCK_INDEX(bdb->pageNumber, slotRemove)],
>>> "Cache::getFreeBuffer");
>>> +			lockHashRemove.lock(Exclusive);
>>>
>>> -	if (bdb->pageNumber >= 0)
>>> -		for (Bdb **ptr = hashTable + bdb->pageNumber % hashSize;;
>> ptr =
>>> &(*ptr)->hash)
>>> -			if (*ptr == bdb)
>>> +			if (bdb->useCount != 1)
>>> 				{
>>> -				*ptr = bdb->hash;
>>> -				break;
>>> +				// we lost a race try again
>>> +				bdb->decrementUseCount(REL_HISTORY);
>>> +				lockHashRemove.unlock();
>>> +				continue;
>>> 				}
>>> -			else
>>> -				ASSERT (*ptr);
>>> -
>>> -	bdb->addRef (lockType  COMMA_ADD_HISTORY);
>>>
>>> -	/* Set new page number and relink into hash table */
>>> +			if (bdb->isDirty)
>>> +				writePage (bdb, WRITE_TYPE_REUSE);
>>>
>>> -	bdb->hash = hashTable [slot];
>>> -	hashTable [slot] = bdb;
>>> -	bdb->pageNumber = pageNumber;
>>> -	bdb->dbb = dbb;
>>> +			/* Unlink its old incarnation from the page/hash
>> table */
>>> +			for (Bdb **ptr = hashTable + PAGENUM_2_SLOT(bdb-
>>>> pageNumber) ;; ptr = &(*ptr)->hash)
>>> +				if (*ptr == bdb)
>>> +					{
>>> +					*ptr = bdb->hash;
>>> +					break;
>>> +					}
>>> +				else
>>> +					ASSERT (*ptr);
>>> +		}
>>>
>>> -#ifdef COLLECT_BDB_HISTORY
>>> -	bdb->initHistory();
>>> -#endif
>>> +		break;
>>> +		}
>>>
>>> 	return bdb;
>>> }
>>>
>>> void Cache::validate()
>>> {
>>> +	//Sync bufferQueueLock (&bufferQueue.syncObject, "Cache::validate");
>>> +
>>> +	//bufferQueueLock.lock (Shared);
>>> +	// non-protected access to bufferQueue is DANGEROUS...
>>> 	for (Bdb *bdb = bufferQueue.last; bdb; bdb = bdb->prior)
>>> 		{
>>> 		//IndexPage *page = (IndexPage*) bdb->buffer;
>>> @@ -470,8 +628,8 @@ void Cache::validate()
>>>
>>> void Cache::markDirty(Bdb *bdb)
>>> {
>>> -	Sync sync (&syncDirty, "Cache::markDirty");
>>> -	sync.lock (Exclusive);
>>> +	Sync dirtyLock (&syncDirty, "Cache::markDirty");
>>> +	dirtyLock.lock (Exclusive);
>>> 	bdb->nextDirty = NULL;
>>> 	bdb->priorDirty = lastDirty;
>>>
>>> @@ -481,14 +639,13 @@ void Cache::markDirty(Bdb *bdb)
>>> 		firstDirty = bdb;
>>>
>>> 	lastDirty = bdb;
>>> -	++numberDirtyPages;
>>> 	//validateUnique (bdb);
>>> }
>>>
>>> void Cache::markClean(Bdb *bdb)
>>> {
>>> -	Sync sync (&syncDirty, "Cache::markClean");
>>> -	sync.lock (Exclusive);
>>> +	Sync dirtyLock (&syncDirty, "Cache::markClean");
>>> +	dirtyLock.lock (Exclusive);
>>>
>>> 	/***
>>> 	if (bdb->flushIt)
>>> @@ -496,7 +653,6 @@ void Cache::markClean(Bdb *bdb)
>>> 	***/
>>>
>>> 	bdb->flushIt = false;
>>> -	--numberDirtyPages;
>>>
>>> 	if (bdb == lastDirty)
>>> 		lastDirty = bdb->priorDirty;
>>> @@ -600,8 +756,8 @@ void Cache::writePage(Bdb *bdb, int type
>>>
>>> 	if (dbb->shadows)
>>> 		{
>>> -		Sync sync (&dbb->syncClone, "Cache::writePage(2)");
>>> -		sync.lock (Shared);
>>> +		Sync cloneLock (&dbb->syncClone, "Cache::writePage(2)");
>>> +		cloneLock.lock (Shared);
>>>
>>> 		for (DatabaseCopy *shadow = dbb->shadows; shadow; shadow =
>>> shadow->next)
>>> 			shadow->rewritePage(bdb);
>>> @@ -610,14 +766,14 @@ void Cache::writePage(Bdb *bdb, int type
>>>
>>> void Cache::analyze(Stream *stream)
>>> {
>>> -	Sync sync (&syncDirty, "Cache::analyze");
>>> -	sync.lock (Shared);
>>> +	Sync dirtyLock (&syncDirty, "Cache::analyze");
>>> 	int inUse = 0;
>>> 	int dirty = 0;
>>> 	int dirtyList = 0;
>>> 	int total = 0;
>>> 	Bdb *bdb;
>>>
>>> +	// non-protected access to bdbs,endBdbs is DANGEROUS...
>>> 	for (bdb = bdbs; bdb < endBdbs; ++bdb)
>>> 		{
>>> 		++total;
>>> @@ -629,8 +785,10 @@ void Cache::analyze(Stream *stream)
>>> 			++inUse;
>>> 		}
>>>
>>> +	dirtyLock.lock (Shared);
>>> 	for (bdb = firstDirty; bdb; bdb = bdb->nextDirty)
>>> 		++dirtyList;
>>> +	dirtyLock.unlock();
>>>
>>> 	stream->format ("Cache: %d pages, %d in use, %d dirty, %d in dirty
>>> chain\n",
>>> 					total, inUse, dirty, dirtyList);
>>> @@ -638,17 +796,18 @@ void Cache::analyze(Stream *stream)
>>>
>>> void Cache::validateUnique(Bdb *target)
>>> {
>>> -	int	slot = target->pageNumber % hashSize;
>>> +	int	slot = PAGENUM_2_SLOT(target->pageNumber);
>>>
>>> +	// WARNING: unlocked walk of hash table.... DANGEROUS
>>> 	for (Bdb *bdb = hashTable [slot]; bdb; bdb = bdb->hash)
>>> 		ASSERT (bdb == target || !(bdb->pageNumber ==
>> target->pageNumber
>>> && bdb->dbb == target->dbb));
>>> }
>>>
>>> void Cache::freePage(Dbb *dbb, int32 pageNumber)
>>> {
>>> -	Sync sync (&syncObject, "Cache::freePage");
>>> -	sync.lock (Shared);
>>> -	int	slot = pageNumber % hashSize;
>>> +	int slot = PAGENUM_2_SLOT(pageNumber);
>>> +	Sync lockHash (&syncHashTable[PAGENUM_2_LOCK_INDEX(pageNumber,
>> slot)],
>>> "Cache::freePage");
>>> +	lockHash.lock(Shared);
>>>
>>> 	// If page exists in cache (usual case), clean it up
>>>
>>> @@ -657,7 +816,7 @@ void Cache::freePage(Dbb *dbb, int32 pag
>>> 			{
>>> 			if (bdb->isDirty)
>>> 				{
>>> -				sync.unlock();
>>> +				lockHash.unlock();
>>> 				markClean (bdb);
>>> 				}
>>>
>>> @@ -685,12 +844,16 @@ void Cache::flush(Dbb *dbb)
>>>
>>> bool Cache::hasDirtyPages(Dbb *dbb)
>>> {
>>> -	Sync sync (&syncDirty, "Cache::hasDirtyPages");
>>> -	sync.lock (Shared);
>>> +	Sync dirtyLock (&syncDirty, "Cache::hasDirtyPages");
>>> +	dirtyLock.lock (Shared);
>>>
>>> 	for (Bdb *bdb = firstDirty; bdb; bdb = bdb->nextDirty)
>>> 		if (bdb->dbb == dbb)
>>> +			{
>>> +			dirtyLock.unlock();
>>> 			return true;
>>> +			}
>>> +	dirtyLock.unlock();
>>>
>>> 	return false;
>>> }
>>> @@ -717,26 +880,16 @@ Bdb* Cache::trialFetch(Dbb* dbb, int32 p
>>> 		}
>>>
>>> 	ASSERT (pageNumber >= 0);
>>> -	int	slot = pageNumber % hashSize;
>>> -	Sync sync (&syncObject, "Cache::trialFetch");
>>> -	sync.lock (Shared);
>>> -	int hit = 0;
>>> -
>>> -	/* If we already have a buffer for this go, we're done */
>>> -
>>> 	Bdb *bdb;
>>>
>>> -	for (bdb = hashTable [slot]; bdb; bdb = bdb->hash)
>>> -		if (bdb->pageNumber == pageNumber && bdb->dbb == dbb)
>>> -			{
>>> -			//syncObject.validateShared("Cache::trialFetch");
>>> -			bdb->incrementUseCount(ADD_HISTORY);
>>> -			sync.unlock();
>>> -			bdb->addRef(lockType  COMMA_ADD_HISTORY);
>>> -			bdb->decrementUseCount(REL_HISTORY);
>>> -			hit = 1;
>>> -			break;
>>> -			}
>>> +	/* If we already have a buffer for this, we're done */
>>> +	bdb = lockFindBdbIncrementUseCount(dbb, pageNumber);
>>> +	if (bdb)
>>> +		{
>>> +		bdb->addRef(lockType  COMMA_ADD_HISTORY);
>>> +		bdb->decrementUseCount(REL_HISTORY);
>>> +		moveToHead(bdb);
>>> +	}
>>>
>>> 	return bdb;
>>> }
>>> @@ -764,10 +917,9 @@ void Cache::ioThread(void* arg)
>>>
>>> void Cache::ioThread(void)
>>> {
>>> -	Sync syncThread(&syncThreads, "Cache::ioThread(1)");
>>> +	Sync syncThread(&syncThreads, "Cache::ioThread");
>>> 	syncThread.lock(Shared);
>>> -	Sync flushLock(&syncFlush, "Cache::ioThread(2)");
>>> -	Sync sync(&syncObject, "Cache::ioThread(3)");
>>> +	Sync flushLock(&syncFlush, "Cache::ioThread");
>>> 	Priority priority(database->ioScheduler);
>>> 	Thread *thread = Thread::getThread("Cache::ioThread");
>>> 	UCHAR *rawBuffer = new UCHAR[ASYNC_BUFFER_SIZE];
>>> @@ -781,137 +933,135 @@ void Cache::ioThread(void)
>>> 		{
>>> 		int32 pageNumber = flushBitmap->nextSet(0);
>>> 		int count;
>>> -		Dbb *dbb;
>>>
>>> 		if (pageNumber >= 0)
>>> 			{
>>> -			int	slot = pageNumber % hashSize;
>>> +			Bdb *bdb;
>>> +			Dbb *dbb;
>>> +			int	slot = PAGENUM_2_SLOT(pageNumber);
>>> 			bool hit = false;
>>> 			Bdb *bdbList = NULL;
>>> 			UCHAR *p = buffer;
>>> -			sync.lock(Shared);
>>> -
>>> -			// Look for a page to flush.  Then get all his
>> friends
>>> -			for (Bdb *bdb = hashTable[slot]; bdb; bdb =
>> bdb->hash)
>>> -				if (bdb->pageNumber == pageNumber &&
>> bdb->flushIt
>>> && bdb->isDirty)
>>> +			// Look for the page to flush.
>>> +			bdb = lockFindBdbIncrementUseCount(pageNumber,
>> slot);
>>> +			if (bdb && bdb->flushIt && bdb->isDirty)
>>> +				{
>>> +				hit = true;
>>> +				count = 0;
>>> +				dbb = bdb->dbb;
>>> +
>>> +				flushBitmap->clear(pageNumber);
>>> +
>>> +				// get all his friends
>>> +				while (p < end)
>>> 					{
>>> -					hit = true;
>>> -					count = 0;
>>> -					dbb = bdb->dbb;
>>> +					++count;
>>> +					bdb->addRef(Shared
>> COMMA_ADD_HISTORY);
>>> -					if (!bdb->hash)
>>> -
>> flushBitmap->clear(pageNumber);
>>> +					bdb->syncWrite.lock(NULL,
>> Exclusive);
>>> +					bdb->ioThreadNext = bdbList;
>>> +					bdbList = bdb;
>>>
>>> -					while (p < end)
>>> -						{
>>> -						++count;
>>> -
>> bdb->incrementUseCount(ADD_HISTORY);
>>> -						sync.unlock();
>>> -						bdb->addRef(Shared
>>> COMMA_ADD_HISTORY);
>>> -						if (falcon_use_sectorcache)
>>> -
>> sectorCache->writePage(bdb);
>>> -
>>> -						bdb->syncWrite.lock(NULL,
>> Exclusive);
>>> -						bdb->ioThreadNext = bdbList;
>>> -						bdbList = bdb;
>>> -
>>> -						//ASSERT(!(bdb->flags &
>>> BDB_write_pending));
>>> -						//bdb->flags |=
>> BDB_write_pending;
>>> -						memcpy(p, bdb->buffer,
>> pageSize);
>>> -						p += pageSize;
>>> -						bdb->flushIt = false;
>>> -						markClean(bdb);
>>> -						bdb->isDirty = false;
>>> -						bdb->release(REL_HISTORY);
>>> -						sync.lock(Shared);
>>> -
>>> -						if ( !(bdb = findBdb(dbb,
>> bdb-
>>>> pageNumber + 1)) )
>>> -							break;
>>> -
>>> -						if (!bdb->isDirty &&
>>> !continueWrite(bdb))
>>> -							break;
>>> -						}
>>> +					//ASSERT(!(bdb->flags &
>> BDB_write_pending));
>>> +					//bdb->flags |= BDB_write_pending;
>>> +					memcpy(p, bdb->buffer, pageSize);
>>> +					p += pageSize;
>>> +					bdb->flushIt = false;
>>> +					markClean(bdb);
>>> +					bdb->isDirty = false;
>>> +					bdb->release(REL_HISTORY);
>>>
>>> -					if (sync.state != None)
>>> -						sync.unlock();
>>> -
>>> -					flushLock.unlock();
>>> -					//Log::debug(" %d Writing %s %d
>> pages: %d -
>>> %d\n", thread->threadId, (const char*) dbb->fileName, count,
> pageNumber,
>>> pageNumber + count - 1);
>>> -					int length = p - buffer;
>>> -					priority.schedule(PRIORITY_LOW);
>>> +					bdb =
>> lockFindBdbIncrementUseCount(dbb, bdb-
>>>> pageNumber + 1);
>>> +					if (!bdb)
>>> +						break;
>>>
>>> -					try
>>> +					if (!bdb->isDirty &&
>> !continueWrite(bdb))
>>> 						{
>>> -
>> priority.schedule(PRIORITY_LOW);
>>> -						dbb->writePages(pageNumber,
>> length,
>>> buffer, WRITE_TYPE_FLUSH);
>>> +
>> bdb->decrementUseCount(REL_HISTORY);
>>> +						break;
>>> 						}
>>> -					catch (SQLException& exception)
>>> +					}
>>> +
>>> +				flushLock.unlock();
>>> +				//Log::debug(" %d Writing %s %d pages: %d -
>> %d\n",
>>> thread->threadId, (const char*) dbb->fileName, count, pageNumber,
>> pageNumber
>>> + count - 1);
>>> +				int length = p - buffer;
>>> +				priority.schedule(PRIORITY_LOW);
>>> +
>>> +				try
>>> +					{
>>> +					priority.schedule(PRIORITY_LOW);
>>> +					dbb->writePages(pageNumber, length,
>> buffer,
>>> WRITE_TYPE_FLUSH);
>>> +					}
>>> +				catch (SQLException& exception)
>>> +					{
>>> +					priority.finished();
>>> +
>>> +					if (exception.getSqlcode() !=
>> DEVICE_FULL)
>>> +						throw;
>>> +
>>> +					database->setIOError(&exception);
>>> +
>>> +					for (bool error = true; error;)
>>> 						{
>>> -						priority.finished();
>>> -
>>> -						if (exception.getSqlcode()
>> !=
>>> DEVICE_FULL)
>>> -							throw;
>>> -
>>> -
>> database->setIOError(&exception);
>>> -
>>> -						for (bool error = true;
>> error;)
>>> +						if
>> (thread->shutdownInProgress)
>>> 							{
>>> -							if
>> (thread->shutdownInProgress)
>>> -								{
>>> -								Bdb *next;
>>> +							Bdb *next;
>>>
>>> -								for (bdb =
>> bdbList; bdb;
>>> bdb = next)
>>> -									{
>>> -
>> //bdb->flags &=
>>> ~BDB_write_pending;
>>> -									next
>> = bdb-
>>>> ioThreadNext;
>>> -									bdb-
>>>> syncWrite.unlock();
>>> -									bdb-
>>>> decrementUseCount(REL_HISTORY);
>>> -									}
>>> -
>>> -								return;
>>> -								}
>>> -
>>> -							thread->sleep(1000);
>>> -
>>> -							try
>>> +							for (bdb = bdbList;
>> bdb; bdb =
>>> next)
>>> 								{
>>> -
>>> 	priority.schedule(PRIORITY_LOW);
>>> -								dbb-
>>>> writePages(pageNumber, length, buffer, WRITE_TYPE_FLUSH);
>>> -								error =
>> false;
>>> -
>> database->clearIOError();
>>> +								//bdb->flags
>> &=
>>> ~BDB_write_pending;
>>> +								next =
>> bdb->ioThreadNext;
>>> +
>> bdb->syncWrite.unlock();
>>> +								bdb-
>>>> decrementUseCount(REL_HISTORY);
>>> 								}
>>> -							catch (SQLException&
>>> exception2)
>>> -								{
>>> -
>> priority.finished();
>>> -								if
>>> (exception2.getSqlcode() != DEVICE_FULL)
>>> -
>> throw;
>>> -								}
>>> +							return;
>>> +							}
>>> +
>>> +						thread->sleep(1000);
>>> +
>>> +						try
>>> +							{
>>> +
>>> 	priority.schedule(PRIORITY_LOW);
>>> +
>> dbb->writePages(pageNumber,
>>> length, buffer, WRITE_TYPE_FLUSH);
>>> +							error = false;
>>> +
>> database->clearIOError();
>>> +							}
>>> +						catch (SQLException&
>> exception2)
>>> +							{
>>> +							priority.finished();
>>> +
>>> +							if
>> (exception2.getSqlcode() !=
>>> DEVICE_FULL)
>>> +								throw;
>>> 							}
>>> 						}
>>> +					}
>>>
>>> -					priority.finished();
>>> -					Bdb *next;
>>> +				priority.finished();
>>> +				Bdb *next;
>>>
>>> -					for (bdb = bdbList; bdb; bdb = next)
>>> -						{
>>> -						//ASSERT(bdb->flags &
>>> BDB_write_pending);
>>> -						//bdb->flags &=
>> ~BDB_write_pending;
>>> -						next = bdb->ioThreadNext;
>>> -						bdb->syncWrite.unlock();
>>> -
>> bdb->decrementUseCount(REL_HISTORY);
>>> -						}
>>> -
>>> -					flushLock.lock(Exclusive);
>>> -					++physicalWrites;
>>> -
>>> -					break;
>>> +				for (bdb = bdbList; bdb; bdb = next)
>>> +					{
>>> +					//ASSERT(bdb->flags &
>> BDB_write_pending);
>>> +					//bdb->flags &= ~BDB_write_pending;
>>> +					next = bdb->ioThreadNext;
>>> +					bdb->syncWrite.unlock();
>>> +					bdb->decrementUseCount(REL_HISTORY);
>>> 					}
>>> +
>>> +				flushLock.lock(Exclusive);
>>> +				++physicalWrites;
>>> +
>>> +				}
>>> +			else
>>> +				{
>>> +					if (bdb)
>>> +
>> bdb->decrementUseCount(REL_HISTORY);
>>> +				}
>>>
>>> 			if (!hit)
>>> 				{
>>> -				sync.unlock();
>>> 				flushBitmap->clear(pageNumber);
>>> 				}
>>> 			}
>>> @@ -922,7 +1072,9 @@ void Cache::ioThread(void)
>>> 				int writes = physicalWrites;
>>> 				int pages = flushPages;
>>> 				int delta = (int) (database->timestamp -
>>> flushStart);
>>> +				int64 callbackArg = flushArg;
>>> 				flushing = false;
>>> +				flushArg = 0;
>>> 				flushLock.unlock();
>>> 				syncWait.unlock();
>>>
>>> @@ -930,7 +1082,8 @@ void Cache::ioThread(void)
>>> 					Log::log(LogInfo, "%d: Cache flush:
>> %d
>>> pages, %d writes in %d seconds (%d pps)\n",
>>>
>> database->deltaTime,
>>> pages, writes, delta, pages / MAX(delta, 1));
>>>
>>> -				database->pageCacheFlushed(flushArg);
>>> +				if (callbackArg != 0)
>>> +
>> database->pageCacheFlushed(callbackArg);
>>> 				}
>>> 			else
>>> 				flushLock.unlock();
>>> @@ -940,8 +1093,8 @@ void Cache::ioThread(void)
>>>
>>> 			thread->sleep();
>>> 			flushLock.lock(Exclusive);
>>> -			}
>>> 		}
>>> +		} // for ever
>>>
>>> 	delete [] rawBuffer;
>>> }
>>> @@ -954,11 +1107,12 @@ bool Cache::continueWrite(Bdb* startingB
>>>
>>> 	for (int32 pageNumber = startingBdb->pageNumber + 1, end =
>> pageNumber+
>>> 5; pageNumber < end; ++pageNumber)
>>> 		{
>>> -		Bdb *bdb = findBdb(dbb, pageNumber);
>>> +		Bdb *bdb;
>>>
>>> 		if (dirty > clean)
>>> 			return true;
>>> -
>>> +
>>> +		bdb = lockFindBdbIncrementUseCount(dbb, pageNumber);
>>> 		if (!bdb)
>>> 			return dirty >= clean;
>>>
>>> @@ -966,6 +1120,7 @@ bool Cache::continueWrite(Bdb* startingB
>>> 			++dirty;
>>> 		else
>>> 			++clean;
>>> +		bdb->decrementUseCount(REL_HISTORY);
>>> 		}
>>>
>>> 	return (dirty >= clean);
>>> @@ -995,15 +1150,18 @@ void Cache::shutdownThreads(void)
>>> 		ioThreads[n] = 0;
>>> 		}
>>>
>>> -	Sync sync(&syncThreads, "Cache::shutdownThreads");
>>> -	sync.lock(Exclusive);
>>> +	Sync lockThreads(&syncThreads, "Cache::shutdownThreads");
>>> +	lockThreads.lock(Exclusive);
>>> }
>>>
>>> +#ifdef CACHE_TRACE_FILE
>>> void Cache::analyzeFlush(void)
>>> {
>>> 	Dbb *dbb = NULL;
>>> 	Bdb *bdb;
>>> +	Sync dirtyLock (&syncDirty, "Cache::hasDirtyPages");
>>>
>>> +	dirtyLock.lock (Shared);
>>> 	for (bdb = firstDirty; bdb; bdb = bdb->nextDirty)
>>> 		if (bdb->dbb->tableSpaceId == 1)
>>> 			{
>>> @@ -1011,6 +1169,7 @@ void Cache::analyzeFlush(void)
>>>
>>> 			break;
>>> 			}
>>> +	dirtyLock.unlock();
>>>
>>> 	if (!dbb)
>>> 		return;
>>> @@ -1018,16 +1177,19 @@ void Cache::analyzeFlush(void)
>>> 	fprintf(traceFile, "-------- time %d -------\n",
>> database->deltaTime);
>>> 	for (int pageNumber = 0; (pageNumber = flushBitmap-
>>>> nextSet(pageNumber)) >= 0;)
>>> +		// non-protected access to hash table via findBdb()!
>>> 		if ( (bdb = findBdb(dbb, pageNumber)) )
>>> 			{
>>> 			int start = pageNumber;
>>> 			int type = bdb->buffer->pageType;
>>>
>>> +			// non-protected access to hash table via findBdb()!
>>> 			for (; (bdb = findBdb(dbb, ++pageNumber)) && bdb-
>>>> flushIt;)
>>> 				;
>>>
>>> 			fprintf(traceFile, " %d flushed: %d to %d, first
>> type
>>> %d\n", pageNumber - start, start, pageNumber - 1, type);
>>>
>>> +			// non-protected access to hash table via findBdb()!
>>> 			for (int max = pageNumber + 5; pageNumber < max &&
>> (bdb =
>>> findBdb(dbb, pageNumber)) && !bdb->flushIt; ++pageNumber)
>>> 				{
>>> 				if (bdb->isDirty)
>>> @@ -1044,27 +1206,39 @@ void Cache::analyzeFlush(void)
>>>
>>> void Cache::openTraceFile(void)
>>> {
>>> -#ifdef TRACE_FILE
>>> 	if (traceFile)
>>> 		closeTraceFile();
>>>
>>> -	traceFile = fopen(TRACE_FILE, "w");
>>> -#endif
>>> +	traceFile = fopen(TRACE_FILE, "a+");
>>> +	fprintf(traceFile, "Starting\n");
>>> +//KEL
>>> +//	setvbuf(traceFile, (char *) NULL, _IOLBF, 0);
>>> +
>>> }
>>>
>>> void Cache::closeTraceFile(void)
>>> {
>>> -#ifdef TRACE_FILE
>>> 	if (traceFile)
>>> 		{
>>> 		fclose(traceFile);
>>> 		traceFile = NULL;
>>> 		}
>>> -#endif
>>> }
>>> +#else // CACHE_TRACE_FILE
>>> +void Cache::analyzeFlush(void)
>>> +{
>>> +}
>>> +void Cache::openTraceFile(void)
>>> +{
>>> +}
>>> +void Cache::closeTraceFile(void)
>>> +{
>>> +}
>>> +#endif // CACHE_TRACE_FILE
>>>
>>> void Cache::flushWait(void)
>>> {
>>> -	Sync sync(&syncWait, "Cache::flushWait");
>>> -	sync.lock(Shared);
>>> +	Sync waitLock(&syncWait, "Cache::flushWait");
>>> +	waitLock.lock(Exclusive);
>>> }
>>> +
>>>
>>> === modified file 'storage/falcon/Cache.h'
>>> --- a/storage/falcon/Cache.h	2008-08-22 06:47:40 +0000
>>> +++ b/storage/falcon/Cache.h	2008-09-02 16:08:11 +0000
>>> @@ -28,6 +28,17 @@
>>> #include "SyncObject.h"
>>> #include "Queue.h"
>>>
>>> +// uncomment DEBUG_SYNC_HASH_TABLE_SIZE to cause more contention and
>test
>>> for race conditions
>>> +//#define DEBUG_SYNC_HASH_TABLE_SIZE (0x01 << 1)
>>> +#ifdef DEBUG_SYNC_HASH_TABLE_SIZE
>>> +#  define DEBUG_SYNC_HASH_TABLE_MASK (DEBUG_SYNC_HASH_TABLE_SIZE - 1)
>>> +#  define PAGENUM_2_LOCK_INDEX(_pgnum, _slot) ((_pgnum) &
>>> DEBUG_SYNC_HASH_TABLE_MASK)
>>> +#else /* DEBUG_SYNC_HASH_TABLE_SIZE */
>>> +#  define PAGENUM_2_LOCK_INDEX(_pgnum, _slot) ((_slot))
>>> +#endif /* DEBUG_SYNC_HASH_TABLE_SIZE */
>>> +
>>> +#define PAGENUM_2_SLOT(_pgnum) ((_pgnum) & hashMask)
>>> +
>>> class Bdb;
>>> class Dbb;
>>> class PageWriter;
>>> @@ -54,7 +65,6 @@ public:
>>> 	void	markClean (Bdb *bdb);
>>> 	void	markDirty (Bdb *bdb);
>>> 	void	validate();
>>> -	void	moveToHead (Bdb *bdb);
>>> 	void	flush(int64 arg);
>>> 	void	validateCache(void);
>>> 	void	syncFile(Dbb *dbb, const char *text);
>>> @@ -83,14 +93,20 @@ public:
>>> 	bool		flushing;
>>>
>>> protected:
>>> -	Bdb*		findBuffer (Dbb *dbb, int pageNumber, LockType
>> lockType);
>>> +	void	moveToHead (Bdb *bdb);
>>> +	void	moveToHeadAlreadyLocked (Bdb *bdb);
>>> +	Bdb*		getFreeBuffer(void);
>>> +	Bdb*		findBdb(Dbb* dbb, int32 pageNumber, int slot);
>>> 	Bdb*		findBdb(Dbb* dbb, int32 pageNumber);
>>> +	Bdb*		lockFindBdbIncrementUseCount(Dbb* dbb, int32
>> pageNumber);
>>> +	Bdb*		lockFindBdbIncrementUseCount(int32 pageNumber, int
>> slot);
>>> 	int64		flushArg;
>>> 	Bdb			*bdbs;
>>> 	Bdb			*endBdbs;
>>> 	Queue<Bdb>	bufferQueue;
>>> 	Bdb			**hashTable;
>>> +	SyncObject  *syncHashTable;
>>> 	Bdb			*firstDirty;
>>> 	Bdb			*lastDirty;
>>> 	Bitmap		*flushBitmap;
>>> @@ -105,12 +121,13 @@ protected:
>>> 	int			flushPages;
>>> 	int			physicalWrites;
>>> 	int			hashSize;
>>> +	unsigned int	hashMask;
>>> 	int			pageSize;
>>> -	int			upperFraction;
>>> +	unsigned int upperFraction;
>>> 	int			numberHunks;
>>> -	int			numberDirtyPages;
>>> 	int			numberIoThreads;
>>> -	volatile int bufferAge;
>>> +	volatile uint64 bufferAge;
>>> +
>>> public:
>>> 	void flushWait(void);
>>> };
>>>
>>>
>>> --
>>> MySQL Code Commits Mailing List
>>> For list archives: http://lists.mysql.com/commits
>>> To unsubscribe:    http://lists.mysql.com/commits?unsub=1
>>
>>
>
>--
>
>Kelly Long, Senior Software Engineer/Performance Architect
>Sun Microsystems AKA MySQL
>Office: Denver CO USA
>
>Are you MySQL certified?  www.mysql.com/certification
>
>This message including any attachments is confidential information of Sun
>Microsystems, Inc. Disclosure, copying or distribution is prohibited
without
>permission of Sun. If you are not the intended recipient, please reply to
>the
>sender and then delete this message.

Thread
bzr commit into mysql-6.0-falcon branch (klong:2808) Kelly Long2 Sep
  • RE: bzr commit into mysql-6.0-falcon branch (klong:2808) Kevin Lewis3 Sep
RE: bzr commit into mysql-6.0-falcon branch (klong:2808)Kevin Lewis3 Sep