#At bzr+ssh://bk-internal.mysql.com/bzrroot/server/mysql-maria/
2657 Sergei Golubchik 2008-07-03
intermediate commit to have a checkpoint to rollback to
modified:
client/mysqltest.c
include/waiting_threads.h
mysys/waiting_threads.c
unittest/mysys/waiting_threads-t.c
=== modified file 'client/mysqltest.c'
--- a/client/mysqltest.c 2008-04-28 16:24:05 +0000
+++ b/client/mysqltest.c 2008-07-03 21:33:43 +0000
@@ -2815,7 +2815,7 @@ void do_mkdir(struct st_command *command
int error;
static DYNAMIC_STRING ds_dirname;
const struct command_arg mkdir_args[] = {
- "dirname", ARG_STRING, TRUE, &ds_dirname, "Directory to create"
+ {"dirname", ARG_STRING, TRUE, &ds_dirname, "Directory to create"}
};
DBUG_ENTER("do_mkdir");
@@ -2845,7 +2845,7 @@ void do_rmdir(struct st_command *command
int error;
static DYNAMIC_STRING ds_dirname;
const struct command_arg rmdir_args[] = {
- "dirname", ARG_STRING, TRUE, &ds_dirname, "Directory to remove"
+ { "dirname", ARG_STRING, TRUE, &ds_dirname, "Directory to remove" }
};
DBUG_ENTER("do_rmdir");
=== modified file 'include/waiting_threads.h'
--- a/include/waiting_threads.h 2008-06-23 17:15:24 +0000
+++ b/include/waiting_threads.h 2008-07-03 21:33:43 +0000
@@ -18,6 +18,7 @@
#include <lf.h>
typedef struct st_wt_resource_id WT_RESOURCE_ID;
+typedef struct st_wt_thd WT_THD;
typedef struct st_wt_resource_type {
int (*compare)(void *a, void *b);
@@ -32,7 +33,7 @@ struct st_wt_resource_id {
} value;
};
-extern uint wt_timeout;
+extern uint wt_timeout, wt_deadlock_search_depth_short;
/*
'mutex' protects 'owners', 'state', and 'waiter_count'
@@ -50,8 +51,11 @@ extern uint wt_timeout;
- it's returned pinned.
a) take a mutex
b) check the state, it should be ACTIVE
+ c) unpin
2. by a direct reference
- could only used if a resource cannot be freed
+ e.g. accessing a resource by thd->waiting_for is safe,
+ a resource cannot be freed as there's a thread waiting for it
*/
typedef struct st_wt_resource {
@@ -66,6 +70,7 @@ typedef struct st_wt_resource {
pthread_mutex_t mutex;
pthread_cond_t cond;
DYNAMIC_ARRAY owners;
+ WT_THD *mutex_owner; /* for loop detection in deadlock() */
} WT_RESOURCE;
typedef struct st_wt_thd {
@@ -78,15 +83,25 @@ typedef struct st_wt_thd {
If not, we'll need to add a mutex
*/
DYNAMIC_ARRAY my_resources;
+ /*
+ 'waiting_for' is modified under waiting_for->mutex, and only by thd itself
+ 'waiting_for' is read lock-free (using pinning protocol), but a thd object
+ can read its own 'waiting_for' without any locks or tricks.
+ */
WT_RESOURCE *waiting_for;
LF_PINS *pins;
-} WT_THD;
+};
+
+#define WT_TIMEOUT ETIMEDOUT
+#define WT_DEADLOCK -1
+#define WT_DEPTH_EXCEEDED -2
void wt_init(void);
void wt_end(void);
void wt_thd_init(WT_THD *thd);
void wt_thd_destroy(WT_THD *thd);
-void wt_thd_will_wait_for(WT_THD *thd, WT_THD *blocker, WT_RESOURCE_ID *resid);
+int wt_thd_will_wait_for(WT_THD *thd, WT_THD *blocker, WT_RESOURCE_ID *resid);
+void wt_thd_dontwait(WT_THD *thd);
int wt_thd_cond_timedwait(WT_THD *thd, pthread_mutex_t *mutex);
void wt_thd_release(WT_THD *thd, WT_RESOURCE_ID *resid);
#define wt_thd_release_all(THD) wt_thd_release((THD), 0)
=== modified file 'mysys/waiting_threads.c'
--- a/mysys/waiting_threads.c 2008-06-23 17:15:24 +0000
+++ b/mysys/waiting_threads.c 2008-07-03 21:33:43 +0000
@@ -16,7 +16,16 @@
#include <waiting_threads.h>
#include <m_string.h>
-uint wt_timeout=100;
+/*
+ TODO
+ rwlocks instead of mutexes
+ two timeouts, two deadlock() calls with different depths
+ statistics (avg. loop length, avg. wait time ?)
+ self-tuning ?
+ status (dump of the wait-for graph ?)
+ deadlock reaction - killing a transaction, killing everybody in that loop
+*/
+uint wt_timeout=100, wt_deadlock_search_depth_short=4;
static LF_HASH reshash;
@@ -81,12 +90,118 @@ int wt_resource_id_memcmp(void *a, void
}
/*
+ loop detection in a wait-for graph with a limited search depth.
+*/
+static int deadlock(WT_THD *thd, WT_THD *blocker, int depth)
+{
+ WT_RESOURCE *rc, *volatile *shared_ptr= &blocker->waiting_for;
+ WT_THD *cursor;
+ uint i;
+ int ret= 0;
+ LF_REQUIRE_PINS(1);
+
+ if (depth == 0)
+ return WT_DEPTH_EXCEEDED;
+
+retry:
+ /* safe dereference as explained in lf_alloc-pin.c */
+ do
+ {
+ rc= *shared_ptr;
+ lf_pin(thd->pins, 0, rc);
+ } while (rc != *shared_ptr && LF_BACKOFF);
+
+ if (rc == 0)
+ return 0;
+
+ if (rc->mutex_owner == thd)
+ {
+ lf_unpin(thd->pins, 0);
+ return WT_DEPTH_EXCEEDED; // XXX yes, it's bad solution
+ }
+
+ pthread_mutex_lock(&rc->mutex);
+ rc->mutex_owner= thd;
+ if (unlikely(rc->state != ACTIVE))
+ {
+ rc->mutex_owner= 0;
+ pthread_mutex_unlock(&rc->mutex);
+ lf_unpin(thd->pins, 0);
+ goto retry;
+ }
+ lf_unpin(thd->pins, 0);
+
+ for (i=0; i < rc->owners.elements; i++)
+ {
+ cursor= *dynamic_element(&rc->owners, i, WT_THD**);
+ if (cursor == thd)
+ {
+ ret= WT_DEADLOCK;
+ goto end;
+ }
+ }
+ for (i=0; i < rc->owners.elements; i++)
+ {
+ cursor= *dynamic_element(&rc->owners, i, WT_THD**);
+ if ((ret= deadlock(thd, cursor, depth-1)))
+ goto end;
+ }
+end:
+ pthread_mutex_unlock(&rc->mutex);
+ return ret;
+}
+
+/*
+ Deletes an element from reshash.
+ rc->mutex must be locked by the caller and it's unlocked on return.
+*/
+static void unlock_mutex_and_free_resource(WT_THD *thd, WT_RESOURCE *rc)
+{
+ uint keylen;
+ const void *key;
+ DBUG_ENTER("unlock_mutex_and_free_resource");
+
+ DBUG_ASSERT(rc->state == ACTIVE);
+ safe_mutex_assert_owner(&rc->mutex);
+
+ if (rc->owners.elements || rc->waiter_count)
+ {
+ DBUG_PRINT("wt", ("nothing to do, %d owners, %d waiters",
+ rc->owners.elements, rc->waiter_count));
+ pthread_mutex_unlock(&rc->mutex);
+ DBUG_VOID_RETURN;
+ }
+
+ /* XXX if (rc->id.type->make_key) key= rc->id.type->make_key(&rc->id, &keylen); else */
+ {
+ key= &rc->id;
+ keylen= sizeof(rc->id);
+ }
+
+ /*
+ To free the element correctly we need to:
+ 1. take its mutex (already done).
+ 2. set the state to FREE
+ 3. release the mutex
+ 4. remove from the hash
+
+ I *think* it's safe to release the mutex while the element is still
+ in the hash. If not, the corrected procedure should be
+ 3. pin; 4; remove; 5; release; 6; unpin and it'll need pin[3].
+ */
+ rc->state=FREE;
+ pthread_mutex_unlock(&rc->mutex);
+ lf_hash_delete(&reshash, thd->pins, key, keylen);
+ DBUG_VOID_RETURN;
+}
+
+/*
called by a *waiter* to declare what resource it will wait for.
can be called many times, if many blockers own a blocking resource.
but must always be called with the same resource id - a thread cannot
wait for more than one resource at a time.
*/
-void wt_thd_will_wait_for(WT_THD *thd, WT_THD *blocker, WT_RESOURCE_ID *resid)
+int wt_thd_will_wait_for(WT_THD *thd, WT_THD *blocker, WT_RESOURCE_ID *resid)
{
WT_RESOURCE *rc;
LF_REQUIRE_PINS(3);
@@ -144,13 +259,12 @@ retry:
push_dynamic(&blocker->my_resources, (void*)&rc);
push_dynamic(&rc->owners, (void*)&blocker);
rc->waiter_count++;
- pthread_mutex_unlock(&rc->mutex);
}
else
{
uint i;
DBUG_ASSERT(thd->waiting_for->id.type == resid->type);
- DBUG_ASSERT(resid->type->compare(& thd->waiting_for->id, resid) == 0);
+ DBUG_ASSERT(resid->type->compare(&thd->waiting_for->id, resid) == 0);
DBUG_PRINT("wt", ("adding another blocker"));
/*
@@ -168,54 +282,34 @@ retry:
push_dynamic(&blocker->my_resources, (void*)&rc);
push_dynamic(&rc->owners, (void*)&blocker);
-
- pthread_mutex_unlock(&rc->mutex);
- }
- DBUG_VOID_RETURN;
-}
-
-/*
- Deletes an element from reshash.
- rc->mutex must be locked by the caller and it's unlocked on return.
-*/
-static void unlock_mutex_and_free_resource(WT_THD *thd, WT_RESOURCE *rc)
-{
- uint keylen;
- const void *key;
- DBUG_ENTER("unlock_mutex_and_free_resource");
-
- DBUG_ASSERT(rc->state == ACTIVE);
- safe_mutex_assert_owner(&rc->mutex);
-
- if (rc->owners.elements || rc->waiter_count)
- {
- DBUG_PRINT("wt", ("nothing to do, %d owners, %d waiters",
- rc->owners.elements, rc->waiter_count));
- pthread_mutex_unlock(&rc->mutex);
- DBUG_VOID_RETURN;
}
- /* XXX if (rc->id.type->make_key) key= rc->id.type->make_key(&rc->id, &keylen); else */
+ if (deadlock(thd, blocker, wt_deadlock_search_depth_short) == WT_DEADLOCK)
{
- key= &rc->id;
- keylen= sizeof(rc->id);
+ rc->waiter_count--;
+ thd->waiting_for= 0;
+ unlock_mutex_and_free_resource(thd, rc);
+ DBUG_RETURN(WT_DEADLOCK);
}
+ pthread_mutex_unlock(&rc->mutex);
+ DBUG_RETURN(0);
+}
+void wt_thd_dontwait(WT_THD *thd)
+{
+ WT_RESOURCE *rc= thd->waiting_for;
+ if (!rc)
+ return;
/*
- To free the element correctly we need to:
- 1. take its mutex (already done).
- 2. set the state to FREE
- 3. release the mutex
- 4. remove from the hash
-
- I *think* it's safe to release the mutex while the element is still
- in the hash. If not, the corrected procedure should be
- 3. pin; 4; remove; 5; release; 6; unpin and it'll need pin[3].
+ nobody's trying to free the resource now,
+ as its waiter_count is guaranteed to be non-zero
*/
- rc->state=FREE;
- pthread_mutex_unlock(&rc->mutex);
- lf_hash_delete(&reshash, thd->pins, key, keylen);
- DBUG_VOID_RETURN;
+ pthread_mutex_lock(&rc->mutex);
+ DBUG_ASSERT(rc->waiter_count);
+ DBUG_ASSERT(rc->state == ACTIVE);
+ rc->waiter_count--;
+ thd->waiting_for= 0;
+ unlock_mutex_and_free_resource(thd, rc);
}
/*
@@ -230,16 +324,7 @@ int wt_thd_cond_timedwait(WT_THD *thd, p
set_timespec_nsec(timeout, wt_timeout);
ret= pthread_cond_timedwait(&rc->cond, mutex, &timeout);
- thd->waiting_for= 0;
- /*
- nobody's trying to free the resource now,
- as its waiter_count is guaranteed to be non-zero
- */
- pthread_mutex_lock(&rc->mutex);
- DBUG_ASSERT(rc->waiter_count);
- DBUG_ASSERT(rc->state == ACTIVE);
- rc->waiter_count--;
- unlock_mutex_and_free_resource(thd, rc);
+ wt_thd_dontwait(thd);
DBUG_RETURN(ret);
}
@@ -257,7 +342,7 @@ void wt_thd_release(WT_THD *thd, WT_RESO
for (i=0; i < thd->my_resources.elements; i++)
{
rc= *dynamic_element(&thd->my_resources, i, WT_RESOURCE**);
- if (!resid || (resid->type->compare(& rc->id, resid) == 0))
+ if (!resid || (resid->type->compare(&rc->id, resid) == 0))
{
pthread_mutex_lock(&rc->mutex);
/*
=== modified file 'unittest/mysys/waiting_threads-t.c'
--- a/unittest/mysys/waiting_threads-t.c 2008-06-23 17:15:24 +0000
+++ b/unittest/mysys/waiting_threads-t.c 2008-07-03 21:33:43 +0000
@@ -21,7 +21,7 @@ struct test_wt_thd {
WT_THD thd;
pthread_mutex_t lock;
} thds[THREADS];
-int cnt, stats[2];
+int cnt, stats[3];
pthread_mutex_t lock;
WT_RESOURCE_TYPE restype={ wt_resource_id_memcmp, 0};
@@ -50,22 +50,28 @@ pthread_handler_t test_wt(void *arg)
resid.value.num= x % THREADS;
resid.type= &restype;
+ res= 0;
x= (x*m+0x87654321) % (INT_MAX32-1);
- for (n= (x % THREADS)/10; n >= 0; n--)
+ for (n= (x % THREADS)/10; !res && n >= 0; n--)
{
x= (x*m+0x87654321) % (INT_MAX32-1);
i= x % (THREADS-1);
if (i >= id) i++;
pthread_mutex_lock(& thds[i].lock);
- wt_thd_will_wait_for(& thds[id].thd, & thds[i].thd, &resid);
+ res= wt_thd_will_wait_for(& thds[id].thd, & thds[i].thd, &resid);
pthread_mutex_unlock(& thds[i].lock);
}
- pthread_mutex_lock(&lock);
- res= wt_thd_cond_timedwait(& thds[id].thd, &lock);
- stats[res != 0]++;
- pthread_mutex_unlock(&lock);
+ if (res)
+ stats[2]++; /* deadlock detected */
+ else
+ {
+ pthread_mutex_lock(&lock);
+ res= wt_thd_cond_timedwait(& thds[id].thd, &lock);
+ stats[res != 0]++; /* ok or timeout */
+ pthread_mutex_unlock(&lock);
+ }
if (res)
{
@@ -84,7 +90,7 @@ pthread_handler_t test_wt(void *arg)
void do_tests()
{
- plan(3);
+ plan(9);
DBUG_PRINT("wt", ("================= initialization ==================="));
@@ -98,33 +104,57 @@ void do_tests()
wt_thd_init(& thds[cnt].thd);
pthread_mutex_init(& thds[cnt].lock, 0);
}
- wt_timeout=10000;
{
- WT_RESOURCE_ID resid;
- bzero(&resid, sizeof(resid));
- resid.value.num= 1;
- resid.type= &restype;
+ int i;
+ WT_RESOURCE_ID resid[3];
+ for (i=0; i < 3; i++)
+ {
+ bzero(&resid[i], sizeof(resid[i]));
+ resid[i].value.num= i+1;
+ resid[i].type= &restype;
+ }
DBUG_PRINT("wt", ("================= manual test ==================="));
- wt_thd_will_wait_for(& thds[0].thd, & thds[1].thd, &resid);
- wt_thd_will_wait_for(& thds[0].thd, & thds[2].thd, &resid);
- wt_thd_will_wait_for(& thds[0].thd, & thds[3].thd, &resid);
+#define ok_wait(X,Y, R) \
+ ok(wt_thd_will_wait_for(& thds[X].thd, & thds[Y].thd, &resid[R]) == 0, \
+ "thd[" #X "] will wait for thd[" #Y "]")
+#define ok_deadlock(X,Y,R) \
+ ok(wt_thd_will_wait_for(& thds[X].thd, & thds[Y].thd, &resid[R]) == WT_DEADLOCK, \
+ "thd[" #X "] will wait for thd[" #Y "] - deadlock")
+
+ ok_wait(0,1,0);
+ ok_wait(0,2,0);
+ ok_wait(0,3,0);
pthread_mutex_lock(&lock);
bad= wt_thd_cond_timedwait(& thds[0].thd, &lock);
pthread_mutex_unlock(&lock);
+ ok(bad == ETIMEDOUT, "timeout test returned %d", bad);
+ ok_wait(0,1,0);
+ ok_wait(1,2,1);
+ ok_deadlock(2,0,2);
+
+ wt_thd_dontwait(& thds[0].thd);
+ wt_thd_dontwait(& thds[1].thd);
+ wt_thd_dontwait(& thds[2].thd);
+ wt_thd_dontwait(& thds[3].thd);
wt_thd_release_all(& thds[0].thd);
-
- ok(bad == ETIMEDOUT, "timeout test returned %d", bad);
+ wt_thd_release_all(& thds[1].thd);
+ wt_thd_release_all(& thds[2].thd);
+ wt_thd_release_all(& thds[3].thd);
}
+ //wt_deadlock_search_depth_short=0;
+ wt_timeout=10000;
DBUG_PRINT("wt", ("================= stress test ==================="));
cnt=stats[0]=stats[1]=0;
test_concurrently("waiting_threads", test_wt, THREADS, CYCLES);
- diag("%d successul, %d timeouts (wt_timeout=%d ns)",
- stats[0], stats[1], wt_timeout);
+ diag("%d successul, %d timeouts, %d deadlocks",
+ stats[0], stats[1], stats[2]);
+ diag("timeout=%d ns, deadlock_search_depth_short=%d",
+ wt_timeout, wt_deadlock_search_depth_short);
DBUG_PRINT("wt", ("================= cleanup ==================="));
for (cnt=0; cnt < THREADS; cnt++)
| Thread |
|---|
| • bzr commit into MySQL/Maria:mysql-maria branch (serg:2657) | Sergei Golubchik | 3 Jul |