List:Commits« Previous MessageNext Message »
From:JimStarkeyjstarkey Date:June 11 2008 4:23pm
Subject:commit into mysql-6.0-falcon:mysql-6.0-falcon branch
(JimStarkeyjstarkey:2695)
View as plain text  
#At bzr+ssh://bk-internal.mysql.com/bzrroot/server/mysql-6.0-falcon/

 2695 Jim Starkey	2008-06-11
      Trial of new SyncObject implementation (likely to be abandoned, actually).
modified:
  storage/falcon/Event.cpp
  storage/falcon/PriorityScheduler.cpp
  storage/falcon/SyncObject.cpp
  storage/falcon/SyncObject.h
  storage/falcon/SyncTest.cpp
  storage/falcon/Thread.h

=== modified file 'storage/falcon/Event.cpp'
--- a/storage/falcon/Event.cpp	2007-09-20 15:44:25 +0000
+++ b/storage/falcon/Event.cpp	2008-06-11 18:24:29 +0000
@@ -45,7 +45,7 @@
 	Thread *thread = Thread::getThread("Event::wait");
 	Sync sync(&mutex, "Event::wait");
 	sync.lock(Exclusive);
-	thread->que = (Thread*) waiters;
+	thread->queue = (Thread*) waiters;
 	waiters = thread;
 	sync.unlock();
 
@@ -70,7 +70,7 @@
 		{
 		Sync sync(&mutex, "Event::post");
 		sync.lock(Exclusive);
-		for (Thread *waiter = (Thread*) waiters; waiter; waiter = waiter->que)
+		for (Thread *waiter = (Thread*) waiters; waiter; waiter = waiter->queue)
 			waiter->wake();
 		}
 }
@@ -94,10 +94,10 @@
 	Sync sync(&mutex, "Event::removeWaiter");
 	sync.lock(Exclusive);
 
-	for (Thread **ptr = (Thread**) &waiters; *ptr; ptr = &(*ptr)->que)
+	for (Thread **ptr = (Thread**) &waiters; *ptr; ptr = &(*ptr)->queue)
 		if (*ptr == thread)
 			{
-			*ptr = thread->que;
+			*ptr = thread->queue;
 			return;
 			}
 }

=== modified file 'storage/falcon/PriorityScheduler.cpp'
--- a/storage/falcon/PriorityScheduler.cpp	2007-11-02 22:11:54 +0000
+++ b/storage/falcon/PriorityScheduler.cpp	2008-06-11 18:24:29 +0000
@@ -51,7 +51,7 @@
 		}
 	
 	Thread *thread = Thread::getThread("PriorityScheduler::schedule");
-	thread->que = waitingThreads[priority];
+	thread->queue = waitingThreads[priority];
 	waitingThreads[priority] = thread;
 	thread->wakeupType = None;
 	sync.unlock();
@@ -83,7 +83,7 @@
 			for (Thread *thread; (thread = waitingThreads[currentPriority]);)
 				{
 				++count;
-				waitingThreads[currentPriority] = thread->que;
+				waitingThreads[currentPriority] = thread->queue;
 				thread->wakeupType = Exclusive;
 				thread->wake();
 				}

=== modified file 'storage/falcon/SyncObject.cpp'
--- a/storage/falcon/SyncObject.cpp	2008-05-14 22:31:23 +0000
+++ b/storage/falcon/SyncObject.cpp	2008-06-11 18:24:29 +0000
@@ -102,9 +102,10 @@
 
 SyncObject::SyncObject()
 {
+	readers = 0;
 	waiters = 0;
 	lockState = 0;
-	que = NULL;
+	queue = NULL;
 	monitorCount = 0;
 	stalls = 0;
 	exclusiveThread = NULL;
@@ -137,6 +138,174 @@
 #endif
 }
 
+#ifdef FAST_SHARED
+void SyncObject::lock(Sync *sync, LockType type, int timeout)
+{
+	Thread *thread;
+
+#ifdef TRACE_SYNC_OBJECTS
+	if (sync)
+		where = sync->where;
+#endif
+	
+	// Shared case
+	
+	if (type == Shared)
+		{
+		thread = NULL;
+		//BUMP_INTERLOCKED(sharedCount);
+		INTERLOCKED_INCREMENT(readers);
+		
+		// If there aren't any writers, we've got the lock.  Ducky.
+		
+		if (lockState == 0)
+			{
+			DEBUG_FREEZE;
+			
+			return;
+			}
+
+		// See if we have already have the lock, in which case bump the monitor and get going
+		
+		if (!thread)
+			thread = Thread::getThread("SyncObject::lock");
+
+		if (thread == exclusiveThread)
+			{
+			INTERLOCKED_DECREMENT(readers);
+			++monitorCount;
+			DEBUG_FREEZE;
+			
+			return;
+			}
+
+		// We have contention.  Get the mutex and prepare the wait, but
+		// maybe we'll get lucky
+					
+		mutex.lock();
+
+		if (lockState == 0)
+			{
+			DEBUG_FREEZE;
+			
+			return;
+			}
+
+		// If there isn't an exclusive thread or the exclusive thread is stalled, we've got the
lock
+		
+		if (!exclusiveThread || exclusiveThread == queue)
+			{
+			DEBUG_FREEZE;
+			
+			return;
+			}
+		
+		// There is an outstanding exclusive lock; wait
+		
+		INTERLOCKED_DECREMENT(readers);
+		bumpWaiters(1);
+		wait(type, thread, sync, timeout);
+		DEBUG_FREEZE;
+		
+		return;
+		}
+	
+	// Exclusive case
+	
+	thread = Thread::getThread("SyncObject::lock");
+	thread->backoff = BACKOFF_INTERVAL;
+	ASSERT(thread);
+
+	// If we're already the exclusive thread, just bump the monitor count and we're done
+	
+	if (thread == exclusiveThread)
+		{
+		++monitorCount;
+		BUMP(exclusiveCount);
+		DEBUG_FREEZE;
+		
+		return;
+		}
+
+	// If nothing is pending, go for the lock.
+	
+	while (readers == 0 && waiters == 0)
+		{
+		INTERLOCK_TYPE oldState = lockState;
+		
+		if (oldState != 0)
+			break;
+			
+		if (COMPARE_EXCHANGE(&lockState, oldState, -1))
+			{
+			exclusiveThread = thread;
+
+			if (readers)
+				{
+				mutex.lock();
+				
+				if (readers)
+					{
+					if (queue && queue->lockType == Shared)
+					BUMP(exclusiveCount);
+					bumpWaiters(1);
+					wait(type, thread, sync, timeout);
+					}
+				
+				}
+				
+			BUMP(exclusiveCount);
+			DEBUG_FREEZE;
+			
+			return; 
+			}
+		
+		BACKOFF;
+		}
+		
+	mutex.lock();
+	bumpWaiters(1);
+	BUMP(exclusiveCount);
+	
+	while (readers == 0 && queue == NULL)
+		{
+		INTERLOCK_TYPE oldState = lockState;
+		
+		if (oldState != 0)
+			break;
+			
+		if (COMPARE_EXCHANGE(&lockState, oldState, -1))
+			{
+			exclusiveThread = thread;
+
+			if (readers)
+				{
+				wait(type, thread, sync, timeout);
+				DEBUG_FREEZE;
+				
+				return;
+				}
+
+			bumpWaiters(-1);
+			mutex.release();
+			DEBUG_FREEZE;
+			
+			return;
+			}
+		
+		BACKOFF;
+		}
+
+	// mutex is held going into wait() It is released before coming out.
+
+	wait(type, thread, sync, timeout);
+	DEBUG_FREEZE;
+}
+
+#else	// FAST_SHARED
+
+// Old (aka working) version
+
 void SyncObject::lock(Sync *sync, LockType type, int timeout)
 {
 	Thread *thread;
@@ -245,7 +414,7 @@
 		bumpWaiters(1);
 		BUMP(exclusiveCount);
 		
-		while (que == NULL)
+		while (queue == NULL)
 			{
 			INTERLOCK_TYPE oldState = lockState;
 			
@@ -271,7 +440,57 @@
 	wait(type, thread, sync, timeout);
 	DEBUG_FREEZE;
 }
-
+#endif // FAST_SHARED
+
+#ifdef FAST_SHARED
+void SyncObject::unlock(Sync *sync, LockType type)
+{
+	if (monitorCount)
+		{
+		//ASSERT (monitorCount > 0);
+		--monitorCount;
+		DEBUG_FREEZE;
+
+		return;
+		}
+	
+	if (type == Shared)
+		{
+		ASSERT(readers > 0);
+		
+		if (INTERLOCKED_DECREMENT(readers) == 0 && waiters)
+			grantLocks();
+			
+		return;
+		}
+	
+	ASSERT(lockState == -1 && exclusiveThread != queue);
+	Thread *thread = NULL;
+	
+	for (;;)
+		{
+		//ASSERT (type == Exclusive && lockState == -1);
+		long oldState = lockState;
+		long newState = (type == Shared) ? oldState - 1 : 0;
+		exclusiveThread = NULL;
+		
+		if (COMPARE_EXCHANGE(&lockState, oldState, newState))
+			{
+			DEBUG_FREEZE;
+
+			if (waiters)
+				grantLocks();
+				
+			return;
+			}
+			
+		BACKOFF;
+		}
+
+	DEBUG_FREEZE;
+}
+
+#else // FAST_SHARED
 void SyncObject::unlock(Sync *sync, LockType type)
 {
 	//ASSERT(lockState != 0);
@@ -309,6 +528,31 @@
 
 	DEBUG_FREEZE;
 }
+#endif
+
+#ifdef FAST_SHARED
+
+void SyncObject::downGrade(LockType type)
+{
+	ASSERT (monitorCount == 0);
+	ASSERT (type == Shared);
+	ASSERT (lockState == -1);
+	INTERLOCKED_INCREMENT(readers);
+	
+	for (;;)
+		if (COMPARE_EXCHANGE(&lockState, -1, 0))
+			{
+			exclusiveThread = NULL;
+			DEBUG_FREEZE;
+			
+			if (waiters)
+				grantLocks();
+
+			return;
+			}
+}
+
+#else // FAST_SHARED
 
 void SyncObject::downGrade(LockType type)
 {
@@ -328,6 +572,7 @@
 			return;
 			}
 }
+#endif //FAST_SHARED
 
 void SyncObject::wait(LockType type, Thread *thread, Sync *sync, int timeout)
 {
@@ -343,7 +588,7 @@
 
 	Thread *volatile *ptr;
 	
-	for (ptr = &que; *ptr; ptr = &(*ptr)->que)
+	for (ptr = &queue; *ptr; ptr = &(*ptr)->queue)
 		{
 		BUMP(queueLength);
 		
@@ -351,7 +596,7 @@
 			{
 			LOG_DEBUG ("Apparent single thread deadlock for thread %d (%x)\n",
thread->threadId, thread);
 			
-			for (Thread *thread = que; thread; thread = thread->que)
+			for (Thread *thread = queue; thread; thread = thread->queue)
 				thread->print();
 				
 			mutex.release();
@@ -359,7 +604,7 @@
 			}
 		}
 
-	thread->que = NULL;
+	thread->queue = NULL;
 	thread->lockType = type;
 	*ptr = thread;
 	thread->lockGranted = false;
@@ -378,10 +623,10 @@
 				return;
 				}
 			
-			for (ptr = &que; *ptr; ptr = &(*ptr)->que)
+			for (ptr = &queue; *ptr; ptr = &(*ptr)->queue)
 				if (*ptr == thread)
 					{
-					*ptr = thread->que;
+					*ptr = thread->queue;
 					--waiters;
 					break;
 					}
@@ -393,7 +638,6 @@
 				}
 			}
 		
-		
 	while (!thread->lockGranted)
 		{
 		wokeup = thread->sleep (10000, &mutex);
@@ -455,10 +699,10 @@
 #ifdef TRACE
 	if (syncObjects.appendUnique(this))
 		{
-		if (exclusiveThread)
+		if (exclusiveThread && exclusiveThread != queue)
 			exclusiveThread->findLocks(threads, syncObjects);
 
-		for (Thread *thread = que; thread; thread = thread->que)
+		for (Thread *thread = queue; thread; thread = thread->queue)
 			thread->findLocks(threads, syncObjects);
 		}
 #endif
@@ -467,13 +711,13 @@
 void SyncObject::print()
 {
 #ifdef TRACE
-	LOG_DEBUG ("  SyncObject %lx: state %d, monitor %d, waiters %d\n", 
-				this, lockState, monitorCount, waiters);
+	LOG_DEBUG ("  SyncObject %lx: state %d, readers %d, monitor %d, waiters %d\n", 
+				this, lockState, readers, monitorCount, waiters);
 
 	if (exclusiveThread)
 		exclusiveThread->print ("    Exclusive thread");
 
-	for (Thread *volatile thread = que; thread; thread = thread->que)
+	for (Thread *volatile thread = queue; thread; thread = thread->queue)
 		thread->print ("    Waiting thread");
 #endif
 }
@@ -501,18 +745,88 @@
 			}
 }
 
-void SyncObject::grantLocks(void)
-{
-	mutex.lock();
-	ASSERT ((waiters && que) || (!waiters && !que));
-	const char *description = NULL;
-	Thread *thread = NULL;
-	
-	for (Thread *waiter = que, *prior = NULL, *next; waiter; waiter = next)
-		{
-		description = waiter->description;
-		bool granted = false;
-		next = waiter->que;
+#ifdef FAST_SHARED
+void SyncObject::grantLocks(void)
+{
+	mutex.lock();
+	ASSERT ((waiters && queue) || (!waiters && !queue));
+	const char *description = NULL;
+	Thread *thread = NULL;
+	
+	for (Thread *waiter = queue, *prior = NULL, *next; waiter; waiter = next)
+		{
+		description = waiter->description;
+		bool granted = false;
+		next = waiter->queue;
+				
+		if (waiter->lockType == Shared)
+			{
+			INTERLOCKED_INCREMENT(readers);
+			granted = true;
+			}
+		else
+			{
+			ASSERT(waiter->lockType == Exclusive);
+			
+			if (exclusiveThread == waiter)
+				{
+				ASSERT(lockState == -1);
+				granted = true;
+				}
+			else
+				while (lockState == 0)
+					{
+					if (COMPARE_EXCHANGE(&lockState, 0, -1))
+						{
+						granted = true;
+						exclusiveThread = waiter;
+						break;
+						}
+					
+					BACKOFF;
+					}
+			}
+		
+		if (granted)
+			{
+			if (prior)
+				prior->queue = next;
+			else
+				queue = next;
+			
+			bool shutdownInProgress = waiter->shutdownInProgress;
+			
+			if (shutdownInProgress)
+				Thread::lockExitMutex();
+					
+			bumpWaiters(-1);
+			--waiter->activeLocks;
+			waiter->grantLock (this);
+			
+			if (shutdownInProgress)
+				Thread::unlockExitMutex();
+			}
+		else
+			prior = waiter;
+		}
+			
+	mutex.release();
+}
+
+#else // FAST_SHARED
+
+void SyncObject::grantLocks(void)
+{
+	mutex.lock();
+	ASSERT ((waiters && queue) || (!waiters && !queue));
+	const char *description = NULL;
+	Thread *thread = NULL;
+	
+	for (Thread *waiter = queue, *prior = NULL, *next; waiter; waiter = next)
+		{
+		description = waiter->description;
+		bool granted = false;
+		next = waiter->queue;
 				
 		if (waiter->lockType == Shared)
 			for (int oldState; (oldState = lockState) >= 0;)
@@ -548,9 +862,9 @@
 		if (granted)
 			{
 			if (prior)
-				prior->que = next;
+				prior->queue = next;
 			else
-				que = next;
+				queue = next;
 			
 			bool shutdownInProgress = waiter->shutdownInProgress;
 			
@@ -570,12 +884,38 @@
 			
 	mutex.release();
 }
+#endif // FAST_SHARED
 
 int SyncObject::getState(void)
 {
 	return lockState;
 }
 
+#ifdef FAST_SHARED
+
+void SyncObject::validate(LockType lockType)
+{
+	switch (lockType)
+		{
+		case None:
+			ASSERT (lockState == 0 && readers == 0);
+			break;
+		
+		case Shared:
+			ASSERT (readers > 0 && !(exclusiveThread && exclusiveThread !=
queue));
+			break;
+		
+		case Exclusive:
+			ASSERT (lockState == -1 && (readers == 0 || queue != NULL));
+			break;
+		
+		case Invalid:
+			break;
+		}
+}
+
+#else
+
 void SyncObject::validate(LockType lockType)
 {
 	switch (lockType)
@@ -596,6 +936,20 @@
 			break;
 		}
 }
+#endif // FAST_SHARED
+
+#ifdef FAST_SHARED
+void SyncObject::unlock(void)
+{
+	if (exclusiveThread && exclusiveThread != queue)
+		unlock(NULL, Exclusive);
+	else if (readers > 0)
+		unlock (NULL, Shared);
+	else
+		ASSERT(false);
+}
+
+#else //FAST_SHARED
 
 void SyncObject::unlock(void)
 {
@@ -606,6 +960,7 @@
 	else
 		ASSERT(false);
 }
+#endif //FAST_SHARED
 
 bool SyncObject::ourExclusiveLock(void)
 {
@@ -617,7 +972,7 @@
 
 void SyncObject::frequentStaller(Thread *thread, Sync *sync)
 {
-	Thread *threadQue = thread->que;
+	Thread *threadQue = thread->queue;
 	LockType lockType = thread->lockType;
 	bool lockGranted = thread->lockGranted;
 	Sync *lockPending = thread->lockPending;
@@ -627,7 +982,7 @@
 	else
 		LOG_DEBUG("Frequent stall from unknown\n");
 		
-	thread->que = threadQue;
+	thread->queue = threadQue;
 	thread->lockType = lockType;
 	thread->lockGranted = lockGranted;
 	thread->lockPending = lockPending;

=== modified file 'storage/falcon/SyncObject.h'
--- a/storage/falcon/SyncObject.h	2008-05-14 19:49:03 +0000
+++ b/storage/falcon/SyncObject.h	2008-06-11 18:24:29 +0000
@@ -41,6 +41,8 @@
 #include <thread.h>
 #endif
 
+//#define FAST_SHARED
+
 #include "Mutex.h"
 
 #define TRACE_SYNC_OBJECTS
@@ -86,19 +88,20 @@
 	static void		getSyncInfo(InfoTable* infoTable);
 	static void		dump(void);
 
-	inline Thread* getExclusiveThread()
+	inline Thread*	getExclusiveThread()
 		{ return exclusiveThread; };
 
 protected:
-	void wait(LockType type, Thread *thread, Sync *sync, int timeout);
+	void			wait(LockType type, Thread *thread, Sync *sync, int timeout);
 
-	int32				monitorCount;
-	Mutex				mutex;
-	Thread				*volatile que;
-	Thread				*volatile exclusiveThread;
-	volatile INTERLOCK_TYPE		waiters;
-	volatile INTERLOCK_TYPE		lockState;
-	int					stalls;
+	int32					monitorCount;
+	Mutex					mutex;
+	Thread					*volatile queue;
+	Thread					*volatile exclusiveThread;
+	volatile INTERLOCK_TYPE	readers;
+	volatile INTERLOCK_TYPE	waiters;
+	volatile INTERLOCK_TYPE	lockState;
+	int						stalls;
 
 #ifdef TRACE_SYNC_OBJECTS
 	int					objectId;

=== modified file 'storage/falcon/SyncTest.cpp'
--- a/storage/falcon/SyncTest.cpp	2008-05-14 22:31:23 +0000
+++ b/storage/falcon/SyncTest.cpp	2008-06-11 18:24:29 +0000
@@ -78,7 +78,7 @@
 			}
 			
 		sync.unlock();
-		Thread::sleep(5000);
+		Thread::sleep(1000);
 		stop = true;
 		threadBarn->waitForAll();
 		int total = 0;

=== modified file 'storage/falcon/Thread.h'
--- a/storage/falcon/Thread.h	2008-05-14 19:49:03 +0000
+++ b/storage/falcon/Thread.h	2008-06-11 18:24:29 +0000
@@ -92,7 +92,7 @@
 	Threads			*threadBarn;
 	Thread			*next;				// next thread in "thread barn"
 	Thread			*prior;				// next thread in "thread barn"
-	Thread			*que;				// next thread in wait que (see SyncObject)
+	Thread			*queue;				// next thread in wait que (see SyncObject)
 	Thread			*srlQueue;			// serial log queue
 	LockType		lockType;			// requested lock type (see SyncObject)
 	LockType		wakeupType;			// used by SerialLog::flush

Thread
commit into mysql-6.0-falcon:mysql-6.0-falcon branch(JimStarkeyjstarkey:2695) JimStarkeyjstarkey11 Jun