List:Commits« Previous MessageNext Message »
From:Kelly Long Date:August 18 2008 1:48pm
Subject:bzr commit into mysql-6.0-falcon branch (klong:2786)
View as plain text  
#At file:///FC/MYSQL/wa-2008-BZR/lcl/mysql-6.0-falcon-team-KL-Cache/

 2786 Kelly Long	2008-08-18
      Page cache lock per hash bucket. WL4480
      Page cache hash buckets are a power of two. WL4481
modified:
  storage/falcon/Cache.cpp
  storage/falcon/Cache.h

per-file messages:
  storage/falcon/Cache.h
    Page cache lock per hash bucket. WL4480
    Page cache hash buckets are a power of two. WL4481
=== modified file 'storage/falcon/Cache.cpp'
--- a/storage/falcon/Cache.cpp	2008-07-24 08:45:03 +0000
+++ b/storage/falcon/Cache.cpp	2008-08-18 13:47:57 +0000
@@ -72,7 +72,22 @@ Cache::Cache(Database *db, int pageSz, i
 	database = db;
 	panicShutdown = false;
 	pageSize = pageSz;
-	hashSize = hashSz;
+	{
+		unsigned int highBit;
+		for (highBit=0x01; highBit < hashSz; highBit= highBit << 1) {
+		}
+
+		// if there are more than 4096 buckets then lets round down
+		// else lets round up
+		if (highBit >= 0x00001000) {
+			// KEL use power of two rounded down
+			hashSize = highBit << 1;
+		} else {
+			// KEL use power of two rounded up
+			hashSize = highBit;
+		}
+	}
+	hashMask = hashSize - 1;
 	numberBuffers = numBuffers;
 	upperFraction = numberBuffers / 4;
 	bufferAge = 0;
@@ -80,8 +95,21 @@ Cache::Cache(Database *db, int pageSz, i
 	lastDirty = NULL;
 	numberDirtyPages = 0;
 	pageWriter = NULL;
-	hashTable = new Bdb* [hashSz];
+	hashTable = new Bdb* [hashSize];
 	memset (hashTable, 0, sizeof (Bdb*) * hashSize);
+#ifdef DEBUG_SYNC_HASH_TABLE_SIZE
+    syncHashTable = new SyncObject [DEBUG_SYNC_HASH_TABLE_SIZE];
+	for (int loop = 0; loop < DEBUG_SYNC_HASH_TABLE_SIZE; loop ++)
+		syncHashTable[loop].setName("Cache::syncHashTable");
+#else /* DEBUG_SYNC_HASH_TABLE_SIZE */
+    syncHashTable = new SyncObject [hashSize];
+	for (int loop = 0; loop < hashSize; loop ++)
+		{
+		char tmpName[128];
+		snprintf(tmpName,120,"Cache::syncHashTable[%d]",loop);
+		syncHashTable[loop].setName(tmpName);
+		}
+#endif /* DEBUG_SYNC_HASH_TABLE_SIZE */
 	sectorCache = new SectorCache(sectorCacheSize / SECTOR_BUFFER_SIZE, pageSize);
 
 	uint64 n = ((uint64) pageSize * numberBuffers + cacheHunkSize - 1) / cacheHunkSize;
@@ -103,6 +131,7 @@ Cache::Cache(Database *db, int pageSz, i
 	
 	try
 		{
+		// non-protected access to bdbs,endBdbs is OK during initialization
 		bdbs = new Bdb [numberBuffers];
 		endBdbs = bdbs + numberBuffers;
 		int remaining = 0;
@@ -121,6 +150,7 @@ Cache::Cache(Database *db, int pageSz, i
 				}
 
 			bdb->cache = this;
+			// non-protected access to bufferQueue is OK during initialization
 			bufferQueue.append(bdb);
 			bdb->buffer = (Page*) stuff;
 			stuff += pageSize;
@@ -150,6 +180,7 @@ Cache::~Cache()
 		closeTraceFile();
 
 	delete [] hashTable;
+	delete [] syncHashTable;
 	delete [] bdbs;
 	delete [] ioThreads;
 	delete flushBitmap;
@@ -167,14 +198,16 @@ Cache::~Cache()
 Bdb* Cache::probePage(Dbb *dbb, int32 pageNumber)
 {
 	ASSERT (pageNumber >= 0);
-	Sync sync (&syncObject, "Cache::probePage");
-	sync.lock (Shared);
-	Bdb *bdb = findBdb(dbb, pageNumber);
+	int slot = PAGENUM_2_SLOT(pageNumber);
+	Sync lockHash (&syncHashTable[PAGENUM_2_LOCK_INDEX(pageNumber, slot)], "Cache::probePage");
+	lockHash.lock (Shared);
+	Bdb *bdb;
 	
+	bdb = findBdb(dbb, pageNumber, slot);
 	if (bdb)
 		{
 		bdb->incrementUseCount(ADD_HISTORY);
-		sync.unlock();
+		lockHash.unlock();
 
 		if (bdb->buffer->pageType == PAGE_free)
 			{
@@ -189,15 +222,57 @@ Bdb* Cache::probePage(Dbb *dbb, int32 pa
 		return bdb;
 		}
 
+	lockHash.unlock();
 	return NULL;
 }
 
-Bdb* Cache::findBdb(Dbb* dbb, int32 pageNumber)
+Bdb* Cache::findBdb(Dbb* dbb, int32 pageNumber, int slot)
+{
+	for (Bdb *bdb = hashTable [slot]; bdb; bdb = bdb->hash)
 {
-	for (Bdb *bdb = hashTable [pageNumber % hashSize]; bdb; bdb = bdb->hash)
 		if (bdb->pageNumber == pageNumber && bdb->dbb == dbb)
+			{
+			return bdb;
+			}
+		}
+
+	return NULL;
+}
+
+Bdb* Cache::findBdb(Dbb* dbb, int32 pageNumber)
+{
+	return (findBdb(dbb, pageNumber, PAGENUM_2_SLOT(pageNumber)));
+}
+
+Bdb* Cache::lockFindBdbIncrementUseCount(Dbb* dbb, int32 pageNumber)
+{
+	int slot = PAGENUM_2_SLOT(pageNumber);
+	Sync lockHash (&syncHashTable[PAGENUM_2_LOCK_INDEX(pageNumber, slot)], "Cache::lockFindBdbIncrementUseCount");
+	lockHash.lock (Shared);
+	Bdb *bdb;
+
+	bdb = findBdb(dbb, pageNumber, slot);
+	if (bdb != NULL)
+		bdb->incrementUseCount(ADD_HISTORY);
+
+	lockHash.unlock();
+	return bdb;
+}
+
+Bdb* Cache::lockFindBdbIncrementUseCount(int32 pageNumber, int slot)
+{
+	Sync lockHash (&syncHashTable[PAGENUM_2_LOCK_INDEX(pageNumber, slot)], "Cache::lockFindBdbIncrementUseCount");
+	lockHash.lock (Shared);
+
+	for (Bdb *bdb = hashTable [slot]; bdb; bdb = bdb->hash)
+		if (bdb->pageNumber == pageNumber)
+		{
+			bdb->incrementUseCount(ADD_HISTORY);
+			lockHash.unlock();
 			return bdb;
+		}
 
+	lockHash.unlock();
 	return NULL;
 }
 
@@ -217,51 +292,46 @@ Bdb* Cache::fetchPage(Dbb *dbb, int32 pa
 #endif
 
 	ASSERT (pageNumber >= 0);
-	int slot = pageNumber % hashSize;
-	LockType actual = lockType;
-	Sync sync (&syncObject, "Cache::fetchPage");
-	sync.lock (Shared);
-	int hit = 0;
-
-	/* If we already have a buffer for this go, we're done */
+	int slot = PAGENUM_2_SLOT(pageNumber);
+	Sync lockHash (&syncHashTable[PAGENUM_2_LOCK_INDEX(pageNumber, slot)], "Cache::fetchPage");
 
+	lockHash.lock (Shared);
 	Bdb *bdb;
 
-	for (bdb = hashTable [slot]; bdb; bdb = bdb->hash)
-		if (bdb->pageNumber == pageNumber && bdb->dbb == dbb)
+	bdb = findBdb(dbb, pageNumber, slot);
+	if (!bdb)
 			{
-			//syncObject.validateShared("Cache::fetchPage");
-			bdb->incrementUseCount(ADD_HISTORY);
-			sync.unlock();
-			bdb->addRef(lockType  COMMA_ADD_HISTORY);
-			bdb->decrementUseCount(REL_HISTORY);
-			hit = 1;
-			break;
-			}
+		lockHash.unlock();
+		// get getFreeBuffer() locks a hash bucket to remove the candidate bdb
+		// if we locked out hash bucket before the call then we could have
+		// a deadlock
+		// thus we get the free buffer before we lock the hash bucket we will
+		// be inserting into.  This avoids a dead lock but generates a race
+		// we take care of the race by reversing the getFreeBuffer() work
+		// when we lose the race
+		Bdb *bdbAvailable;
+		bdbAvailable = getFreeBuffer();
+		lockHash.lock(Exclusive);
 
+		bdb = findBdb(dbb, pageNumber, slot);
 	if (!bdb)
 		{
-		sync.unlock();
-		actual = Exclusive;
-		sync.lock(Exclusive);
-
-		for (bdb = hashTable [slot]; bdb; bdb = bdb->hash)
-			if (bdb->pageNumber == pageNumber && bdb->dbb == dbb)
-				{
-				//syncObject.validateExclusive("Cache::fetchPage (retry)");
-				bdb->incrementUseCount(ADD_HISTORY);
-				sync.unlock();
-				bdb->addRef(lockType  COMMA_ADD_HISTORY);
-				bdb->decrementUseCount(REL_HISTORY);
-				hit = 2;
-				break;
-				}
+			// we won the race so lets use the free bdb
+			/* Set new page number and relink into hash table */
+			bdbAvailable->addRef (Exclusive  COMMA_ADD_HISTORY);
+			bdbAvailable->decrementUseCount(REL_HISTORY);
+
+			bdbAvailable->hash = hashTable [slot];
+			hashTable [slot] = bdbAvailable;
+			bdbAvailable->pageNumber = pageNumber;
+			bdbAvailable->dbb = dbb;
 
-		if (!bdb)
-			{
-			bdb = findBuffer(dbb, pageNumber, actual);
+#ifdef COLLECT_BDB_HISTORY
+			bdbAvailable->initHistory();
+#endif
+			bdb = bdbAvailable;
 			moveToHead(bdb);
-			sync.unlock();
+			lockHash.unlock();
 
 #ifdef STOP_PAGE			
 			if (bdb->pageNumber == STOP_PAGE)
@@ -278,9 +348,30 @@ Bdb* Cache::fetchPage(Dbb *dbb, int32 pa
 #ifdef HAVE_PAGE_NUMBER
 			ASSERT(bdb->buffer->pageNumber == pageNumber);
 #endif			
-			if (actual != lockType)
+			if (Exclusive != lockType)
 				bdb->downGrade(lockType);
 			}
+			else
+			{
+			// lost a race.  put our available back to useable
+			bdbAvailable->pageNumber = -1;
+			bdbAvailable->dbb = NULL;
+			bdbAvailable->decrementUseCount(REL_HISTORY);
+
+			//syncObject.validateExclusive("Cache::fetchPage (retry)");
+			bdb->incrementUseCount(ADD_HISTORY);
+			lockHash.unlock();
+			bdb->addRef(lockType  COMMA_ADD_HISTORY);
+			bdb->decrementUseCount(REL_HISTORY);
+			}
+		}
+		else
+		{
+		//syncObject.validateShared("Cache::fetchPage");
+		bdb->incrementUseCount(ADD_HISTORY);
+		lockHash.unlock();
+		bdb->addRef(lockType  COMMA_ADD_HISTORY);
+		bdb->decrementUseCount(REL_HISTORY);
 		}
 
 	Page *page = bdb->buffer;
@@ -304,9 +395,9 @@ Bdb* Cache::fetchPage(Dbb *dbb, int32 pa
 
 	// If buffer has moved out of the upper "fraction" of the LRU queue, move it back up
 	
+	// non-protected access to age is harmless since it is fuzzy anyway
 	if (bdb->age < bufferAge - (uint64) upperFraction)
 		{
-		sync.lock (Exclusive);
 		moveToHead (bdb);
 		}
 		
@@ -319,9 +410,10 @@ Bdb* Cache::fetchPage(Dbb *dbb, int32 pa
 
 Bdb* Cache::fakePage(Dbb *dbb, int32 pageNumber, PageType type, TransId transId)
 {
-	Sync sync(&syncObject, "Cache::fakePage");
-	sync.lock(Exclusive);
-	int	slot = pageNumber % hashSize;
+	int	slot = PAGENUM_2_SLOT(pageNumber);
+	Sync lockHash (&syncHashTable[PAGENUM_2_LOCK_INDEX(pageNumber, slot)], "Cache::fakePage");
+	lockHash.lock(Exclusive);
+	Bdb *bdb;
 
 #ifdef STOP_PAGE			
 	if (pageNumber == STOP_PAGE)
@@ -330,25 +422,64 @@ Bdb* Cache::fakePage(Dbb *dbb, int32 pag
 
 	/* If we already have a buffer for this, we're done */
 
-	Bdb *bdb;
-
-	for (bdb = hashTable [slot]; bdb; bdb = bdb->hash)
-		if (bdb->pageNumber == pageNumber && bdb->dbb == dbb)
+	bdb = findBdb(dbb, pageNumber, slot);
+	if (!bdb)
 			{
-			if (bdb->syncObject.isLocked())
+		lockHash.unlock();
+		// get getFreeBuffer() locks a hash bucket to remove the candidate bdb
+		// if we locked out hash bucket before the call then we could have
+		// a deadlock
+		// thus we get the free buffer before we lock the hash bucket we will
+		// be inserting into.  This avoids a dead lock but generates a race
+		// we take care of the race by reversing the getFreeBuffer() work
+		// when we lose the race
+		Bdb *bdbAvailable;
+		bdbAvailable = getFreeBuffer();
+		lockHash.lock(Exclusive);
+
+		bdb = findBdb(dbb, pageNumber, slot);
+		if (!bdb)
 				{
-				// The pageWriter may still be cleaning up this freed page with a shared lock
-				ASSERT(bdb->buffer->pageType == PAGE_free);
-				ASSERT(bdb->syncObject.getState() >= 0);
-				}
+			// we won the race so lets use the free bdb
+			/* Set new page number and relink into hash table */
+			bdbAvailable->addRef (Exclusive  COMMA_ADD_HISTORY);
+			bdbAvailable->decrementUseCount(REL_HISTORY);
 				
-			bdb->addRef(Exclusive  COMMA_ADD_HISTORY);
+			bdbAvailable->hash = hashTable [slot];
+			hashTable [slot] = bdbAvailable;
+			bdbAvailable->pageNumber = pageNumber;
+			bdbAvailable->dbb = dbb;
+
+#ifdef COLLECT_BDB_HISTORY
+			bdbAvailable->initHistory();
+#endif
+			bdb = bdbAvailable;
+			moveToHead(bdb);
+			lockHash.unlock();
 			
-			break;
 			}
+			else
+			{
+			// lost a race.  put our available back to useable
+			bdbAvailable->pageNumber = -1;
+			bdbAvailable->dbb = NULL;
+			bdbAvailable->decrementUseCount(REL_HISTORY);
 
-	if (!bdb)
-		bdb = findBuffer(dbb, pageNumber, Exclusive);
+			//syncObject.validateExclusive("Cache::fakePage (retry)");
+			bdb->incrementUseCount(ADD_HISTORY);
+			lockHash.unlock();
+			bdb->addRef(Exclusive  COMMA_ADD_HISTORY);
+			bdb->decrementUseCount(REL_HISTORY);
+			}
+		}
+		else
+		{
+		//syncObject.validateShared("Cache::fakePage");
+		bdb->incrementUseCount(ADD_HISTORY);
+		lockHash.unlock();
+		bdb->addRef(Exclusive  COMMA_ADD_HISTORY);
+		bdb->decrementUseCount(REL_HISTORY);
+		}
 
 	if (!dbb->isReadOnly)
 		bdb->mark(transId);
@@ -363,14 +494,14 @@ Bdb* Cache::fakePage(Dbb *dbb, int32 pag
 void Cache::flush(int64 arg)
 {
 	Sync flushLock(&syncFlush, "Cache::flush(1)");
-	Sync sync(&syncDirty, "Cache::flush(2)");
+	Sync dirtyLock(&syncDirty, "Cache::flush(2)");
 	flushLock.lock(Exclusive);
 	
 	if (flushing)
 		return;
 
 	syncWait.lock(NULL, Exclusive);
-	sync.lock(Shared);
+	dirtyLock.lock(Shared);
 	//Log::debug(%d: "Initiating flush\n", dbb->deltaTime);
 	flushArg = arg;
 	flushPages = 0;
@@ -388,7 +519,7 @@ void Cache::flush(int64 arg)
 
 	flushStart = database->timestamp;
 	flushing = true;
-	sync.unlock();
+	dirtyLock.unlock();
 	flushLock.unlock();
 	
 	for (int n = 0; n < numberIoThreads; ++n)
@@ -398,69 +529,92 @@ void Cache::flush(int64 arg)
 
 void Cache::moveToHead(Bdb * bdb)
 {
+	Sync bufferQueueLock (&bufferQueue.syncObject, "Cache::moveToHead");
+
+	bufferQueueLock.lock (Exclusive);
 	bdb->age = bufferAge++;
 	bufferQueue.remove(bdb);
 	bufferQueue.prepend(bdb);
 	//validateUnique (bdb);
 }
 
-Bdb* Cache::findBuffer(Dbb *dbb, int pageNumber, LockType lockType)
+Bdb* Cache::getFreeBuffer(void)
 {
-	//syncObject.validateExclusive("Cache::findBuffer");
-	int	slot = pageNumber % hashSize;
-	Sync sync(&syncDirty, "Cache::findBuffer");
-	
-	/* Find least recently used, not-in-use buffer */
-
+	Sync bufferQueueLock (&bufferQueue.syncObject, "Cache::getFreeBuffer");
+	unsigned int count;
 	Bdb *bdb;
 
 	// Find a candidate BDB.
-	
 	for (;;)
 		{
-		for (bdb = bufferQueue.last; bdb; bdb = bdb->prior)
+		bufferQueueLock.lock (Exclusive);
+		// find a candidate that is NOT in use and NOT dirty and in the tail fraction of the LRU
+		for (count = 0, bdb = bufferQueue.last; bdb && count < upperFraction; bdb = bdb->prior, count++)
 			if (bdb->useCount == 0)
-				break;
+				{
+				if (!bdb->isDirty)
+					{
+					bdb->incrementUseCount(REL_HISTORY);
+					break;
+					}
+				}
+				else
+				{
+					moveToHead(bdb);
+				}
+		if (!bdb)
+			// find a candidate that is NOT in use, could be dirty
+			for (bdb = bufferQueue.last; bdb; bdb = bdb->prior)
+				if (bdb->useCount == 0)
+					{
+					bdb->incrementUseCount(REL_HISTORY);
+					break;
+					}
+		bufferQueueLock.unlock();
 
 		if (!bdb)
 			throw SQLError(RUNTIME_ERROR, "buffer pool is exhausted\n");
 			
-		if (!bdb->isDirty)
-			break;
-			
-		writePage (bdb, WRITE_TYPE_REUSE);
-		}
-
-	/* Unlink its old incarnation from the page/hash table */
+		if (bdb->pageNumber >= 0)
+		{
+			int	slotRemove = PAGENUM_2_SLOT(bdb->pageNumber);
+			Sync lockHashRemove (&syncHashTable[PAGENUM_2_LOCK_INDEX(bdb->pageNumber, slotRemove)], "Cache::getFreeBuffer");
+			lockHashRemove.lock(Exclusive);
 
-	if (bdb->pageNumber >= 0)
-		for (Bdb **ptr = hashTable + bdb->pageNumber % hashSize;; ptr = &(*ptr)->hash)
-			if (*ptr == bdb)
+			if (bdb->useCount != 1)
 				{
-				*ptr = bdb->hash;
-				break;
+				// we lost a race try again
+				bdb->decrementUseCount(REL_HISTORY);
+				lockHashRemove.unlock();
+				continue;
 				}
-			else
-				ASSERT (*ptr);
-
-	bdb->addRef (lockType  COMMA_ADD_HISTORY);
 
-	/* Set new page number and relink into hash table */
+			if (bdb->isDirty)
+				writePage (bdb, WRITE_TYPE_REUSE);
 
-	bdb->hash = hashTable [slot];
-	hashTable [slot] = bdb;
-	bdb->pageNumber = pageNumber;
-	bdb->dbb = dbb;
+			/* Unlink its old incarnation from the page/hash table */
+			for (Bdb **ptr = hashTable + PAGENUM_2_SLOT(bdb->pageNumber) ;; ptr = &(*ptr)->hash)
+				if (*ptr == bdb)
+					{
+					*ptr = bdb->hash;
+					break;
+					}
+				else
+					ASSERT (*ptr);
+		}
 
-#ifdef COLLECT_BDB_HISTORY
-	bdb->initHistory();
-#endif
+		break;
+		}
 
 	return bdb;
 }
 
 void Cache::validate()
 {
+	//Sync bufferQueueLock (&bufferQueue.syncObject, "Cache::moveToHead");
+
+	//bufferQueueLock.lock (Shared);
+	// non-protected access to bufferQueue is DANGEROUS...
 	for (Bdb *bdb = bufferQueue.last; bdb; bdb = bdb->prior)
 		{
 		//IndexPage *page = (IndexPage*) bdb->buffer;
@@ -470,8 +624,8 @@ void Cache::validate()
 
 void Cache::markDirty(Bdb *bdb)
 {
-	Sync sync (&syncDirty, "Cache::markDirty");
-	sync.lock (Exclusive);
+	Sync dirtyLock (&syncDirty, "Cache::markDirty");
+	dirtyLock.lock (Exclusive);
 	bdb->nextDirty = NULL;
 	bdb->priorDirty = lastDirty;
 
@@ -487,8 +641,8 @@ void Cache::markDirty(Bdb *bdb)
 
 void Cache::markClean(Bdb *bdb)
 {
-	Sync sync (&syncDirty, "Cache::markClean");
-	sync.lock (Exclusive);
+	Sync dirtyLock (&syncDirty, "Cache::markClean");
+	dirtyLock.lock (Exclusive);
 
 	/***
 	if (bdb->flushIt)
@@ -600,8 +754,8 @@ void Cache::writePage(Bdb *bdb, int type
 
 	if (dbb->shadows)
 		{
-		Sync sync (&dbb->syncClone, "Cache::writePage(2)");
-		sync.lock (Shared);
+		Sync cloneLock (&dbb->syncClone, "Cache::writePage(2)");
+		cloneLock.lock (Shared);
 
 		for (DatabaseCopy *shadow = dbb->shadows; shadow; shadow = shadow->next)
 			shadow->rewritePage(bdb);
@@ -610,14 +764,15 @@ void Cache::writePage(Bdb *bdb, int type
 
 void Cache::analyze(Stream *stream)
 {
-	Sync sync (&syncDirty, "Cache::analyze");
-	sync.lock (Shared);
+	Sync dirtyLock (&syncDirty, "Cache::analyze");
+	dirtyLock.lock (Shared);
 	int inUse = 0;
 	int dirty = 0;
 	int dirtyList = 0;
 	int total = 0;
 	Bdb *bdb;
 
+	// non-protected access to bdbs,endBdbs is DANGEROUS...
 	for (bdb = bdbs; bdb < endBdbs; ++bdb)
 		{
 		++total;
@@ -638,17 +793,18 @@ void Cache::analyze(Stream *stream)
 
 void Cache::validateUnique(Bdb *target)
 {
-	int	slot = target->pageNumber % hashSize;
+	int	slot = PAGENUM_2_SLOT(target->pageNumber);
 
+	// WARNING: unlocked walk of hash table.... DANGEROUS
 	for (Bdb *bdb = hashTable [slot]; bdb; bdb = bdb->hash)
 		ASSERT (bdb == target || !(bdb->pageNumber == target->pageNumber && bdb->dbb == target->dbb));
 }
 
 void Cache::freePage(Dbb *dbb, int32 pageNumber)
 {
-	Sync sync (&syncObject, "Cache::freePage");
-	sync.lock (Shared);
-	int	slot = pageNumber % hashSize;
+	int slot = PAGENUM_2_SLOT(pageNumber);
+	Sync lockHash (&syncHashTable[PAGENUM_2_LOCK_INDEX(pageNumber, slot)], "Cache::freePage");
+	lockHash.lock(Shared);
 
 	// If page exists in cache (usual case), clean it up
 
@@ -657,7 +813,7 @@ void Cache::freePage(Dbb *dbb, int32 pag
 			{
 			if (bdb->isDirty)
 				{
-				sync.unlock();
+				lockHash.unlock();
 				markClean (bdb);
 				}
 				
@@ -670,8 +826,8 @@ void Cache::flush(Dbb *dbb)
 {
 	//Sync sync (&syncDirty, "Cache::flush(1)");
 	//sync.lock (Exclusive);
-	Sync sync (&syncObject, "Cache::flush(3)");
-	sync.lock (Shared);
+	Sync objectLock (&syncObject, "Cache::flush(3)");
+	objectLock.lock (Shared);
 
 	for (Bdb *bdb = bdbs; bdb < endBdbs; ++bdb)
 		if (bdb->dbb == dbb)
@@ -685,8 +841,8 @@ void Cache::flush(Dbb *dbb)
 
 bool Cache::hasDirtyPages(Dbb *dbb)
 {
-	Sync sync (&syncDirty, "Cache::hasDirtyPages");
-	sync.lock (Shared);
+	Sync dirtyLock (&syncDirty, "Cache::hasDirtyPages");
+	dirtyLock.lock (Shared);
 
 	for (Bdb *bdb = firstDirty; bdb; bdb = bdb->nextDirty)
 		if (bdb->dbb == dbb)
@@ -717,25 +873,21 @@ Bdb* Cache::trialFetch(Dbb* dbb, int32 p
 		}
 
 	ASSERT (pageNumber >= 0);
-	int	slot = pageNumber % hashSize;
-	Sync sync (&syncObject, "Cache::trialFetch");
-	sync.lock (Shared);
-	int hit = 0;
+	int	slot = PAGENUM_2_SLOT(pageNumber);
+	Sync lockHash (&syncHashTable[PAGENUM_2_LOCK_INDEX(pageNumber, slot)], "Cache::trialFetch");
+	lockHash.lock(Shared);
+	Bdb *bdb;
 
 	/* If we already have a buffer for this go, we're done */
 
-	Bdb *bdb;
-
-	for (bdb = hashTable [slot]; bdb; bdb = bdb->hash)
-		if (bdb->pageNumber == pageNumber && bdb->dbb == dbb)
+	bdb = findBdb(dbb, pageNumber, slot);
+	if (bdb)
 			{
 			//syncObject.validateShared("Cache::trialFetch");
 			bdb->incrementUseCount(ADD_HISTORY);
-			sync.unlock();
+		lockHash.unlock();
 			bdb->addRef(lockType  COMMA_ADD_HISTORY);
 			bdb->decrementUseCount(REL_HISTORY);
-			hit = 1;
-			break;
 			}
 
 	return bdb;
@@ -764,10 +916,9 @@ void Cache::ioThread(void* arg)
 
 void Cache::ioThread(void)
 {
-	Sync syncThread(&syncThreads, "Cache::ioThread(1)");
+	Sync syncThread(&syncThreads, "Cache::ioThread");
 	syncThread.lock(Shared);
-	Sync flushLock(&syncFlush, "Cache::ioThread(2)");
-	Sync sync(&syncObject, "Cache::ioThread(3)");
+	Sync flushLock(&syncFlush, "Cache::ioThread");
 	Priority priority(database->ioScheduler);
 	Thread *thread = Thread::getThread("Cache::ioThread");
 	UCHAR *rawBuffer = new UCHAR[ASYNC_BUFFER_SIZE];
@@ -781,137 +932,135 @@ void Cache::ioThread(void)
 		{
 		int32 pageNumber = flushBitmap->nextSet(0);
 		int count;
-		Dbb *dbb;
 		
 		if (pageNumber >= 0)
 			{
-			int	slot = pageNumber % hashSize;
+			Bdb *bdb;
+			Dbb *dbb;
+			int	slot = PAGENUM_2_SLOT(pageNumber);
 			bool hit = false;
 			Bdb *bdbList = NULL;
 			UCHAR *p = buffer;
-			sync.lock(Shared);
-			
-			// Look for a page to flush.  Then get all his friends
 			
-			for (Bdb *bdb = hashTable[slot]; bdb; bdb = bdb->hash)
-				if (bdb->pageNumber == pageNumber && bdb->flushIt && bdb->isDirty)
+			// Look for the page to flush.
+			bdb = lockFindBdbIncrementUseCount(pageNumber, slot);
+			if (bdb && bdb->flushIt && bdb->isDirty)
+				{
+				hit = true;
+				count = 0;
+				dbb = bdb->dbb;
+				
+				flushBitmap->clear(pageNumber);
+				
+				// get all his friends
+				while (p < end)
 					{
-					hit = true;
-					count = 0;
-					dbb = bdb->dbb;
+					++count;
+					bdb->addRef(Shared  COMMA_ADD_HISTORY);
 					
-					if (!bdb->hash)
-						flushBitmap->clear(pageNumber);
+					bdb->syncWrite.lock(NULL, Exclusive);
+					bdb->ioThreadNext = bdbList;
+					bdbList = bdb;
 					
-					while (p < end)
-						{
-						++count;
-						bdb->incrementUseCount(ADD_HISTORY);
-						sync.unlock();
-						bdb->addRef(Shared  COMMA_ADD_HISTORY);
-						if (falcon_use_sectorcache)
-							sectorCache->writePage(bdb);
-						
-						bdb->syncWrite.lock(NULL, Exclusive);
-						bdb->ioThreadNext = bdbList;
-						bdbList = bdb;
-						
-						//ASSERT(!(bdb->flags & BDB_write_pending));
-						//bdb->flags |= BDB_write_pending;
-						memcpy(p, bdb->buffer, pageSize);
-						p += pageSize;
-						bdb->flushIt = false;
-						markClean(bdb);
-						bdb->isDirty = false;
-						bdb->release(REL_HISTORY);
-						sync.lock(Shared);
-						
-						if ( !(bdb = findBdb(dbb, bdb->pageNumber + 1)) )
-							break;
-						
-						if (!bdb->isDirty && !continueWrite(bdb))
-							break;
-						}
+					//ASSERT(!(bdb->flags & BDB_write_pending));
+					//bdb->flags |= BDB_write_pending;
+					memcpy(p, bdb->buffer, pageSize);
+					p += pageSize;
+					bdb->flushIt = false;
+					markClean(bdb);
+					bdb->isDirty = false;
+					bdb->release(REL_HISTORY);
 					
-					if (sync.state != None)
-						sync.unlock();
-						
-					flushLock.unlock();
-					//Log::debug(" %d Writing %s %d pages: %d - %d\n", thread->threadId, (const char*) dbb->fileName, count, pageNumber, pageNumber + count - 1);
-					int length = p - buffer;
-					priority.schedule(PRIORITY_LOW);
+					bdb = lockFindBdbIncrementUseCount(dbb, bdb->pageNumber + 1);
+					if (!bdb)
+						break;
 					
-					try
+					if (!bdb->isDirty && !continueWrite(bdb))
 						{
-						priority.schedule(PRIORITY_LOW);
-						dbb->writePages(pageNumber, length, buffer, WRITE_TYPE_FLUSH);
+						bdb->decrementUseCount(REL_HISTORY);
+						break;
 						}
-					catch (SQLException& exception)
+					}
+				
+				flushLock.unlock();
+				//Log::debug(" %d Writing %s %d pages: %d - %d\n", thread->threadId, (const char*) dbb->fileName, count, pageNumber, pageNumber + count - 1);
+				int length = p - buffer;
+				priority.schedule(PRIORITY_LOW);
+				
+				try
+					{
+					priority.schedule(PRIORITY_LOW);
+					dbb->writePages(pageNumber, length, buffer, WRITE_TYPE_FLUSH);
+					}
+				catch (SQLException& exception)
+					{
+					priority.finished();
+					
+					if (exception.getSqlcode() != DEVICE_FULL)
+						throw;
+					
+					database->setIOError(&exception);
+					
+					for (bool error = true; error;)
 						{
-						priority.finished();
-						
-						if (exception.getSqlcode() != DEVICE_FULL)
-							throw;
-						
-						database->setIOError(&exception);
-						
-						for (bool error = true; error;)
+						if (thread->shutdownInProgress)
 							{
-							if (thread->shutdownInProgress)
-								{
-								Bdb *next;
+							Bdb *next;
 
-								for (bdb = bdbList; bdb; bdb = next)
-									{
-									//bdb->flags &= ~BDB_write_pending;
-									next = bdb->ioThreadNext;
-									bdb->syncWrite.unlock();
-									bdb->decrementUseCount(REL_HISTORY);
-									}
-									
-								return;
-								}
-							
-							thread->sleep(1000);
-							
-							try
+							for (bdb = bdbList; bdb; bdb = next)
 								{
-								priority.schedule(PRIORITY_LOW);
-								dbb->writePages(pageNumber, length, buffer, WRITE_TYPE_FLUSH);
-								error = false;
-								database->clearIOError();
+								//bdb->flags &= ~BDB_write_pending;
+								next = bdb->ioThreadNext;
+								bdb->syncWrite.unlock();
+								bdb->decrementUseCount(REL_HISTORY);
 								}
-							catch (SQLException& exception2)
-								{
-								priority.finished();
 								
-								if (exception2.getSqlcode() != DEVICE_FULL)
-									throw;
-								}
+							return;
+							}
+						
+						thread->sleep(1000);
+						
+						try
+							{
+							priority.schedule(PRIORITY_LOW);
+							dbb->writePages(pageNumber, length, buffer, WRITE_TYPE_FLUSH);
+							error = false;
+							database->clearIOError();
+							}
+						catch (SQLException& exception2)
+							{
+							priority.finished();
+							
+							if (exception2.getSqlcode() != DEVICE_FULL)
+								throw;
 							}
 						}
+					}
 
-					priority.finished();
-					Bdb *next;
+				priority.finished();
+				Bdb *next;
 
-					for (bdb = bdbList; bdb; bdb = next)
-						{
-						//ASSERT(bdb->flags & BDB_write_pending);
-						//bdb->flags &= ~BDB_write_pending;
-						next = bdb->ioThreadNext;
-						bdb->syncWrite.unlock();
-						bdb->decrementUseCount(REL_HISTORY);
-						}
-					
-					flushLock.lock(Exclusive);
-					++physicalWrites;
-					
-					break;
+				for (bdb = bdbList; bdb; bdb = next)
+					{
+					//ASSERT(bdb->flags & BDB_write_pending);
+					//bdb->flags &= ~BDB_write_pending;
+					next = bdb->ioThreadNext;
+					bdb->syncWrite.unlock();
+					bdb->decrementUseCount(REL_HISTORY);
 					}
+				
+				flushLock.lock(Exclusive);
+				++physicalWrites;
+				
+				}
+			else
+				{
+					if (bdb)
+						bdb->decrementUseCount(REL_HISTORY);
+				}
 			
 			if (!hit)
 				{
-				sync.unlock();
 				flushBitmap->clear(pageNumber);
 				}
 			}
@@ -940,8 +1089,8 @@ void Cache::ioThread(void)
 
 			thread->sleep();
 			flushLock.lock(Exclusive);
-			}
 		}
+		} // for ever
 	
 	delete [] rawBuffer;			
 }
@@ -974,8 +1123,8 @@ bool Cache::continueWrite(Bdb* startingB
 void Cache::shutdown(void)
 {
 	shutdownThreads();
-	Sync sync (&syncDirty, "Cache::shutdown");
-	sync.lock (Exclusive);
+	Sync dirtyLock (&syncDirty, "Cache::shutdown");
+	dirtyLock.lock (Exclusive);
 
 	for (Bdb *bdb = firstDirty; bdb; bdb = bdb->nextDirty)
 		bdb->dbb->writePage(bdb, WRITE_TYPE_SHUTDOWN);
@@ -995,8 +1144,8 @@ void Cache::shutdownThreads(void)
 		ioThreads[n] = 0;
 		}
 	
-	Sync sync(&syncThreads, "Cache::shutdownThreads");
-	sync.lock(Exclusive);
+	Sync lockThreads(&syncThreads, "Cache::shutdownThreads");
+	lockThreads.lock(Exclusive);
 }
 
 void Cache::analyzeFlush(void)
@@ -1048,7 +1197,8 @@ void Cache::openTraceFile(void)
 	if (traceFile)
 		closeTraceFile();
 		
-	traceFile = fopen(TRACE_FILE, "w");
+	traceFile = fopen(TRACE_FILE, "a+");
+	setlinebuf(traceFile);
 #endif
 }
 
@@ -1065,6 +1215,6 @@ void Cache::closeTraceFile(void)
 
 void Cache::flushWait(void)
 {
-	Sync sync(&syncWait, "Cache::flushWait");
-	sync.lock(Shared);
+	Sync waitLock(&syncWait, "Cache::flushWait");
+	waitLock.lock(Shared);
 }

=== modified file 'storage/falcon/Cache.h'
--- a/storage/falcon/Cache.h	2008-06-06 19:20:10 +0000
+++ b/storage/falcon/Cache.h	2008-08-18 13:47:57 +0000
@@ -28,6 +28,17 @@
 #include "SyncObject.h"
 #include "Queue.h"
 
+// define DEBUG_SYNC_HASH_TABLE_SIZE to cause more contention and test for race conditions
+//#define DEBUG_SYNC_HASH_TABLE_SIZE (0x01 << 1)
+#ifdef DEBUG_SYNC_HASH_TABLE_SIZE
+#  define DEBUG_SYNC_HASH_TABLE_MASK (DEBUG_SYNC_HASH_TABLE_SIZE - 1)
+#  define PAGENUM_2_LOCK_INDEX(_pgnum, _slot) ((_pgnum) & DEBUG_SYNC_HASH_TABLE_MASK)
+#else /* DEBUG_SYNC_HASH_TABLE_SIZE */
+#  define PAGENUM_2_LOCK_INDEX(_pgnum, _slot) ((_slot))
+#endif /* DEBUG_SYNC_HASH_TABLE_SIZE */
+
+#define PAGENUM_2_SLOT(_pgnum) ((_pgnum) & hashMask)
+
 class Bdb;
 class Dbb;
 class PageWriter;
@@ -83,14 +94,18 @@ public:
 	bool		flushing;
 
 protected:
-	Bdb*		findBuffer (Dbb *dbb, int pageNumber, LockType lockType);
+	Bdb*		getFreeBuffer(void);
+	Bdb*		findBdb(Dbb* dbb, int32 pageNumber, int slot);
 	Bdb*		findBdb(Dbb* dbb, int32 pageNumber);
+	Bdb*		lockFindBdbIncrementUseCount(Dbb* dbb, int32 pageNumber);
+	Bdb*		lockFindBdbIncrementUseCount(int32 pageNumber, int slot);
 
 	int64		flushArg;
 	Bdb			*bdbs;
 	Bdb			*endBdbs;
 	Queue<Bdb>	bufferQueue;
 	Bdb			**hashTable;
+	SyncObject  *syncHashTable;
 	Bdb			*firstDirty;
 	Bdb			*lastDirty;
 	Bitmap		*flushBitmap;
@@ -105,12 +120,13 @@ protected:
 	int			flushPages;
 	int			physicalWrites;
 	int			hashSize;
+	unsigned int	hashMask;
 	int			pageSize;
-	int			upperFraction;
+	unsigned int upperFraction;
 	int			numberHunks;
 	int			numberDirtyPages;
 	int			numberIoThreads;
-	volatile int bufferAge;
+	volatile uint64 bufferAge;
 public:
 	void flushWait(void);
 };

Thread
bzr commit into mysql-6.0-falcon branch (klong:2786) Kelly Long18 Aug
  • RE: bzr commit into mysql-6.0-falcon branch (klong:2786) Kevin Lewis21 Aug