#At file:///C:/Work/bzr/Merge/mysql-6.0-falcon-team/ based on revid:kevin.lewis@stripped
2701 Kevin Lewis 2009-04-30
Bug#44375 copntinued; This is the second part of the optimization of the CycleManager
for high concurrency on multiple CPUs. This patch spreads out the purgatory lists into
arrays or list heads and makes sure that each list head is on a separate cache line.
In addition, it hashes the thread ID to find a consistent slot to use instead of calling rand()
to distribute access to the arrays evenly. Since the threadID is not as evenly distributed as
rand(), the array is made larger, at 256 elements instead of 64. But this allows the same
thread to enter Falcon many times and keep using the same purgatories lists and Cycle
syncObject.
Also, in order to make the CycleManager more responsive to the need to free memory,
itwill now skip the mandatory 1 second sleep between cycles if there is work to be done
after the previous cycle. Then after a no-sleep cycle, if there is no work to be done the
next time, it will sleep less time than a full second. Probably too much algorithm, but it is
adaptive.
Also, the destructor now clears any uncleared purgatories, in case that were possible.
second
modified:
storage/falcon/CycleLock.cpp
storage/falcon/CycleLock.h
storage/falcon/CycleManager.cpp
storage/falcon/CycleManager.h
=== modified file 'storage/falcon/CycleLock.cpp'
--- a/storage/falcon/CycleLock.cpp 2009-04-21 06:09:11 +0000
+++ b/storage/falcon/CycleLock.cpp 2009-04-30 15:25:01 +0000
@@ -6,9 +6,9 @@
CycleLock::CycleLock(Database *database)
{
- cycleManager = database->cycleManager;
- syncObject = cycleManager->getSyncObject();
thread = Thread::getThread("CycleLock::CycleLock");
+ cycleManager = database->cycleManager;
+ syncObject = cycleManager->getSyncObject(thread->threadId);
// If there already is a cycle manager, let him worry about all this
@@ -61,7 +61,7 @@ void CycleLock::lockCycle(void)
chain->lockCycle();
else
{
- syncObject = cycleManager->getSyncObject();
+ syncObject = cycleManager->getSyncObject(thread->threadId);
syncObject->lock(NULL, Shared);
locked = true;
}
=== modified file 'storage/falcon/CycleLock.h'
--- a/storage/falcon/CycleLock.h 2009-03-20 19:33:52 +0000
+++ b/storage/falcon/CycleLock.h 2009-04-30 15:25:01 +0000
@@ -11,7 +11,7 @@ class CycleLock
public:
CycleLock(Database *database);
~CycleLock(void);
-
+
static CycleLock* unlock(void);
static bool isLocked(void);
=== modified file 'storage/falcon/CycleManager.cpp'
--- a/storage/falcon/CycleManager.cpp 2009-04-21 06:09:11 +0000
+++ b/storage/falcon/CycleManager.cpp 2009-04-30 15:25:01 +0000
@@ -29,7 +29,9 @@
#include "Value.h"
#include "Interlock.h"
-static const int CYCLE_SLEEP = 1000;
+static const int MAX_CYCLE_SLEEP = 1000;
+static const int MIN_CYCLE_SLEEP = 200;
+static const int CYCLE_SLEEP_INCREMENT = 200;
#ifdef _DEBUG
#undef THIS_FILE
@@ -40,35 +42,91 @@ CycleManager::CycleManager(Database *db)
{
database = db;
thread = NULL;
- recordPurgatory = NULL;
- recordVersionPurgatory = NULL;
- valuePurgatory = NULL;
- bufferPurgatory = NULL;
- cycle1 = new SyncObject* [syncArraySize];
- cycle2 = new SyncObject* [syncArraySize];
+ cycle1 = new SyncObject* [cycleArraySize];
+ cycle2 = new SyncObject* [cycleArraySize];
+ recordPurgatory = new RecordListHead* [cycleArraySize];
+ recordVersionPurgatory = new RecordVersionListHead* [cycleArraySize];
+ valuePurgatory = new ValueListHead* [cycleArraySize];
+ bufferPurgatory = new BufferListHead* [cycleArraySize];
+
+ cycleSleepTime = MAX_CYCLE_SLEEP;
currentCycle = cycle1;
- for (int i = 0; i < syncArraySize; i++)
+ for (int i = 0; i < cycleArraySize; i++)
{
cycle1[i] = NULL;
cycle2[i] = NULL;
+ recordPurgatory[i] = NULL;
+ recordVersionPurgatory[i] = NULL;
+ valuePurgatory[i] = NULL;
+ bufferPurgatory[i] = NULL;
}
}
CycleManager::~CycleManager(void)
{
- for (int i = 0; i < syncArraySize; i++)
+ for (int i = 0; i < cycleArraySize; i++)
{
if (cycle1[i] != NULL)
delete cycle1[i];
if (cycle2[i] != NULL)
delete cycle2[i];
+
+ if (recordPurgatory[i])
+ {
+ for (RecordList *recordList; (recordList = recordPurgatory[i]->top);)
+ {
+ recordPurgatory[i]->top = recordList->next;
+ recordList->zombie->release(REC_HISTORY);
+ delete recordList;
+ }
+ delete recordPurgatory[i];
+ recordPurgatory[i] = NULL;
+ }
+
+ if (recordVersionPurgatory[i])
+ {
+ for (RecordVersion *recordVersion; (recordVersion = recordVersionPurgatory[i]->top);)
+ {
+ recordVersionPurgatory[i]->top = recordVersion->nextInTrans;
+ recordVersion->release(REC_HISTORY);
+ }
+ delete recordVersionPurgatory[i];
+ recordVersionPurgatory[i] = NULL;
+ }
+
+ if (valuePurgatory[i])
+ {
+ for (ValueList *valueList; (valueList = valuePurgatory[i]->top);)
+ {
+ valuePurgatory[i]->top = valueList->next;
+ delete [] (Value*) valueList->zombie;
+ delete valueList;
+ }
+ delete valuePurgatory[i];
+ valuePurgatory[i] = NULL;
+ }
+
+ if (bufferPurgatory[i])
+ {
+ for (BufferList *bufferList; (bufferList = bufferPurgatory[i]->top);)
+ {
+ bufferPurgatory[i]->top = bufferList->next;
+ DELETE_RECORD (bufferList->zombie);
+ delete bufferList;
+ }
+ delete bufferPurgatory[i];
+ bufferPurgatory[i] = NULL;
+ }
}
delete [] cycle1;
delete [] cycle2;
-
+ delete [] recordPurgatory;
+ delete [] recordVersionPurgatory;
+ delete [] valuePurgatory;
+ delete [] bufferPurgatory;
}
void CycleManager::start(void)
@@ -93,58 +151,86 @@ void CycleManager::cycleManager(void)
while (!thread->shutdownInProgress)
{
- thread->sleep(CYCLE_SLEEP);
- RecordVersion *doomedRecordVersions;
- RecordList *doomedRecords;
- ValueList *doomedValues;
- BufferList *doomedBuffers;
-
- // Pick up detrius registered for delete during cycle
-
- if (recordVersionPurgatory)
- for (;;)
- {
- doomedRecordVersions = recordVersionPurgatory;
-
- if (COMPARE_EXCHANGE_POINTER(&recordVersionPurgatory, doomedRecordVersions, NULL))
- break;
- }
- else
- doomedRecordVersions = NULL;
-
- if (recordPurgatory)
- for (;;)
- {
- doomedRecords = recordPurgatory;
-
- if (COMPARE_EXCHANGE_POINTER(&recordPurgatory, doomedRecords, NULL))
- break;
- }
- else
- doomedRecords = NULL;
+ // Only sleep if there is nothing to do. Search for zombies.
+ // If found, lower the sleep time for nexxt time.
- if (valuePurgatory)
- for (;;)
- {
- doomedValues = valuePurgatory;
-
- if (COMPARE_EXCHANGE_POINTER(&valuePurgatory, doomedValues, NULL))
- break;
- }
- else
- doomedValues = NULL;
-
- if (bufferPurgatory)
- for (;;)
- {
- doomedBuffers = bufferPurgatory;
-
- if (COMPARE_EXCHANGE_POINTER(&bufferPurgatory, doomedBuffers, NULL))
- break;
- }
+ bool zombiesFound = false;
+ for (int i = 0; !zombiesFound && i < cycleArraySize; i++)
+ {
+ zombiesFound = (recordVersionPurgatory[i] ? (recordVersionPurgatory[i]->top != NULL) : false);
+ if (!zombiesFound)
+ zombiesFound = (recordPurgatory[i] ? (recordPurgatory[i]->top != NULL) : false);
+ if (!zombiesFound)
+ zombiesFound = (valuePurgatory[i] ? (valuePurgatory[i]->top != NULL) : false);
+ if (!zombiesFound)
+ zombiesFound = (bufferPurgatory[i] ? (bufferPurgatory[i]->top != NULL) : false);
+ }
+ if (zombiesFound)
+ cycleSleepTime = MIN_CYCLE_SLEEP;
else
- doomedBuffers = NULL;
+ {
+ if (cycleSleepTime < MAX_CYCLE_SLEEP)
+ cycleSleepTime += CYCLE_SLEEP_INCREMENT;
+
+ thread->sleep(cycleSleepTime);
+ }
+
+ // There are zombies to kill!
+
+ RecordVersion *doomedRecordVersions[cycleArraySize];
+ RecordList *doomedRecords[cycleArraySize];
+ ValueList *doomedValues[cycleArraySize];
+ BufferList *doomedBuffers[cycleArraySize];
+ // Disconnect all zombies currently on all purgatories
+
+ for (int i = 0; i < cycleArraySize; i++)
+ {
+ if (recordVersionPurgatory[i])
+ for (;;)
+ {
+ doomedRecordVersions[i] = recordVersionPurgatory[i]->top;
+
+ if (COMPARE_EXCHANGE_POINTER(&recordVersionPurgatory[i]->top, doomedRecordVersions[i], NULL))
+ break;
+ }
+ else
+ doomedRecordVersions[i] = NULL;
+
+ if (recordPurgatory[i])
+ for (;;)
+ {
+ doomedRecords[i] = recordPurgatory[i]->top;
+
+ if (COMPARE_EXCHANGE_POINTER(&recordPurgatory[i]->top, doomedRecords[i], NULL))
+ break;
+ }
+ else
+ doomedRecords[i] = NULL;
+
+ if (valuePurgatory[i])
+ for (;;)
+ {
+ doomedValues[i] = valuePurgatory[i]->top;
+
+ if (COMPARE_EXCHANGE_POINTER(&valuePurgatory[i]->top, doomedValues[i], NULL))
+ break;
+ }
+ else
+ doomedValues[i] = NULL;
+
+ if (bufferPurgatory[i])
+ for (;;)
+ {
+ doomedBuffers[i] = bufferPurgatory[i]->top;
+
+ if (COMPARE_EXCHANGE_POINTER(&bufferPurgatory[i]->top, doomedBuffers[i], NULL))
+ break;
+ }
+ else
+ doomedBuffers[i] = NULL;
+ }
+
// Swap cycle clocks to start next cycle
SyncObject **priorCycle = currentCycle;
@@ -153,7 +239,7 @@ void CycleManager::cycleManager(void)
// Wait for the previous cycle to complete by getting an exclusive
// lock on each of the allocated syncObjects in that cycle.
- for (int i = 0; i < syncArraySize; i++)
+ for (int i = 0; i < cycleArraySize; i++)
{
if (priorCycle[i] != NULL)
{
@@ -162,39 +248,44 @@ void CycleManager::cycleManager(void)
sync.unlock();
}
}
-
- for (RecordVersion *recordVersion; (recordVersion = doomedRecordVersions);)
- {
- doomedRecordVersions = recordVersion->nextInTrans;
- recordVersion->release(REC_HISTORY);
- }
- for (RecordList *recordList; (recordList = doomedRecords);)
- {
- doomedRecords = recordList->next;
- recordList->zombie->release(REC_HISTORY);
- delete recordList;
- }
+ // Zombies can now be deleted safely!
- for (ValueList *valueList; (valueList = doomedValues);)
+ for (int i = 0; i < cycleArraySize; i++)
{
- doomedValues = valueList->next;
- delete [] (Value*) valueList->zombie;
- delete valueList;
- }
+ for (RecordVersion *recordVersion; (recordVersion = doomedRecordVersions[i]);)
+ {
+ doomedRecordVersions[i] = recordVersion->nextInTrans;
+ recordVersion->release(REC_HISTORY);
+ }
- for (BufferList *bufferList; (bufferList = doomedBuffers);)
- {
- doomedBuffers = bufferList->next;
- DELETE_RECORD (bufferList->zombie);
- delete bufferList;
+ for (RecordList *recordList; (recordList = doomedRecords[i]);)
+ {
+ doomedRecords[i] = recordList->next;
+ recordList->zombie->release(REC_HISTORY);
+ delete recordList;
+ }
+
+ for (ValueList *valueList; (valueList = doomedValues[i]);)
+ {
+ doomedValues[i] = valueList->next;
+ delete [] (Value*) valueList->zombie;
+ delete valueList;
+ }
+
+ for (BufferList *bufferList; (bufferList = doomedBuffers[i]);)
+ {
+ doomedBuffers[i] = bufferList->next;
+ DELETE_RECORD (bufferList->zombie);
+ delete bufferList;
+ }
}
}
}
-SyncObject *CycleManager::getSyncObject(void)
+SyncObject *CycleManager::getSyncObject(unsigned long threadId)
{
- int slot = rand() & syncArrayMask;
+ int slot = threadId & cycleArrayMask;
SyncObject* syncObject = currentCycle[slot];
if (syncObject == NULL)
{
@@ -213,56 +304,131 @@ SyncObject *CycleManager::getSyncObject(
void CycleManager::queueForDelete(Record* zombie)
{
+ Thread* thread = Thread::getThread("CycleManager::queueForDelete(Record**)");
+ int slot = thread->threadId & cycleArrayMask;
+
if (zombie->isVersion())
{
+ // Establish the RecordVersion list head
+
+ RecordVersionListHead* recordVersionListHead = recordVersionPurgatory[slot];
+ if (recordVersionListHead == NULL)
+ {
+ recordVersionListHead = new RecordVersionListHead;
+ if (COMPARE_EXCHANGE_POINTER(&recordVersionPurgatory[slot], NULL, recordVersionListHead))
+ recordVersionListHead->top = NULL;
+ else // another thread beat us to the slot.
+ {
+ delete recordVersionListHead;
+ recordVersionListHead = recordVersionPurgatory[slot];
+ }
+ }
+
+ // Add the zombie to the head of the list.
+
RecordVersion *recordVersion = (RecordVersion*) zombie;
-
for (;;)
{
- recordVersion->nextInTrans = recordVersionPurgatory;
+ recordVersion->nextInTrans = recordVersionListHead->top;
- if (COMPARE_EXCHANGE_POINTER(&recordVersionPurgatory, recordVersion->nextInTrans, recordVersion))
+ if (COMPARE_EXCHANGE_POINTER(&recordVersionListHead->top, recordVersion->nextInTrans, recordVersion))
break;
}
}
else
{
+ // Establish the Record list head
+
+ RecordListHead* recordListHead = recordPurgatory[slot];
+ if (recordListHead == NULL)
+ {
+ recordListHead = new RecordListHead;
+ if (COMPARE_EXCHANGE_POINTER(&recordPurgatory[slot], NULL, recordListHead))
+ recordListHead->top = NULL;
+ else // another thread beat us to the slot.
+ {
+ delete recordListHead;
+ recordListHead = recordPurgatory[slot];
+ }
+ }
+
+ // Add the zombie to the head of the list.
+
RecordList *recordList = new RecordList;
recordList->zombie = zombie;
for (;;)
{
- recordList->next = recordPurgatory;
+ recordList->next = recordListHead->top;
- if (COMPARE_EXCHANGE_POINTER(&recordPurgatory, recordList->next, recordList))
+ if (COMPARE_EXCHANGE_POINTER(&recordListHead->top, recordList->next, recordList))
break;
}
}
}
void CycleManager::queueForDelete(Value** zombie)
{
+ Thread* thread = Thread::getThread("CycleManager::queueForDelete(Value**)");
+ int slot = thread->threadId & cycleArrayMask;
+
+ // Establish the value list head
+
+ ValueListHead* valueListHead = valuePurgatory[slot];
+ if (valueListHead == NULL)
+ {
+ valueListHead = new ValueListHead;
+ if (COMPARE_EXCHANGE_POINTER(&valuePurgatory[slot], NULL, valueListHead))
+ valueListHead->top = NULL;
+ else // another thread beat us to the slot.
+ {
+ delete valueListHead;
+ valueListHead = valuePurgatory[slot];
+ }
+ }
+
+ // Add the zombie to the head of the list.
+
ValueList *valueList = new ValueList;
valueList->zombie = zombie;
for (;;)
{
- valueList->next = valuePurgatory;
+ valueList->next = valueListHead->top;
- if (COMPARE_EXCHANGE_POINTER(&valuePurgatory, valueList->next, valueList))
+ if (COMPARE_EXCHANGE_POINTER(&valueListHead->top, valueList->next, valueList))
break;
}
}
void CycleManager::queueForDelete(char* zombie)
{
+ Thread* thread = Thread::getThread("CycleManager::queueForDelete(Char**)");
+ int slot = thread->threadId & cycleArrayMask;
+
+ // Establish the buffer list head
+
+ BufferListHead* bufferListHead = bufferPurgatory[slot];
+ if (bufferListHead == NULL)
+ {
+ bufferListHead = new BufferListHead;
+ if (COMPARE_EXCHANGE_POINTER(&bufferPurgatory[slot], NULL, bufferListHead))
+ bufferListHead->top = NULL;
+ else // another thread beat us to the slot.
+ {
+ delete bufferListHead;
+ bufferListHead = bufferPurgatory[slot];
+ }
+ }
+
+ // Add the zombie to the head of the list.
+
BufferList *bufferlist = new BufferList;
bufferlist->zombie = zombie;
-
for (;;)
{
- bufferlist->next = bufferPurgatory;
+ bufferlist->next = bufferListHead->top;
- if (COMPARE_EXCHANGE_POINTER(&bufferPurgatory, bufferlist->next, bufferlist))
+ if (COMPARE_EXCHANGE_POINTER(&bufferListHead->top, bufferlist->next, bufferlist))
break;
}
}
=== modified file 'storage/falcon/CycleManager.h'
--- a/storage/falcon/CycleManager.h 2009-04-21 06:09:11 +0000
+++ b/storage/falcon/CycleManager.h 2009-04-30 15:25:01 +0000
@@ -9,8 +9,9 @@ class Record;
class RecordVersion;
class Value;
-static const int syncArraySize = 64;
-static const int syncArrayMask = 63;
+static const int cycleArraySize = 256;
+static const int cycleArrayMask = 255;
+static const int cacheLineSpaceSize = 128 - sizeof(void *);
class CycleManager
{
@@ -31,7 +32,32 @@ class CycleManager
char *zombie;
BufferList *next;
};
-
+
+ // These list heads must be on separate L1 cache lines.
+ struct RecordListHead
+ {
+ RecordList *top;
+ char restOfCacheLine[cacheLineSpaceSize];
+ };
+
+ struct RecordVersionListHead
+ {
+ RecordVersion *top;
+ char restOfCacheLine[cacheLineSpaceSize];
+ };
+
+ struct ValueListHead
+ {
+ ValueList *top;
+ char restOfCacheLine[cacheLineSpaceSize];
+ };
+
+ struct BufferListHead
+ {
+ BufferList *top;
+ char restOfCacheLine[cacheLineSpaceSize];
+ };
+
public:
CycleManager(Database *database);
~CycleManager(void);
@@ -39,7 +65,7 @@ public:
void start(void);
void shutdown(void);
void cycleManager(void);
- SyncObject *getSyncObject(void);
+ SyncObject *getSyncObject(unsigned long threadId);
void queueForDelete(Record* zombie);
void queueForDelete(Value** zombie);
void queueForDelete(char* zombie);
@@ -49,10 +75,11 @@ public:
SyncObject **cycle1;
SyncObject **cycle2;
SyncObject **currentCycle;
- RecordVersion *recordVersionPurgatory;
- RecordList *recordPurgatory;
- ValueList *valuePurgatory;
- BufferList *bufferPurgatory;
+ int cycleSleepTime;
+ RecordVersionListHead **recordVersionPurgatory;
+ RecordListHead **recordPurgatory;
+ ValueListHead **valuePurgatory;
+ BufferListHead **bufferPurgatory;
Thread *thread;
Database *database;
};
Attachment: [text/bzr-bundle] bzr/kevin.lewis@sun.com-20090430152501-1l8qjiuv32gdu3ao.bundle
| Thread |
|---|
| • bzr commit into mysql-6.0-falcon-team branch (kevin.lewis:2701)Bug#44375 | Kevin Lewis | 30 Apr |