List:Commits« Previous MessageNext Message »
From:kpettersson Date:January 30 2007 9:34am
Subject:bk commit into 5.0 tree (Kristofer.Pettersson:1.2353) BUG#25042
View as plain text  
Below is the list of changes that have just been committed into a local
5.0 repository of Kristofer Pettersson. When Kristofer Pettersson does a push these
changes will
be propagated to the main repository and, within 24 hours after the
push, to the public repository.
For information on how to access the public repository
see http://dev.mysql.com/doc/mysql/en/installing-source-tree.html

ChangeSet@stripped, 2007-01-30 09:33:33+01:00, Kristofer.Pettersson@naruto. +2 -0
  Bug#25042 OPTIMIZE TABLE cause race condition in IO CACHE SHARE
  
  - The condition variable implementation "lost" a signal to 
    WaitOnSingleObject when a semaphore was released.
  - The signal could be consumed by a new call to pthread_cond_wait
    before all waiting threads had awoken.
  - The new implementation of pthread_cond_* uses events
    instead of semaphores. It also uses an extra lock to protect entry
    into a new cond wait before the broadcast has finished.

  include/my_pthread.h@stripped, 2007-01-30 09:33:27+01:00, Kristofer.Pettersson@naruto. +41
-12
    - New implementation of pthread_cond_init. This version uses events
      instead of semaphores.

  mysys/my_wincond.c@stripped, 2007-01-30 09:33:27+01:00, Kristofer.Pettersson@naruto. +123 -28
    - New implementation of pthread_cond_*. This version uses events
      instead of semaphores.

# This is a BitKeeper patch.  What follows are the unified diffs for the
# set of deltas contained in the patch.  The rest of the patch, the part
# that BitKeeper cares about, is below these diffs.
# User:	Kristofer.Pettersson
# Host:	naruto.
# Root:	C:/cpp/bug25042/my50-bug25042

--- 1.92/include/my_pthread.h	2007-01-30 09:33:51 +01:00
+++ 1.93/include/my_pthread.h	2007-01-30 09:33:51 +01:00
@@ -62,20 +62,19 @@ typedef struct st_pthread_link {
 
 typedef struct {
   uint32 waiting;
-#ifdef OS2
-  HEV    semaphore;
-#else
-  HANDLE semaphore;
-#endif
-} pthread_cond_t;
+  CRITICAL_SECTION lock_waiting;
 
+  enum {
+    SIGNAL = 0,
+    BROADCAST = 1,
+    MAX_EVENTS = 2
+  } EVENTS;
+
+  HANDLE events[MAX_EVENTS];
+  HANDLE broadcast_block_event;
+
+} pthread_cond_t;
 
-#ifndef OS2
-struct timespec {		/* For pthread_cond_timedwait() */
-    time_t tv_sec;
-    long tv_nsec;
-};
-#endif
 
 typedef int pthread_mutexattr_t;
 #define win_pthread_self my_thread_var->pthread_self
@@ -86,6 +85,36 @@ typedef void * (_Optlink *pthread_handle
 #define pthread_handler_t EXTERNC void * __cdecl
 typedef void * (__cdecl *pthread_handler)(void *);
 #endif
+
+/*
+  Struct and macros to be used in combination with the
+  windows implementation of pthread_cond_timedwait
+*/
+
+/*
+   Declare a union to make sure FILETIME is properly aligned
+   so it can be used directly as a 64 bit value. The value
+   stored is in 100ns units.
+ */
+ union ft64 {
+  FILETIME ft;
+  __int64 i64;
+ };
+struct timespec {
+  union ft64 tv;
+  /* The max timeout value in millisecond for pthread_cond_timedwait */
+  long max_timeout_msec;
+};
+#define set_timespec(ABSTIME,SEC) { \
+  GetSystemTimeAsFileTime(&((ABSTIME).tv.ft)); \
+  (ABSTIME).tv.i64+= (__int64)(SEC)*10000000; \
+  (ABSTIME).max_timeout_msec= (long)((SEC)*1000); \
+}
+#define set_timespec_nsec(ABSTIME,NSEC) { \
+  GetSystemTimeAsFileTime(&((ABSTIME).tv.ft)); \
+  (ABSTIME).tv.i64+= (__int64)(NSEC)/100; \
+  (ABSTIME).max_timeout_msec= (long)((NSEC)/1000000); \
+}
 
 void win_pthread_init(void);
 int win_pthread_setspecific(void *A,void *B,uint length);

--- 1.6/mysys/my_wincond.c	2007-01-30 09:33:51 +01:00
+++ 1.7/mysys/my_wincond.c	2007-01-30 09:33:51 +01:00
@@ -28,65 +28,160 @@
 
 int pthread_cond_init(pthread_cond_t *cond, const pthread_condattr_t *attr)
 {
-  cond->waiting=0;
-  cond->semaphore=CreateSemaphore(NULL,0,0x7FFFFFFF,NullS);
-  if (!cond->semaphore)
+  cond->waiting= 0;
+  InitializeCriticalSection(&cond->lock_waiting);
+    
+  cond->events[SIGNAL]= CreateEvent(NULL,  /* no security */
+                                    FALSE, /* auto-reset event */
+                                    FALSE, /* non-signaled initially */
+                                    NULL); /* unnamed */
+
+  /* Create a manual-reset event. */
+  cond->events[BROADCAST]= CreateEvent(NULL,  /* no security */
+                                       TRUE,  /* manual-reset */
+                                       FALSE, /* non-signaled initially */
+                                       NULL); /* unnamed */
+
+
+  cond->broadcast_block_event= CreateEvent(NULL,  /* no security */
+                                           TRUE,  /* manual-reset */
+                                           TRUE,  /* signaled initially */
+                                           NULL); /* unnamed */
+  
+  if( cond->events[SIGNAL] == NULL ||
+      cond->events[BROADCAST] == NULL ||
+      cond->broadcast_block_event == NULL )
     return ENOMEM;
   return 0;
 }
 
 int pthread_cond_destroy(pthread_cond_t *cond)
 {
-	return CloseHandle(cond->semaphore) ? 0 : EINVAL;
+  int signal;
+  int broadcast;
+  int broadcast_block;
+
+  DeleteCriticalSection(&cond->lock_waiting);
+  signal= CloseHandle(cond->events[SIGNAL]);
+  broadcast= CloseHandle(cond->events[BROADCAST]);
+  broadcast_block= CloseHandle(cond->broadcast_block_event);
+  if( signal == 0 || broadcast == 0 || broadcast_block == 0 )
+    return EINVAL;
+  return 0;
 }
 
 
 int pthread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex)
 {
-  InterlockedIncrement(&cond->waiting);
-  LeaveCriticalSection(mutex);
-  WaitForSingleObject(cond->semaphore,INFINITE);
-  InterlockedDecrement(&cond->waiting);
-  EnterCriticalSection(mutex);
-  return 0 ;
+  return pthread_cond_timedwait(cond,mutex,NULL);
 }
 
+
 int pthread_cond_timedwait(pthread_cond_t *cond, pthread_mutex_t *mutex,
                            struct timespec *abstime)
 {
-  struct _timeb curtime;
   int result;
-  long timeout;
-  _ftime(&curtime);
-  timeout= ((long) (abstime->tv_sec - curtime.time)*1000L +
-		    (long)((abstime->tv_nsec/1000) - curtime.millitm)/1000L);
-  if (timeout < 0)				/* Some safety */
-    timeout = 0L;
-  InterlockedIncrement(&cond->waiting);
+  long timeout; 
+  union ft64 now;
+
+  if( abstime != NULL )
+  {
+    GetSystemTimeAsFileTime(&now.ft);
+
+    /*
+      Calculate time left to abstime
+      - subtract start time from current time(values are in 100ns units)
+      - convert to millisec by dividing with 10000
+    */
+    timeout= (long)((abstime->tv.i64 - now.i64) / 10000);
+    
+    /* Don't allow the timeout to be negative */
+    if (timeout < 0)
+      timeout= 0L;
+
+    /*
+      Make sure the calucated timeout does not exceed original timeout
+      value which could cause "wait for ever" if system time changes
+    */
+    if (timeout > abstime->max_timeout_msec)
+      timeout= abstime->max_timeout_msec;
+
+  }
+  else
+  {
+    /* No time specified; don't expire */
+    timeout= INFINITE;
+  }
+
+  /* 
+    Block access if previous broadcast hasn't finished.
+    This is just for safety and should normally not
+    affect the total time spent in this function.
+  */
+  WaitForSingleObject(cond->broadcast_block_event, INFINITE);
+
+  EnterCriticalSection(&cond->lock_waiting);
+  cond->waiting++;
+  LeaveCriticalSection(&cond->lock_waiting);
+
   LeaveCriticalSection(mutex);
-  result=WaitForSingleObject(cond->semaphore,timeout);
-  InterlockedDecrement(&cond->waiting);
+
+  result= WaitForMultipleObjects(2, cond->events, FALSE, timeout);
+  
+  EnterCriticalSection(&cond->lock_waiting);
+  cond->waiting--;
+  
+  /* Some thread called <pthread_cond_broadcast>. */
+  if (cond->waiting == 0 && result == (WAIT_OBJECT_0+BROADCAST))
+  {
+    /*
+      We're the last waiter to be notified or to stop waiting, so
+      reset the manual event. 
+    */
+    /* Close broadcast gate */
+    ResetEvent(cond->events[BROADCAST]);
+    /* Open block gate */
+    SetEvent(cond->broadcast_block_event);
+  }
+  LeaveCriticalSection(&cond->lock_waiting);
+  
   EnterCriticalSection(mutex);
 
   return result == WAIT_TIMEOUT ? ETIMEDOUT : 0;
 }
 
-
 int pthread_cond_signal(pthread_cond_t *cond)
 {
-  long prev_count;
-  if (cond->waiting)
-    ReleaseSemaphore(cond->semaphore,1,&prev_count);
+  my_bool have_waiters;
+
+  EnterCriticalSection(&cond->lock_waiting);
+  have_waiters= cond->waiting > 0;
+  LeaveCriticalSection(&cond->lock_waiting);
+
+  if(have_waiters)
+    SetEvent(cond->events[SIGNAL]);
+
   return 0;
 }
 
 
 int pthread_cond_broadcast(pthread_cond_t *cond)
 {
-  long prev_count;
-  if (cond->waiting)
-    ReleaseSemaphore(cond->semaphore,cond->waiting,&prev_count);
-  return 0 ;
+  my_bool have_waiters;
+
+  EnterCriticalSection(&cond->lock_waiting);
+  have_waiters= cond->waiting > 0;
+  LeaveCriticalSection(&cond->lock_waiting);
+
+  if(have_waiters)
+  {
+    /* Close block gate */
+    ResetEvent(cond->broadcast_block_event); 
+    /* Open broadcast gate */
+    SetEvent(cond->events[BROADCAST]);
+  }
+
+  return 0;
 }
 
 


Thread
bk commit into 5.0 tree (Kristofer.Pettersson:1.2353) BUG#25042kpettersson30 Jan