List:Maria Storage Engine« Previous MessageNext Message »
From:Sergei Golubchik Date:July 3 2008 9:33pm
Subject:bzr commit into MySQL/Maria:mysql-maria branch (serg:2657)
View as plain text  
#At bzr+ssh://bk-internal.mysql.com/bzrroot/server/mysql-maria/

 2657 Sergei Golubchik	2008-07-03
      intermediate commit to have a checkpoint to rollback to
modified:
  client/mysqltest.c
  include/waiting_threads.h
  mysys/waiting_threads.c
  unittest/mysys/waiting_threads-t.c

=== modified file 'client/mysqltest.c'
--- a/client/mysqltest.c	2008-04-28 16:24:05 +0000
+++ b/client/mysqltest.c	2008-07-03 21:33:43 +0000
@@ -2815,7 +2815,7 @@ void do_mkdir(struct st_command *command
   int error;
   static DYNAMIC_STRING ds_dirname;
   const struct command_arg mkdir_args[] = {
-    "dirname", ARG_STRING, TRUE, &ds_dirname, "Directory to create"
+    {"dirname", ARG_STRING, TRUE, &ds_dirname, "Directory to create"}
   };
   DBUG_ENTER("do_mkdir");
 
@@ -2845,7 +2845,7 @@ void do_rmdir(struct st_command *command
   int error;
   static DYNAMIC_STRING ds_dirname;
   const struct command_arg rmdir_args[] = {
-    "dirname", ARG_STRING, TRUE, &ds_dirname, "Directory to remove"
+    { "dirname", ARG_STRING, TRUE, &ds_dirname, "Directory to remove" }
   };
   DBUG_ENTER("do_rmdir");
 

=== modified file 'include/waiting_threads.h'
--- a/include/waiting_threads.h	2008-06-23 17:15:24 +0000
+++ b/include/waiting_threads.h	2008-07-03 21:33:43 +0000
@@ -18,6 +18,7 @@
 #include <lf.h>
 
 typedef struct st_wt_resource_id WT_RESOURCE_ID;
+typedef struct st_wt_thd WT_THD;
 
 typedef struct st_wt_resource_type {
   int (*compare)(void *a, void *b);
@@ -32,7 +33,7 @@ struct st_wt_resource_id {
   } value;
 };
 
-extern uint wt_timeout;
+extern uint wt_timeout, wt_deadlock_search_depth_short;
 
 /*
   'mutex' protects 'owners', 'state', and 'waiter_count'
@@ -50,8 +51,11 @@ extern uint wt_timeout;
        - it's returned pinned.
         a) take a mutex
         b) check the state, it should be ACTIVE
+        c) unpin
     2. by a direct reference
        - could only used if a resource cannot be freed
+       e.g. accessing a resource by thd->waiting_for is safe,
+       a resource cannot be freed as there's a thread waiting for it
 */
 
 typedef struct st_wt_resource {
@@ -66,6 +70,7 @@ typedef struct st_wt_resource {
   pthread_mutex_t mutex;
   pthread_cond_t  cond;
   DYNAMIC_ARRAY   owners;
+  WT_THD         *mutex_owner; /* for loop detection in deadlock() */
 } WT_RESOURCE;
 
 typedef struct st_wt_thd {
@@ -78,15 +83,25 @@ typedef struct st_wt_thd {
     If not, we'll need to add a mutex
   */
   DYNAMIC_ARRAY   my_resources;
+  /*
+    'waiting_for' is modified under waiting_for->mutex, and only by thd itself
+    'waiting_for' is read lock-free (using pinning protocol), but a thd object
+    can read its own 'waiting_for' without any locks or tricks.
+  */
   WT_RESOURCE    *waiting_for;
   LF_PINS        *pins;
-} WT_THD;
+};
+
+#define WT_TIMEOUT              ETIMEDOUT
+#define WT_DEADLOCK             -1
+#define WT_DEPTH_EXCEEDED       -2
 
 void wt_init(void);
 void wt_end(void);
 void wt_thd_init(WT_THD *thd);
 void wt_thd_destroy(WT_THD *thd);
-void wt_thd_will_wait_for(WT_THD *thd, WT_THD *blocker, WT_RESOURCE_ID *resid);
+int wt_thd_will_wait_for(WT_THD *thd, WT_THD *blocker, WT_RESOURCE_ID *resid);
+void wt_thd_dontwait(WT_THD *thd);
 int wt_thd_cond_timedwait(WT_THD *thd, pthread_mutex_t *mutex);
 void wt_thd_release(WT_THD *thd, WT_RESOURCE_ID *resid);
 #define wt_thd_release_all(THD) wt_thd_release((THD), 0)

=== modified file 'mysys/waiting_threads.c'
--- a/mysys/waiting_threads.c	2008-06-23 17:15:24 +0000
+++ b/mysys/waiting_threads.c	2008-07-03 21:33:43 +0000
@@ -16,7 +16,16 @@
 #include <waiting_threads.h>
 #include <m_string.h>
 
-uint wt_timeout=100;
+/*
+  TODO
+  rwlocks instead of mutexes
+  two timeouts, two deadlock() calls with different depths
+  statistics (avg. loop length, avg. wait time ?)
+  self-tuning ?
+  status (dump of the wait-for graph ?)
+  deadlock reaction - killing a transaction, killing everybody in that loop
+*/
+uint wt_timeout=100, wt_deadlock_search_depth_short=4;
 
 static LF_HASH      reshash;
 
@@ -81,12 +90,118 @@ int wt_resource_id_memcmp(void *a, void 
 }
 
 /*
+  loop detection in a wait-for graph with a limited search depth.
+*/
+static int deadlock(WT_THD *thd, WT_THD *blocker, int depth)
+{
+  WT_RESOURCE *rc, *volatile *shared_ptr= &blocker->waiting_for;
+  WT_THD *cursor;
+  uint i;
+  int ret= 0;
+  LF_REQUIRE_PINS(1);
+
+  if (depth == 0)
+    return WT_DEPTH_EXCEEDED;
+
+retry:
+  /* safe dereference as explained in lf_alloc-pin.c */
+  do
+  {
+    rc= *shared_ptr;
+    lf_pin(thd->pins, 0, rc);
+  } while (rc != *shared_ptr && LF_BACKOFF);
+
+  if (rc == 0)
+    return 0;
+
+  if (rc->mutex_owner == thd)
+  {
+    lf_unpin(thd->pins, 0);
+    return WT_DEPTH_EXCEEDED; // XXX yes, it's bad solution
+  }
+
+  pthread_mutex_lock(&rc->mutex);
+  rc->mutex_owner= thd;
+  if (unlikely(rc->state != ACTIVE))
+  {
+    rc->mutex_owner= 0;
+    pthread_mutex_unlock(&rc->mutex);
+    lf_unpin(thd->pins, 0);
+    goto retry;
+  }
+  lf_unpin(thd->pins, 0);
+
+  for (i=0; i < rc->owners.elements; i++)
+  {
+    cursor= *dynamic_element(&rc->owners, i, WT_THD**);
+    if (cursor == thd)
+    {
+      ret= WT_DEADLOCK;
+      goto end;
+    }
+  }
+  for (i=0; i < rc->owners.elements; i++)
+  {
+    cursor= *dynamic_element(&rc->owners, i, WT_THD**);
+    if ((ret= deadlock(thd, cursor, depth-1)))
+      goto end;
+  }
+end:
+  pthread_mutex_unlock(&rc->mutex);
+  return ret;
+}
+
+/*
+  Deletes an element from reshash.
+  rc->mutex must be locked by the caller and it's unlocked on return.
+*/
+static void unlock_mutex_and_free_resource(WT_THD *thd, WT_RESOURCE *rc)
+{
+  uint keylen;
+  const void *key;
+  DBUG_ENTER("unlock_mutex_and_free_resource");
+
+  DBUG_ASSERT(rc->state == ACTIVE);
+  safe_mutex_assert_owner(&rc->mutex);
+
+  if (rc->owners.elements || rc->waiter_count)
+  {
+    DBUG_PRINT("wt", ("nothing to do, %d owners, %d waiters",
+                      rc->owners.elements, rc->waiter_count));
+    pthread_mutex_unlock(&rc->mutex);
+    DBUG_VOID_RETURN;
+  }
+
+  /* XXX if (rc->id.type->make_key) key= rc->id.type->make_key(&rc->id, &keylen); else */
+  {
+    key= &rc->id;
+    keylen= sizeof(rc->id);
+  }
+
+  /*
+    To free the element correctly we need to:
+     1. take its mutex (already done).
+     2. set the state to FREE
+     3. release the mutex
+     4. remove from the hash
+
+     I *think* it's safe to release the mutex while the element is still
+     in the hash. If not, the corrected procedure should be
+     3. pin; 4; remove; 5; release; 6; unpin and it'll need pin[3].
+  */
+  rc->state=FREE;
+  pthread_mutex_unlock(&rc->mutex);
+  lf_hash_delete(&reshash, thd->pins, key, keylen);
+  DBUG_VOID_RETURN;
+}
+
+/*
   called by a *waiter* to declare what resource it will wait for.
   can be called many times, if many blockers own a blocking resource.
   but must always be called with the same resource id - a thread cannot
   wait for more than one resource at a time.
 */
-void wt_thd_will_wait_for(WT_THD *thd, WT_THD *blocker, WT_RESOURCE_ID *resid)
+int wt_thd_will_wait_for(WT_THD *thd, WT_THD *blocker, WT_RESOURCE_ID *resid)
 {
   WT_RESOURCE *rc;
   LF_REQUIRE_PINS(3);
@@ -144,13 +259,12 @@ retry:
     push_dynamic(&blocker->my_resources, (void*)&rc);
     push_dynamic(&rc->owners, (void*)&blocker);
     rc->waiter_count++;
-    pthread_mutex_unlock(&rc->mutex);
   }
   else
   {
     uint i;
     DBUG_ASSERT(thd->waiting_for->id.type == resid->type);
-    DBUG_ASSERT(resid->type->compare(& thd->waiting_for->id, resid) == 0);
+    DBUG_ASSERT(resid->type->compare(&thd->waiting_for->id, resid) == 0);
     DBUG_PRINT("wt", ("adding another blocker"));
 
     /*
@@ -168,54 +282,34 @@ retry:
 
     push_dynamic(&blocker->my_resources, (void*)&rc);
     push_dynamic(&rc->owners, (void*)&blocker);
-
-    pthread_mutex_unlock(&rc->mutex);
-  }
-  DBUG_VOID_RETURN;
-}
-
-/*
-  Deletes an element from reshash.
-  rc->mutex must be locked by the caller and it's unlocked on return.
-*/
-static void unlock_mutex_and_free_resource(WT_THD *thd, WT_RESOURCE *rc)
-{
-  uint keylen;
-  const void *key;
-  DBUG_ENTER("unlock_mutex_and_free_resource");
-
-  DBUG_ASSERT(rc->state == ACTIVE);
-  safe_mutex_assert_owner(&rc->mutex);
-
-  if (rc->owners.elements || rc->waiter_count)
-  {
-    DBUG_PRINT("wt", ("nothing to do, %d owners, %d waiters",
-                      rc->owners.elements, rc->waiter_count));
-    pthread_mutex_unlock(&rc->mutex);
-    DBUG_VOID_RETURN;
   }
 
-  /* XXX if (rc->id.type->make_key) key= rc->id.type->make_key(&rc->id, &keylen); else */
+  if (deadlock(thd, blocker, wt_deadlock_search_depth_short) == WT_DEADLOCK)
   {
-    key= &rc->id;
-    keylen= sizeof(rc->id);
+    rc->waiter_count--;
+    thd->waiting_for= 0;
+    unlock_mutex_and_free_resource(thd, rc);
+    DBUG_RETURN(WT_DEADLOCK);
   }
+  pthread_mutex_unlock(&rc->mutex);
+  DBUG_RETURN(0);
+}
 
+void wt_thd_dontwait(WT_THD *thd)
+{
+  WT_RESOURCE *rc= thd->waiting_for;
+  if (!rc)
+    return;
   /*
-    To free the element correctly we need to:
-     1. take its mutex (already done).
-     2. set the state to FREE
-     3. release the mutex
-     4. remove from the hash
-
-     I *think* it's safe to release the mutex while the element is still
-     in the hash. If not, the corrected procedure should be
-     3. pin; 4; remove; 5; release; 6; unpin and it'll need pin[3].
+    nobody's trying to free the resource now,
+    as its waiter_count is guaranteed to be non-zero
   */
-  rc->state=FREE;
-  pthread_mutex_unlock(&rc->mutex);
-  lf_hash_delete(&reshash, thd->pins, key, keylen);
-  DBUG_VOID_RETURN;
+  pthread_mutex_lock(&rc->mutex);
+  DBUG_ASSERT(rc->waiter_count);
+  DBUG_ASSERT(rc->state == ACTIVE);
+  rc->waiter_count--;
+  thd->waiting_for= 0;
+  unlock_mutex_and_free_resource(thd, rc);
 }
 
 /*
@@ -230,16 +324,7 @@ int wt_thd_cond_timedwait(WT_THD *thd, p
 
   set_timespec_nsec(timeout, wt_timeout);
   ret= pthread_cond_timedwait(&rc->cond, mutex, &timeout);
-  thd->waiting_for= 0;
-  /*
-    nobody's trying to free the resource now,
-    as its waiter_count is guaranteed to be non-zero
-  */
-  pthread_mutex_lock(&rc->mutex);
-  DBUG_ASSERT(rc->waiter_count);
-  DBUG_ASSERT(rc->state == ACTIVE);
-  rc->waiter_count--;
-  unlock_mutex_and_free_resource(thd, rc);
+  wt_thd_dontwait(thd);
   DBUG_RETURN(ret);
 }
 
@@ -257,7 +342,7 @@ void wt_thd_release(WT_THD *thd, WT_RESO
   for (i=0; i < thd->my_resources.elements; i++)
   {
     rc= *dynamic_element(&thd->my_resources, i, WT_RESOURCE**);
-    if (!resid || (resid->type->compare(& rc->id, resid) == 0))
+    if (!resid || (resid->type->compare(&rc->id, resid) == 0))
     {
       pthread_mutex_lock(&rc->mutex);
       /*

=== modified file 'unittest/mysys/waiting_threads-t.c'
--- a/unittest/mysys/waiting_threads-t.c	2008-06-23 17:15:24 +0000
+++ b/unittest/mysys/waiting_threads-t.c	2008-07-03 21:33:43 +0000
@@ -21,7 +21,7 @@ struct test_wt_thd {
   WT_THD thd;
   pthread_mutex_t lock;
 } thds[THREADS];
-int cnt, stats[2];
+int cnt, stats[3];
 pthread_mutex_t lock;
 
 WT_RESOURCE_TYPE restype={ wt_resource_id_memcmp, 0};
@@ -50,22 +50,28 @@ pthread_handler_t test_wt(void *arg)
     resid.value.num= x % THREADS;
     resid.type= &restype;
 
+    res= 0;
     x= (x*m+0x87654321) % (INT_MAX32-1);
-    for (n= (x % THREADS)/10; n >= 0; n--)
+    for (n= (x % THREADS)/10; !res && n >= 0; n--)
     {
 
       x= (x*m+0x87654321) % (INT_MAX32-1);
       i= x % (THREADS-1);
       if (i >= id) i++;
       pthread_mutex_lock(& thds[i].lock);
-      wt_thd_will_wait_for(& thds[id].thd, & thds[i].thd, &resid);
+      res= wt_thd_will_wait_for(& thds[id].thd, & thds[i].thd, &resid);
       pthread_mutex_unlock(& thds[i].lock);
     }
 
-    pthread_mutex_lock(&lock);
-    res= wt_thd_cond_timedwait(& thds[id].thd, &lock);
-    stats[res != 0]++;
-    pthread_mutex_unlock(&lock);
+    if (res)
+      stats[2]++; /* deadlock detected */
+    else
+    {
+      pthread_mutex_lock(&lock);
+      res= wt_thd_cond_timedwait(& thds[id].thd, &lock);
+      stats[res != 0]++; /* ok or timeout */
+      pthread_mutex_unlock(&lock);
+    }
 
     if (res)
     {
@@ -84,7 +90,7 @@ pthread_handler_t test_wt(void *arg)
 
 void do_tests()
 {
-  plan(3);
+  plan(9);
 
   DBUG_PRINT("wt", ("================= initialization ==================="));
 
@@ -98,33 +104,57 @@ void do_tests()
     wt_thd_init(& thds[cnt].thd);
     pthread_mutex_init(& thds[cnt].lock, 0);
   }
-  wt_timeout=10000;
   {
-    WT_RESOURCE_ID resid;
-    bzero(&resid, sizeof(resid));
-    resid.value.num= 1;
-    resid.type= &restype;
+    int i;
+    WT_RESOURCE_ID resid[3];
+    for (i=0; i < 3; i++)
+    {
+      bzero(&resid[i], sizeof(resid[i]));
+      resid[i].value.num= i+1;
+      resid[i].type= &restype;
+    }
 
     DBUG_PRINT("wt", ("================= manual test ==================="));
 
-    wt_thd_will_wait_for(& thds[0].thd, & thds[1].thd, &resid);
-    wt_thd_will_wait_for(& thds[0].thd, & thds[2].thd, &resid);
-    wt_thd_will_wait_for(& thds[0].thd, & thds[3].thd, &resid);
+#define ok_wait(X,Y, R) \
+    ok(wt_thd_will_wait_for(& thds[X].thd, & thds[Y].thd, &resid[R]) == 0, \
+      "thd[" #X "] will wait for thd[" #Y "]")
+#define ok_deadlock(X,Y,R) \
+    ok(wt_thd_will_wait_for(& thds[X].thd, & thds[Y].thd, &resid[R]) == WT_DEADLOCK, \
+      "thd[" #X "] will wait for thd[" #Y "] - deadlock")
+
+    ok_wait(0,1,0);
+    ok_wait(0,2,0);
+    ok_wait(0,3,0);
 
     pthread_mutex_lock(&lock);
     bad= wt_thd_cond_timedwait(& thds[0].thd, &lock);
     pthread_mutex_unlock(&lock);
+    ok(bad == ETIMEDOUT, "timeout test returned %d", bad);
 
+    ok_wait(0,1,0);
+    ok_wait(1,2,1);
+    ok_deadlock(2,0,2);
+
+    wt_thd_dontwait(& thds[0].thd);
+    wt_thd_dontwait(& thds[1].thd);
+    wt_thd_dontwait(& thds[2].thd);
+    wt_thd_dontwait(& thds[3].thd);
     wt_thd_release_all(& thds[0].thd);
-
-    ok(bad == ETIMEDOUT, "timeout test returned %d", bad);
+    wt_thd_release_all(& thds[1].thd);
+    wt_thd_release_all(& thds[2].thd);
+    wt_thd_release_all(& thds[3].thd);
   }
 
+  //wt_deadlock_search_depth_short=0;
+  wt_timeout=10000;
   DBUG_PRINT("wt", ("================= stress test ==================="));
   cnt=stats[0]=stats[1]=0;
   test_concurrently("waiting_threads", test_wt, THREADS, CYCLES);
-  diag("%d successul, %d timeouts (wt_timeout=%d ns)",
-       stats[0], stats[1], wt_timeout);
+  diag("%d successul, %d timeouts, %d deadlocks",
+       stats[0], stats[1], stats[2]);
+  diag("timeout=%d ns, deadlock_search_depth_short=%d",
+       wt_timeout, wt_deadlock_search_depth_short);
 
   DBUG_PRINT("wt", ("================= cleanup ==================="));
   for (cnt=0; cnt < THREADS; cnt++)

Thread
bzr commit into MySQL/Maria:mysql-maria branch (serg:2657) Sergei Golubchik3 Jul