#At bzr+ssh://bk-internal.mysql.com/bzrroot/server/mysql-6.0-falcon/
2695 Jim Starkey 2008-06-11
Trial of new SyncObject implementation (likely to be abandoned, actually).
modified:
storage/falcon/Event.cpp
storage/falcon/PriorityScheduler.cpp
storage/falcon/SyncObject.cpp
storage/falcon/SyncObject.h
storage/falcon/SyncTest.cpp
storage/falcon/Thread.h
=== modified file 'storage/falcon/Event.cpp'
--- a/storage/falcon/Event.cpp 2007-09-20 15:44:25 +0000
+++ b/storage/falcon/Event.cpp 2008-06-11 18:24:29 +0000
@@ -45,7 +45,7 @@
Thread *thread = Thread::getThread("Event::wait");
Sync sync(&mutex, "Event::wait");
sync.lock(Exclusive);
- thread->que = (Thread*) waiters;
+ thread->queue = (Thread*) waiters;
waiters = thread;
sync.unlock();
@@ -70,7 +70,7 @@
{
Sync sync(&mutex, "Event::post");
sync.lock(Exclusive);
- for (Thread *waiter = (Thread*) waiters; waiter; waiter = waiter->que)
+ for (Thread *waiter = (Thread*) waiters; waiter; waiter = waiter->queue)
waiter->wake();
}
}
@@ -94,10 +94,10 @@
Sync sync(&mutex, "Event::removeWaiter");
sync.lock(Exclusive);
- for (Thread **ptr = (Thread**) &waiters; *ptr; ptr = &(*ptr)->que)
+ for (Thread **ptr = (Thread**) &waiters; *ptr; ptr = &(*ptr)->queue)
if (*ptr == thread)
{
- *ptr = thread->que;
+ *ptr = thread->queue;
return;
}
}
=== modified file 'storage/falcon/PriorityScheduler.cpp'
--- a/storage/falcon/PriorityScheduler.cpp 2007-11-02 22:11:54 +0000
+++ b/storage/falcon/PriorityScheduler.cpp 2008-06-11 18:24:29 +0000
@@ -51,7 +51,7 @@
}
Thread *thread = Thread::getThread("PriorityScheduler::schedule");
- thread->que = waitingThreads[priority];
+ thread->queue = waitingThreads[priority];
waitingThreads[priority] = thread;
thread->wakeupType = None;
sync.unlock();
@@ -83,7 +83,7 @@
for (Thread *thread; (thread = waitingThreads[currentPriority]);)
{
++count;
- waitingThreads[currentPriority] = thread->que;
+ waitingThreads[currentPriority] = thread->queue;
thread->wakeupType = Exclusive;
thread->wake();
}
=== modified file 'storage/falcon/SyncObject.cpp'
--- a/storage/falcon/SyncObject.cpp 2008-05-14 22:31:23 +0000
+++ b/storage/falcon/SyncObject.cpp 2008-06-11 18:24:29 +0000
@@ -102,9 +102,10 @@
SyncObject::SyncObject()
{
+ readers = 0;
waiters = 0;
lockState = 0;
- que = NULL;
+ queue = NULL;
monitorCount = 0;
stalls = 0;
exclusiveThread = NULL;
@@ -137,6 +138,174 @@
#endif
}
+#ifdef FAST_SHARED
+void SyncObject::lock(Sync *sync, LockType type, int timeout)
+{
+ Thread *thread;
+
+#ifdef TRACE_SYNC_OBJECTS
+ if (sync)
+ where = sync->where;
+#endif
+
+ // Shared case
+
+ if (type == Shared)
+ {
+ thread = NULL;
+ //BUMP_INTERLOCKED(sharedCount);
+ INTERLOCKED_INCREMENT(readers);
+
+ // If there aren't any writers, we've got the lock. Ducky.
+
+ if (lockState == 0)
+ {
+ DEBUG_FREEZE;
+
+ return;
+ }
+
+ // See if we have already have the lock, in which case bump the monitor and get going
+
+ if (!thread)
+ thread = Thread::getThread("SyncObject::lock");
+
+ if (thread == exclusiveThread)
+ {
+ INTERLOCKED_DECREMENT(readers);
+ ++monitorCount;
+ DEBUG_FREEZE;
+
+ return;
+ }
+
+ // We have contention. Get the mutex and prepare the wait, but
+ // maybe we'll get lucky
+
+ mutex.lock();
+
+ if (lockState == 0)
+ {
+ DEBUG_FREEZE;
+
+ return;
+ }
+
+ // If there isn't an exclusive thread or the exclusive thread is stalled, we've got the
lock
+
+ if (!exclusiveThread || exclusiveThread == queue)
+ {
+ DEBUG_FREEZE;
+
+ return;
+ }
+
+ // There is an outstanding exclusive lock; wait
+
+ INTERLOCKED_DECREMENT(readers);
+ bumpWaiters(1);
+ wait(type, thread, sync, timeout);
+ DEBUG_FREEZE;
+
+ return;
+ }
+
+ // Exclusive case
+
+ thread = Thread::getThread("SyncObject::lock");
+ thread->backoff = BACKOFF_INTERVAL;
+ ASSERT(thread);
+
+ // If we're already the exclusive thread, just bump the monitor count and we're done
+
+ if (thread == exclusiveThread)
+ {
+ ++monitorCount;
+ BUMP(exclusiveCount);
+ DEBUG_FREEZE;
+
+ return;
+ }
+
+ // If nothing is pending, go for the lock.
+
+ while (readers == 0 && waiters == 0)
+ {
+ INTERLOCK_TYPE oldState = lockState;
+
+ if (oldState != 0)
+ break;
+
+ if (COMPARE_EXCHANGE(&lockState, oldState, -1))
+ {
+ exclusiveThread = thread;
+
+ if (readers)
+ {
+ mutex.lock();
+
+ if (readers)
+ {
+ if (queue && queue->lockType == Shared)
+ BUMP(exclusiveCount);
+ bumpWaiters(1);
+ wait(type, thread, sync, timeout);
+ }
+
+ }
+
+ BUMP(exclusiveCount);
+ DEBUG_FREEZE;
+
+ return;
+ }
+
+ BACKOFF;
+ }
+
+ mutex.lock();
+ bumpWaiters(1);
+ BUMP(exclusiveCount);
+
+ while (readers == 0 && queue == NULL)
+ {
+ INTERLOCK_TYPE oldState = lockState;
+
+ if (oldState != 0)
+ break;
+
+ if (COMPARE_EXCHANGE(&lockState, oldState, -1))
+ {
+ exclusiveThread = thread;
+
+ if (readers)
+ {
+ wait(type, thread, sync, timeout);
+ DEBUG_FREEZE;
+
+ return;
+ }
+
+ bumpWaiters(-1);
+ mutex.release();
+ DEBUG_FREEZE;
+
+ return;
+ }
+
+ BACKOFF;
+ }
+
+ // mutex is held going into wait() It is released before coming out.
+
+ wait(type, thread, sync, timeout);
+ DEBUG_FREEZE;
+}
+
+#else // FAST_SHARED
+
+// Old (aka working) version
+
void SyncObject::lock(Sync *sync, LockType type, int timeout)
{
Thread *thread;
@@ -245,7 +414,7 @@
bumpWaiters(1);
BUMP(exclusiveCount);
- while (que == NULL)
+ while (queue == NULL)
{
INTERLOCK_TYPE oldState = lockState;
@@ -271,7 +440,57 @@
wait(type, thread, sync, timeout);
DEBUG_FREEZE;
}
-
+#endif // FAST_SHARED
+
+#ifdef FAST_SHARED
+void SyncObject::unlock(Sync *sync, LockType type)
+{
+ if (monitorCount)
+ {
+ //ASSERT (monitorCount > 0);
+ --monitorCount;
+ DEBUG_FREEZE;
+
+ return;
+ }
+
+ if (type == Shared)
+ {
+ ASSERT(readers > 0);
+
+ if (INTERLOCKED_DECREMENT(readers) == 0 && waiters)
+ grantLocks();
+
+ return;
+ }
+
+ ASSERT(lockState == -1 && exclusiveThread != queue);
+ Thread *thread = NULL;
+
+ for (;;)
+ {
+ //ASSERT (type == Exclusive && lockState == -1);
+ long oldState = lockState;
+ long newState = (type == Shared) ? oldState - 1 : 0;
+ exclusiveThread = NULL;
+
+ if (COMPARE_EXCHANGE(&lockState, oldState, newState))
+ {
+ DEBUG_FREEZE;
+
+ if (waiters)
+ grantLocks();
+
+ return;
+ }
+
+ BACKOFF;
+ }
+
+ DEBUG_FREEZE;
+}
+
+#else // FAST_SHARED
void SyncObject::unlock(Sync *sync, LockType type)
{
//ASSERT(lockState != 0);
@@ -309,6 +528,31 @@
DEBUG_FREEZE;
}
+#endif
+
+#ifdef FAST_SHARED
+
+void SyncObject::downGrade(LockType type)
+{
+ ASSERT (monitorCount == 0);
+ ASSERT (type == Shared);
+ ASSERT (lockState == -1);
+ INTERLOCKED_INCREMENT(readers);
+
+ for (;;)
+ if (COMPARE_EXCHANGE(&lockState, -1, 0))
+ {
+ exclusiveThread = NULL;
+ DEBUG_FREEZE;
+
+ if (waiters)
+ grantLocks();
+
+ return;
+ }
+}
+
+#else // FAST_SHARED
void SyncObject::downGrade(LockType type)
{
@@ -328,6 +572,7 @@
return;
}
}
+#endif //FAST_SHARED
void SyncObject::wait(LockType type, Thread *thread, Sync *sync, int timeout)
{
@@ -343,7 +588,7 @@
Thread *volatile *ptr;
- for (ptr = &que; *ptr; ptr = &(*ptr)->que)
+ for (ptr = &queue; *ptr; ptr = &(*ptr)->queue)
{
BUMP(queueLength);
@@ -351,7 +596,7 @@
{
LOG_DEBUG ("Apparent single thread deadlock for thread %d (%x)\n",
thread->threadId, thread);
- for (Thread *thread = que; thread; thread = thread->que)
+ for (Thread *thread = queue; thread; thread = thread->queue)
thread->print();
mutex.release();
@@ -359,7 +604,7 @@
}
}
- thread->que = NULL;
+ thread->queue = NULL;
thread->lockType = type;
*ptr = thread;
thread->lockGranted = false;
@@ -378,10 +623,10 @@
return;
}
- for (ptr = &que; *ptr; ptr = &(*ptr)->que)
+ for (ptr = &queue; *ptr; ptr = &(*ptr)->queue)
if (*ptr == thread)
{
- *ptr = thread->que;
+ *ptr = thread->queue;
--waiters;
break;
}
@@ -393,7 +638,6 @@
}
}
-
while (!thread->lockGranted)
{
wokeup = thread->sleep (10000, &mutex);
@@ -455,10 +699,10 @@
#ifdef TRACE
if (syncObjects.appendUnique(this))
{
- if (exclusiveThread)
+ if (exclusiveThread && exclusiveThread != queue)
exclusiveThread->findLocks(threads, syncObjects);
- for (Thread *thread = que; thread; thread = thread->que)
+ for (Thread *thread = queue; thread; thread = thread->queue)
thread->findLocks(threads, syncObjects);
}
#endif
@@ -467,13 +711,13 @@
void SyncObject::print()
{
#ifdef TRACE
- LOG_DEBUG (" SyncObject %lx: state %d, monitor %d, waiters %d\n",
- this, lockState, monitorCount, waiters);
+ LOG_DEBUG (" SyncObject %lx: state %d, readers %d, monitor %d, waiters %d\n",
+ this, lockState, readers, monitorCount, waiters);
if (exclusiveThread)
exclusiveThread->print (" Exclusive thread");
- for (Thread *volatile thread = que; thread; thread = thread->que)
+ for (Thread *volatile thread = queue; thread; thread = thread->queue)
thread->print (" Waiting thread");
#endif
}
@@ -501,18 +745,88 @@
}
}
-void SyncObject::grantLocks(void)
-{
- mutex.lock();
- ASSERT ((waiters && que) || (!waiters && !que));
- const char *description = NULL;
- Thread *thread = NULL;
-
- for (Thread *waiter = que, *prior = NULL, *next; waiter; waiter = next)
- {
- description = waiter->description;
- bool granted = false;
- next = waiter->que;
+#ifdef FAST_SHARED
+void SyncObject::grantLocks(void)
+{
+ mutex.lock();
+ ASSERT ((waiters && queue) || (!waiters && !queue));
+ const char *description = NULL;
+ Thread *thread = NULL;
+
+ for (Thread *waiter = queue, *prior = NULL, *next; waiter; waiter = next)
+ {
+ description = waiter->description;
+ bool granted = false;
+ next = waiter->queue;
+
+ if (waiter->lockType == Shared)
+ {
+ INTERLOCKED_INCREMENT(readers);
+ granted = true;
+ }
+ else
+ {
+ ASSERT(waiter->lockType == Exclusive);
+
+ if (exclusiveThread == waiter)
+ {
+ ASSERT(lockState == -1);
+ granted = true;
+ }
+ else
+ while (lockState == 0)
+ {
+ if (COMPARE_EXCHANGE(&lockState, 0, -1))
+ {
+ granted = true;
+ exclusiveThread = waiter;
+ break;
+ }
+
+ BACKOFF;
+ }
+ }
+
+ if (granted)
+ {
+ if (prior)
+ prior->queue = next;
+ else
+ queue = next;
+
+ bool shutdownInProgress = waiter->shutdownInProgress;
+
+ if (shutdownInProgress)
+ Thread::lockExitMutex();
+
+ bumpWaiters(-1);
+ --waiter->activeLocks;
+ waiter->grantLock (this);
+
+ if (shutdownInProgress)
+ Thread::unlockExitMutex();
+ }
+ else
+ prior = waiter;
+ }
+
+ mutex.release();
+}
+
+#else // FAST_SHARED
+
+void SyncObject::grantLocks(void)
+{
+ mutex.lock();
+ ASSERT ((waiters && queue) || (!waiters && !queue));
+ const char *description = NULL;
+ Thread *thread = NULL;
+
+ for (Thread *waiter = queue, *prior = NULL, *next; waiter; waiter = next)
+ {
+ description = waiter->description;
+ bool granted = false;
+ next = waiter->queue;
if (waiter->lockType == Shared)
for (int oldState; (oldState = lockState) >= 0;)
@@ -548,9 +862,9 @@
if (granted)
{
if (prior)
- prior->que = next;
+ prior->queue = next;
else
- que = next;
+ queue = next;
bool shutdownInProgress = waiter->shutdownInProgress;
@@ -570,12 +884,38 @@
mutex.release();
}
+#endif // FAST_SHARED
int SyncObject::getState(void)
{
return lockState;
}
+#ifdef FAST_SHARED
+
+void SyncObject::validate(LockType lockType)
+{
+ switch (lockType)
+ {
+ case None:
+ ASSERT (lockState == 0 && readers == 0);
+ break;
+
+ case Shared:
+ ASSERT (readers > 0 && !(exclusiveThread && exclusiveThread !=
queue));
+ break;
+
+ case Exclusive:
+ ASSERT (lockState == -1 && (readers == 0 || queue != NULL));
+ break;
+
+ case Invalid:
+ break;
+ }
+}
+
+#else
+
void SyncObject::validate(LockType lockType)
{
switch (lockType)
@@ -596,6 +936,20 @@
break;
}
}
+#endif // FAST_SHARED
+
+#ifdef FAST_SHARED
+void SyncObject::unlock(void)
+{
+ if (exclusiveThread && exclusiveThread != queue)
+ unlock(NULL, Exclusive);
+ else if (readers > 0)
+ unlock (NULL, Shared);
+ else
+ ASSERT(false);
+}
+
+#else //FAST_SHARED
void SyncObject::unlock(void)
{
@@ -606,6 +960,7 @@
else
ASSERT(false);
}
+#endif //FAST_SHARED
bool SyncObject::ourExclusiveLock(void)
{
@@ -617,7 +972,7 @@
void SyncObject::frequentStaller(Thread *thread, Sync *sync)
{
- Thread *threadQue = thread->que;
+ Thread *threadQue = thread->queue;
LockType lockType = thread->lockType;
bool lockGranted = thread->lockGranted;
Sync *lockPending = thread->lockPending;
@@ -627,7 +982,7 @@
else
LOG_DEBUG("Frequent stall from unknown\n");
- thread->que = threadQue;
+ thread->queue = threadQue;
thread->lockType = lockType;
thread->lockGranted = lockGranted;
thread->lockPending = lockPending;
=== modified file 'storage/falcon/SyncObject.h'
--- a/storage/falcon/SyncObject.h 2008-05-14 19:49:03 +0000
+++ b/storage/falcon/SyncObject.h 2008-06-11 18:24:29 +0000
@@ -41,6 +41,8 @@
#include <thread.h>
#endif
+//#define FAST_SHARED
+
#include "Mutex.h"
#define TRACE_SYNC_OBJECTS
@@ -86,19 +88,20 @@
static void getSyncInfo(InfoTable* infoTable);
static void dump(void);
- inline Thread* getExclusiveThread()
+ inline Thread* getExclusiveThread()
{ return exclusiveThread; };
protected:
- void wait(LockType type, Thread *thread, Sync *sync, int timeout);
+ void wait(LockType type, Thread *thread, Sync *sync, int timeout);
- int32 monitorCount;
- Mutex mutex;
- Thread *volatile que;
- Thread *volatile exclusiveThread;
- volatile INTERLOCK_TYPE waiters;
- volatile INTERLOCK_TYPE lockState;
- int stalls;
+ int32 monitorCount;
+ Mutex mutex;
+ Thread *volatile queue;
+ Thread *volatile exclusiveThread;
+ volatile INTERLOCK_TYPE readers;
+ volatile INTERLOCK_TYPE waiters;
+ volatile INTERLOCK_TYPE lockState;
+ int stalls;
#ifdef TRACE_SYNC_OBJECTS
int objectId;
=== modified file 'storage/falcon/SyncTest.cpp'
--- a/storage/falcon/SyncTest.cpp 2008-05-14 22:31:23 +0000
+++ b/storage/falcon/SyncTest.cpp 2008-06-11 18:24:29 +0000
@@ -78,7 +78,7 @@
}
sync.unlock();
- Thread::sleep(5000);
+ Thread::sleep(1000);
stop = true;
threadBarn->waitForAll();
int total = 0;
=== modified file 'storage/falcon/Thread.h'
--- a/storage/falcon/Thread.h 2008-05-14 19:49:03 +0000
+++ b/storage/falcon/Thread.h 2008-06-11 18:24:29 +0000
@@ -92,7 +92,7 @@
Threads *threadBarn;
Thread *next; // next thread in "thread barn"
Thread *prior; // next thread in "thread barn"
- Thread *que; // next thread in wait que (see SyncObject)
+ Thread *queue; // next thread in wait que (see SyncObject)
Thread *srlQueue; // serial log queue
LockType lockType; // requested lock type (see SyncObject)
LockType wakeupType; // used by SerialLog::flush
| Thread |
|---|
| • commit into mysql-6.0-falcon:mysql-6.0-falcon branch(JimStarkeyjstarkey:2695) | JimStarkeyjstarkey | 11 Jun |