List:Commits« Previous MessageNext Message »
From:Kevin Lewis Date:August 21 2008 4:10am
Subject:RE: bzr commit into mysql-6.0-falcon branch (klong:2786)
View as plain text  
OK to push.  Use mysql-6.0-falcon-kevin if you feel you would like to test
this on pushbuild before pushing it to mysql-6.0-falcon-team.


>-----Original Message-----
>From: Kelly Long [mailto:klong@stripped]
>Sent: Monday, August 18, 2008 8:48 AM
>To: commits@stripped
>Subject: bzr commit into mysql-6.0-falcon branch (klong:2786)
>
>#At file:///FC/MYSQL/wa-2008-BZR/lcl/mysql-6.0-falcon-team-KL-Cache/
>
> 2786 Kelly Long	2008-08-18
>      Page cache lock per hash bucket. WL4480
>      Page cache hash buckets are a power of two. WL4481
>modified:
>  storage/falcon/Cache.cpp
>  storage/falcon/Cache.h
>
>per-file messages:
>  storage/falcon/Cache.h
>    Page cache lock per hash bucket. WL4480
>    Page cache hash buckets are a power of two. WL4481
>=== modified file 'storage/falcon/Cache.cpp'
>--- a/storage/falcon/Cache.cpp	2008-07-24 08:45:03 +0000
>+++ b/storage/falcon/Cache.cpp	2008-08-18 13:47:57 +0000
>@@ -72,7 +72,22 @@ Cache::Cache(Database *db, int pageSz, i
> 	database = db;
> 	panicShutdown = false;
> 	pageSize = pageSz;
>-	hashSize = hashSz;
>+	{
>+		unsigned int highBit;
>+		for (highBit=0x01; highBit < hashSz; highBit= highBit << 1)
{
>+		}
>+
>+		// if there are more than 4096 buckets then lets round down
>+		// else lets round up
>+		if (highBit >= 0x00001000) {
>+			// KEL use power of two rounded down
>+			hashSize = highBit << 1;
>+		} else {
>+			// KEL use power of two rounded up
>+			hashSize = highBit;
>+		}
>+	}
>+	hashMask = hashSize - 1;
> 	numberBuffers = numBuffers;
> 	upperFraction = numberBuffers / 4;
> 	bufferAge = 0;
>@@ -80,8 +95,21 @@ Cache::Cache(Database *db, int pageSz, i
> 	lastDirty = NULL;
> 	numberDirtyPages = 0;
> 	pageWriter = NULL;
>-	hashTable = new Bdb* [hashSz];
>+	hashTable = new Bdb* [hashSize];
> 	memset (hashTable, 0, sizeof (Bdb*) * hashSize);
>+#ifdef DEBUG_SYNC_HASH_TABLE_SIZE
>+    syncHashTable = new SyncObject [DEBUG_SYNC_HASH_TABLE_SIZE];
>+	for (int loop = 0; loop < DEBUG_SYNC_HASH_TABLE_SIZE; loop ++)
>+		syncHashTable[loop].setName("Cache::syncHashTable");
>+#else /* DEBUG_SYNC_HASH_TABLE_SIZE */
>+    syncHashTable = new SyncObject [hashSize];
>+	for (int loop = 0; loop < hashSize; loop ++)
>+		{
>+		char tmpName[128];
>+		snprintf(tmpName,120,"Cache::syncHashTable[%d]",loop);
>+		syncHashTable[loop].setName(tmpName);
>+		}
>+#endif /* DEBUG_SYNC_HASH_TABLE_SIZE */
> 	sectorCache = new SectorCache(sectorCacheSize / SECTOR_BUFFER_SIZE,
>pageSize);
>
> 	uint64 n = ((uint64) pageSize * numberBuffers + cacheHunkSize - 1) /
>cacheHunkSize;
>@@ -103,6 +131,7 @@ Cache::Cache(Database *db, int pageSz, i
>
> 	try
> 		{
>+		// non-protected access to bdbs,endBdbs is OK during
>initialization
> 		bdbs = new Bdb [numberBuffers];
> 		endBdbs = bdbs + numberBuffers;
> 		int remaining = 0;
>@@ -121,6 +150,7 @@ Cache::Cache(Database *db, int pageSz, i
> 				}
>
> 			bdb->cache = this;
>+			// non-protected access to bufferQueue is OK during
>initialization
> 			bufferQueue.append(bdb);
> 			bdb->buffer = (Page*) stuff;
> 			stuff += pageSize;
>@@ -150,6 +180,7 @@ Cache::~Cache()
> 		closeTraceFile();
>
> 	delete [] hashTable;
>+	delete [] syncHashTable;
> 	delete [] bdbs;
> 	delete [] ioThreads;
> 	delete flushBitmap;
>@@ -167,14 +198,16 @@ Cache::~Cache()
> Bdb* Cache::probePage(Dbb *dbb, int32 pageNumber)
> {
> 	ASSERT (pageNumber >= 0);
>-	Sync sync (&syncObject, "Cache::probePage");
>-	sync.lock (Shared);
>-	Bdb *bdb = findBdb(dbb, pageNumber);
>+	int slot = PAGENUM_2_SLOT(pageNumber);
>+	Sync lockHash (&syncHashTable[PAGENUM_2_LOCK_INDEX(pageNumber,
slot)],
>"Cache::probePage");
>+	lockHash.lock (Shared);
>+	Bdb *bdb;
>
>+	bdb = findBdb(dbb, pageNumber, slot);
> 	if (bdb)
> 		{
> 		bdb->incrementUseCount(ADD_HISTORY);
>-		sync.unlock();
>+		lockHash.unlock();
>
> 		if (bdb->buffer->pageType == PAGE_free)
> 			{
>@@ -189,15 +222,57 @@ Bdb* Cache::probePage(Dbb *dbb, int32 pa
> 		return bdb;
> 		}
>
>+	lockHash.unlock();
> 	return NULL;
> }
>
>-Bdb* Cache::findBdb(Dbb* dbb, int32 pageNumber)
>+Bdb* Cache::findBdb(Dbb* dbb, int32 pageNumber, int slot)
>+{
>+	for (Bdb *bdb = hashTable [slot]; bdb; bdb = bdb->hash)
> {
>-	for (Bdb *bdb = hashTable [pageNumber % hashSize]; bdb; bdb = bdb-
>>hash)
> 		if (bdb->pageNumber == pageNumber && bdb->dbb == dbb)
>+			{
>+			return bdb;
>+			}
>+		}
>+
>+	return NULL;
>+}
>+
>+Bdb* Cache::findBdb(Dbb* dbb, int32 pageNumber)
>+{
>+	return (findBdb(dbb, pageNumber, PAGENUM_2_SLOT(pageNumber)));
>+}
>+
>+Bdb* Cache::lockFindBdbIncrementUseCount(Dbb* dbb, int32 pageNumber)
>+{
>+	int slot = PAGENUM_2_SLOT(pageNumber);
>+	Sync lockHash (&syncHashTable[PAGENUM_2_LOCK_INDEX(pageNumber,
slot)],
>"Cache::lockFindBdbIncrementUseCount");
>+	lockHash.lock (Shared);
>+	Bdb *bdb;
>+
>+	bdb = findBdb(dbb, pageNumber, slot);
>+	if (bdb != NULL)
>+		bdb->incrementUseCount(ADD_HISTORY);
>+
>+	lockHash.unlock();
>+	return bdb;
>+}
>+
>+Bdb* Cache::lockFindBdbIncrementUseCount(int32 pageNumber, int slot)
>+{
>+	Sync lockHash (&syncHashTable[PAGENUM_2_LOCK_INDEX(pageNumber,
slot)],
>"Cache::lockFindBdbIncrementUseCount");
>+	lockHash.lock (Shared);
>+
>+	for (Bdb *bdb = hashTable [slot]; bdb; bdb = bdb->hash)
>+		if (bdb->pageNumber == pageNumber)
>+		{
>+			bdb->incrementUseCount(ADD_HISTORY);
>+			lockHash.unlock();
> 			return bdb;
>+		}
>
>+	lockHash.unlock();
> 	return NULL;
> }
>
>@@ -217,51 +292,46 @@ Bdb* Cache::fetchPage(Dbb *dbb, int32 pa
> #endif
>
> 	ASSERT (pageNumber >= 0);
>-	int slot = pageNumber % hashSize;
>-	LockType actual = lockType;
>-	Sync sync (&syncObject, "Cache::fetchPage");
>-	sync.lock (Shared);
>-	int hit = 0;
>-
>-	/* If we already have a buffer for this go, we're done */
>+	int slot = PAGENUM_2_SLOT(pageNumber);
>+	Sync lockHash (&syncHashTable[PAGENUM_2_LOCK_INDEX(pageNumber,
slot)],
>"Cache::fetchPage");
>
>+	lockHash.lock (Shared);
> 	Bdb *bdb;
>
>-	for (bdb = hashTable [slot]; bdb; bdb = bdb->hash)
>-		if (bdb->pageNumber == pageNumber && bdb->dbb == dbb)
>+	bdb = findBdb(dbb, pageNumber, slot);
>+	if (!bdb)
> 			{
>-			//syncObject.validateShared("Cache::fetchPage");
>-			bdb->incrementUseCount(ADD_HISTORY);
>-			sync.unlock();
>-			bdb->addRef(lockType  COMMA_ADD_HISTORY);
>-			bdb->decrementUseCount(REL_HISTORY);
>-			hit = 1;
>-			break;
>-			}
>+		lockHash.unlock();
>+		// get getFreeBuffer() locks a hash bucket to remove the
>candidate bdb
>+		// if we locked out hash bucket before the call then we
could
>have
>+		// a deadlock
>+		// thus we get the free buffer before we lock the hash
bucket we
>will
>+		// be inserting into.  This avoids a dead lock but generates
a
>race
>+		// we take care of the race by reversing the getFreeBuffer()
>work
>+		// when we lose the race
>+		Bdb *bdbAvailable;
>+		bdbAvailable = getFreeBuffer();
>+		lockHash.lock(Exclusive);
>
>+		bdb = findBdb(dbb, pageNumber, slot);
> 	if (!bdb)
> 		{
>-		sync.unlock();
>-		actual = Exclusive;
>-		sync.lock(Exclusive);
>-
>-		for (bdb = hashTable [slot]; bdb; bdb = bdb->hash)
>-			if (bdb->pageNumber == pageNumber && bdb->dbb ==
dbb)
>-				{
>-
//syncObject.validateExclusive("Cache::fetchPage
>(retry)");
>-				bdb->incrementUseCount(ADD_HISTORY);
>-				sync.unlock();
>-				bdb->addRef(lockType  COMMA_ADD_HISTORY);
>-				bdb->decrementUseCount(REL_HISTORY);
>-				hit = 2;
>-				break;
>-				}
>+			// we won the race so lets use the free bdb
>+			/* Set new page number and relink into hash table */
>+			bdbAvailable->addRef (Exclusive  COMMA_ADD_HISTORY);
>+			bdbAvailable->decrementUseCount(REL_HISTORY);
>+
>+			bdbAvailable->hash = hashTable [slot];
>+			hashTable [slot] = bdbAvailable;
>+			bdbAvailable->pageNumber = pageNumber;
>+			bdbAvailable->dbb = dbb;
>
>-		if (!bdb)
>-			{
>-			bdb = findBuffer(dbb, pageNumber, actual);
>+#ifdef COLLECT_BDB_HISTORY
>+			bdbAvailable->initHistory();
>+#endif
>+			bdb = bdbAvailable;
> 			moveToHead(bdb);
>-			sync.unlock();
>+			lockHash.unlock();
>
> #ifdef STOP_PAGE
> 			if (bdb->pageNumber == STOP_PAGE)
>@@ -278,9 +348,30 @@ Bdb* Cache::fetchPage(Dbb *dbb, int32 pa
> #ifdef HAVE_PAGE_NUMBER
> 			ASSERT(bdb->buffer->pageNumber == pageNumber);
> #endif
>-			if (actual != lockType)
>+			if (Exclusive != lockType)
> 				bdb->downGrade(lockType);
> 			}
>+			else
>+			{
>+			// lost a race.  put our available back to useable
>+			bdbAvailable->pageNumber = -1;
>+			bdbAvailable->dbb = NULL;
>+			bdbAvailable->decrementUseCount(REL_HISTORY);
>+
>+			//syncObject.validateExclusive("Cache::fetchPage
>(retry)");
>+			bdb->incrementUseCount(ADD_HISTORY);
>+			lockHash.unlock();
>+			bdb->addRef(lockType  COMMA_ADD_HISTORY);
>+			bdb->decrementUseCount(REL_HISTORY);
>+			}
>+		}
>+		else
>+		{
>+		//syncObject.validateShared("Cache::fetchPage");
>+		bdb->incrementUseCount(ADD_HISTORY);
>+		lockHash.unlock();
>+		bdb->addRef(lockType  COMMA_ADD_HISTORY);
>+		bdb->decrementUseCount(REL_HISTORY);
> 		}
>
> 	Page *page = bdb->buffer;
>@@ -304,9 +395,9 @@ Bdb* Cache::fetchPage(Dbb *dbb, int32 pa
>
> 	// If buffer has moved out of the upper "fraction" of the LRU queue,
>move it back up
>
>+	// non-protected access to age is harmless since it is fuzzy anyway
> 	if (bdb->age < bufferAge - (uint64) upperFraction)
> 		{
>-		sync.lock (Exclusive);
> 		moveToHead (bdb);
> 		}
>
>@@ -319,9 +410,10 @@ Bdb* Cache::fetchPage(Dbb *dbb, int32 pa
>
> Bdb* Cache::fakePage(Dbb *dbb, int32 pageNumber, PageType type, TransId
>transId)
> {
>-	Sync sync(&syncObject, "Cache::fakePage");
>-	sync.lock(Exclusive);
>-	int	slot = pageNumber % hashSize;
>+	int	slot = PAGENUM_2_SLOT(pageNumber);
>+	Sync lockHash (&syncHashTable[PAGENUM_2_LOCK_INDEX(pageNumber,
slot)],
>"Cache::fakePage");
>+	lockHash.lock(Exclusive);
>+	Bdb *bdb;
>
> #ifdef STOP_PAGE
> 	if (pageNumber == STOP_PAGE)
>@@ -330,25 +422,64 @@ Bdb* Cache::fakePage(Dbb *dbb, int32 pag
>
> 	/* If we already have a buffer for this, we're done */
>
>-	Bdb *bdb;
>-
>-	for (bdb = hashTable [slot]; bdb; bdb = bdb->hash)
>-		if (bdb->pageNumber == pageNumber && bdb->dbb == dbb)
>+	bdb = findBdb(dbb, pageNumber, slot);
>+	if (!bdb)
> 			{
>-			if (bdb->syncObject.isLocked())
>+		lockHash.unlock();
>+		// get getFreeBuffer() locks a hash bucket to remove the
>candidate bdb
>+		// if we locked out hash bucket before the call then we
could
>have
>+		// a deadlock
>+		// thus we get the free buffer before we lock the hash
bucket we
>will
>+		// be inserting into.  This avoids a dead lock but generates
a
>race
>+		// we take care of the race by reversing the getFreeBuffer()
>work
>+		// when we lose the race
>+		Bdb *bdbAvailable;
>+		bdbAvailable = getFreeBuffer();
>+		lockHash.lock(Exclusive);
>+
>+		bdb = findBdb(dbb, pageNumber, slot);
>+		if (!bdb)
> 				{
>-				// The pageWriter may still be cleaning up
this
>freed page with a shared lock
>-				ASSERT(bdb->buffer->pageType == PAGE_free);
>-				ASSERT(bdb->syncObject.getState() >= 0);
>-				}
>+			// we won the race so lets use the free bdb
>+			/* Set new page number and relink into hash table */
>+			bdbAvailable->addRef (Exclusive  COMMA_ADD_HISTORY);
>+			bdbAvailable->decrementUseCount(REL_HISTORY);
>
>-			bdb->addRef(Exclusive  COMMA_ADD_HISTORY);
>+			bdbAvailable->hash = hashTable [slot];
>+			hashTable [slot] = bdbAvailable;
>+			bdbAvailable->pageNumber = pageNumber;
>+			bdbAvailable->dbb = dbb;
>+
>+#ifdef COLLECT_BDB_HISTORY
>+			bdbAvailable->initHistory();
>+#endif
>+			bdb = bdbAvailable;
>+			moveToHead(bdb);
>+			lockHash.unlock();
>
>-			break;
> 			}
>+			else
>+			{
>+			// lost a race.  put our available back to useable
>+			bdbAvailable->pageNumber = -1;
>+			bdbAvailable->dbb = NULL;
>+			bdbAvailable->decrementUseCount(REL_HISTORY);
>
>-	if (!bdb)
>-		bdb = findBuffer(dbb, pageNumber, Exclusive);
>+			//syncObject.validateExclusive("Cache::fakePage
>(retry)");
>+			bdb->incrementUseCount(ADD_HISTORY);
>+			lockHash.unlock();
>+			bdb->addRef(Exclusive  COMMA_ADD_HISTORY);
>+			bdb->decrementUseCount(REL_HISTORY);
>+			}
>+		}
>+		else
>+		{
>+		//syncObject.validateShared("Cache::fakePage");
>+		bdb->incrementUseCount(ADD_HISTORY);
>+		lockHash.unlock();
>+		bdb->addRef(Exclusive  COMMA_ADD_HISTORY);
>+		bdb->decrementUseCount(REL_HISTORY);
>+		}
>
> 	if (!dbb->isReadOnly)
> 		bdb->mark(transId);
>@@ -363,14 +494,14 @@ Bdb* Cache::fakePage(Dbb *dbb, int32 pag
> void Cache::flush(int64 arg)
> {
> 	Sync flushLock(&syncFlush, "Cache::flush(1)");
>-	Sync sync(&syncDirty, "Cache::flush(2)");
>+	Sync dirtyLock(&syncDirty, "Cache::flush(2)");
> 	flushLock.lock(Exclusive);
>
> 	if (flushing)
> 		return;
>
> 	syncWait.lock(NULL, Exclusive);
>-	sync.lock(Shared);
>+	dirtyLock.lock(Shared);
> 	//Log::debug(%d: "Initiating flush\n", dbb->deltaTime);
> 	flushArg = arg;
> 	flushPages = 0;
>@@ -388,7 +519,7 @@ void Cache::flush(int64 arg)
>
> 	flushStart = database->timestamp;
> 	flushing = true;
>-	sync.unlock();
>+	dirtyLock.unlock();
> 	flushLock.unlock();
>
> 	for (int n = 0; n < numberIoThreads; ++n)
>@@ -398,69 +529,92 @@ void Cache::flush(int64 arg)
>
> void Cache::moveToHead(Bdb * bdb)
> {
>+	Sync bufferQueueLock (&bufferQueue.syncObject, "Cache::moveToHead");
>+
>+	bufferQueueLock.lock (Exclusive);
> 	bdb->age = bufferAge++;
> 	bufferQueue.remove(bdb);
> 	bufferQueue.prepend(bdb);
> 	//validateUnique (bdb);
> }
>
>-Bdb* Cache::findBuffer(Dbb *dbb, int pageNumber, LockType lockType)
>+Bdb* Cache::getFreeBuffer(void)
> {
>-	//syncObject.validateExclusive("Cache::findBuffer");
>-	int	slot = pageNumber % hashSize;
>-	Sync sync(&syncDirty, "Cache::findBuffer");
>-
>-	/* Find least recently used, not-in-use buffer */
>-
>+	Sync bufferQueueLock (&bufferQueue.syncObject,
>"Cache::getFreeBuffer");
>+	unsigned int count;
> 	Bdb *bdb;
>
> 	// Find a candidate BDB.
>-
> 	for (;;)
> 		{
>-		for (bdb = bufferQueue.last; bdb; bdb = bdb->prior)
>+		bufferQueueLock.lock (Exclusive);
>+		// find a candidate that is NOT in use and NOT dirty and in
the
>tail fraction of the LRU
>+		for (count = 0, bdb = bufferQueue.last; bdb && count <
>upperFraction; bdb = bdb->prior, count++)
> 			if (bdb->useCount == 0)
>-				break;
>+				{
>+				if (!bdb->isDirty)
>+					{
>+					bdb->incrementUseCount(REL_HISTORY);
>+					break;
>+					}
>+				}
>+				else
>+				{
>+					moveToHead(bdb);
>+				}
>+		if (!bdb)
>+			// find a candidate that is NOT in use, could be
dirty
>+			for (bdb = bufferQueue.last; bdb; bdb = bdb->prior)
>+				if (bdb->useCount == 0)
>+					{
>+					bdb->incrementUseCount(REL_HISTORY);
>+					break;
>+					}
>+		bufferQueueLock.unlock();
>
> 		if (!bdb)
> 			throw SQLError(RUNTIME_ERROR, "buffer pool is
>exhausted\n");
>
>-		if (!bdb->isDirty)
>-			break;
>-
>-		writePage (bdb, WRITE_TYPE_REUSE);
>-		}
>-
>-	/* Unlink its old incarnation from the page/hash table */
>+		if (bdb->pageNumber >= 0)
>+		{
>+			int	slotRemove =
PAGENUM_2_SLOT(bdb->pageNumber);
>+			Sync lockHashRemove
>(&syncHashTable[PAGENUM_2_LOCK_INDEX(bdb->pageNumber, slotRemove)],
>"Cache::getFreeBuffer");
>+			lockHashRemove.lock(Exclusive);
>
>-	if (bdb->pageNumber >= 0)
>-		for (Bdb **ptr = hashTable + bdb->pageNumber % hashSize;;
ptr =
>&(*ptr)->hash)
>-			if (*ptr == bdb)
>+			if (bdb->useCount != 1)
> 				{
>-				*ptr = bdb->hash;
>-				break;
>+				// we lost a race try again
>+				bdb->decrementUseCount(REL_HISTORY);
>+				lockHashRemove.unlock();
>+				continue;
> 				}
>-			else
>-				ASSERT (*ptr);
>-
>-	bdb->addRef (lockType  COMMA_ADD_HISTORY);
>
>-	/* Set new page number and relink into hash table */
>+			if (bdb->isDirty)
>+				writePage (bdb, WRITE_TYPE_REUSE);
>
>-	bdb->hash = hashTable [slot];
>-	hashTable [slot] = bdb;
>-	bdb->pageNumber = pageNumber;
>-	bdb->dbb = dbb;
>+			/* Unlink its old incarnation from the page/hash
table */
>+			for (Bdb **ptr = hashTable + PAGENUM_2_SLOT(bdb-
>>pageNumber) ;; ptr = &(*ptr)->hash)
>+				if (*ptr == bdb)
>+					{
>+					*ptr = bdb->hash;
>+					break;
>+					}
>+				else
>+					ASSERT (*ptr);
>+		}
>
>-#ifdef COLLECT_BDB_HISTORY
>-	bdb->initHistory();
>-#endif
>+		break;
>+		}
>
> 	return bdb;
> }
>
> void Cache::validate()
> {
>+	//Sync bufferQueueLock (&bufferQueue.syncObject,
"Cache::moveToHead");
>+
>+	//bufferQueueLock.lock (Shared);
>+	// non-protected access to bufferQueue is DANGEROUS...
> 	for (Bdb *bdb = bufferQueue.last; bdb; bdb = bdb->prior)
> 		{
> 		//IndexPage *page = (IndexPage*) bdb->buffer;
>@@ -470,8 +624,8 @@ void Cache::validate()
>
> void Cache::markDirty(Bdb *bdb)
> {
>-	Sync sync (&syncDirty, "Cache::markDirty");
>-	sync.lock (Exclusive);
>+	Sync dirtyLock (&syncDirty, "Cache::markDirty");
>+	dirtyLock.lock (Exclusive);
> 	bdb->nextDirty = NULL;
> 	bdb->priorDirty = lastDirty;
>
>@@ -487,8 +641,8 @@ void Cache::markDirty(Bdb *bdb)
>
> void Cache::markClean(Bdb *bdb)
> {
>-	Sync sync (&syncDirty, "Cache::markClean");
>-	sync.lock (Exclusive);
>+	Sync dirtyLock (&syncDirty, "Cache::markClean");
>+	dirtyLock.lock (Exclusive);
>
> 	/***
> 	if (bdb->flushIt)
>@@ -600,8 +754,8 @@ void Cache::writePage(Bdb *bdb, int type
>
> 	if (dbb->shadows)
> 		{
>-		Sync sync (&dbb->syncClone, "Cache::writePage(2)");
>-		sync.lock (Shared);
>+		Sync cloneLock (&dbb->syncClone, "Cache::writePage(2)");
>+		cloneLock.lock (Shared);
>
> 		for (DatabaseCopy *shadow = dbb->shadows; shadow; shadow =
>shadow->next)
> 			shadow->rewritePage(bdb);
>@@ -610,14 +764,15 @@ void Cache::writePage(Bdb *bdb, int type
>
> void Cache::analyze(Stream *stream)
> {
>-	Sync sync (&syncDirty, "Cache::analyze");
>-	sync.lock (Shared);
>+	Sync dirtyLock (&syncDirty, "Cache::analyze");
>+	dirtyLock.lock (Shared);
> 	int inUse = 0;
> 	int dirty = 0;
> 	int dirtyList = 0;
> 	int total = 0;
> 	Bdb *bdb;
>
>+	// non-protected access to bdbs,endBdbs is DANGEROUS...
> 	for (bdb = bdbs; bdb < endBdbs; ++bdb)
> 		{
> 		++total;
>@@ -638,17 +793,18 @@ void Cache::analyze(Stream *stream)
>
> void Cache::validateUnique(Bdb *target)
> {
>-	int	slot = target->pageNumber % hashSize;
>+	int	slot = PAGENUM_2_SLOT(target->pageNumber);
>
>+	// WARNING: unlocked walk of hash table.... DANGEROUS
> 	for (Bdb *bdb = hashTable [slot]; bdb; bdb = bdb->hash)
> 		ASSERT (bdb == target || !(bdb->pageNumber ==
target->pageNumber
>&& bdb->dbb == target->dbb));
> }
>
> void Cache::freePage(Dbb *dbb, int32 pageNumber)
> {
>-	Sync sync (&syncObject, "Cache::freePage");
>-	sync.lock (Shared);
>-	int	slot = pageNumber % hashSize;
>+	int slot = PAGENUM_2_SLOT(pageNumber);
>+	Sync lockHash (&syncHashTable[PAGENUM_2_LOCK_INDEX(pageNumber,
slot)],
>"Cache::freePage");
>+	lockHash.lock(Shared);
>
> 	// If page exists in cache (usual case), clean it up
>
>@@ -657,7 +813,7 @@ void Cache::freePage(Dbb *dbb, int32 pag
> 			{
> 			if (bdb->isDirty)
> 				{
>-				sync.unlock();
>+				lockHash.unlock();
> 				markClean (bdb);
> 				}
>
>@@ -670,8 +826,8 @@ void Cache::flush(Dbb *dbb)
> {
> 	//Sync sync (&syncDirty, "Cache::flush(1)");
> 	//sync.lock (Exclusive);
>-	Sync sync (&syncObject, "Cache::flush(3)");
>-	sync.lock (Shared);
>+	Sync objectLock (&syncObject, "Cache::flush(3)");
>+	objectLock.lock (Shared);
>
> 	for (Bdb *bdb = bdbs; bdb < endBdbs; ++bdb)
> 		if (bdb->dbb == dbb)
>@@ -685,8 +841,8 @@ void Cache::flush(Dbb *dbb)
>
> bool Cache::hasDirtyPages(Dbb *dbb)
> {
>-	Sync sync (&syncDirty, "Cache::hasDirtyPages");
>-	sync.lock (Shared);
>+	Sync dirtyLock (&syncDirty, "Cache::hasDirtyPages");
>+	dirtyLock.lock (Shared);
>
> 	for (Bdb *bdb = firstDirty; bdb; bdb = bdb->nextDirty)
> 		if (bdb->dbb == dbb)
>@@ -717,25 +873,21 @@ Bdb* Cache::trialFetch(Dbb* dbb, int32 p
> 		}
>
> 	ASSERT (pageNumber >= 0);
>-	int	slot = pageNumber % hashSize;
>-	Sync sync (&syncObject, "Cache::trialFetch");
>-	sync.lock (Shared);
>-	int hit = 0;
>+	int	slot = PAGENUM_2_SLOT(pageNumber);
>+	Sync lockHash (&syncHashTable[PAGENUM_2_LOCK_INDEX(pageNumber,
slot)],
>"Cache::trialFetch");
>+	lockHash.lock(Shared);
>+	Bdb *bdb;
>
> 	/* If we already have a buffer for this go, we're done */
>
>-	Bdb *bdb;
>-
>-	for (bdb = hashTable [slot]; bdb; bdb = bdb->hash)
>-		if (bdb->pageNumber == pageNumber && bdb->dbb == dbb)
>+	bdb = findBdb(dbb, pageNumber, slot);
>+	if (bdb)
> 			{
> 			//syncObject.validateShared("Cache::trialFetch");
> 			bdb->incrementUseCount(ADD_HISTORY);
>-			sync.unlock();
>+		lockHash.unlock();
> 			bdb->addRef(lockType  COMMA_ADD_HISTORY);
> 			bdb->decrementUseCount(REL_HISTORY);
>-			hit = 1;
>-			break;
> 			}
>
> 	return bdb;
>@@ -764,10 +916,9 @@ void Cache::ioThread(void* arg)
>
> void Cache::ioThread(void)
> {
>-	Sync syncThread(&syncThreads, "Cache::ioThread(1)");
>+	Sync syncThread(&syncThreads, "Cache::ioThread");
> 	syncThread.lock(Shared);
>-	Sync flushLock(&syncFlush, "Cache::ioThread(2)");
>-	Sync sync(&syncObject, "Cache::ioThread(3)");
>+	Sync flushLock(&syncFlush, "Cache::ioThread");
> 	Priority priority(database->ioScheduler);
> 	Thread *thread = Thread::getThread("Cache::ioThread");
> 	UCHAR *rawBuffer = new UCHAR[ASYNC_BUFFER_SIZE];
>@@ -781,137 +932,135 @@ void Cache::ioThread(void)
> 		{
> 		int32 pageNumber = flushBitmap->nextSet(0);
> 		int count;
>-		Dbb *dbb;
>
> 		if (pageNumber >= 0)
> 			{
>-			int	slot = pageNumber % hashSize;
>+			Bdb *bdb;
>+			Dbb *dbb;
>+			int	slot = PAGENUM_2_SLOT(pageNumber);
> 			bool hit = false;
> 			Bdb *bdbList = NULL;
> 			UCHAR *p = buffer;
>-			sync.lock(Shared);
>-
>-			// Look for a page to flush.  Then get all his
friends
>
>-			for (Bdb *bdb = hashTable[slot]; bdb; bdb =
bdb->hash)
>-				if (bdb->pageNumber == pageNumber &&
bdb->flushIt
>&& bdb->isDirty)
>+			// Look for the page to flush.
>+			bdb = lockFindBdbIncrementUseCount(pageNumber,
slot);
>+			if (bdb && bdb->flushIt && bdb->isDirty)
>+				{
>+				hit = true;
>+				count = 0;
>+				dbb = bdb->dbb;
>+
>+				flushBitmap->clear(pageNumber);
>+
>+				// get all his friends
>+				while (p < end)
> 					{
>-					hit = true;
>-					count = 0;
>-					dbb = bdb->dbb;
>+					++count;
>+					bdb->addRef(Shared
COMMA_ADD_HISTORY);
>
>-					if (!bdb->hash)
>-
flushBitmap->clear(pageNumber);
>+					bdb->syncWrite.lock(NULL,
Exclusive);
>+					bdb->ioThreadNext = bdbList;
>+					bdbList = bdb;
>
>-					while (p < end)
>-						{
>-						++count;
>-
bdb->incrementUseCount(ADD_HISTORY);
>-						sync.unlock();
>-						bdb->addRef(Shared
>COMMA_ADD_HISTORY);
>-						if (falcon_use_sectorcache)
>-
sectorCache->writePage(bdb);
>-
>-						bdb->syncWrite.lock(NULL,
Exclusive);
>-						bdb->ioThreadNext = bdbList;
>-						bdbList = bdb;
>-
>-						//ASSERT(!(bdb->flags &
>BDB_write_pending));
>-						//bdb->flags |=
BDB_write_pending;
>-						memcpy(p, bdb->buffer,
pageSize);
>-						p += pageSize;
>-						bdb->flushIt = false;
>-						markClean(bdb);
>-						bdb->isDirty = false;
>-						bdb->release(REL_HISTORY);
>-						sync.lock(Shared);
>-
>-						if ( !(bdb = findBdb(dbb,
bdb-
>>pageNumber + 1)) )
>-							break;
>-
>-						if (!bdb->isDirty &&
>!continueWrite(bdb))
>-							break;
>-						}
>+					//ASSERT(!(bdb->flags &
BDB_write_pending));
>+					//bdb->flags |= BDB_write_pending;
>+					memcpy(p, bdb->buffer, pageSize);
>+					p += pageSize;
>+					bdb->flushIt = false;
>+					markClean(bdb);
>+					bdb->isDirty = false;
>+					bdb->release(REL_HISTORY);
>
>-					if (sync.state != None)
>-						sync.unlock();
>-
>-					flushLock.unlock();
>-					//Log::debug(" %d Writing %s %d
pages: %d -
>%d\n", thread->threadId, (const char*) dbb->fileName, count, pageNumber,
>pageNumber + count - 1);
>-					int length = p - buffer;
>-					priority.schedule(PRIORITY_LOW);
>+					bdb =
lockFindBdbIncrementUseCount(dbb, bdb-
>>pageNumber + 1);
>+					if (!bdb)
>+						break;
>
>-					try
>+					if (!bdb->isDirty &&
!continueWrite(bdb))
> 						{
>-
priority.schedule(PRIORITY_LOW);
>-						dbb->writePages(pageNumber,
length,
>buffer, WRITE_TYPE_FLUSH);
>+
bdb->decrementUseCount(REL_HISTORY);
>+						break;
> 						}
>-					catch (SQLException& exception)
>+					}
>+
>+				flushLock.unlock();
>+				//Log::debug(" %d Writing %s %d pages: %d -
%d\n",
>thread->threadId, (const char*) dbb->fileName, count, pageNumber,
pageNumber
>+ count - 1);
>+				int length = p - buffer;
>+				priority.schedule(PRIORITY_LOW);
>+
>+				try
>+					{
>+					priority.schedule(PRIORITY_LOW);
>+					dbb->writePages(pageNumber, length,
buffer,
>WRITE_TYPE_FLUSH);
>+					}
>+				catch (SQLException& exception)
>+					{
>+					priority.finished();
>+
>+					if (exception.getSqlcode() !=
DEVICE_FULL)
>+						throw;
>+
>+					database->setIOError(&exception);
>+
>+					for (bool error = true; error;)
> 						{
>-						priority.finished();
>-
>-						if (exception.getSqlcode()
!=
>DEVICE_FULL)
>-							throw;
>-
>-
database->setIOError(&exception);
>-
>-						for (bool error = true;
error;)
>+						if
(thread->shutdownInProgress)
> 							{
>-							if
(thread->shutdownInProgress)
>-								{
>-								Bdb *next;
>+							Bdb *next;
>
>-								for (bdb =
bdbList; bdb;
>bdb = next)
>-									{
>-
//bdb->flags &=
>~BDB_write_pending;
>-									next
= bdb-
>>ioThreadNext;
>-									bdb-
>>syncWrite.unlock();
>-									bdb-
>>decrementUseCount(REL_HISTORY);
>-									}
>-
>-								return;
>-								}
>-
>-							thread->sleep(1000);
>-
>-							try
>+							for (bdb = bdbList;
bdb; bdb =
>next)
> 								{
>-
>	priority.schedule(PRIORITY_LOW);
>-								dbb-
>>writePages(pageNumber, length, buffer, WRITE_TYPE_FLUSH);
>-								error =
false;
>-
database->clearIOError();
>+								//bdb->flags
&=
>~BDB_write_pending;
>+								next =
bdb->ioThreadNext;
>+
bdb->syncWrite.unlock();
>+								bdb-
>>decrementUseCount(REL_HISTORY);
> 								}
>-							catch (SQLException&
>exception2)
>-								{
>-
priority.finished();
>
>-								if
>(exception2.getSqlcode() != DEVICE_FULL)
>-
throw;
>-								}
>+							return;
>+							}
>+
>+						thread->sleep(1000);
>+
>+						try
>+							{
>+
>	priority.schedule(PRIORITY_LOW);
>+
dbb->writePages(pageNumber,
>length, buffer, WRITE_TYPE_FLUSH);
>+							error = false;
>+
database->clearIOError();
>+							}
>+						catch (SQLException&
exception2)
>+							{
>+							priority.finished();
>+
>+							if
(exception2.getSqlcode() !=
>DEVICE_FULL)
>+								throw;
> 							}
> 						}
>+					}
>
>-					priority.finished();
>-					Bdb *next;
>+				priority.finished();
>+				Bdb *next;
>
>-					for (bdb = bdbList; bdb; bdb = next)
>-						{
>-						//ASSERT(bdb->flags &
>BDB_write_pending);
>-						//bdb->flags &=
~BDB_write_pending;
>-						next = bdb->ioThreadNext;
>-						bdb->syncWrite.unlock();
>-
bdb->decrementUseCount(REL_HISTORY);
>-						}
>-
>-					flushLock.lock(Exclusive);
>-					++physicalWrites;
>-
>-					break;
>+				for (bdb = bdbList; bdb; bdb = next)
>+					{
>+					//ASSERT(bdb->flags &
BDB_write_pending);
>+					//bdb->flags &= ~BDB_write_pending;
>+					next = bdb->ioThreadNext;
>+					bdb->syncWrite.unlock();
>+					bdb->decrementUseCount(REL_HISTORY);
> 					}
>+
>+				flushLock.lock(Exclusive);
>+				++physicalWrites;
>+
>+				}
>+			else
>+				{
>+					if (bdb)
>+
bdb->decrementUseCount(REL_HISTORY);
>+				}
>
> 			if (!hit)
> 				{
>-				sync.unlock();
> 				flushBitmap->clear(pageNumber);
> 				}
> 			}
>@@ -940,8 +1089,8 @@ void Cache::ioThread(void)
>
> 			thread->sleep();
> 			flushLock.lock(Exclusive);
>-			}
> 		}
>+		} // for ever
>
> 	delete [] rawBuffer;
> }
>@@ -974,8 +1123,8 @@ bool Cache::continueWrite(Bdb* startingB
> void Cache::shutdown(void)
> {
> 	shutdownThreads();
>-	Sync sync (&syncDirty, "Cache::shutdown");
>-	sync.lock (Exclusive);
>+	Sync dirtyLock (&syncDirty, "Cache::shutdown");
>+	dirtyLock.lock (Exclusive);
>
> 	for (Bdb *bdb = firstDirty; bdb; bdb = bdb->nextDirty)
> 		bdb->dbb->writePage(bdb, WRITE_TYPE_SHUTDOWN);
>@@ -995,8 +1144,8 @@ void Cache::shutdownThreads(void)
> 		ioThreads[n] = 0;
> 		}
>
>-	Sync sync(&syncThreads, "Cache::shutdownThreads");
>-	sync.lock(Exclusive);
>+	Sync lockThreads(&syncThreads, "Cache::shutdownThreads");
>+	lockThreads.lock(Exclusive);
> }
>
> void Cache::analyzeFlush(void)
>@@ -1048,7 +1197,8 @@ void Cache::openTraceFile(void)
> 	if (traceFile)
> 		closeTraceFile();
>
>-	traceFile = fopen(TRACE_FILE, "w");
>+	traceFile = fopen(TRACE_FILE, "a+");
>+	setlinebuf(traceFile);
> #endif
> }
>
>@@ -1065,6 +1215,6 @@ void Cache::closeTraceFile(void)
>
> void Cache::flushWait(void)
> {
>-	Sync sync(&syncWait, "Cache::flushWait");
>-	sync.lock(Shared);
>+	Sync waitLock(&syncWait, "Cache::flushWait");
>+	waitLock.lock(Shared);
> }
>
>=== modified file 'storage/falcon/Cache.h'
>--- a/storage/falcon/Cache.h	2008-06-06 19:20:10 +0000
>+++ b/storage/falcon/Cache.h	2008-08-18 13:47:57 +0000
>@@ -28,6 +28,17 @@
> #include "SyncObject.h"
> #include "Queue.h"
>
>+// define DEBUG_SYNC_HASH_TABLE_SIZE to cause more contention and test for
>race conditions
>+//#define DEBUG_SYNC_HASH_TABLE_SIZE (0x01 << 1)
>+#ifdef DEBUG_SYNC_HASH_TABLE_SIZE
>+#  define DEBUG_SYNC_HASH_TABLE_MASK (DEBUG_SYNC_HASH_TABLE_SIZE - 1)
>+#  define PAGENUM_2_LOCK_INDEX(_pgnum, _slot) ((_pgnum) &
>DEBUG_SYNC_HASH_TABLE_MASK)
>+#else /* DEBUG_SYNC_HASH_TABLE_SIZE */
>+#  define PAGENUM_2_LOCK_INDEX(_pgnum, _slot) ((_slot))
>+#endif /* DEBUG_SYNC_HASH_TABLE_SIZE */
>+
>+#define PAGENUM_2_SLOT(_pgnum) ((_pgnum) & hashMask)
>+
> class Bdb;
> class Dbb;
> class PageWriter;
>@@ -83,14 +94,18 @@ public:
> 	bool		flushing;
>
> protected:
>-	Bdb*		findBuffer (Dbb *dbb, int pageNumber, LockType
lockType);
>+	Bdb*		getFreeBuffer(void);
>+	Bdb*		findBdb(Dbb* dbb, int32 pageNumber, int slot);
> 	Bdb*		findBdb(Dbb* dbb, int32 pageNumber);
>+	Bdb*		lockFindBdbIncrementUseCount(Dbb* dbb, int32
pageNumber);
>+	Bdb*		lockFindBdbIncrementUseCount(int32 pageNumber, int
slot);
>
> 	int64		flushArg;
> 	Bdb			*bdbs;
> 	Bdb			*endBdbs;
> 	Queue<Bdb>	bufferQueue;
> 	Bdb			**hashTable;
>+	SyncObject  *syncHashTable;
> 	Bdb			*firstDirty;
> 	Bdb			*lastDirty;
> 	Bitmap		*flushBitmap;
>@@ -105,12 +120,13 @@ protected:
> 	int			flushPages;
> 	int			physicalWrites;
> 	int			hashSize;
>+	unsigned int	hashMask;
> 	int			pageSize;
>-	int			upperFraction;
>+	unsigned int upperFraction;
> 	int			numberHunks;
> 	int			numberDirtyPages;
> 	int			numberIoThreads;
>-	volatile int bufferAge;
>+	volatile uint64 bufferAge;
> public:
> 	void flushWait(void);
> };
>
>
>--
>MySQL Code Commits Mailing List
>For list archives: http://lists.mysql.com/commits
>To unsubscribe:    http://lists.mysql.com/commits?unsub=1

Thread
bzr commit into mysql-6.0-falcon branch (klong:2786) Kelly Long18 Aug
  • RE: bzr commit into mysql-6.0-falcon branch (klong:2786) Kevin Lewis21 Aug