List:Commits« Previous MessageNext Message »
From:kpettersson Date:February 15 2007 1:08pm
Subject:bk commit into 5.0 tree (Kristofer.Pettersson:1.2412) BUG#25042
View as plain text  
Below is the list of changes that have just been committed into a local
5.0 repository of Kristofer Pettersson. When Kristofer Pettersson does a push these changes will
be propagated to the main repository and, within 24 hours after the
push, to the public repository.
For information on how to access the public repository
see http://dev.mysql.com/doc/mysql/en/installing-source-tree.html

ChangeSet@stripped, 2007-02-15 14:08:21+01:00, Kristofer.Pettersson@naruto. +2 -0
  Bug#25042 OPTIMIZE TABLE cause race condition in IO CACHE SHARE
  
  - The condition variable implementation "lost" a signal to
    WaitOnSingleObject when a semaphore was released.
  - The signal could be consumed by a new call to pthread_cond_wait
    before all waiting threads had awoken.
  - The new implementation of pthread_cond_* uses events
    instead of semaphores. It also uses an extra lock to protect entry
    into new cond wait before the broadcast has finished.

  include/my_pthread.h@stripped, 2007-02-15 14:08:17+01:00, Kristofer.Pettersson@naruto. +11 -5
    - New implementatin of pthread_cond_init. This version uses events
      instead of semaphores

  mysys/my_wincond.c@stripped, 2007-02-15 14:08:17+01:00, Kristofer.Pettersson@naruto. +111 -38
    - New implementatin of pthread_cond_init. This version uses events
      instead of semaphores

# This is a BitKeeper patch.  What follows are the unified diffs for the
# set of deltas contained in the patch.  The rest of the patch, the part
# that BitKeeper cares about, is below these diffs.
# User:	Kristofer.Pettersson
# Host:	naruto.
# Root:	C:/cpp/bug25042/my50-bug25042

--- 1.98/include/my_pthread.h	2007-02-15 14:08:44 +01:00
+++ 1.99/include/my_pthread.h	2007-02-15 14:08:44 +01:00
@@ -80,11 +80,17 @@ typedef struct st_pthread_link {
 
 typedef struct {
   uint32 waiting;
-#ifdef OS2
-  HEV    semaphore;
-#else
-  HANDLE semaphore;
-#endif
+  CRITICAL_SECTION lock_waiting;
+ 
+  enum {
+    SIGNAL= 0,
+    BROADCAST= 1,
+    MAX_EVENTS= 2
+  } EVENTS;
+
+  HANDLE events[MAX_EVENTS];
+  HANDLE broadcast_block_event;
+
 } pthread_cond_t;
 
 typedef int pthread_mutexattr_t;

--- 1.9/mysys/my_wincond.c	2007-02-15 14:08:44 +01:00
+++ 1.10/mysys/my_wincond.c	2007-02-15 14:08:44 +01:00
@@ -27,27 +27,48 @@
 
 int pthread_cond_init(pthread_cond_t *cond, const pthread_condattr_t *attr)
 {
-  cond->waiting=0;
-  cond->semaphore=CreateSemaphore(NULL,0,0x7FFFFFFF,NullS);
-  if (!cond->semaphore)
+  cond->waiting= 0;
+  InitializeCriticalSection(&cond->lock_waiting);
+    
+  cond->events[SIGNAL]= CreateEvent(NULL,  /* no security */
+                                    FALSE, /* auto-reset event */
+                                    FALSE, /* non-signaled initially */
+                                    NULL); /* unnamed */
+
+  /* Create a manual-reset event. */
+  cond->events[BROADCAST]= CreateEvent(NULL,  /* no security */
+                                       TRUE,  /* manual-reset */
+                                       FALSE, /* non-signaled initially */
+                                       NULL); /* unnamed */
+
+
+  cond->broadcast_block_event= CreateEvent(NULL,  /* no security */
+                                           TRUE,  /* manual-reset */
+                                           TRUE,  /* signaled initially */
+                                           NULL); /* unnamed */
+  
+  if( cond->events[SIGNAL] == NULL ||
+      cond->events[BROADCAST] == NULL ||
+      cond->broadcast_block_event == NULL )
     return ENOMEM;
   return 0;
 }
 
 int pthread_cond_destroy(pthread_cond_t *cond)
 {
-  return CloseHandle(cond->semaphore) ? 0 : EINVAL;
+  DeleteCriticalSection(&cond->lock_waiting);
+
+  if (CloseHandle(cond->events[SIGNAL]) == 0 ||
+      CloseHandle(cond->events[BROADCAST]) == 0 ||
+      CloseHandle(cond->broadcast_block_event) == 0)
+    return EINVAL;
+  return 0;
 }
 
 
 int pthread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex)
 {
-  InterlockedIncrement(&cond->waiting);
-  LeaveCriticalSection(mutex);
-  WaitForSingleObject(cond->semaphore,INFINITE);
-  InterlockedDecrement(&cond->waiting);
-  EnterCriticalSection(mutex);
-  return 0 ;
+  return pthread_cond_timedwait(cond,mutex,NULL);
 }
 
 
@@ -57,52 +78,104 @@ int pthread_cond_timedwait(pthread_cond_
   int result;
   long timeout; 
   union ft64 now;
-  
-  GetSystemTimeAsFileTime(&now.ft);
 
-  /*
-    Calculate time left to abstime
-    - subtract start time from current time(values are in 100ns units)
-    - convert to millisec by dividing with 10000
+  if( abstime != NULL )
+  {
+    GetSystemTimeAsFileTime(&now.ft);
+
+    /*
+      Calculate time left to abstime
+      - subtract start time from current time(values are in 100ns units)
+      - convert to millisec by dividing with 10000
+    */
+    timeout= (long)((abstime->tv.i64 - now.i64) / 10000);
+    
+    /* Don't allow the timeout to be negative */
+    if (timeout < 0)
+      timeout= 0L;
+
+    /*
+      Make sure the calucated timeout does not exceed original timeout
+      value which could cause "wait for ever" if system time changes
+    */
+    if (timeout > abstime->max_timeout_msec)
+      timeout= abstime->max_timeout_msec;
+
+  }
+  else
+  {
+    /* No time specified; don't expire */
+    timeout= INFINITE;
+  }
+
+  /* 
+    Block access if previous broadcast hasn't finished.
+    This is just for safety and should normally not
+    affect the total time spent in this function.
   */
-  timeout= (long)((abstime->tv.i64 - now.i64) / 10000);
-  
-  /* Don't allow the timeout to be negative */
-  if (timeout < 0)
-    timeout= 0L;
+  WaitForSingleObject(cond->broadcast_block_event, INFINITE);
 
-  /*
-    Make sure the calucated timeout does not exceed original timeout
-    value which could cause "wait for ever" if system time changes
-  */
-  if (timeout > abstime->max_timeout_msec)
-    timeout= abstime->max_timeout_msec;
+  EnterCriticalSection(&cond->lock_waiting);
+  cond->waiting++;
+  LeaveCriticalSection(&cond->lock_waiting);
 
-  InterlockedIncrement(&cond->waiting);
   LeaveCriticalSection(mutex);
-  result= WaitForSingleObject(cond->semaphore,timeout);
-  InterlockedDecrement(&cond->waiting);
+
+  result= WaitForMultipleObjects(2, cond->events, FALSE, timeout);
+  
+  EnterCriticalSection(&cond->lock_waiting);
+  cond->waiting--;
+  
+  if (cond->waiting == 0 && result == (WAIT_OBJECT_0+BROADCAST))
+  {
+    /*
+      We're the last waiter to be notified or to stop waiting, so
+      reset the manual event. 
+    */
+    /* Close broadcast gate */
+    ResetEvent(cond->events[BROADCAST]);
+    /* Open block gate */
+    SetEvent(cond->broadcast_block_event);
+  }
+  LeaveCriticalSection(&cond->lock_waiting);
+  
   EnterCriticalSection(mutex);
 
   return result == WAIT_TIMEOUT ? ETIMEDOUT : 0;
 }
 
-
 int pthread_cond_signal(pthread_cond_t *cond)
 {
-  long prev_count;
-  if (cond->waiting)
-    ReleaseSemaphore(cond->semaphore,1,&prev_count);
+  EnterCriticalSection(&cond->lock_waiting);
+  
+  if(cond->waiting > 0)
+    SetEvent(cond->events[SIGNAL]);
+
+  LeaveCriticalSection(&cond->lock_waiting);
+  
   return 0;
 }
 
 
 int pthread_cond_broadcast(pthread_cond_t *cond)
 {
-  long prev_count;
-  if (cond->waiting)
-    ReleaseSemaphore(cond->semaphore,cond->waiting,&prev_count);
-  return 0 ;
+  EnterCriticalSection(&cond->lock_waiting);
+  /*
+     The mutex protect us from broadcasting if
+     there isn't any thread waiting to open the
+     block gate after this call has closed it.
+   */
+  if(cond->waiting > 0)
+  {
+    /* Close block gate */
+    ResetEvent(cond->broadcast_block_event); 
+    /* Open broadcast gate */
+    SetEvent(cond->events[BROADCAST]);
+  }
+
+  LeaveCriticalSection(&cond->lock_waiting);  
+
+  return 0;
 }
 
 


Thread
bk commit into 5.0 tree (Kristofer.Pettersson:1.2412) BUG#25042kpettersson15 Feb