MySQL Lists are EOL. Please join:

List:Commits« Previous MessageNext Message »
From:Christopher Powers Date:August 22 2008 6:56am
Subject:bzr push into mysql-6.0-falcon branch (cpowers:2793 to 2794) Bug#38041
View as plain text  
 2794 Christopher Powers	2008-08-22
      Bug#38041 "Bizarre errors when ALTER ADD/DROP KEY on Falcon tables"
      
      Resolved memory corruption stemming from incompatible allocation between Falcon and the StorageInterface.
      
      - Converted JString fields in StorageIndexDesc to char[]
      - Changed StorageTableShare::indexes[] from DenseArray to linked list
      - Reverted DenseArray to original
      - Reverted page cache changes, commmit 2793
modified:
  storage/falcon/Cache.cpp
  storage/falcon/Cache.h
  storage/falcon/DenseArray.h
  storage/falcon/StorageTable.cpp
  storage/falcon/StorageTable.h
  storage/falcon/StorageTableShare.cpp
  storage/falcon/StorageTableShare.h
  storage/falcon/ha_falcon.cpp
  storage/falcon/ha_falcon.h

 2793 Vladislav Vaintroub	2008-08-21 [merge]
      Correct compile error on Windows introduced by previous change
      (setlinebuf). Also, fix  compile warnings by cast
modified:
  storage/falcon/Cache.cpp
  storage/falcon/Cache.h
  storage/falcon/Interlock.h

=== modified file 'storage/falcon/Cache.cpp'
--- a/storage/falcon/Cache.cpp	2008-08-21 20:13:14 +0000
+++ b/storage/falcon/Cache.cpp	2008-08-22 06:47:40 +0000
@@ -72,21 +72,7 @@ Cache::Cache(Database *db, int pageSz, i
 	database = db;
 	panicShutdown = false;
 	pageSize = pageSz;
-
-	unsigned int highBit;
-	for (highBit=0x01; highBit < (uint32)hashSz; highBit= highBit << 1) { }
-
-	// if there are more than 4096 buckets then lets round down
-	// else lets round up
-	if (highBit >= 0x00001000) {
-		// KEL use power of two rounded down
-		hashSize = highBit << 1;
-	} else {
-		// KEL use power of two rounded up
-		hashSize = highBit;
-	}
-
-	hashMask = hashSize - 1;
+	hashSize = hashSz;
 	numberBuffers = numBuffers;
 	upperFraction = numberBuffers / 4;
 	bufferAge = 0;
@@ -94,21 +80,8 @@ Cache::Cache(Database *db, int pageSz, i
 	lastDirty = NULL;
 	numberDirtyPages = 0;
 	pageWriter = NULL;
-	hashTable = new Bdb* [hashSize];
+	hashTable = new Bdb* [hashSz];
 	memset (hashTable, 0, sizeof (Bdb*) * hashSize);
-#ifdef DEBUG_SYNC_HASH_TABLE_SIZE
-    syncHashTable = new SyncObject [DEBUG_SYNC_HASH_TABLE_SIZE];
-	for (int loop = 0; loop < DEBUG_SYNC_HASH_TABLE_SIZE; loop ++)
-		syncHashTable[loop].setName("Cache::syncHashTable");
-#else /* DEBUG_SYNC_HASH_TABLE_SIZE */
-    syncHashTable = new SyncObject [hashSize];
-	for (int loop = 0; loop < hashSize; loop ++)
-		{
-		char tmpName[128];
-		snprintf(tmpName,120,"Cache::syncHashTable[%d]",loop);
-		syncHashTable[loop].setName(tmpName);
-		}
-#endif /* DEBUG_SYNC_HASH_TABLE_SIZE */
 	sectorCache = new SectorCache(sectorCacheSize / SECTOR_BUFFER_SIZE, pageSize);
 
 	uint64 n = ((uint64) pageSize * numberBuffers + cacheHunkSize - 1) / cacheHunkSize;
@@ -130,7 +103,6 @@ Cache::Cache(Database *db, int pageSz, i
 	
 	try
 		{
-		// non-protected access to bdbs,endBdbs is OK during initialization
 		bdbs = new Bdb [numberBuffers];
 		endBdbs = bdbs + numberBuffers;
 		int remaining = 0;
@@ -149,7 +121,6 @@ Cache::Cache(Database *db, int pageSz, i
 				}
 
 			bdb->cache = this;
-			// non-protected access to bufferQueue is OK during initialization
 			bufferQueue.append(bdb);
 			bdb->buffer = (Page*) stuff;
 			stuff += pageSize;
@@ -179,7 +150,6 @@ Cache::~Cache()
 		closeTraceFile();
 
 	delete [] hashTable;
-	delete [] syncHashTable;
 	delete [] bdbs;
 	delete [] ioThreads;
 	delete flushBitmap;
@@ -197,16 +167,14 @@ Cache::~Cache()
 Bdb* Cache::probePage(Dbb *dbb, int32 pageNumber)
 {
 	ASSERT (pageNumber >= 0);
-	int slot = PAGENUM_2_SLOT(pageNumber);
-	Sync lockHash (&syncHashTable[PAGENUM_2_LOCK_INDEX(pageNumber, slot)], "Cache::probePage");
-	lockHash.lock (Shared);
-	Bdb *bdb;
+	Sync sync (&syncObject, "Cache::probePage");
+	sync.lock (Shared);
+	Bdb *bdb = findBdb(dbb, pageNumber);
 	
-	bdb = findBdb(dbb, pageNumber, slot);
 	if (bdb)
 		{
 		bdb->incrementUseCount(ADD_HISTORY);
-		lockHash.unlock();
+		sync.unlock();
 
 		if (bdb->buffer->pageType == PAGE_free)
 			{
@@ -221,57 +189,15 @@ Bdb* Cache::probePage(Dbb *dbb, int32 pa
 		return bdb;
 		}
 
-	lockHash.unlock();
-	return NULL;
-}
-
-Bdb* Cache::findBdb(Dbb* dbb, int32 pageNumber, int slot)
-{
-	for (Bdb *bdb = hashTable [slot]; bdb; bdb = bdb->hash)
-{
-		if (bdb->pageNumber == pageNumber && bdb->dbb == dbb)
-			{
-			return bdb;
-			}
-		}
-
 	return NULL;
 }
 
 Bdb* Cache::findBdb(Dbb* dbb, int32 pageNumber)
 {
-	return (findBdb(dbb, pageNumber, PAGENUM_2_SLOT(pageNumber)));
-}
-
-Bdb* Cache::lockFindBdbIncrementUseCount(Dbb* dbb, int32 pageNumber)
-{
-	int slot = PAGENUM_2_SLOT(pageNumber);
-	Sync lockHash (&syncHashTable[PAGENUM_2_LOCK_INDEX(pageNumber, slot)], "Cache::lockFindBdbIncrementUseCount");
-	lockHash.lock (Shared);
-	Bdb *bdb;
-
-	bdb = findBdb(dbb, pageNumber, slot);
-	if (bdb != NULL)
-		bdb->incrementUseCount(ADD_HISTORY);
-
-	lockHash.unlock();
-	return bdb;
-}
-
-Bdb* Cache::lockFindBdbIncrementUseCount(int32 pageNumber, int slot)
-{
-	Sync lockHash (&syncHashTable[PAGENUM_2_LOCK_INDEX(pageNumber, slot)], "Cache::lockFindBdbIncrementUseCount");
-	lockHash.lock (Shared);
-
-	for (Bdb *bdb = hashTable [slot]; bdb; bdb = bdb->hash)
-		if (bdb->pageNumber == pageNumber)
-		{
-			bdb->incrementUseCount(ADD_HISTORY);
-			lockHash.unlock();
+	for (Bdb *bdb = hashTable [pageNumber % hashSize]; bdb; bdb = bdb->hash)
+		if (bdb->pageNumber == pageNumber && bdb->dbb == dbb)
 			return bdb;
-		}
 
-	lockHash.unlock();
 	return NULL;
 }
 
@@ -291,46 +217,51 @@ Bdb* Cache::fetchPage(Dbb *dbb, int32 pa
 #endif
 
 	ASSERT (pageNumber >= 0);
-	int slot = PAGENUM_2_SLOT(pageNumber);
-	Sync lockHash (&syncHashTable[PAGENUM_2_LOCK_INDEX(pageNumber, slot)], "Cache::fetchPage");
+	int slot = pageNumber % hashSize;
+	LockType actual = lockType;
+	Sync sync (&syncObject, "Cache::fetchPage");
+	sync.lock (Shared);
+	int hit = 0;
+
+	/* If we already have a buffer for this go, we're done */
 
-	lockHash.lock (Shared);
 	Bdb *bdb;
 
-	bdb = findBdb(dbb, pageNumber, slot);
-	if (!bdb)
+	for (bdb = hashTable [slot]; bdb; bdb = bdb->hash)
+		if (bdb->pageNumber == pageNumber && bdb->dbb == dbb)
 			{
-		lockHash.unlock();
-		// get getFreeBuffer() locks a hash bucket to remove the candidate bdb
-		// if we locked out hash bucket before the call then we could have
-		// a deadlock
-		// thus we get the free buffer before we lock the hash bucket we will
-		// be inserting into.  This avoids a dead lock but generates a race
-		// we take care of the race by reversing the getFreeBuffer() work
-		// when we lose the race
-		Bdb *bdbAvailable;
-		bdbAvailable = getFreeBuffer();
-		lockHash.lock(Exclusive);
+			//syncObject.validateShared("Cache::fetchPage");
+			bdb->incrementUseCount(ADD_HISTORY);
+			sync.unlock();
+			bdb->addRef(lockType  COMMA_ADD_HISTORY);
+			bdb->decrementUseCount(REL_HISTORY);
+			hit = 1;
+			break;
+			}
 
-		bdb = findBdb(dbb, pageNumber, slot);
 	if (!bdb)
 		{
-			// we won the race so lets use the free bdb
-			/* Set new page number and relink into hash table */
-			bdbAvailable->addRef (Exclusive  COMMA_ADD_HISTORY);
-			bdbAvailable->decrementUseCount(REL_HISTORY);
-
-			bdbAvailable->hash = hashTable [slot];
-			hashTable [slot] = bdbAvailable;
-			bdbAvailable->pageNumber = pageNumber;
-			bdbAvailable->dbb = dbb;
+		sync.unlock();
+		actual = Exclusive;
+		sync.lock(Exclusive);
+
+		for (bdb = hashTable [slot]; bdb; bdb = bdb->hash)
+			if (bdb->pageNumber == pageNumber && bdb->dbb == dbb)
+				{
+				//syncObject.validateExclusive("Cache::fetchPage (retry)");
+				bdb->incrementUseCount(ADD_HISTORY);
+				sync.unlock();
+				bdb->addRef(lockType  COMMA_ADD_HISTORY);
+				bdb->decrementUseCount(REL_HISTORY);
+				hit = 2;
+				break;
+				}
 
-#ifdef COLLECT_BDB_HISTORY
-			bdbAvailable->initHistory();
-#endif
-			bdb = bdbAvailable;
+		if (!bdb)
+			{
+			bdb = findBuffer(dbb, pageNumber, actual);
 			moveToHead(bdb);
-			lockHash.unlock();
+			sync.unlock();
 
 #ifdef STOP_PAGE			
 			if (bdb->pageNumber == STOP_PAGE)
@@ -347,30 +278,9 @@ Bdb* Cache::fetchPage(Dbb *dbb, int32 pa
 #ifdef HAVE_PAGE_NUMBER
 			ASSERT(bdb->buffer->pageNumber == pageNumber);
 #endif			
-			if (Exclusive != lockType)
+			if (actual != lockType)
 				bdb->downGrade(lockType);
 			}
-			else
-			{
-			// lost a race.  put our available back to useable
-			bdbAvailable->pageNumber = -1;
-			bdbAvailable->dbb = NULL;
-			bdbAvailable->decrementUseCount(REL_HISTORY);
-
-			//syncObject.validateExclusive("Cache::fetchPage (retry)");
-			bdb->incrementUseCount(ADD_HISTORY);
-			lockHash.unlock();
-			bdb->addRef(lockType  COMMA_ADD_HISTORY);
-			bdb->decrementUseCount(REL_HISTORY);
-			}
-		}
-		else
-		{
-		//syncObject.validateShared("Cache::fetchPage");
-		bdb->incrementUseCount(ADD_HISTORY);
-		lockHash.unlock();
-		bdb->addRef(lockType  COMMA_ADD_HISTORY);
-		bdb->decrementUseCount(REL_HISTORY);
 		}
 
 	Page *page = bdb->buffer;
@@ -394,9 +304,9 @@ Bdb* Cache::fetchPage(Dbb *dbb, int32 pa
 
 	// If buffer has moved out of the upper "fraction" of the LRU queue, move it back up
 	
-	// non-protected access to age is harmless since it is fuzzy anyway
 	if (bdb->age < bufferAge - (uint64) upperFraction)
 		{
+		sync.lock (Exclusive);
 		moveToHead (bdb);
 		}
 		
@@ -409,10 +319,9 @@ Bdb* Cache::fetchPage(Dbb *dbb, int32 pa
 
 Bdb* Cache::fakePage(Dbb *dbb, int32 pageNumber, PageType type, TransId transId)
 {
-	int	slot = PAGENUM_2_SLOT(pageNumber);
-	Sync lockHash (&syncHashTable[PAGENUM_2_LOCK_INDEX(pageNumber, slot)], "Cache::fakePage");
-	lockHash.lock(Exclusive);
-	Bdb *bdb;
+	Sync sync(&syncObject, "Cache::fakePage");
+	sync.lock(Exclusive);
+	int	slot = pageNumber % hashSize;
 
 #ifdef STOP_PAGE			
 	if (pageNumber == STOP_PAGE)
@@ -421,64 +330,25 @@ Bdb* Cache::fakePage(Dbb *dbb, int32 pag
 
 	/* If we already have a buffer for this, we're done */
 
-	bdb = findBdb(dbb, pageNumber, slot);
-	if (!bdb)
-			{
-		lockHash.unlock();
-		// get getFreeBuffer() locks a hash bucket to remove the candidate bdb
-		// if we locked out hash bucket before the call then we could have
-		// a deadlock
-		// thus we get the free buffer before we lock the hash bucket we will
-		// be inserting into.  This avoids a dead lock but generates a race
-		// we take care of the race by reversing the getFreeBuffer() work
-		// when we lose the race
-		Bdb *bdbAvailable;
-		bdbAvailable = getFreeBuffer();
-		lockHash.lock(Exclusive);
+	Bdb *bdb;
 
-		bdb = findBdb(dbb, pageNumber, slot);
-		if (!bdb)
+	for (bdb = hashTable [slot]; bdb; bdb = bdb->hash)
+		if (bdb->pageNumber == pageNumber && bdb->dbb == dbb)
+			{
+			if (bdb->syncObject.isLocked())
 				{
-			// we won the race so lets use the free bdb
-			/* Set new page number and relink into hash table */
-			bdbAvailable->addRef (Exclusive  COMMA_ADD_HISTORY);
-			bdbAvailable->decrementUseCount(REL_HISTORY);
+				// The pageWriter may still be cleaning up this freed page with a shared lock
+				ASSERT(bdb->buffer->pageType == PAGE_free);
+				ASSERT(bdb->syncObject.getState() >= 0);
+				}
 				
-			bdbAvailable->hash = hashTable [slot];
-			hashTable [slot] = bdbAvailable;
-			bdbAvailable->pageNumber = pageNumber;
-			bdbAvailable->dbb = dbb;
-
-#ifdef COLLECT_BDB_HISTORY
-			bdbAvailable->initHistory();
-#endif
-			bdb = bdbAvailable;
-			moveToHead(bdb);
-			lockHash.unlock();
+			bdb->addRef(Exclusive  COMMA_ADD_HISTORY);
 			
+			break;
 			}
-			else
-			{
-			// lost a race.  put our available back to useable
-			bdbAvailable->pageNumber = -1;
-			bdbAvailable->dbb = NULL;
-			bdbAvailable->decrementUseCount(REL_HISTORY);
 
-			//syncObject.validateExclusive("Cache::fakePage (retry)");
-			bdb->incrementUseCount(ADD_HISTORY);
-			lockHash.unlock();
-			bdb->addRef(Exclusive  COMMA_ADD_HISTORY);
-			bdb->decrementUseCount(REL_HISTORY);
-			}
-		}
-		else
-		{
-		//syncObject.validateShared("Cache::fakePage");
-		bdb->incrementUseCount(ADD_HISTORY);
-		lockHash.unlock();
-		bdb->addRef(Exclusive  COMMA_ADD_HISTORY);
-		bdb->decrementUseCount(REL_HISTORY);
-		}
+	if (!bdb)
+		bdb = findBuffer(dbb, pageNumber, Exclusive);
 
 	if (!dbb->isReadOnly)
 		bdb->mark(transId);
@@ -493,14 +363,14 @@ Bdb* Cache::fakePage(Dbb *dbb, int32 pag
 void Cache::flush(int64 arg)
 {
 	Sync flushLock(&syncFlush, "Cache::flush(1)");
-	Sync dirtyLock(&syncDirty, "Cache::flush(2)");
+	Sync sync(&syncDirty, "Cache::flush(2)");
 	flushLock.lock(Exclusive);
 	
 	if (flushing)
 		return;
 
 	syncWait.lock(NULL, Exclusive);
-	dirtyLock.lock(Shared);
+	sync.lock(Shared);
 	//Log::debug(%d: "Initiating flush\n", dbb->deltaTime);
 	flushArg = arg;
 	flushPages = 0;
@@ -518,7 +388,7 @@ void Cache::flush(int64 arg)
 
 	flushStart = database->timestamp;
 	flushing = true;
-	dirtyLock.unlock();
+	sync.unlock();
 	flushLock.unlock();
 	
 	for (int n = 0; n < numberIoThreads; ++n)
@@ -528,92 +398,69 @@ void Cache::flush(int64 arg)
 
 void Cache::moveToHead(Bdb * bdb)
 {
-	Sync bufferQueueLock (&bufferQueue.syncObject, "Cache::moveToHead");
-
-	bufferQueueLock.lock (Exclusive);
 	bdb->age = bufferAge++;
 	bufferQueue.remove(bdb);
 	bufferQueue.prepend(bdb);
 	//validateUnique (bdb);
 }
 
-Bdb* Cache::getFreeBuffer(void)
+Bdb* Cache::findBuffer(Dbb *dbb, int pageNumber, LockType lockType)
 {
-	Sync bufferQueueLock (&bufferQueue.syncObject, "Cache::getFreeBuffer");
-	unsigned int count;
+	//syncObject.validateExclusive("Cache::findBuffer");
+	int	slot = pageNumber % hashSize;
+	Sync sync(&syncDirty, "Cache::findBuffer");
+	
+	/* Find least recently used, not-in-use buffer */
+
 	Bdb *bdb;
 
 	// Find a candidate BDB.
+	
 	for (;;)
 		{
-		bufferQueueLock.lock (Exclusive);
-		// find a candidate that is NOT in use and NOT dirty and in the tail fraction of the LRU
-		for (count = 0, bdb = bufferQueue.last; bdb && count < upperFraction; bdb = bdb->prior, count++)
+		for (bdb = bufferQueue.last; bdb; bdb = bdb->prior)
 			if (bdb->useCount == 0)
-				{
-				if (!bdb->isDirty)
-					{
-					bdb->incrementUseCount(REL_HISTORY);
-					break;
-					}
-				}
-				else
-				{
-					moveToHead(bdb);
-				}
-		if (!bdb)
-			// find a candidate that is NOT in use, could be dirty
-			for (bdb = bufferQueue.last; bdb; bdb = bdb->prior)
-				if (bdb->useCount == 0)
-					{
-					bdb->incrementUseCount(REL_HISTORY);
-					break;
-					}
-		bufferQueueLock.unlock();
+				break;
 
 		if (!bdb)
 			throw SQLError(RUNTIME_ERROR, "buffer pool is exhausted\n");
 			
-		if (bdb->pageNumber >= 0)
-		{
-			int	slotRemove = PAGENUM_2_SLOT(bdb->pageNumber);
-			Sync lockHashRemove (&syncHashTable[PAGENUM_2_LOCK_INDEX(bdb->pageNumber, slotRemove)], "Cache::getFreeBuffer");
-			lockHashRemove.lock(Exclusive);
+		if (!bdb->isDirty)
+			break;
+			
+		writePage (bdb, WRITE_TYPE_REUSE);
+		}
+
+	/* Unlink its old incarnation from the page/hash table */
 
-			if (bdb->useCount != 1)
+	if (bdb->pageNumber >= 0)
+		for (Bdb **ptr = hashTable + bdb->pageNumber % hashSize;; ptr = &(*ptr)->hash)
+			if (*ptr == bdb)
 				{
-				// we lost a race try again
-				bdb->decrementUseCount(REL_HISTORY);
-				lockHashRemove.unlock();
-				continue;
+				*ptr = bdb->hash;
+				break;
 				}
+			else
+				ASSERT (*ptr);
 
-			if (bdb->isDirty)
-				writePage (bdb, WRITE_TYPE_REUSE);
+	bdb->addRef (lockType  COMMA_ADD_HISTORY);
 
-			/* Unlink its old incarnation from the page/hash table */
-			for (Bdb **ptr = hashTable + PAGENUM_2_SLOT(bdb->pageNumber) ;; ptr = &(*ptr)->hash)
-				if (*ptr == bdb)
-					{
-					*ptr = bdb->hash;
-					break;
-					}
-				else
-					ASSERT (*ptr);
-		}
+	/* Set new page number and relink into hash table */
 
-		break;
-		}
+	bdb->hash = hashTable [slot];
+	hashTable [slot] = bdb;
+	bdb->pageNumber = pageNumber;
+	bdb->dbb = dbb;
+
+#ifdef COLLECT_BDB_HISTORY
+	bdb->initHistory();
+#endif
 
 	return bdb;
 }
 
 void Cache::validate()
 {
-	//Sync bufferQueueLock (&bufferQueue.syncObject, "Cache::moveToHead");
-
-	//bufferQueueLock.lock (Shared);
-	// non-protected access to bufferQueue is DANGEROUS...
 	for (Bdb *bdb = bufferQueue.last; bdb; bdb = bdb->prior)
 		{
 		//IndexPage *page = (IndexPage*) bdb->buffer;
@@ -623,8 +470,8 @@ void Cache::validate()
 
 void Cache::markDirty(Bdb *bdb)
 {
-	Sync dirtyLock (&syncDirty, "Cache::markDirty");
-	dirtyLock.lock (Exclusive);
+	Sync sync (&syncDirty, "Cache::markDirty");
+	sync.lock (Exclusive);
 	bdb->nextDirty = NULL;
 	bdb->priorDirty = lastDirty;
 
@@ -640,8 +487,8 @@ void Cache::markDirty(Bdb *bdb)
 
 void Cache::markClean(Bdb *bdb)
 {
-	Sync dirtyLock (&syncDirty, "Cache::markClean");
-	dirtyLock.lock (Exclusive);
+	Sync sync (&syncDirty, "Cache::markClean");
+	sync.lock (Exclusive);
 
 	/***
 	if (bdb->flushIt)
@@ -753,8 +600,8 @@ void Cache::writePage(Bdb *bdb, int type
 
 	if (dbb->shadows)
 		{
-		Sync cloneLock (&dbb->syncClone, "Cache::writePage(2)");
-		cloneLock.lock (Shared);
+		Sync sync (&dbb->syncClone, "Cache::writePage(2)");
+		sync.lock (Shared);
 
 		for (DatabaseCopy *shadow = dbb->shadows; shadow; shadow = shadow->next)
 			shadow->rewritePage(bdb);
@@ -763,15 +610,14 @@ void Cache::writePage(Bdb *bdb, int type
 
 void Cache::analyze(Stream *stream)
 {
-	Sync dirtyLock (&syncDirty, "Cache::analyze");
-	dirtyLock.lock (Shared);
+	Sync sync (&syncDirty, "Cache::analyze");
+	sync.lock (Shared);
 	int inUse = 0;
 	int dirty = 0;
 	int dirtyList = 0;
 	int total = 0;
 	Bdb *bdb;
 
-	// non-protected access to bdbs,endBdbs is DANGEROUS...
 	for (bdb = bdbs; bdb < endBdbs; ++bdb)
 		{
 		++total;
@@ -792,18 +638,17 @@ void Cache::analyze(Stream *stream)
 
 void Cache::validateUnique(Bdb *target)
 {
-	int	slot = PAGENUM_2_SLOT(target->pageNumber);
+	int	slot = target->pageNumber % hashSize;
 
-	// WARNING: unlocked walk of hash table.... DANGEROUS
 	for (Bdb *bdb = hashTable [slot]; bdb; bdb = bdb->hash)
 		ASSERT (bdb == target || !(bdb->pageNumber == target->pageNumber && bdb->dbb == target->dbb));
 }
 
 void Cache::freePage(Dbb *dbb, int32 pageNumber)
 {
-	int slot = PAGENUM_2_SLOT(pageNumber);
-	Sync lockHash (&syncHashTable[PAGENUM_2_LOCK_INDEX(pageNumber, slot)], "Cache::freePage");
-	lockHash.lock(Shared);
+	Sync sync (&syncObject, "Cache::freePage");
+	sync.lock (Shared);
+	int	slot = pageNumber % hashSize;
 
 	// If page exists in cache (usual case), clean it up
 
@@ -812,7 +657,7 @@ void Cache::freePage(Dbb *dbb, int32 pag
 			{
 			if (bdb->isDirty)
 				{
-				lockHash.unlock();
+				sync.unlock();
 				markClean (bdb);
 				}
 				
@@ -825,8 +670,8 @@ void Cache::flush(Dbb *dbb)
 {
 	//Sync sync (&syncDirty, "Cache::flush(1)");
 	//sync.lock (Exclusive);
-	Sync objectLock (&syncObject, "Cache::flush(3)");
-	objectLock.lock (Shared);
+	Sync sync (&syncObject, "Cache::flush(3)");
+	sync.lock (Shared);
 
 	for (Bdb *bdb = bdbs; bdb < endBdbs; ++bdb)
 		if (bdb->dbb == dbb)
@@ -840,8 +685,8 @@ void Cache::flush(Dbb *dbb)
 
 bool Cache::hasDirtyPages(Dbb *dbb)
 {
-	Sync dirtyLock (&syncDirty, "Cache::hasDirtyPages");
-	dirtyLock.lock (Shared);
+	Sync sync (&syncDirty, "Cache::hasDirtyPages");
+	sync.lock (Shared);
 
 	for (Bdb *bdb = firstDirty; bdb; bdb = bdb->nextDirty)
 		if (bdb->dbb == dbb)
@@ -872,21 +717,25 @@ Bdb* Cache::trialFetch(Dbb* dbb, int32 p
 		}
 
 	ASSERT (pageNumber >= 0);
-	int	slot = PAGENUM_2_SLOT(pageNumber);
-	Sync lockHash (&syncHashTable[PAGENUM_2_LOCK_INDEX(pageNumber, slot)], "Cache::trialFetch");
-	lockHash.lock(Shared);
-	Bdb *bdb;
+	int	slot = pageNumber % hashSize;
+	Sync sync (&syncObject, "Cache::trialFetch");
+	sync.lock (Shared);
+	int hit = 0;
 
 	/* If we already have a buffer for this go, we're done */
 
-	bdb = findBdb(dbb, pageNumber, slot);
-	if (bdb)
+	Bdb *bdb;
+
+	for (bdb = hashTable [slot]; bdb; bdb = bdb->hash)
+		if (bdb->pageNumber == pageNumber && bdb->dbb == dbb)
 			{
 			//syncObject.validateShared("Cache::trialFetch");
 			bdb->incrementUseCount(ADD_HISTORY);
-		lockHash.unlock();
+			sync.unlock();
 			bdb->addRef(lockType  COMMA_ADD_HISTORY);
 			bdb->decrementUseCount(REL_HISTORY);
+			hit = 1;
+			break;
 			}
 
 	return bdb;
@@ -915,9 +764,10 @@ void Cache::ioThread(void* arg)
 
 void Cache::ioThread(void)
 {
-	Sync syncThread(&syncThreads, "Cache::ioThread");
+	Sync syncThread(&syncThreads, "Cache::ioThread(1)");
 	syncThread.lock(Shared);
-	Sync flushLock(&syncFlush, "Cache::ioThread");
+	Sync flushLock(&syncFlush, "Cache::ioThread(2)");
+	Sync sync(&syncObject, "Cache::ioThread(3)");
 	Priority priority(database->ioScheduler);
 	Thread *thread = Thread::getThread("Cache::ioThread");
 	UCHAR *rawBuffer = new UCHAR[ASYNC_BUFFER_SIZE];
@@ -931,135 +781,137 @@ void Cache::ioThread(void)
 		{
 		int32 pageNumber = flushBitmap->nextSet(0);
 		int count;
+		Dbb *dbb;
 		
 		if (pageNumber >= 0)
 			{
-			Bdb *bdb;
-			Dbb *dbb;
-			int	slot = PAGENUM_2_SLOT(pageNumber);
+			int	slot = pageNumber % hashSize;
 			bool hit = false;
 			Bdb *bdbList = NULL;
 			UCHAR *p = buffer;
+			sync.lock(Shared);
 			
-			// Look for the page to flush.
-			bdb = lockFindBdbIncrementUseCount(pageNumber, slot);
-			if (bdb && bdb->flushIt && bdb->isDirty)
-				{
-				hit = true;
-				count = 0;
-				dbb = bdb->dbb;
-				
-				flushBitmap->clear(pageNumber);
-				
-				// get all his friends
-				while (p < end)
+			// Look for a page to flush.  Then get all his friends
+			
+			for (Bdb *bdb = hashTable[slot]; bdb; bdb = bdb->hash)
+				if (bdb->pageNumber == pageNumber && bdb->flushIt && bdb->isDirty)
 					{
-					++count;
-					bdb->addRef(Shared  COMMA_ADD_HISTORY);
-					
-					bdb->syncWrite.lock(NULL, Exclusive);
-					bdb->ioThreadNext = bdbList;
-					bdbList = bdb;
-					
-					//ASSERT(!(bdb->flags & BDB_write_pending));
-					//bdb->flags |= BDB_write_pending;
-					memcpy(p, bdb->buffer, pageSize);
-					p += pageSize;
-					bdb->flushIt = false;
-					markClean(bdb);
-					bdb->isDirty = false;
-					bdb->release(REL_HISTORY);
+					hit = true;
+					count = 0;
+					dbb = bdb->dbb;
 					
-					bdb = lockFindBdbIncrementUseCount(dbb, bdb->pageNumber + 1);
-					if (!bdb)
-						break;
+					if (!bdb->hash)
+						flushBitmap->clear(pageNumber);
 					
-					if (!bdb->isDirty && !continueWrite(bdb))
+					while (p < end)
 						{
-						bdb->decrementUseCount(REL_HISTORY);
-						break;
+						++count;
+						bdb->incrementUseCount(ADD_HISTORY);
+						sync.unlock();
+						bdb->addRef(Shared  COMMA_ADD_HISTORY);
+						if (falcon_use_sectorcache)
+							sectorCache->writePage(bdb);
+						
+						bdb->syncWrite.lock(NULL, Exclusive);
+						bdb->ioThreadNext = bdbList;
+						bdbList = bdb;
+						
+						//ASSERT(!(bdb->flags & BDB_write_pending));
+						//bdb->flags |= BDB_write_pending;
+						memcpy(p, bdb->buffer, pageSize);
+						p += pageSize;
+						bdb->flushIt = false;
+						markClean(bdb);
+						bdb->isDirty = false;
+						bdb->release(REL_HISTORY);
+						sync.lock(Shared);
+						
+						if ( !(bdb = findBdb(dbb, bdb->pageNumber + 1)) )
+							break;
+						
+						if (!bdb->isDirty && !continueWrite(bdb))
+							break;
 						}
-					}
-				
-				flushLock.unlock();
-				//Log::debug(" %d Writing %s %d pages: %d - %d\n", thread->threadId, (const char*) dbb->fileName, count, pageNumber, pageNumber + count - 1);
-				int length = (int)(p - buffer);
-				priority.schedule(PRIORITY_LOW);
-				
-				try
-					{
-					priority.schedule(PRIORITY_LOW);
-					dbb->writePages(pageNumber, length, buffer, WRITE_TYPE_FLUSH);
-					}
-				catch (SQLException& exception)
-					{
-					priority.finished();
-					
-					if (exception.getSqlcode() != DEVICE_FULL)
-						throw;
 					
-					database->setIOError(&exception);
+					if (sync.state != None)
+						sync.unlock();
+						
+					flushLock.unlock();
+					//Log::debug(" %d Writing %s %d pages: %d - %d\n", thread->threadId, (const char*) dbb->fileName, count, pageNumber, pageNumber + count - 1);
+					int length = p - buffer;
+					priority.schedule(PRIORITY_LOW);
 					
-					for (bool error = true; error;)
+					try
+						{
+						priority.schedule(PRIORITY_LOW);
+						dbb->writePages(pageNumber, length, buffer, WRITE_TYPE_FLUSH);
+						}
+					catch (SQLException& exception)
 						{
-						if (thread->shutdownInProgress)
+						priority.finished();
+						
+						if (exception.getSqlcode() != DEVICE_FULL)
+							throw;
+						
+						database->setIOError(&exception);
+						
+						for (bool error = true; error;)
 							{
-							Bdb *next;
+							if (thread->shutdownInProgress)
+								{
+								Bdb *next;
 
-							for (bdb = bdbList; bdb; bdb = next)
+								for (bdb = bdbList; bdb; bdb = next)
+									{
+									//bdb->flags &= ~BDB_write_pending;
+									next = bdb->ioThreadNext;
+									bdb->syncWrite.unlock();
+									bdb->decrementUseCount(REL_HISTORY);
+									}
+									
+								return;
+								}
+							
+							thread->sleep(1000);
+							
+							try
 								{
-								//bdb->flags &= ~BDB_write_pending;
-								next = bdb->ioThreadNext;
-								bdb->syncWrite.unlock();
-								bdb->decrementUseCount(REL_HISTORY);
+								priority.schedule(PRIORITY_LOW);
+								dbb->writePages(pageNumber, length, buffer, WRITE_TYPE_FLUSH);
+								error = false;
+								database->clearIOError();
 								}
+							catch (SQLException& exception2)
+								{
+								priority.finished();
 								
-							return;
-							}
-						
-						thread->sleep(1000);
-						
-						try
-							{
-							priority.schedule(PRIORITY_LOW);
-							dbb->writePages(pageNumber, length, buffer, WRITE_TYPE_FLUSH);
-							error = false;
-							database->clearIOError();
-							}
-						catch (SQLException& exception2)
-							{
-							priority.finished();
-							
-							if (exception2.getSqlcode() != DEVICE_FULL)
-								throw;
+								if (exception2.getSqlcode() != DEVICE_FULL)
+									throw;
+								}
 							}
 						}
-					}
 
-				priority.finished();
-				Bdb *next;
+					priority.finished();
+					Bdb *next;
 
-				for (bdb = bdbList; bdb; bdb = next)
-					{
-					//ASSERT(bdb->flags & BDB_write_pending);
-					//bdb->flags &= ~BDB_write_pending;
-					next = bdb->ioThreadNext;
-					bdb->syncWrite.unlock();
-					bdb->decrementUseCount(REL_HISTORY);
-					}
-				
-				flushLock.lock(Exclusive);
-				++physicalWrites;
-				
-				}
-			else
-				{
-					if (bdb)
+					for (bdb = bdbList; bdb; bdb = next)
+						{
+						//ASSERT(bdb->flags & BDB_write_pending);
+						//bdb->flags &= ~BDB_write_pending;
+						next = bdb->ioThreadNext;
+						bdb->syncWrite.unlock();
 						bdb->decrementUseCount(REL_HISTORY);
-				}
+						}
+					
+					flushLock.lock(Exclusive);
+					++physicalWrites;
+					
+					break;
+					}
 			
 			if (!hit)
 				{
+				sync.unlock();
 				flushBitmap->clear(pageNumber);
 				}
 			}
@@ -1088,8 +940,8 @@ void Cache::ioThread(void)
 
 			thread->sleep();
 			flushLock.lock(Exclusive);
+			}
 		}
-		} // for ever
 	
 	delete [] rawBuffer;			
 }
@@ -1122,8 +974,8 @@ bool Cache::continueWrite(Bdb* startingB
 void Cache::shutdown(void)
 {
 	shutdownThreads();
-	Sync dirtyLock (&syncDirty, "Cache::shutdown");
-	dirtyLock.lock (Exclusive);
+	Sync sync (&syncDirty, "Cache::shutdown");
+	sync.lock (Exclusive);
 
 	for (Bdb *bdb = firstDirty; bdb; bdb = bdb->nextDirty)
 		bdb->dbb->writePage(bdb, WRITE_TYPE_SHUTDOWN);
@@ -1143,8 +995,8 @@ void Cache::shutdownThreads(void)
 		ioThreads[n] = 0;
 		}
 	
-	Sync lockThreads(&syncThreads, "Cache::shutdownThreads");
-	lockThreads.lock(Exclusive);
+	Sync sync(&syncThreads, "Cache::shutdownThreads");
+	sync.lock(Exclusive);
 }
 
 void Cache::analyzeFlush(void)
@@ -1196,7 +1048,7 @@ void Cache::openTraceFile(void)
 	if (traceFile)
 		closeTraceFile();
 		
-	traceFile = fopen(TRACE_FILE, "a+");
+	traceFile = fopen(TRACE_FILE, "w");
 #endif
 }
 
@@ -1213,6 +1065,6 @@ void Cache::closeTraceFile(void)
 
 void Cache::flushWait(void)
 {
-	Sync waitLock(&syncWait, "Cache::flushWait");
-	waitLock.lock(Shared);
+	Sync sync(&syncWait, "Cache::flushWait");
+	sync.lock(Shared);
 }

=== modified file 'storage/falcon/Cache.h'
--- a/storage/falcon/Cache.h	2008-08-21 14:45:38 +0000
+++ b/storage/falcon/Cache.h	2008-08-22 06:47:40 +0000
@@ -28,17 +28,6 @@
 #include "SyncObject.h"
 #include "Queue.h"
 
-// uncomment DEBUG_SYNC_HASH_TABLE_SIZE to cause more contention and test for race conditions
-//#define DEBUG_SYNC_HASH_TABLE_SIZE (0x01 << 1)
-#ifdef DEBUG_SYNC_HASH_TABLE_SIZE
-#  define DEBUG_SYNC_HASH_TABLE_MASK (DEBUG_SYNC_HASH_TABLE_SIZE - 1)
-#  define PAGENUM_2_LOCK_INDEX(_pgnum, _slot) ((_pgnum) & DEBUG_SYNC_HASH_TABLE_MASK)
-#else /* DEBUG_SYNC_HASH_TABLE_SIZE */
-#  define PAGENUM_2_LOCK_INDEX(_pgnum, _slot) ((_slot))
-#endif /* DEBUG_SYNC_HASH_TABLE_SIZE */
-
-#define PAGENUM_2_SLOT(_pgnum) ((_pgnum) & hashMask)
-
 class Bdb;
 class Dbb;
 class PageWriter;
@@ -94,18 +83,14 @@ public:
 	bool		flushing;
 
 protected:
-	Bdb*		getFreeBuffer(void);
-	Bdb*		findBdb(Dbb* dbb, int32 pageNumber, int slot);
+	Bdb*		findBuffer (Dbb *dbb, int pageNumber, LockType lockType);
 	Bdb*		findBdb(Dbb* dbb, int32 pageNumber);
-	Bdb*		lockFindBdbIncrementUseCount(Dbb* dbb, int32 pageNumber);
-	Bdb*		lockFindBdbIncrementUseCount(int32 pageNumber, int slot);
 
 	int64		flushArg;
 	Bdb			*bdbs;
 	Bdb			*endBdbs;
 	Queue<Bdb>	bufferQueue;
 	Bdb			**hashTable;
-	SyncObject  *syncHashTable;
 	Bdb			*firstDirty;
 	Bdb			*lastDirty;
 	Bitmap		*flushBitmap;
@@ -120,13 +105,12 @@ protected:
 	int			flushPages;
 	int			physicalWrites;
 	int			hashSize;
-	unsigned int	hashMask;
 	int			pageSize;
-	unsigned int upperFraction;
+	int			upperFraction;
 	int			numberHunks;
 	int			numberDirtyPages;
 	int			numberIoThreads;
-	volatile uint64 bufferAge;
+	volatile int bufferAge;
 public:
 	void flushWait(void);
 };

=== modified file 'storage/falcon/DenseArray.h'
--- a/storage/falcon/DenseArray.h	2008-08-18 05:45:29 +0000
+++ b/storage/falcon/DenseArray.h	2008-08-22 06:47:40 +0000
@@ -47,35 +47,17 @@ public:
 		if (newLength < length)
 			return;
 			
-		T *newVector = new T[newLength];
-		T *oldVector = vector;
-		memcpy((void*) newVector, (void*) vector, length * sizeof(T));
-		memset((void*) (newVector + length), 0, (newLength - length) * sizeof(T));
-		vector = newVector;
-		int oldLength = length;
-		length = newLength;
-		memset((void*) oldVector, 0xbc, oldLength * sizeof(T));
-		delete [] oldVector;
-		
-		/**		
 		T *oldVector = vector;
 		vector = new T[newLength];
 		memcpy((void*) vector, (void*) oldVector, length * sizeof(T));
 		memset((void*) (vector + length), 0, (newLength - length) * sizeof(T));
 		length = newLength;
-		**/
 		};
 	
 	void zap ()
 		{
 		memset((void*) vector, 0, length * sizeof(T));
 		};
-		
-	void zap (uint n)
-		{
-		if (n < length)
-			memset(vector + n, 0, sizeof(T));
-		}
 	
 	T get (uint index)
 		{

=== modified file 'storage/falcon/StorageTable.cpp'
--- a/storage/falcon/StorageTable.cpp	2008-08-19 03:33:01 +0000
+++ b/storage/falcon/StorageTable.cpp	2008-08-22 06:47:40 +0000
@@ -96,6 +96,8 @@ int StorageTable::deleteTable(void)
 int StorageTable::truncateTable(void)
 {
 	clearRecord();
+	clearCurrentIndex();
+	
 	int ret = share->truncateTable(storageConnection);
 	return ret;
 }
@@ -139,9 +141,9 @@ int StorageTable::updateRow(int recordNu
 	return 0;
 }
 
-int StorageTable::createIndex(StorageIndexDesc *indexDesc, int indexCount, const char *sql)
+int StorageTable::createIndex(StorageIndexDesc *indexDesc, const char *sql)
 {
-	return share->createIndex(storageConnection, indexDesc, indexCount, sql);
+	return share->createIndex(storageConnection, indexDesc, sql);
 }
 
 int StorageTable::dropIndex(StorageIndexDesc *indexDesc, const char *sql)
@@ -217,9 +219,9 @@ int StorageTable::clearCurrentIndex()
 	return 0;
 }
 
-int StorageTable::setIndex(int indexCount, StorageIndexDesc* indexDesc)
+int StorageTable::setIndex(StorageIndexDesc* indexDesc)
 {
-	return share->setIndex(indexCount, indexDesc);
+	return share->setIndex(indexDesc);
 }
 
 int StorageTable::indexScan(int indexOrder)

=== modified file 'storage/falcon/StorageTable.h'
--- a/storage/falcon/StorageTable.h	2008-08-18 05:45:29 +0000
+++ b/storage/falcon/StorageTable.h	2008-08-22 06:47:40 +0000
@@ -78,7 +78,7 @@ public:
 	virtual int		indexScan(int indexOrder);
 	virtual int		setCurrentIndex(int indexId);
 	virtual int		clearCurrentIndex();
-	virtual int		setIndex(int indexCount, StorageIndexDesc* indexDesc);
+	virtual int		setIndex(StorageIndexDesc* indexDesc);
 	virtual void	indexEnd(void);
 	virtual int		setIndexBound(const unsigned char* key, int keyLength, int which);
 	virtual int		storeBlob(StorageBlob* blob);
@@ -94,7 +94,7 @@ public:
 	virtual int		fetch(int recordNumber, bool lockForUpdate);
 	
 	virtual int		updateRow(int recordNumber);
-	virtual int		createIndex(StorageIndexDesc *indexDesc, int indexCount, const char *sql);
+	virtual int		createIndex(StorageIndexDesc *indexDesc, const char *sql);
 	virtual int		dropIndex(StorageIndexDesc *indexDesc, const char *sql);
 	virtual const unsigned char* getEncoding(int fieldIndex);
 	virtual const char*			 getName(void);

=== modified file 'storage/falcon/StorageTableShare.cpp'
--- a/storage/falcon/StorageTableShare.cpp	2008-08-19 03:33:01 +0000
+++ b/storage/falcon/StorageTableShare.cpp	2008-08-22 06:47:40 +0000
@@ -49,6 +49,43 @@ static const char *DB_ROOT				= ".fts";
 static const char THIS_FILE[]=__FILE__;
 #endif
 
+StorageIndexDesc::StorageIndexDesc()
+{
+	id = 0;
+	unique = 0;
+	primaryKey = 0;
+	numberSegments = 0;
+	index = NULL;
+	segmentRecordCounts = NULL;
+	next = NULL;
+	name[0] = '\0';
+	rawName[0] = '\0';
+};
+
+StorageIndexDesc::StorageIndexDesc(const StorageIndexDesc *indexInfo)
+{
+	if (indexInfo)
+		*this = *indexInfo;
+	else
+		{
+		id = 0;
+		unique = 0;
+		primaryKey = 0;
+		numberSegments = 0;
+		segmentRecordCounts = NULL;
+		name[0] = '\0';
+		rawName[0] = '\0';
+		}
+		
+	index = NULL;
+	next = NULL;
+	prev = NULL;
+};
+
+StorageIndexDesc::~StorageIndexDesc(void)
+{
+}
+
 //////////////////////////////////////////////////////////////////////
 // Construction/Destruction
 //////////////////////////////////////////////////////////////////////
@@ -68,7 +105,7 @@ StorageTableShare::StorageTableShare(Sto
 	sequence = NULL;
 	tempTable = tempTbl;
 	setPath(path);
-	numberIndexes = 0;
+	indexes = NULL;
 
 	if (tempTable)
 		tableSpace = TEMPORARY_TABLESPACE;
@@ -87,21 +124,21 @@ StorageTableShare::~StorageTableShare(vo
 	if (storageDatabase)
 		storageDatabase->release();
 		
-	for (uint n = 0; n < indexes.length; n++)
-		if (indexes.vector[n])
-			delete indexes.get(n);
+	for (StorageIndexDesc *indexDesc; (indexDesc = indexes);)
+		{
+		indexes = indexDesc->next;
+		delete indexDesc;
+		}
 }
 
 void StorageTableShare::lock(bool exclusiveLock)
 {
-	//syncObject->lock(NULL, (exclusiveLock) ? Exclusive : Shared);
-	syncIndexes->lock(NULL, (exclusiveLock) ? Exclusive : Shared);
+	syncObject->lock(NULL, (exclusiveLock) ? Exclusive : Shared);
 }
 
 void StorageTableShare::unlock(void)
 {
-	//syncObject->unlock();
-	syncIndexes->unlock();
+	syncObject->unlock();
 }
 
 void StorageTableShare::lockIndexes(bool exclusiveLock)
@@ -256,12 +293,12 @@ char* StorageTableShare::createIndexName
 	return indexName;
 }
 
-int StorageTableShare::createIndex(StorageConnection *storageConnection, StorageIndexDesc *indexDesc, int indexCount, const char *sql)
+int StorageTableShare::createIndex(StorageConnection *storageConnection, StorageIndexDesc *indexDesc, const char *sql)
 {
 	if (!table)
 		open();
 
-	// Always get syncIndexes before syncObject
+	// Lock out other clients before locking the table
 	
 	Sync syncIndex(syncIndexes, "StorageTableShare::createIndex(1)");
 	syncIndex.lock(Exclusive);
@@ -272,17 +309,50 @@ int StorageTableShare::createIndex(Stora
 	int ret = storageDatabase->createIndex(storageConnection, table, sql);
 	
 	if (!ret)
-		ret = setIndex(indexCount, indexDesc);
+		ret = setIndex(indexDesc);
 		
 	return ret;
 }
 
+void StorageTableShare::addIndex(StorageIndexDesc *indexDesc)
+{
+	if (!getIndex(indexDesc->id))
+		{
+		if (indexes)
+			{
+			indexDesc->next = indexes;
+			indexDesc->prev = NULL;
+			indexes->prev = indexDesc;
+			}
+		
+		indexes = indexDesc;
+		}
+}
+
+void StorageTableShare::deleteIndex(int indexId)
+{
+	for (StorageIndexDesc *indexDesc = indexes; indexDesc; indexDesc = indexDesc->next)
+		if (indexDesc->id == indexId)
+			{
+			if (indexDesc->prev)
+				indexDesc->prev->next = indexDesc->next;
+			else
+				indexes = indexDesc->next;
+				
+			if (indexDesc->next)
+				indexDesc->next->prev = indexDesc->prev;
+				
+			delete indexDesc;	
+			break;
+			}
+}
+
 int StorageTableShare::dropIndex(StorageConnection *storageConnection, StorageIndexDesc *indexDesc, const char *sql)
 {
 	if (!table)
 		open();
 
-	// Always get syncIndexes before syncObject
+	// Lock out other clients before locking the table
 
 	Sync syncIndex(syncIndexes, "StorageTableShare::dropIndex(1)");
 	syncIndex.lock(Exclusive);
@@ -293,11 +363,20 @@ int StorageTableShare::dropIndex(Storage
 	int ret = storageDatabase->dropIndex(storageConnection, table, sql);
 	
 	if (!ret)
-		clearIndex(indexDesc);
+		deleteIndex(indexDesc->id);
 				
 	return ret;
 }
 
+void StorageTableShare::deleteIndexes()
+{
+	for (StorageIndexDesc *indexDesc; (indexDesc = indexes);)
+		{
+		indexes = indexDesc->next;
+		delete indexDesc;
+		}
+}
+
 int StorageTableShare::renameTable(StorageConnection *storageConnection, const char* newName)
 {
 	char tableName[256];
@@ -317,34 +396,14 @@ int StorageTableShare::renameTable(Stora
 	return ret;
 }
 
-void StorageTableShare::resizeIndexes(int indexCount)
+int StorageTableShare::setIndex(const StorageIndexDesc *indexInfo)
 {
-	if (indexCount <= 0)
-		return;
-	
-	if ((uint)indexCount > indexes.length)
-		indexes.extend(indexCount + 5);
-
-	numberIndexes = indexCount;
-}
+	int ret = 0;
 
-int StorageTableShare::setIndex(int indexCount, const StorageIndexDesc *indexInfo)
+	if (!getIndex(indexInfo->id))
 		{
-	int indexId = indexInfo->id;
-		
-	if ((uint)indexId >= indexes.length || numberIndexes < indexCount)
-		resizeIndexes(indexCount);
-		
-	// Allocate a new index if necessary
-	
-	StorageIndexDesc *indexDesc = indexes.get(indexId);
-	
-	if (!indexDesc)
-		indexes.vector[indexId] = indexDesc = new StorageIndexDesc(indexId);
-	
-	// Copy index description info
-	
-	*indexDesc = *indexInfo;
+		StorageIndexDesc *indexDesc = new StorageIndexDesc(indexInfo);
+		addIndex(indexDesc);
 
 	// Find the corresponding Falcon index
 	
@@ -353,94 +412,93 @@ int StorageTableShare::setIndex(int inde
 	else
 		{
 		char indexName[indexNameSize];
-		sprintf(indexName, "%s$%s", name.getString(), indexDesc->name.getString());
+			sprintf(indexName, "%s$%s", name.getString(), indexDesc->name);
 		indexDesc->index = table->findIndex(indexName);
 		}
 
-	int ret = 0;
-	
 	if (indexDesc->index)
 		indexDesc->segmentRecordCounts = indexDesc->index->recordsPerSegment;
 	else
 		ret = StorageErrorNoIndex;
-	
-	ASSERT((!ret ? validateIndexes() : true));
+		}
 		
 	return ret;
 }
 
-void StorageTableShare::clearIndex(StorageIndexDesc *indexDesc)
+StorageIndexDesc* StorageTableShare::getIndex(int indexId)
 {
-	if (numberIndexes > 0)
-		{
-		for (int n = indexDesc->id; n < numberIndexes-1; n++)
-			{
-			indexes.vector[n] = indexes.vector[n+1];
-			indexes.vector[n]->id = n; // assume that index id will match server
-			}
+	if (!indexes)
+		return NULL;
 			
-		indexes.zap(numberIndexes-1);
-		numberIndexes--;
-		}
+	for (StorageIndexDesc *indexDesc = indexes; indexDesc; indexDesc = indexDesc->next)
+		if (indexDesc->id == indexId)
+			return indexDesc;
 		
-	ASSERT(validateIndexes());
+	return NULL;
 }
 
-bool StorageTableShare::validateIndexes()
+StorageIndexDesc* StorageTableShare::getIndex(int indexId, StorageIndexDesc *indexDesc)
 {
-	for (int n = 0; n < numberIndexes; n++)
-		{
-		StorageIndexDesc *indexDesc = indexes.get(n);
-		if (indexDesc && indexDesc->id != n)
-			return false;
-		}
+	if (!indexes)
+		return NULL;
+	
+	Sync sync(syncIndexes, "StorageTableShare::getIndex");
+	sync.lock(Shared);
+	
+	StorageIndexDesc *index = getIndex(indexId);
 			
-	return true;
+	if (index)
+		*indexDesc = *index;
+		
+	return index;
 }
 
-// Assumes syncIndexes is locked
-
-StorageIndexDesc* StorageTableShare::getIndex(int indexId)
+StorageIndexDesc* StorageTableShare::getIndex(const char *name)
 {
-	if (!indexes.length || indexId >= numberIndexes)
+	if (!indexes)
 		return NULL;
 	
-	return indexes.get(indexId);
+	Sync sync(syncIndexes, "StorageTableShare::getIndex(name)");
+	sync.lock(Shared);
+	
+	for (StorageIndexDesc *indexDesc = indexes; indexDesc; indexDesc = indexDesc->next)
+		if (indexDesc->name == name)
+			return indexDesc;
+			
+	return NULL;
 }
 
-StorageIndexDesc* StorageTableShare::getIndex(int indexId, StorageIndexDesc *indexDesc)
+int StorageTableShare::getIndexId(const char* schemaName, const char* indexName)
 {
-	StorageIndexDesc *index;
+	if (!indexes)
+		return -1;
 	
-	if (!indexes.length || indexId >= numberIndexes)
-		index = NULL;
-	else
+	for (StorageIndexDesc *indexDesc = indexes; indexDesc; indexDesc = indexDesc->next)
 		{
-		Sync sync(syncIndexes, "StorageTableShare::getIndex");
-		sync.lock(Shared);
-	
-		index = indexes.get(indexId);
+		Index *index = indexDesc->index;
 	
 		if (index)
-			*indexDesc = *index;
+			if (strcmp(index->getIndexName(), indexName) == 0 &&
+				strcmp(index->getSchemaName(), schemaName) == 0)
+				return indexDesc->id;
 		}
 		
-	return index;
+	return -1;
 }
 
-StorageIndexDesc* StorageTableShare::getIndex(const char *name)
+int StorageTableShare::haveIndexes(int indexCount)
 {
-	Sync sync(syncIndexes, "StorageTableShare::getIndex(name)");
-	sync.lock(Shared);
+	if (!indexes)
+		return false;
 	
-	for (int i = 0; i < numberIndexes; i++)
+	int n = 0;
+	for (StorageIndexDesc *indexDesc = indexes; indexDesc; indexDesc = indexDesc->next, n++)
 		{
-		StorageIndexDesc *indexDesc = indexes.get(i);
-		if (indexDesc && indexDesc->name == name)
-			return indexDesc;
+		if (!indexDesc->index)
+			return false;
 		}
 
-	return NULL;
+	return (n == indexCount);
 }
 
 INT64 StorageTableShare::getSequenceValue(int delta)
@@ -466,45 +524,6 @@ int StorageTableShare::setSequenceValue(
 	return 0;
 }
 
-// Get index id using the internal (Falcon) index name
-
-int StorageTableShare::getIndexId(const char* schemaName, const char* indexName)
-{
-	if (indexes.length > 0)
-		for (int n = 0; n < numberIndexes; ++n)
-			{
-			Index *index = indexes.get(n)->index;
-			
-			if (strcmp(index->getIndexName(), indexName) == 0 &&
-				strcmp(index->getSchemaName(), schemaName) == 0)
-				return n;
-			}
-		
-	return -1;
-}
-
-int StorageTableShare::haveIndexes(int indexCount)
-{
-	if (indexes.length == 0)
-		return false;
-		
-	if (indexCount > numberIndexes)
-		return false;
-	
-	for (int n = 0; n < numberIndexes; ++n)
-		{
-		StorageIndexDesc* index = indexes.get(n);
-		
-		if (!index)
-			return false;
-			
-		if (index && !index->index)
-			return false;
-		}
-	
-	return true;
-}
-
 void StorageTableShare::setTablePath(const char* path, bool tmp)
 {
 	if (pathName.IsEmpty())

=== modified file 'storage/falcon/StorageTableShare.h'
--- a/storage/falcon/StorageTableShare.h	2008-08-18 05:45:29 +0000
+++ b/storage/falcon/StorageTableShare.h	2008-08-22 06:47:40 +0000
@@ -18,7 +18,6 @@
 
 #include "JString.h"
 #include "SyncObject.h"
-#include "DenseArray.h"
 
 #ifndef _WIN32
 #define __int64			long long
@@ -49,20 +48,25 @@ struct StorageSegment {
 	void			*mysql_charset;
 	};
 
+// StorageIndexDesc maps a server-side index to a Falcon index
 class StorageIndexDesc
 {
 public:
-	StorageIndexDesc(int indexId=0) : id (indexId), unique(0), primaryKey(0), numberSegments(0), /*name(NULL),*/ index(NULL), segmentRecordCounts(NULL){};
+	StorageIndexDesc();
+	StorageIndexDesc(const StorageIndexDesc *indexInfo);
+	virtual ~StorageIndexDesc(void);
 	
-	int			id;//cwp
+	int			id;
 	int			unique;
 	int			primaryKey;
 	int			numberSegments;
-	JString		name;			// clean name
-	JString		rawName;		// original name
+	char		name[indexNameSize];		// clean name
+	char		rawName[indexNameSize];		// original name
 	Index		*index;
 	uint64		*segmentRecordCounts;
 	StorageSegment segments[MaxIndexSegments];
+	StorageIndexDesc *next;
+	StorageIndexDesc *prev;
 	};
 
 
@@ -107,8 +111,9 @@ public:
 	virtual void		unlock(void);
 	virtual void		lockIndexes(bool exclusiveLock=false);
 	virtual void		unlockIndexes(void);
-	virtual int			createIndex(StorageConnection *storageConnection, StorageIndexDesc *indexDesc, int indexCount, const char *sql);
+	virtual int			createIndex(StorageConnection *storageConnection, StorageIndexDesc *indexDesc, const char *sql);
 	virtual int			dropIndex(StorageConnection *storageConnection, StorageIndexDesc *indexDesc, const char *sql);
+	virtual void		deleteIndexes();
 	virtual int			renameTable(StorageConnection *storageConnection, const char* newName);
 	virtual INT64		getSequenceValue(int delta);
 	virtual int			setSequenceValue(INT64 value);
@@ -118,10 +123,10 @@ public:
 	virtual void		registerCollation(const char* collationName, void* arg);
 
 	int					open(void);
-	void				resizeIndexes(int indexCount);
-	int					setIndex(int indexCount, const StorageIndexDesc* indexInfo);
+	void				addIndex(StorageIndexDesc *indexDesc);
+	void				deleteIndex(int indexId);
+	int					setIndex(const StorageIndexDesc* indexInfo);
 	void				clearIndex(StorageIndexDesc *indexDesc);
-	bool				validateIndexes();
 	StorageIndexDesc*	getIndex(int indexId);
 	StorageIndexDesc*	getIndex(int indexId, StorageIndexDesc *indexDesc);
 	StorageIndexDesc*	getIndex(const char *name);
@@ -159,10 +164,9 @@ public:
 	StorageDatabase		*storageDatabase;
 	StorageHandler		*storageHandler;
 	Table				*table;
-	DenseArray<StorageIndexDesc *,10> indexes;
+	StorageIndexDesc	*indexes;
 	Sequence			*sequence;
 	Format				*format;						// format for insertion
-	int					numberIndexes;
 	bool				tempTable;
 	int getFieldId(const char* fieldName);
 };

=== modified file 'storage/falcon/ha_falcon.cpp'
--- a/storage/falcon/ha_falcon.cpp	2008-08-19 14:27:42 +0000
+++ b/storage/falcon/ha_falcon.cpp	2008-08-22 06:47:40 +0000
@@ -869,11 +869,11 @@ int StorageInterface::add_index(TABLE* t
 int StorageInterface::createIndex(const char *schemaName, const char *tableName, TABLE *table, int indexId)
 {
 	KEY *key = table->key_info + indexId;
-	StorageIndexDesc indexDesc(indexId);
+	StorageIndexDesc indexDesc;
 	getKeyDesc(table, indexId, &indexDesc);
 	
 	char indexName[indexNameSize];
-	storageShare->createIndexName(indexDesc.name.getString(), indexName);
+	storageShare->createIndexName(indexDesc.name, indexName);
 
 	CmdGen gen;
 	const char *unique = (key->flags & HA_NOSAME) ? "unique " : "";
@@ -881,16 +881,16 @@ int StorageInterface::createIndex(const 
 	genKeyFields(key, &gen);
 	const char *sql = gen.getString();
 
-	return storageTable->createIndex(&indexDesc, table->s->keys, sql);
+	return storageTable->createIndex(&indexDesc, sql);
 }
 
 int StorageInterface::dropIndex(const char *schemaName, const char *tableName, TABLE *table, int indexId)
 {
-	StorageIndexDesc indexDesc(indexId);
+	StorageIndexDesc indexDesc;
 	getKeyDesc(table, indexId, &indexDesc);
 	
 	char indexName[indexNameSize];
-	storageShare->createIndexName(indexDesc.name.getString(), indexName);
+	storageShare->createIndexName(indexDesc.name, indexName);
 
 	CmdGen gen;
 	gen.gen("drop index %s.\"%s\"", schemaName, indexName);
@@ -940,6 +940,8 @@ THR_LOCK_DATA **StorageInterface::store_
 		    && !(thd_tablespace_op(thd))
 		    &&  (sql_command != SQLCOM_ALTER_TABLE)
 		    &&  (sql_command != SQLCOM_DROP_TABLE)
+		    &&  (sql_command != SQLCOM_CREATE_INDEX)
+		    &&  (sql_command != SQLCOM_DROP_INDEX)
 		    &&  (sql_command != SQLCOM_TRUNCATE)
 		    &&  (sql_command != SQLCOM_OPTIMIZE)
 		    &&  (sql_command != SQLCOM_CREATE_TABLE)
@@ -987,7 +989,10 @@ int StorageInterface::delete_table(const
 
 	if (storageShare)
 		{
-//		storageShare->lockIndexes(true);
+		
+		// Lock out other clients before locking the table
+		
+		storageShare->lockIndexes(true);
 		storageShare->lock(true);
 
 		if (storageShare->initialized)
@@ -998,7 +1003,7 @@ int StorageInterface::delete_table(const
 			}
 
 		storageShare->unlock();
-//		storageShare->unlockIndexes();
+		storageShare->unlockIndexes();
 		}
 
 	int res = storageTable->deleteTable();
@@ -1395,6 +1400,12 @@ int StorageInterface::index_init(uint id
 	int ret = storageTable->setCurrentIndex(idx);
 
 	if (ret)
+		{
+		setIndex(table, idx);
+		ret = storageTable->setCurrentIndex(idx);
+		}
+		
+	if (ret)
 		DBUG_RETURN(error(ret));
 
 	DBUG_RETURN(ret);
@@ -1458,9 +1469,16 @@ void StorageInterface::getKeyDesc(TABLE 
 	int numberKeys = keyInfo->key_parts;
 	char nameBuffer[indexNameSize];
 	
-	indexDesc->rawName		  = keyInfo->name;
+	// Clean up the index name for internal use
+	
+	strncpy(indexDesc->rawName, (const char*)keyInfo->name, MIN(indexNameSize, (int)strlen(keyInfo->name)+1));
 	storageShare->cleanupFieldName(indexDesc->rawName, nameBuffer, sizeof(nameBuffer));
-	indexDesc->name			  = nameBuffer;
+	indexDesc->rawName[indexNameSize-1] = '\0';
+	
+	strncpy(indexDesc->name, (const char*)nameBuffer, MIN(indexNameSize, (int)strlen(nameBuffer)+1));
+	indexDesc->name[indexNameSize-1] = '\0';
+
+	indexDesc->id			  = indexId;
 	indexDesc->numberSegments = numberKeys;
 	indexDesc->unique		  = (keyInfo->flags & HA_NOSAME);
 	indexDesc->primaryKey	  = (table->s->primary_key == (uint)indexId);
@@ -2232,6 +2250,11 @@ int StorageInterface::addIndex(THD* thd,
 	const char *tableName = storageTable->getName();
 	const char *schemaName = storageTable->getSchemaName();
 
+	// Lock out other clients before locking the table
+	
+	storageShare->lockIndexes(true);
+	storageShare->lock(true);
+
 	// Find indexes to be added by comparing table and alteredTable
 
 	for (unsigned int n = 0; n < alteredTable->s->keys; n++)
@@ -2248,11 +2271,19 @@ int StorageInterface::addIndex(THD* thd,
 					
 			if (tableKey >= tableEnd)
 				if ((ret = createIndex(schemaName, tableName, alteredTable, n)))
-					return (error(ret));
+					break;
 			}
 		}
 		
-	return 0;
+	// The server indexes may have been reordered, so remap to the Falcon indexes
+	
+	if (!ret)
+		remapIndexes(alteredTable);
+	
+	storageShare->unlock();
+	storageShare->unlockIndexes();
+	
+	return error(ret);
 }
 
 int StorageInterface::dropIndex(THD* thd, TABLE* alteredTable, HA_CREATE_INFO* createInfo, HA_ALTER_INFO* alterInfo, HA_ALTER_FLAGS* alterFlags)
@@ -2261,6 +2292,11 @@ int StorageInterface::dropIndex(THD* thd
 	const char *tableName = storageTable->getName();
 	const char *schemaName = storageTable->getSchemaName();
 	
+	// Lock out other clients before locking the table
+	
+	storageShare->lockIndexes(true);
+	storageShare->lock(true);
+	
 	// Find indexes to be dropped by comparing table and alteredTable
 	
 	for (unsigned int n = 0; n < table->s->keys; n++)
@@ -2277,11 +2313,19 @@ int StorageInterface::dropIndex(THD* thd
 
 			if (alterKey >= alterEnd)
 				if ((ret = dropIndex(schemaName, tableName, table, n)))
-				return (error(ret));
+					break;
 				}
 		}
 	
-	return 0;
+	// The server indexes have been reordered, so remap to the Falcon indexes
+	
+	if (!ret)
+		remapIndexes(alteredTable);
+	
+	storageShare->unlock();
+	storageShare->unlockIndexes();
+	
+	return error(ret);
 }
 
 uint StorageInterface::max_supported_key_length(void) const
@@ -2319,10 +2363,10 @@ void StorageInterface::logger(int mask, 
 
 int StorageInterface::setIndex(TABLE *table, int indexId)
 {
-	StorageIndexDesc indexDesc(indexId);
+	StorageIndexDesc indexDesc;
 	getKeyDesc(table, indexId, &indexDesc);
 
-	return storageTable->setIndex(table->s->keys, &indexDesc);
+	return storageTable->setIndex(&indexDesc);
 }
 
 int StorageInterface::setIndexes(void)
@@ -2344,6 +2388,22 @@ int StorageInterface::setIndexes(void)
 	return ret;
 }
 
+int StorageInterface::remapIndexes(TABLE *table)
+{
+	int ret = 0;
+	
+	if (!table)
+		return ret;
+		
+	storageShare->deleteIndexes();
+
+	for (uint n = 0; n < table->s->keys; ++n)
+		if ((ret = setIndex(table, n)))
+			break;
+
+	return ret;
+}
+
 int StorageInterface::genTable(TABLE* table, CmdGen* gen)
 {
 	const char *tableName = storageTable->getName();

=== modified file 'storage/falcon/ha_falcon.h'
--- a/storage/falcon/ha_falcon.h	2008-08-18 05:45:29 +0000
+++ b/storage/falcon/ha_falcon.h	2008-08-22 06:47:40 +0000
@@ -124,6 +124,7 @@ public:
 	void			freeActiveBlobs(void);
 	int				setIndex(TABLE *table, int indexId);
 	int				setIndexes(void);
+	int				remapIndexes(TABLE *table);
 	int				genTable(TABLE* table, CmdGen* gen);
 	int				genType(Field *field, CmdGen *gen);
 	void			genKeyFields(KEY *key, CmdGen *gen);

Thread
bzr push into mysql-6.0-falcon branch (cpowers:2793 to 2794) Bug#38041Christopher Powers22 Aug