List:Commits« Previous MessageNext Message »
From:Kevin Lewis Date:April 30 2009 3:25pm
Subject:bzr commit into mysql-6.0-falcon-team branch (kevin.lewis:2701)
Bug#44375
View as plain text  
#At file:///C:/Work/bzr/Merge/mysql-6.0-falcon-team/ based on revid:kevin.lewis@stripped

 2701 Kevin Lewis	2009-04-30
      Bug#44375 copntinued;  This is the second part of the optimization of the CycleManager 
      for high concurrency on multiple CPUs.  This patch spreads out the purgatory lists into 
      arrays or list heads and makes sure that each list head is on a separate cache line.
      
      In addition, it hashes the thread ID to find a consistent slot to use instead of calling rand() 
      to distribute access to the arrays evenly.  Since the threadID is not as evenly distributed as
      rand(), the array is made larger, at 256 elements instead of 64.  But this allows the same 
      thread to enter Falcon many times and keep using the same purgatories lists and Cycle 
      syncObject.
      
      Also, in order to make the CycleManager more responsive to the need to free memory, 
      itwill now skip the mandatory 1 second sleep between cycles if there is work to be done 
      after the previous cycle.  Then after a no-sleep cycle, if there is no work to be done the 
      next time, it will sleep less time than a full second.   Probably too much algorithm, but it is 
      adaptive.
      
      Also, the destructor now clears any uncleared purgatories, in case that were possible.
      second

    modified:
      storage/falcon/CycleLock.cpp
      storage/falcon/CycleLock.h
      storage/falcon/CycleManager.cpp
      storage/falcon/CycleManager.h
=== modified file 'storage/falcon/CycleLock.cpp'
--- a/storage/falcon/CycleLock.cpp	2009-04-21 06:09:11 +0000
+++ b/storage/falcon/CycleLock.cpp	2009-04-30 15:25:01 +0000
@@ -6,9 +6,9 @@
 
 CycleLock::CycleLock(Database *database)
 {
-	cycleManager = database->cycleManager;
-	syncObject = cycleManager->getSyncObject();
 	thread = Thread::getThread("CycleLock::CycleLock");
+	cycleManager = database->cycleManager;
+	syncObject = cycleManager->getSyncObject(thread->threadId);
 
 	// If there already is a cycle manager, let him worry about all this
 
@@ -61,7 +61,7 @@ void CycleLock::lockCycle(void)
 		chain->lockCycle();
 	else
 		{
-		syncObject = cycleManager->getSyncObject();
+		syncObject = cycleManager->getSyncObject(thread->threadId);
 		syncObject->lock(NULL, Shared);
 		locked = true;
 		}

=== modified file 'storage/falcon/CycleLock.h'
--- a/storage/falcon/CycleLock.h	2009-03-20 19:33:52 +0000
+++ b/storage/falcon/CycleLock.h	2009-04-30 15:25:01 +0000
@@ -11,7 +11,7 @@ class CycleLock
 public:
 	CycleLock(Database *database);
 	~CycleLock(void);
-	
+
 	static CycleLock*	unlock(void);
 	static bool			isLocked(void);
 

=== modified file 'storage/falcon/CycleManager.cpp'
--- a/storage/falcon/CycleManager.cpp	2009-04-21 06:09:11 +0000
+++ b/storage/falcon/CycleManager.cpp	2009-04-30 15:25:01 +0000
@@ -29,7 +29,9 @@
 #include "Value.h"
 #include "Interlock.h"
 
-static const int CYCLE_SLEEP		= 1000;
+static const int MAX_CYCLE_SLEEP		= 1000;
+static const int MIN_CYCLE_SLEEP		= 200;
+static const int CYCLE_SLEEP_INCREMENT		= 200;
 
 #ifdef _DEBUG
 #undef THIS_FILE
@@ -40,35 +42,91 @@ CycleManager::CycleManager(Database *db)
 {
 	database = db;
 	thread = NULL;
-	recordPurgatory = NULL;
-	recordVersionPurgatory = NULL;
-	valuePurgatory = NULL;
-	bufferPurgatory = NULL;
 
-	cycle1 = new SyncObject* [syncArraySize];
-	cycle2 = new SyncObject* [syncArraySize];
+	cycle1 = new SyncObject* [cycleArraySize];
+	cycle2 = new SyncObject* [cycleArraySize];
+	recordPurgatory = new RecordListHead* [cycleArraySize];
+	recordVersionPurgatory = new RecordVersionListHead* [cycleArraySize];
+	valuePurgatory = new ValueListHead* [cycleArraySize];
+	bufferPurgatory = new BufferListHead* [cycleArraySize];
+
+	cycleSleepTime = MAX_CYCLE_SLEEP;
 	currentCycle = cycle1;
-	for (int i = 0; i < syncArraySize; i++)
+	for (int i = 0; i < cycleArraySize; i++)
 		{
 		cycle1[i] = NULL;
 		cycle2[i] = NULL;
+		recordPurgatory[i] = NULL;
+		recordVersionPurgatory[i] = NULL;
+		valuePurgatory[i] = NULL;
+		bufferPurgatory[i] = NULL;
 		}
 }
 
 CycleManager::~CycleManager(void)
 {
-	for (int i = 0; i < syncArraySize; i++)
+	for (int i = 0; i < cycleArraySize; i++)
 		{
 		if (cycle1[i] != NULL)
 			delete cycle1[i];
 
 		if (cycle2[i] != NULL)
 			delete cycle2[i];
+
+		if (recordPurgatory[i])
+			{
+			for (RecordList *recordList; (recordList = recordPurgatory[i]->top);)
+				{
+				recordPurgatory[i]->top = recordList->next;
+				recordList->zombie->release(REC_HISTORY);
+				delete recordList;
+				}
+			delete recordPurgatory[i];
+			recordPurgatory[i] = NULL;
+			}
+
+		if (recordVersionPurgatory[i])
+			{
+			for (RecordVersion *recordVersion; (recordVersion = recordVersionPurgatory[i]->top);)
+				{
+				recordVersionPurgatory[i]->top = recordVersion->nextInTrans;
+				recordVersion->release(REC_HISTORY);
+				}
+			delete recordVersionPurgatory[i];
+			recordVersionPurgatory[i] = NULL;
+			}
+
+		if (valuePurgatory[i])
+			{
+			for (ValueList *valueList; (valueList = valuePurgatory[i]->top);)
+				{
+				valuePurgatory[i]->top = valueList->next;
+				delete [] (Value*) valueList->zombie;
+				delete valueList;
+				}
+			delete valuePurgatory[i];
+			valuePurgatory[i] = NULL;
+			}
+
+		if (bufferPurgatory[i])
+			{
+			for (BufferList *bufferList; (bufferList = bufferPurgatory[i]->top);)
+				{
+				bufferPurgatory[i]->top = bufferList->next;
+				DELETE_RECORD (bufferList->zombie);
+				delete bufferList;
+				}
+			delete bufferPurgatory[i];
+			bufferPurgatory[i] = NULL;
+			}
 		}
 
 	delete [] cycle1;
 	delete [] cycle2;
-
+	delete [] recordPurgatory;
+	delete [] recordVersionPurgatory;
+	delete [] valuePurgatory;
+	delete [] bufferPurgatory;
 }
 
 void CycleManager::start(void)
@@ -93,58 +151,86 @@ void CycleManager::cycleManager(void)
 	
 	while (!thread->shutdownInProgress)
 		{
-		thread->sleep(CYCLE_SLEEP);
-		RecordVersion *doomedRecordVersions;
-		RecordList *doomedRecords;
-		ValueList *doomedValues;
-		BufferList *doomedBuffers;
-		
-		// Pick up detrius registered for delete during cycle
-		
-		if (recordVersionPurgatory)
-			for (;;)
-				{
-				doomedRecordVersions = recordVersionPurgatory;
-				
-				if (COMPARE_EXCHANGE_POINTER(&recordVersionPurgatory, doomedRecordVersions, NULL))
-					break;
-				}
-		else
-			doomedRecordVersions = NULL;
-		
-		if (recordPurgatory)
-			for (;;)
-				{
-				doomedRecords = recordPurgatory;
-				
-				if (COMPARE_EXCHANGE_POINTER(&recordPurgatory, doomedRecords, NULL))
-					break;
-				}
-		else
-			doomedRecords = NULL;
+		// Only sleep if there is nothing to do. Search for zombies.  
+		// If found, lower the sleep time for nexxt time.
 
-		if (valuePurgatory)
-			for (;;)
-				{
-				doomedValues = valuePurgatory;
-				
-				if (COMPARE_EXCHANGE_POINTER(&valuePurgatory, doomedValues, NULL))
-					break;
-				}
-		else
-			doomedValues = NULL;
-		
-		if (bufferPurgatory)
-			for (;;)
-				{
-				doomedBuffers = bufferPurgatory;
-				
-				if (COMPARE_EXCHANGE_POINTER(&bufferPurgatory, doomedBuffers, NULL))
-					break;
-				}
+		bool zombiesFound = false;
+		for (int i = 0; !zombiesFound && i < cycleArraySize; i++)
+			{
+			zombiesFound = (recordVersionPurgatory[i] ? (recordVersionPurgatory[i]->top != NULL) : false);
+			if (!zombiesFound)
+				zombiesFound = (recordPurgatory[i] ? (recordPurgatory[i]->top != NULL) : false);
+			if (!zombiesFound)
+				zombiesFound = (valuePurgatory[i] ? (valuePurgatory[i]->top != NULL) : false);
+			if (!zombiesFound)
+				zombiesFound = (bufferPurgatory[i] ? (bufferPurgatory[i]->top != NULL) : false);
+			}
+		if (zombiesFound)
+			cycleSleepTime = MIN_CYCLE_SLEEP;
 		else
-			doomedBuffers = NULL;
+			{
+			if (cycleSleepTime < MAX_CYCLE_SLEEP)
+				cycleSleepTime += CYCLE_SLEEP_INCREMENT;
+
+			thread->sleep(cycleSleepTime);
+			}
+
+		// There are zombies to kill!
+
+		RecordVersion *doomedRecordVersions[cycleArraySize];
+		RecordList *doomedRecords[cycleArraySize];
+		ValueList *doomedValues[cycleArraySize];
+		BufferList *doomedBuffers[cycleArraySize];
 		
+		// Disconnect all zombies currently on all purgatories
+
+		for (int i = 0; i < cycleArraySize; i++)
+			{
+			if (recordVersionPurgatory[i])
+				for (;;)
+					{
+					doomedRecordVersions[i] = recordVersionPurgatory[i]->top;
+					
+					if (COMPARE_EXCHANGE_POINTER(&recordVersionPurgatory[i]->top, doomedRecordVersions[i], NULL))
+						break;
+					}
+			else
+				doomedRecordVersions[i] = NULL;
+
+			if (recordPurgatory[i])
+				for (;;)
+					{
+					doomedRecords[i] = recordPurgatory[i]->top;
+					
+					if (COMPARE_EXCHANGE_POINTER(&recordPurgatory[i]->top, doomedRecords[i], NULL))
+						break;
+					}
+			else
+				doomedRecords[i] = NULL;
+
+			if (valuePurgatory[i])
+				for (;;)
+					{
+					doomedValues[i] = valuePurgatory[i]->top;
+					
+					if (COMPARE_EXCHANGE_POINTER(&valuePurgatory[i]->top, doomedValues[i], NULL))
+						break;
+					}
+			else
+				doomedValues[i] = NULL;
+
+			if (bufferPurgatory[i])
+				for (;;)
+					{
+					doomedBuffers[i] = bufferPurgatory[i]->top;
+					
+					if (COMPARE_EXCHANGE_POINTER(&bufferPurgatory[i]->top, doomedBuffers[i], NULL))
+						break;
+					}
+			else
+				doomedBuffers[i] = NULL;
+			}
+
 		// Swap cycle clocks to start next cycle
 		
 		SyncObject **priorCycle = currentCycle;
@@ -153,7 +239,7 @@ void CycleManager::cycleManager(void)
 		// Wait for the previous cycle to complete by getting an exclusive 
 		// lock on each of the allocated syncObjects in that cycle.
 		
-		for (int i = 0; i < syncArraySize; i++)
+		for (int i = 0; i < cycleArraySize; i++)
 			{
 			if (priorCycle[i] != NULL)
 				{
@@ -162,39 +248,44 @@ void CycleManager::cycleManager(void)
 				sync.unlock();
 				}
 			}
-		
-		for (RecordVersion *recordVersion; (recordVersion = doomedRecordVersions);)
-			{
-			doomedRecordVersions = recordVersion->nextInTrans;
-			recordVersion->release(REC_HISTORY);
-			}
 
-		for (RecordList *recordList; (recordList = doomedRecords);)
-			{
-			doomedRecords = recordList->next;
-			recordList->zombie->release(REC_HISTORY);
-			delete recordList;
-			}
+		// Zombies can now be deleted safely!
 
-		for (ValueList *valueList; (valueList = doomedValues);)
+		for (int i = 0; i < cycleArraySize; i++)
 			{
-			doomedValues = valueList->next;
-			delete [] (Value*) valueList->zombie;
-			delete valueList;
-			}
+			for (RecordVersion *recordVersion; (recordVersion = doomedRecordVersions[i]);)
+				{
+				doomedRecordVersions[i] = recordVersion->nextInTrans;
+				recordVersion->release(REC_HISTORY);
+				}
 
-		for (BufferList *bufferList; (bufferList = doomedBuffers);)
-			{
-			doomedBuffers = bufferList->next;
-			DELETE_RECORD (bufferList->zombie);
-			delete bufferList;
+			for (RecordList *recordList; (recordList = doomedRecords[i]);)
+				{
+				doomedRecords[i] = recordList->next;
+				recordList->zombie->release(REC_HISTORY);
+				delete recordList;
+				}
+
+			for (ValueList *valueList; (valueList = doomedValues[i]);)
+				{
+				doomedValues[i] = valueList->next;
+				delete [] (Value*) valueList->zombie;
+				delete valueList;
+				}
+
+			for (BufferList *bufferList; (bufferList = doomedBuffers[i]);)
+				{
+				doomedBuffers[i] = bufferList->next;
+				DELETE_RECORD (bufferList->zombie);
+				delete bufferList;
+				}
 			}
 		}
 }
 
-SyncObject *CycleManager::getSyncObject(void)
+SyncObject *CycleManager::getSyncObject(unsigned long threadId)
 {
-	int slot = rand() & syncArrayMask;
+	int slot = threadId & cycleArrayMask;
 	SyncObject* syncObject = currentCycle[slot];
 	if (syncObject == NULL)
 		{
@@ -213,56 +304,131 @@ SyncObject *CycleManager::getSyncObject(
 
 void CycleManager::queueForDelete(Record* zombie)
 {
+	Thread* thread = Thread::getThread("CycleManager::queueForDelete(Record**)");
+	int slot = thread->threadId & cycleArrayMask;
+
 	if (zombie->isVersion())
 		{
+		// Establish the RecordVersion list head
+
+		RecordVersionListHead* recordVersionListHead = recordVersionPurgatory[slot];
+		if (recordVersionListHead == NULL)
+			{
+			recordVersionListHead = new RecordVersionListHead;
+			if (COMPARE_EXCHANGE_POINTER(&recordVersionPurgatory[slot], NULL, recordVersionListHead))
+				recordVersionListHead->top = NULL;
+			else // another thread beat us to the slot.
+				{
+				delete recordVersionListHead;
+				recordVersionListHead = recordVersionPurgatory[slot];
+				}
+			}
+
+		// Add the zombie to the head of the list.
+
 		RecordVersion *recordVersion = (RecordVersion*) zombie;
-		
 		for (;;)
 			{
-			recordVersion->nextInTrans = recordVersionPurgatory;
+			recordVersion->nextInTrans = recordVersionListHead->top;
 			
-			if (COMPARE_EXCHANGE_POINTER(&recordVersionPurgatory, recordVersion->nextInTrans, recordVersion))
+			if (COMPARE_EXCHANGE_POINTER(&recordVersionListHead->top, recordVersion->nextInTrans, recordVersion))
 				break;
 			}
 		}
 	else
 		{
+		// Establish the Record list head
+
+		RecordListHead* recordListHead = recordPurgatory[slot];
+		if (recordListHead == NULL)
+			{
+			recordListHead = new RecordListHead;
+			if (COMPARE_EXCHANGE_POINTER(&recordPurgatory[slot], NULL, recordListHead))
+				recordListHead->top = NULL;
+			else // another thread beat us to the slot.
+				{
+				delete recordListHead;
+				recordListHead = recordPurgatory[slot];
+				}
+			}
+
+		// Add the zombie to the head of the list.
+
 		RecordList *recordList = new RecordList;
 		recordList->zombie = zombie;
 		
 		for (;;)
 			{
-			recordList->next = recordPurgatory;
+			recordList->next = recordListHead->top;
 			
-			if (COMPARE_EXCHANGE_POINTER(&recordPurgatory, recordList->next, recordList))
+			if (COMPARE_EXCHANGE_POINTER(&recordListHead->top, recordList->next, recordList))
 				break;
 			}
 		}
 }
 void CycleManager::queueForDelete(Value** zombie)
 {
+	Thread* thread = Thread::getThread("CycleManager::queueForDelete(Value**)");
+	int slot = thread->threadId & cycleArrayMask;
+
+	// Establish the value list head
+
+	ValueListHead* valueListHead = valuePurgatory[slot];
+	if (valueListHead == NULL)
+		{
+		valueListHead = new ValueListHead;
+		if (COMPARE_EXCHANGE_POINTER(&valuePurgatory[slot], NULL, valueListHead))
+			valueListHead->top = NULL;
+		else // another thread beat us to the slot.
+			{
+			delete valueListHead;
+			valueListHead = valuePurgatory[slot];
+			}
+		}
+
+	// Add the zombie to the head of the list.
+
 	ValueList *valueList = new ValueList;
 	valueList->zombie = zombie;
 
 	for (;;)
 		{
-		valueList->next = valuePurgatory;
+		valueList->next = valueListHead->top;
 		
-		if (COMPARE_EXCHANGE_POINTER(&valuePurgatory, valueList->next, valueList))
+		if (COMPARE_EXCHANGE_POINTER(&valueListHead->top, valueList->next, valueList))
 			break;
 		}
 }
 
 void CycleManager::queueForDelete(char* zombie)
 {
+	Thread* thread = Thread::getThread("CycleManager::queueForDelete(Char**)");
+	int slot = thread->threadId & cycleArrayMask;
+
+	// Establish the buffer list head
+
+	BufferListHead* bufferListHead = bufferPurgatory[slot];
+	if (bufferListHead == NULL)
+		{
+		bufferListHead = new BufferListHead;
+		if (COMPARE_EXCHANGE_POINTER(&bufferPurgatory[slot], NULL, bufferListHead))
+			bufferListHead->top = NULL;
+		else // another thread beat us to the slot.
+			{
+			delete bufferListHead;
+			bufferListHead = bufferPurgatory[slot];
+			}
+		}
+
+	// Add the zombie to the head of the list.
+
 	BufferList *bufferlist = new BufferList;
 	bufferlist->zombie = zombie;
-
 	for (;;)
 		{
-		bufferlist->next = bufferPurgatory;
+		bufferlist->next = bufferListHead->top;
 		
-		if (COMPARE_EXCHANGE_POINTER(&bufferPurgatory, bufferlist->next, bufferlist))
+		if (COMPARE_EXCHANGE_POINTER(&bufferListHead->top, bufferlist->next, bufferlist))
 			break;
 		}
 }

=== modified file 'storage/falcon/CycleManager.h'
--- a/storage/falcon/CycleManager.h	2009-04-21 06:09:11 +0000
+++ b/storage/falcon/CycleManager.h	2009-04-30 15:25:01 +0000
@@ -9,8 +9,9 @@ class Record;
 class RecordVersion;
 class Value;
 
-static const int syncArraySize = 64;
-static const int syncArrayMask = 63;
+static const int cycleArraySize = 256;
+static const int cycleArrayMask = 255;
+static const int cacheLineSpaceSize = 128 - sizeof(void *);
 
 class CycleManager
 {
@@ -31,7 +32,32 @@ class CycleManager
 		char		*zombie;
 		BufferList	*next;
 		};
-		
+
+	// These list heads must be on separate L1 cache lines.
+	struct RecordListHead
+		{
+		RecordList	*top;
+		char		restOfCacheLine[cacheLineSpaceSize];
+		};
+
+	struct RecordVersionListHead
+		{
+		RecordVersion	*top;
+		char		restOfCacheLine[cacheLineSpaceSize];
+		};
+
+	struct ValueListHead
+		{
+		ValueList	*top;
+		char		restOfCacheLine[cacheLineSpaceSize];
+		};
+
+	struct BufferListHead
+		{
+		BufferList	*top;
+		char		restOfCacheLine[cacheLineSpaceSize];
+		};
+
 public:
 	CycleManager(Database *database);
 	~CycleManager(void);
@@ -39,7 +65,7 @@ public:
 	void		start(void);
 	void		shutdown(void);
 	void		cycleManager(void);
-	SyncObject *getSyncObject(void);
+	SyncObject *getSyncObject(unsigned long threadId);
 	void		queueForDelete(Record* zombie);
 	void		queueForDelete(Value** zombie);
 	void		queueForDelete(char* zombie);
@@ -49,10 +75,11 @@ public:
 	SyncObject		**cycle1;
 	SyncObject		**cycle2;
 	SyncObject		**currentCycle;
-	RecordVersion	*recordVersionPurgatory;
-	RecordList		*recordPurgatory;
-	ValueList		*valuePurgatory;
-	BufferList		*bufferPurgatory;
+	int				cycleSleepTime;
+	RecordVersionListHead	**recordVersionPurgatory;
+	RecordListHead	**recordPurgatory;
+	ValueListHead	**valuePurgatory;
+	BufferListHead	**bufferPurgatory;
 	Thread			*thread;
 	Database		*database;
 };


Attachment: [text/bzr-bundle] bzr/kevin.lewis@sun.com-20090430152501-1l8qjiuv32gdu3ao.bundle
Thread
bzr commit into mysql-6.0-falcon-team branch (kevin.lewis:2701)Bug#44375Kevin Lewis30 Apr