Below is the list of changes that have just been committed into a local
5.1 repository of andrey. When andrey does a push these changes will
be propagated to the main repository and, within 24 hours after the
push, to the public repository.
For information on how to access the public repository
see http://dev.mysql.com/doc/mysql/en/installing-source-tree.html
ChangeSet
1.2201 06/02/28 17:36:04 andrey@lmy004. +1 -0
Merge ahristov@stripped:/home/bk/mysql-5.1-new
into lmy004.:/work/mysql-5.1-bug17619
sql/event_executor.cc
1.38 06/02/28 17:35:59 andrey@lmy004. +3 -5
Auto merged
# This is a BitKeeper patch. What follows are the unified diffs for the
# set of deltas contained in the patch. The rest of the patch, the part
# that BitKeeper cares about, is below these diffs.
# User: andrey
# Host: lmy004.
# Root: /work/mysql-5.1-bug17619/RESYNC
--- 1.37/sql/event_executor.cc 2006-02-28 17:02:22 +01:00
+++ 1.38/sql/event_executor.cc 2006-02-28 17:35:59 +01:00
@@ -17,6 +17,7 @@
#include "event_priv.h"
#include "event.h"
#include "sp.h"
+#include "event_manager.h"
#define WAIT_STATUS_READY 0
#define WAIT_STATUS_EMPTY_QUEUE 1
@@ -55,7 +56,7 @@ static int
evex_load_events_from_db(THD *thd);
bool
-evex_print_warnings(THD *thd, event_timed *et);
+evex_print_warnings(THD *thd, Event_timed *et);
/*
TODO Andrey: Check for command line option whether to start
@@ -108,6 +109,7 @@ evex_init_mutexes()
return;
evex_mutexes_initted= TRUE;
+ pthread_mutex_init(&Event_scheduler_manager::LOCK_manager, MY_MUTEX_INIT_FAST);
pthread_mutex_init(&LOCK_event_arrays, MY_MUTEX_INIT_FAST);
pthread_mutex_init(&LOCK_workers_count, MY_MUTEX_INIT_FAST);
pthread_mutex_init(&LOCK_evex_running, MY_MUTEX_INIT_FAST);
@@ -203,16 +205,21 @@ init_events()
evex_is_running= false;
VOID(pthread_mutex_unlock(&LOCK_evex_running));
- if (event_executor_running_global_var)
+ VOID(pthread_mutex_lock(&LOCK_evex_running));
+ if (event_executor_running_global_var && !evex_is_running)
{
#ifndef DBUG_FAULTY_THR
/* TODO Andrey: Change the error code returned! */
if (pthread_create(&th, &connection_attrib, event_executor_main,(void*)NULL))
- DBUG_RETURN(ER_SLAVE_THREAD);
+ {
+ VOID(pthread_mutex_unlock(&LOCK_evex_running));
+ DBUG_RETURN(1);
+ }
#else
event_executor_main(NULL);
#endif
}
+ VOID(pthread_mutex_unlock(&LOCK_evex_running));
DBUG_RETURN(0);
}
@@ -304,6 +311,9 @@ init_event_thread(THD* thd)
/*
This function waits till the time next event in the queue should be
executed.
+ SYNOPSIS
+ executor_wait_till_next_event_exec()
+ thd THD of the main event thread
Returns
WAIT_STATUS_READY There is an event to be executed right now
@@ -316,9 +326,10 @@ init_event_thread(THD* thd)
static int
executor_wait_till_next_event_exec(THD *thd)
{
- event_timed *et;
+ Event_timed *et;
TIME time_now;
int t2sleep;
+ bool new_top;
DBUG_ENTER("executor_wait_till_next_event_exec");
/*
@@ -326,12 +337,12 @@ executor_wait_till_next_event_exec(THD *
element in the queue.
*/
VOID(pthread_mutex_lock(&LOCK_event_arrays));
- if (!evex_queue_num_elements(EVEX_EQ_NAME))
+ if (!EVEX_EQ_NAME.elements)
{
VOID(pthread_mutex_unlock(&LOCK_event_arrays));
DBUG_RETURN(WAIT_STATUS_EMPTY_QUEUE);
}
- et= evex_queue_first_element(&EVEX_EQ_NAME, event_timed*);
+ et= evex_queue_first_element(&EVEX_EQ_NAME, Event_timed*);
DBUG_ASSERT(et);
if (et->status == MYSQL_EVENT_DISABLED)
{
@@ -342,7 +353,7 @@ executor_wait_till_next_event_exec(THD *
evex_queue_delete_element(&EVEX_EQ_NAME, 0);// 0 is top, internally 1
VOID(pthread_mutex_unlock(&LOCK_event_arrays));
sql_print_information("Event found disabled, dropping.");
- DBUG_RETURN(1);
+ DBUG_RETURN(WAIT_STATUS_EMPTY_QUEUE);
}
DBUG_PRINT("evex main thread",("computing time to sleep till next exec"));
@@ -353,6 +364,7 @@ executor_wait_till_next_event_exec(THD *
VOID(pthread_mutex_unlock(&LOCK_event_arrays));
DBUG_PRINT("evex main thread",("unlocked LOCK_event_arrays"));
+ t2sleep*=10;
if (t2sleep > 0)
{
/*
@@ -360,11 +372,11 @@ executor_wait_till_next_event_exec(THD *
has been killed, or there is a new candidate
*/
while (t2sleep-- && !thd->killed && event_executor_running_global_var &&
- evex_queue_num_elements(EVEX_EQ_NAME) &&
- (evex_queue_first_element(&EVEX_EQ_NAME, event_timed*) == et))
+ EVEX_EQ_NAME.elements && !et->killed &&
+ (new_top= (et==evex_queue_first_element(&EVEX_EQ_NAME,Event_timed*))))
{
- DBUG_PRINT("evex main thread",("will sleep a bit more"));
- my_sleep(1000000);
+ DBUG_PRINT("evex main thread", ("will sleep a bit more"));
+ my_sleep(100000);
}
}
@@ -400,18 +412,16 @@ event_executor_main(void *arg)
{
THD *thd; /* needs to be first for thread_stack */
uint i=0, j=0;
- my_ulonglong cnt= 0;
+ static my_ulonglong cnt= 0;
TIME time_now;
DBUG_ENTER("event_executor_main");
DBUG_PRINT("event_executor_main", ("EVEX thread started"));
-
- /* init memory root */
- init_alloc_root(&evex_mem_root, MEM_ROOT_BLOCK_SIZE, MEM_ROOT_PREALLOC);
-
- /* needs to call my_thread_init(), otherwise we get a coredump in DBUG_ stuff*/
- my_thread_init();
+ VOID(pthread_mutex_lock(&LOCK_evex_running));
+ if (evex_is_running)
+ UNLOCK_MUTEX_AND_BAIL_OUT(LOCK_evex_running, err_no_thd);
+ VOID(pthread_mutex_unlock(&LOCK_evex_running));
if (sizeof(my_time_t) != sizeof(time_t))
{
@@ -427,8 +437,16 @@ event_executor_main(void *arg)
sql_print_error("SCHEDULER: Cannot create THD for the main thread.");
goto err_no_thd;
}
- thd->thread_stack = (char*)&thd; // remember where our stack is
+ /* init memory root */
+ init_alloc_root(&evex_mem_root, MEM_ROOT_BLOCK_SIZE, MEM_ROOT_PREALLOC);
+
+ /* needs to call my_thread_init(), otherwise we get a coreDUMB in DBUG_ stuff*/
+ my_thread_init();
+
+ /* remember where our stack is */
+ thd->thread_stack = (char*)&thd;
+
pthread_detach_this_thread();
if (init_event_thread(thd))
@@ -456,11 +474,16 @@ event_executor_main(void *arg)
VOID(pthread_mutex_lock(&LOCK_event_arrays));
evex_queue_init(&EVEX_EQ_NAME);
VOID(pthread_mutex_unlock(&LOCK_event_arrays));
+ if (evex_is_running)
+ UNLOCK_MUTEX_AND_BAIL_OUT(LOCK_evex_running, finish);
evex_is_running= true;
VOID(pthread_mutex_unlock(&LOCK_evex_running));
thd->security_ctx->user= my_strdup("event_scheduler", MYF(0));
+ if (!event_executor_running_global_var)
+ goto finish;
+
if (evex_load_events_from_db(thd))
goto finish;
@@ -470,8 +493,8 @@ event_executor_main(void *arg)
while (!thd->killed)
{
TIME time_now;
- event_timed *et;
-
+ Event_timed *et;
+
cnt++;
DBUG_PRINT("info", ("EVEX External Loop %d thd->k", cnt));
@@ -482,9 +505,9 @@ event_executor_main(void *arg)
break;
}
- if (!evex_queue_num_elements(EVEX_EQ_NAME))
+ if (!EVEX_EQ_NAME.elements)
{
- my_sleep(1000000);// sleep 1s
+ my_sleep(100000);// sleep 0.1s
continue;
}
@@ -513,13 +536,13 @@ restart_ticking:
thd->end_time();
my_tz_UTC->gmt_sec_to_TIME(&time_now, thd->query_start());
- if (!evex_queue_num_elements(EVEX_EQ_NAME))
+ if (!EVEX_EQ_NAME.elements)
{
VOID(pthread_mutex_unlock(&LOCK_event_arrays));
DBUG_PRINT("evex main thread",("empty queue"));
continue;
}
- et= evex_queue_first_element(&EVEX_EQ_NAME, event_timed*);
+ et= evex_queue_first_element(&EVEX_EQ_NAME, Event_timed*);
DBUG_PRINT("evex main thread",("got event from the queue"));
if (!et->execute_at_null && my_time_compare(&time_now,&et->execute_at) == -1)
@@ -534,17 +557,17 @@ restart_ticking:
{
int fork_ret_code;
- DBUG_PRINT("evex main thread", ("[%10s] this exec at [%llu]", et->name.str,
+ DBUG_PRINT("evex main thread", ("[%10s] this exec at [%llu]", et->ident.name.str,
TIME_to_ulonglong_datetime(&et->execute_at)));
et->mark_last_executed(thd);
if (et->compute_next_execution_time())
{
sql_print_error("SCHEDULER: Error while computing time of %s.%s . "
"Disabling after execution.",
- et->dbname.str, et->name.str);
+ et->ident.dbname.str, et->ident.name.str);
et->status= MYSQL_EVENT_DISABLED;
}
- DBUG_PRINT("evex main thread", ("[%10s] next exec at [%llu]", et->name.str,
+ DBUG_PRINT("evex main thread", ("[%10s] next exec at [%llu]", et->ident.name.str,
TIME_to_ulonglong_datetime(&et->execute_at)));
et->update_fields(thd);
@@ -555,11 +578,6 @@ restart_ticking:
thread_safe_decrement(workers_count, &LOCK_workers_count);
sql_print_error("SCHEDULER: Problem while trying to create a thread");
UNLOCK_MUTEX_AND_BAIL_OUT(LOCK_event_arrays, finish);
- case EVENT_EXEC_ALREADY_EXEC:
- thread_safe_decrement(workers_count, &LOCK_workers_count);
- sql_print_information("SCHEDULER: %s.%s in execution. Skip this time.",
- et->dbname.str, et->name.str);
- break;
default:
DBUG_ASSERT(!fork_ret_code);
if (fork_ret_code)
@@ -588,10 +606,6 @@ finish:
/* First manifest that this thread does not work and then destroy */
VOID(pthread_mutex_lock(&LOCK_evex_running));
- evex_is_running= false;
- evex_main_thread_id= 0;
- VOID(pthread_mutex_unlock(&LOCK_evex_running));
-
/*
TODO: A better will be with a conditional variable
@@ -604,6 +618,7 @@ finish:
while (1)
{
VOID(pthread_mutex_lock(&LOCK_workers_count));
+ DBUG_PRINT("info", ("workers count %d", workers_count));
if (!workers_count)
{
VOID(pthread_mutex_unlock(&LOCK_workers_count));
@@ -619,9 +634,9 @@ finish:
*/
sql_print_information("SCHEDULER: Emptying the queue.");
VOID(pthread_mutex_lock(&LOCK_event_arrays));
- for (i= 0; i < evex_queue_num_elements(EVEX_EQ_NAME); ++i)
+ for (i= 0; i < EVEX_EQ_NAME.elements; ++i)
{
- event_timed *et= evex_queue_element(&EVEX_EQ_NAME, i, event_timed*);
+ Event_timed *et= evex_queue_element(&EVEX_EQ_NAME, i, Event_timed*);
et->free_sp();
delete et;
}
@@ -644,12 +659,13 @@ finish:
pthread_mutex_unlock(&LOCK_thread_count);
-err_no_thd:
- VOID(pthread_mutex_lock(&LOCK_evex_running));
- evex_is_running= false;
+ free_root(&evex_mem_root, MYF(0));
+ evex_is_running= false;
+ evex_main_thread_id= 0;
VOID(pthread_mutex_unlock(&LOCK_evex_running));
+err_no_thd:
- free_root(&evex_mem_root, MYF(0));
+ event_executor_running_global_var= false;
sql_print_information("SCHEDULER: Stopped.");
#ifndef DBUG_FAULTY_THR
@@ -673,7 +689,7 @@ pthread_handler_t
event_executor_worker(void *event_void)
{
THD *thd; /* needs to be first for thread_stack */
- event_timed *event = (event_timed *) event_void;
+ Event_timed *event = (Event_timed *) event_void;
MEM_ROOT worker_mem_root;
DBUG_ENTER("event_executor_worker");
@@ -712,24 +728,33 @@ event_executor_worker(void *event_void)
{
int ret;
+ if (!event->can_spawn_now_n_lock(thd))
+ {
+ sql_print_information("SCHEDULER: %s.%s in execution. Skip this time.",
+ event->ident.dbname.str, event->ident.name.str);
+ goto err;
+ }
+
sql_print_information("SCHEDULER: Executing event %s.%s of %s [EXPR:%d]",
- event->dbname.str, event->name.str,
- event->definer.str, (int) event->expression);
+ event->ident.dbname.str, event->ident.name.str,
+ event->ident.definer.str, (int) event->expression);
ret= event->execute(thd, &worker_mem_root);
evex_print_warnings(thd, event);
sql_print_information("SCHEDULER: Executed event %s.%s of %s [EXPR:%d]. "
- "RetCode=%d", event->dbname.str, event->name.str,
- event->definer.str, (int) event->expression, ret);
+ "RetCode=%d", event->ident.dbname.str,
+ event->ident.name.str, event->ident.definer.str,
+ (int) event->expression, ret);
if (ret == EVEX_COMPILE_ERROR)
sql_print_information("SCHEDULER: COMPILE ERROR for event %s.%s of",
- event->dbname.str, event->name.str,
- event->definer.str);
+ event->ident.dbname.str, event->ident.name.str,
+ event->ident.definer.str);
else if (ret == EVEX_MICROSECOND_UNSUP)
sql_print_information("SCHEDULER: MICROSECOND is not supported");
+
+ event->spawn_thread_finish(thd);
}
- event->spawn_thread_finish(thd);
err:
@@ -808,8 +833,8 @@ evex_load_events_from_db(THD *thd)
init_read_record(&read_record_info, thd, table ,NULL,1,0);
while (!(read_record_info.read_record(&read_record_info)))
{
- event_timed *et;
- if (!(et= new event_timed))
+ Event_timed *et;
+ if (!(et= new Event_timed))
{
DBUG_PRINT("evex_load_events_from_db", ("Out of memory"));
ret= -1;
@@ -825,13 +850,15 @@ evex_load_events_from_db(THD *thd)
}
if (et->status != MYSQL_EVENT_ENABLED)
{
- DBUG_PRINT("evex_load_events_from_db",("%s is disabled",et->name.str));
+ DBUG_PRINT("evex_load_events_from_db", ("%s.%s of %s is disabled",
+ et->ident.dbname.str, et->ident.name.str,
+ et->ident.definer.str));
delete et;
continue;
}
-
- DBUG_PRINT("evex_load_events_from_db",
- ("Event %s loaded from row. Time to compile", et->name.str));
+
+ DBUG_PRINT("evex_load_events_from_db", ("Event %s.%s loaded from row. "
+ "Time to compile", et->ident.dbname.str, et->ident.name.str));
switch (ret= et->compile(thd, &evex_mem_root)) {
case EVEX_MICROSECOND_UNSUP:
@@ -840,7 +867,7 @@ evex_load_events_from_db(THD *thd)
goto end;
case EVEX_COMPILE_ERROR:
sql_print_error("SCHEDULER: Error while compiling %s.%s. Aborting load.",
- et->dbname.str, et->name.str);
+ et->ident.dbname.str, et->ident.name.str);
goto end;
default:
break;
@@ -850,7 +877,7 @@ evex_load_events_from_db(THD *thd)
if (et->compute_next_execution_time())
{
sql_print_error("SCHEDULER: Error while computing execution time of %s.%s."
- " Skipping", et->dbname.str, et->name.str);
+ " Skipping", et->ident.dbname.str, et->ident.name.str);
continue;
}
@@ -858,7 +885,7 @@ evex_load_events_from_db(THD *thd)
evex_queue_insert(&EVEX_EQ_NAME, (EVEX_PTOQEL) et);
DBUG_PRINT("evex_load_events_from_db", ("%p %*s",
- et, et->name.length,et->name.str));
+ et, et->ident.name.length, et->ident.name.str));
count++;
}
@@ -897,9 +924,12 @@ end:
bool
sys_var_event_executor::update(THD *thd, set_var *var)
{
+ bool wait= false;
/* here start the thread if not running. */
DBUG_ENTER("sys_var_event_executor::update");
VOID(pthread_mutex_lock(&LOCK_evex_running));
+ wait= (*value && !var->save_result.ulong_value);
+
*value= var->save_result.ulong_value;
DBUG_PRINT("new_value", ("%d", *value));
@@ -909,7 +939,8 @@ sys_var_event_executor::update(THD *thd,
init_events();
} else
VOID(pthread_mutex_unlock(&LOCK_evex_running));
-
+ if (wait)
+ my_sleep(200000);
DBUG_RETURN(0);
}
@@ -941,8 +972,8 @@ static sql_print_xxx_func sql_print_xxx_
*/
bool
-evex_print_warnings(THD *thd, event_timed *et)
-{
+evex_print_warnings(THD *thd, Event_timed *et)
+{
MYSQL_ERROR *err;
DBUG_ENTER("evex_show_warnings");
char msg_buf[1024];
@@ -964,9 +995,10 @@ evex_print_warnings(THD *thd, event_time
prefix.append('@');
append_identifier(thd,&prefix,et->definer_host.str,et->definer_host.length);
prefix.append("][", 2);
- append_identifier(thd,&prefix, et->dbname.str, et->dbname.length);
+ append_identifier(thd,&prefix, et->ident.dbname.str,
+ et->ident.dbname.length);
prefix.append('.');
- append_identifier(thd,&prefix, et->name.str, et->name.length);
+ append_identifier(thd, &prefix, et->ident.name.str, et->ident.name.length);
prefix.append("] ", 2);
}
| Thread |
|---|
| • bk commit into 5.1 tree (andrey:1.2201) | ahristov | 28 Feb |